DO NOT MERGE - Adding RabbitMQ connection class. #1211

Closed
wants to merge 2 commits into
from

Projects

None yet

6 participants

@twobraids
Member

this looks fine, let's try it to see if it works...

@peterbe peterbe commented on the diff May 1, 2013
socorro/external/rabbitmq/connection_context.py
+ port=local_config.port,
+ virtual_host=local_config.virtual_host,
+ credentials=credentials
+ )
+
+ self.conn = pika.BlockingConnection(self.connection_params)
+
+ self.operational_exceptions = (
+ pika.exceptions.AMQPConnectionError,
+ pika.exceptions.ChannelClosed,
+ pika.exceptions.ConnectionClosed,
+ pika.exceptions.NoFreeChannels,
+
+ socket.timeout
+ )
+
@peterbe
peterbe May 1, 2013 collaborator

This is nitpickery but I think it's worth dipping in.

In the above code you have three different styles of indentation of parameters.

  • One in line with the first (
  • One with 28 spaces of indentation
  • One with 2 spaces of indentation

The pep8 module will allow the first type only but I think none of them are right actually. (The first one is wrong because the lines are 89 characters wide).

The ideal I try to stick to is this format:

credentials = pika.credentials.PlainCredentials(
    local_config.rabbitmq_user,
    local_config.rabbitmq_password,
)

That's going to pass the pep8 test and it has the main advantage that if you one day decide to add a third parameter no other lines need to change.
Or, more importantly; say you decide to call the variable credentials something shorter (or longer) like creds, then you only need to change one line without having to "re-dent" any other lines.

I hope this helps. Progress trumps perfection but this usually helps going forward because diffs become easier to read.

@selenamarie
Collaborator

Reminder to add pika to requirements/prod.txt

@rhelmer rhelmer commented on the diff May 2, 2013
socorro/external/rabbitmq/connection_context.py
+ name='port',
+ default=5672,
+ doc='the port for the RabbitMQ server',
+ )
+ required_config.add_option(
+ name='rabbitmq_user',
+ default='guest',
+ doc='the name of the user within the RabbitMQ instance',
+ )
+ required_config.add_option(
+ name='rabbitmq_password',
+ default='guest',
+ doc="the user's RabbitMQ password",
+ )
+
+ #--------------------------------------------------------------------------
@rhelmer
rhelmer May 2, 2013 Mozilla member

what are these comments for?

@rhelmer rhelmer commented on the diff May 2, 2013
socorro/external/rabbitmq/connection_context.py
+ classes may implement it."""
+ pass
+
+ #--------------------------------------------------------------------------
+ def force_reconnect(self):
+ pass
+
+
+#==============================================================================
+class ConnectionContextPooled(ConnectionContext): # pragma: no cover
+ """a configman compliant class that pools RabbitMQ connections"""
+ #--------------------------------------------------------------------------
+ def __init__(self, config, local_config=None):
+ super(ConnectionContextPooled, self).__init__(config, local_config)
+ #self.config.logger.debug("PostgresPooled - "
+ # "setting up connection pool")
@rhelmer
rhelmer May 2, 2013 Mozilla member

if this is useful debugging leave it on, or remove it

@rhelmer
Member
rhelmer commented May 2, 2013

This looks great overall. I don't understand the comment style in connection_context.py (the other class is not like this at all). In general I'd say if debug messages are useful leave them uncommented (can be turned on/off at runtime that way).

From the presence of basic_ack I assume this is using transactions and can handle both worker and queue failures, but I haven't read it thoroughly enough to understand how it fits together yet.

@erikrose erikrose commented on the diff May 3, 2013
socorro/external/rabbitmq/crashstorage.py
+ def new_crashes(self):
+ channel = self.rabbitmq.connection()
+ data = channel.basic_get(queue="socorro.priority")
+ if data == (None, None, None):
+ data = channel.basic_get(queue="socorro.normal")
+
+ while data != (None, None, None):
+ self.internal_cache[data[2]] = data[0]
+ yield data
+ data = channel.basic_get(queue="socorro.priority")
+ if data == (None, None, None):
+ data = channel.basic_get(queue="socorro.normal")
+
+
+ def ack_crash(self, crash_id):
+ if crash_id in self.internal_cache:
@erikrose
erikrose May 3, 2013 Mozilla member

A bit briefer would be to do a to_ack = self.internal_cache.pop(crash_id) here. Then you don't need the separate line for the del. The if/else bit can turn into a try/except KeyError.

@erikrose erikrose commented on the diff May 3, 2013
socorro/external/rabbitmq/crashstorage.py
+
+
+ def _save_raw_crash_transaction(self, channel, crash_id):
+ channel.basic_publish(
+ exchange='',
+ routing_key='socorro.normal',
+ body=crash_id,
+ properties=pika.BasicProperties(
+ delivery_mode = 2, # make message persistent
+ ))
+
+
+ def new_crashes(self):
+ channel = self.rabbitmq.connection()
+ data = channel.basic_get(queue="socorro.priority")
+ if data == (None, None, None):
@erikrose
erikrose May 3, 2013 Mozilla member

I'd far rather unpack this here so we don't need all the numerical indices below.

@erikrose erikrose commented on the diff May 3, 2013
socorro/external/rabbitmq/crashstorage.py
+ )
+
+
+ def _save_raw_crash_transaction(self, channel, crash_id):
+ channel.basic_publish(
+ exchange='',
+ routing_key='socorro.normal',
+ body=crash_id,
+ properties=pika.BasicProperties(
+ delivery_mode = 2, # make message persistent
+ ))
+
+
+ def new_crashes(self):
+ channel = self.rabbitmq.connection()
+ data = channel.basic_get(queue="socorro.priority")
@erikrose
erikrose May 3, 2013 Mozilla member

Rather than repeat some nontrivial code here (the get from the priority queue and the fallback to the normal one), why not emulate a do…while loop using a bool flag?

@brandonsavage

Closed and superseded by #1223

@erikrose erikrose referenced this pull request May 6, 2013
Closed

DO NOT MERGE rabbitmq WIP #1223

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment