Skip to content

Commit

Permalink
Bug 1068438 - Support multiple Pulse users per PulseGuardian account.
Browse files Browse the repository at this point in the history
Fix queue deletion.  Implement confirmation dialog when deleting queues
(and users).  PEP-8 fixes.  Misc. other fixes.
  • Loading branch information
Mark Côté committed Oct 3, 2014
1 parent af12458 commit a028d9d
Show file tree
Hide file tree
Showing 24 changed files with 3,813 additions and 2,958 deletions.
72 changes: 41 additions & 31 deletions pulseguardian/guardian.py
Expand Up @@ -8,7 +8,7 @@
import optparse

from model.base import init_db, db_session
from model.user import User
from model.user import PulseUser
from model.queue import Queue
from management import PulseManagementAPI
from sendemail import sendemail
Expand All @@ -18,7 +18,8 @@
logger = logging.getLogger(__name__)
handler = logging.handlers.RotatingFileHandler(config.GUARDIAN_LOG_PATH, mode='a+',
maxBytes=config.MAX_LOG_SIZE)
formatter = logging.Formatter("%(asctime)s - %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S")
formatter = logging.Formatter("%(asctime)s - %(levelname)s: %(message)s",
"%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)

logger.addHandler(handler)
Expand All @@ -40,9 +41,11 @@ class PulseGuardian(object):
:param on_delete: Callback called with a queue's name when it's deleted.
"""
def __init__(self, api, emails=True, warn_queue_size=config.warn_queue_size,
del_queue_size=config.del_queue_size, on_warn=None, on_delete=None):
del_queue_size=config.del_queue_size, on_warn=None,
on_delete=None):
if del_queue_size < warn_queue_size:
raise ValueError("Deletion threshold can't be smaller than the warning threshold.")
raise ValueError("Deletion threshold can't be smaller than the "
"warning threshold.")

self.api = api

Expand All @@ -53,16 +56,17 @@ def __init__(self, api, emails=True, warn_queue_size=config.warn_queue_size,
self.on_warn = on_warn
self.on_delete = on_delete

def delete_zombie_queues(self, queues):
def clear_deleted_queues(self, queues):
db_queues = Queue.query.all()

# Filter queues that are in the database but no longer on RabbitMQ.
alive_queues_names = set(q['name'] for q in queues)
zombie_queues = set(q for q in db_queues if q.name
not in alive_queues_names)
deleted_queues = set(q for q in db_queues if q.name
not in alive_queues_names)

# Delete those queues.
for queue in zombie_queues:
for queue in deleted_queues:
logger.info("Queue '{0}' has been deleted.".format(queue))
db_session.delete(queue)
db_session.commit()

Expand Down Expand Up @@ -94,26 +98,28 @@ def update_queue_information(self, queue_data):

# If no client is currently consuming the queue, just skip it.
if queue_data['consumers'] == 0:
logger.debug("Queue '{0}' skipped (no owner, no current consumer).".format(q_name))
logger.debug("Queue '{0}' skipped (no owner, no current "
"consumer).".format(q_name))
return queue

# Otherwise look for its user.
owner_name = self.api.queue_owner(queue_data)

user = User.query.filter(User.username == owner_name).first()
pulse_user = PulseUser.query.filter(
PulseUser.username == owner_name).first()

# If the queue was created by a user that isn't in the
# pulseguardian database, skip the queue.
if user is None:
if pulse_user is None:
logger.info(
"Queue '{0}' owner, {1}, isn't in the db. Creating the user.".format(q_name, owner_name))
user = User.new_user(username=owner_name, email='', password='',
management_api=None)
"Queue '{0}' owner, {1}, isn't in the db. Creating the "
"user.".format(q_name, owner_name))
pulse_user = PulseUser.new_user(owner_name)

# Assign the user to the queue.
logger.info(
"Assigning queue '{0}' to user {1}.".format(q_name, user))
queue.owner = user
"Assigning queue '{0}' to user {1}.".format(q_name, pulse_user))
queue.owner = pulse_user
db_session.add(queue)
db_session.commit()

Expand All @@ -129,34 +135,38 @@ def monitor_queues(self, queues):
# If a queue is over the deletion size, regardless of it having an
# owner or not, delete it.
if queue.size > self.del_queue_size:
logger.warning("Queue '{0}' deleted. Queue size = {1}; del_queue_size = {2}".format(
logger.warning("Queue '{0}' deleted. Queue size = {1}; "
"del_queue_size = {2}".format(
queue.name, queue.size, self.del_queue_size))
if queue.owner:
self.deletion_email(queue.owner, queue_data)
if queue.owner and queue.owner.owner:
self.deletion_email(queue.owner.owner, queue_data)
if self.on_delete:
self.on_delete(queue.name)
self.api.delete_queue(vhost=queue_data['vhost'], queue=queue.name)
self.api.delete_queue(vhost=queue_data['vhost'],
queue=queue.name)
db_session.delete(queue)
db_session.commit()
continue

if queue.owner is None:
if queue.owner is None or queue.owner.owner is None:
continue

if queue.size > self.warn_queue_size and not queue.warned:
logger.warning("Warning queue '{0}' owner. Queue size = {1}; warn_queue_size = {2}".format(
logger.warning("Warning queue '{0}' owner. Queue size = {1}; "
"warn_queue_size = {2}".format(
queue.name, queue.size, self.warn_queue_size))
queue.warned = True
if self.on_warn:
self.on_warn(queue.name)
self.warning_email(queue.owner, queue_data)
self.warning_email(queue.owner.owner, queue_data)
elif queue.size <= self.warn_queue_size and queue.warned:
# A previously warned queue got out of the warning threshold;
# its owner should not be warned again.
logger.warning("Queue '{0}' was in warning zone but is OK now".format(
queue.name, queue.size, self.del_queue_size))
logger.warning("Queue '{0}' was in warning zone but is OK "
"now".format(queue.name, queue.size,
self.del_queue_size))
queue.warned = False
self.back_to_normal_email(queue.owner, queue_data)
self.back_to_normal_email(queue.owner.owner, queue_data)

# Commit any changes to the queue.
db_session.add(queue)
Expand Down Expand Up @@ -193,7 +203,8 @@ def warning_email(self, user, queue_data):
def deletion_email(self, user, queue_data):
exchange = self._exchange_from_queue(queue_data)

subject = 'Pulse warning: queue "{0}" has been deleted'.format(queue_data['name'])
subject = 'Pulse warning: queue "{0}" has been deleted'.format(
queue_data['name'])
body = '''Your queue "{0}" on exchange "{1}" has been
deleted after exceeding the maximum number of unread messages. Upon deletion
there were {2} messages in the queue, out of a maximum {3} messages.
Expand All @@ -211,7 +222,8 @@ def deletion_email(self, user, queue_data):
def back_to_normal_email(self, user, queue_data):
exchange = self._exchange_from_queue(queue_data)

subject = 'Pulse warning: queue "{0}" is back to normal'.format(queue_data['name'])
subject = 'Pulse warning: queue "{0}" is back to normal'.format(
queue_data['name'])
body = '''Your queue "{0}" on exchange "{1}" is
now back to normal ({2} ready messages, {3} total messages).
'''.format(queue_data['name'], exchange, queue_data['messages_ready'],
Expand All @@ -227,11 +239,9 @@ def guard(self):
logger.info("PulseGuardian started")
while True:
queues = self.api.queues()

if queues:
self.monitor_queues(queues)
self.delete_zombie_queues(queues)

self.clear_deleted_queues(queues)
time.sleep(config.polling_interval)


Expand Down
14 changes: 7 additions & 7 deletions pulseguardian/management.py
Expand Up @@ -42,11 +42,11 @@ def __init__(self, host=DEFAULT_RABBIT_HOST,

def _api_request(self, path, method='GET', data=None):
session = requests.Session()
request = requests.Request(
method, 'http://{0}:{1}/api/{2}'.format(
self.host, self.management_port, path),
auth=(self.management_user, self.management_password),
data=json.dumps(data)).prepare()
url = 'http://{0}:{1}/api/{2}'.format(self.host, self.management_port,
path)
request = requests.Request(method, url, auth=(self.management_user,
self.management_password),
data=json.dumps(data)).prepare()
request.headers['Content-type'] = 'application/json'
response = None

Expand All @@ -64,8 +64,8 @@ def _api_request(self, path, method='GET', data=None):
return response.json()
except ValueError:
raise PulseManagementException(
"Error when calling '{0} {1}' with data={2}. Received : {3}".format(method, path,
data, response.content))
"Error when calling '{0} {1}' with data={2}. "
"Received: {3}".format(method, path, data, response.content))

# Queues

Expand Down
80 changes: 80 additions & 0 deletions pulseguardian/model/pulse_user.py
@@ -0,0 +1,80 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import re

from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

from base import Base, db_session
from queue import Queue


class PulseUser(Base):
"""User class, linked to a rabbitmq user (with the same username).
Provides access to a user's queues.
"""

__tablename__ = 'pulse_users'

id = Column(Integer, primary_key=True)
owner_id = Column(Integer, ForeignKey('users.id'), nullable=True)
username = Column(String(255), unique=True)

queues = relationship(
Queue, backref='owner', cascade='save-update, merge, delete')

@staticmethod
def new_user(username, password='', owner=None, management_api=None):
"""Initializes a new user, generating a salt and encrypting
his password. Then creates a RabbitMQ user if needed and sets
permissions.
"""
pulse_user = PulseUser(owner=owner, username=username)

if management_api is not None:
management_api.create_user(username=username, password=password)

read_perms = '^(queue/{0}/.*|exchange/.*)'.format(username)
write_conf_perms = '^(queue/{0}/.*|exchange/{0}/.*)'.format(
username)

management_api.set_permission(username=username,
vhost='/',
read=read_perms,
configure=write_conf_perms,
write=write_conf_perms)

db_session.add(pulse_user)
db_session.commit()

return pulse_user

@staticmethod
def strong_password(password):
return (re.findall('[0-9]', password) and
re.findall('[a-zA-Z]', password) and len(password) >= 6)

def change_password(self, new_password, management_api):
""""Changes" a user's password by deleting his rabbitmq account
and recreating it with the new password.
"""
try:
management_api.delete_user(self.username)
except management_api.exception:
pass

management_api.create_user(username=self.username,
password=new_password)
management_api.set_permission(username=self.username, vhost='/',
read='.*', configure='.*', write='.*')

db_session.add(self)
db_session.commit()

def __repr__(self):
return "<PulseUser(username='{0}', owner='{1}')>".format(self.username,
self.owner)

__str__ = __repr__
4 changes: 2 additions & 2 deletions pulseguardian/model/queue.py
Expand Up @@ -2,7 +2,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from sqlalchemy import Column, String, Integer, Boolean, ForeignKey
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String

from base import Base

Expand All @@ -11,7 +11,7 @@ class Queue(Base):
__tablename__ = 'queues'

name = Column(String(255), primary_key=True)
owner_id = Column(Integer, ForeignKey('users.id'))
owner_id = Column(Integer, ForeignKey('pulse_users.id'), nullable=True)
size = Column(Integer)

warned = Column(Boolean)
Expand Down

0 comments on commit a028d9d

Please sign in to comment.