Implement MQ support #18

Merged
merged 10 commits into from Dec 29, 2014

Conversation

Projects
None yet
4 participants
@djmitche
Contributor

djmitche commented Dec 24, 2014

Blueprints should have easy access to reliable message production and (in other processes than the web server) consumption.

@djmitche djmitche self-assigned this Apr 4, 2014

@djmitche djmitche added the difficult label Sep 5, 2014

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 9, 2014

Contributor

Specifically, I think this will mean sending to Pulse, and probably using MozillaPulse, although all I can find on that project is a tarball on pypi, so maybe not.

We might also consider re-using WSME as a schema definition for the message bodies.

@markrcote, any quick guidance?

Contributor

djmitche commented Dec 9, 2014

Specifically, I think this will mean sending to Pulse, and probably using MozillaPulse, although all I can find on that project is a tarball on pypi, so maybe not.

We might also consider re-using WSME as a schema definition for the message bodies.

@markrcote, any quick guidance?

@markrcote

This comment has been minimized.

Show comment
Hide comment
@markrcote

markrcote Dec 12, 2014

Heya. The main source of documentation for Pulse is the wiki page: https://wiki.mozilla.org/Auto-tools/Projects/Pulse

MozillaPulse (or just mozillapulse) is the official Python lib for publishing to or consuming from Pulse. Note that it kind of sucks, so we'll be rewriting it at some point. But it's probably your best bet for now.

Heya. The main source of documentation for Pulse is the wiki page: https://wiki.mozilla.org/Auto-tools/Projects/Pulse

MozillaPulse (or just mozillapulse) is the official Python lib for publishing to or consuming from Pulse. Note that it kind of sucks, so we'll be rewriting it at some point. But it's probably your best bet for now.

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 16, 2014

Contributor

We'll need generic MQ support for #13. That should be written in such a way that it can use either AMQP or SQS, since we'll be moving this to Amazon soon.

#118 is about pulse specifically, so I'll copy Mark's comments over there.

Contributor

djmitche commented Dec 16, 2014

We'll need generic MQ support for #13. That should be written in such a way that it can use either AMQP or SQS, since we'll be moving this to Amazon soon.

#118 is about pulse specifically, so I'll copy Mark's comments over there.

@djmitche djmitche modified the milestone: v1.1.0 Dec 16, 2014

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 23, 2014

Contributor

Using SQS itself is pretty simple:

broker_url = 'sqs://%s:%s@' % (urllib.quote(access_key, safe=''), urllib.quote(secret_key, safe=''))

with kombu.Connection(broker_url, transport_options=dict(region='us-east-1')) as conn:
    simple_queue = conn.SimpleQueue('dustin_test_queue')
    message = 'helloword, sent at %s' % datetime.datetime.today()
    simple_queue.put(message)
    print('Sent: %s' % message)
    simple_queue.close()

    message = simple_queue.get(block=True, timeout=20)
    print("Received: %s" % message.payload)
    message.ack()
    simple_queue.close()

And anyway RelengAPI won't be doing any consuming (except Celery's consumption, but that's unrelated). So I think that the only infrastructural bits required here are around configuration and documentation. Ideally within a handler I could just write

from relengapi.lib import mq

q = mq.Queue('myapp', 'myqueuename')

@route(..)
def my_route():
    ...
    q.put(message)

And that would handle the creation of connections based on the configuration, along with dynamic creation of kombu queue objects.

Contributor

djmitche commented Dec 23, 2014

Using SQS itself is pretty simple:

broker_url = 'sqs://%s:%s@' % (urllib.quote(access_key, safe=''), urllib.quote(secret_key, safe=''))

with kombu.Connection(broker_url, transport_options=dict(region='us-east-1')) as conn:
    simple_queue = conn.SimpleQueue('dustin_test_queue')
    message = 'helloword, sent at %s' % datetime.datetime.today()
    simple_queue.put(message)
    print('Sent: %s' % message)
    simple_queue.close()

    message = simple_queue.get(block=True, timeout=20)
    print("Received: %s" % message.payload)
    message.ack()
    simple_queue.close()

And anyway RelengAPI won't be doing any consuming (except Celery's consumption, but that's unrelated). So I think that the only infrastructural bits required here are around configuration and documentation. Ideally within a handler I could just write

from relengapi.lib import mq

q = mq.Queue('myapp', 'myqueuename')

@route(..)
def my_route():
    ...
    q.put(message)

And that would handle the creation of connections based on the configuration, along with dynamic creation of kombu queue objects.

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 23, 2014

Contributor

Well, I learned some things: http://code.v.igoro.us/posts/2014/12/kombu-sqs.html

We want to be able to send pretty arbitrary strings, since they'll be consumed by things not running Kombu. So we should probably just use Boto directly. It'll be nice to have a "debug" mode that logs the messages produced, but does not actually send them.

Contributor

djmitche commented Dec 23, 2014

Well, I learned some things: http://code.v.igoro.us/posts/2014/12/kombu-sqs.html

We want to be able to send pretty arbitrary strings, since they'll be consumed by things not running Kombu. So we should probably just use Boto directly. It'll be nice to have a "debug" mode that logs the messages produced, but does not actually send them.

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 24, 2014

Contributor

OK, I got something put together. You can send a message like this:

    @app.route("/inject")
    @api.apimethod(None)
    def inject():
        # ...
        app.aws.sqs_write('thing',
                          VersionInfo(distributions=dists, blueprints=blueprints))

Where 'thing' references the settings, which specify a region and SQS queue name, and implicitly the AWS credentials to use to access those. The second argument is the body, and it is treated as a WSME type, JSONified, and then base64-encoded.

To Do:

  • tests for the new AWS module (using moto)
  • update setup.py to require boto (and moto for tests)
  • developer documentation
  • deployment documentation
  • 'raw' mode, with a string to be handed directly to SQS
  • receiving messages and calling some function in the appropriate context -- in a different process?
  • refactor to give region/queue config directly in the blueprint configs, rather than with a mapping
Contributor

djmitche commented Dec 24, 2014

OK, I got something put together. You can send a message like this:

    @app.route("/inject")
    @api.apimethod(None)
    def inject():
        # ...
        app.aws.sqs_write('thing',
                          VersionInfo(distributions=dists, blueprints=blueprints))

Where 'thing' references the settings, which specify a region and SQS queue name, and implicitly the AWS credentials to use to access those. The second argument is the body, and it is treated as a WSME type, JSONified, and then base64-encoded.

To Do:

  • tests for the new AWS module (using moto)
  • update setup.py to require boto (and moto for tests)
  • developer documentation
  • deployment documentation
  • 'raw' mode, with a string to be handed directly to SQS
  • receiving messages and calling some function in the appropriate context -- in a different process?
  • refactor to give region/queue config directly in the blueprint configs, rather than with a mapping
@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 27, 2014

Contributor

OK, sending messages is complete and ready for you to have a look. Now, on to receiving messages, and I'd like a few more opinions on the design (@mrrrgn? @Callek?)

The immediate need to receive messages comes from BuildAPI. It sends job messages to masters on one queue, and when the masters act on that, they send back a completion message on another queue. BuildAPI consumes from that queue and updates the DB record for the job. In the current implementation of BuildAPI, that consumer is running in a thread in each WSGI process.

So we have a few options:

  • don't implement receiving at all (masters can just make an authenticated REST call instead)
  • implement receiving, and set up consuming threads in each WSGI process on the webheads
  • implement receiving, and set up a separate relengapi command to run the consuming threads in a different process (and, in AWS, run that in a different auto-scaling group, based on queue lengths). This would be similar to relengapi badpenny-cron and celery -A relengapi.

The advantage of the first is simplicity, but REST calls lack the automatic retry until success functionality of a queue. The second is easier to deploy, but ties the scaling of the queue consumers to the scaling of the webheads. The third is a little harder (and maybe more expensive) to deploy, but probably the most correct.

In any case but the first, I think that the syntax would look like this:

@bp.record
def init_blueprint(state):
    app = state.app
    region, queue = app.config['MYBLUEPRINT']['my_queue']
    @app.aws.sqs_listener(region, queue)
    def my_queue_message(queue, message):
        ...

The @bp.record is (I think) required because the sqs_listener decorator needs the region and queue name, and those are only available after the app config is loaded.

Contributor

djmitche commented Dec 27, 2014

OK, sending messages is complete and ready for you to have a look. Now, on to receiving messages, and I'd like a few more opinions on the design (@mrrrgn? @Callek?)

The immediate need to receive messages comes from BuildAPI. It sends job messages to masters on one queue, and when the masters act on that, they send back a completion message on another queue. BuildAPI consumes from that queue and updates the DB record for the job. In the current implementation of BuildAPI, that consumer is running in a thread in each WSGI process.

So we have a few options:

  • don't implement receiving at all (masters can just make an authenticated REST call instead)
  • implement receiving, and set up consuming threads in each WSGI process on the webheads
  • implement receiving, and set up a separate relengapi command to run the consuming threads in a different process (and, in AWS, run that in a different auto-scaling group, based on queue lengths). This would be similar to relengapi badpenny-cron and celery -A relengapi.

The advantage of the first is simplicity, but REST calls lack the automatic retry until success functionality of a queue. The second is easier to deploy, but ties the scaling of the queue consumers to the scaling of the webheads. The third is a little harder (and maybe more expensive) to deploy, but probably the most correct.

In any case but the first, I think that the syntax would look like this:

@bp.record
def init_blueprint(state):
    app = state.app
    region, queue = app.config['MYBLUEPRINT']['my_queue']
    @app.aws.sqs_listener(region, queue)
    def my_queue_message(queue, message):
        ...

The @bp.record is (I think) required because the sqs_listener decorator needs the region and queue name, and those are only available after the app config is loaded.

@ghost

This comment has been minimized.

Show comment
Hide comment
@ghost

ghost Dec 29, 2014

Heck yeah. This looks most excellent to me. Regarding the recieving implementation, my support falls behind number 3. It will require some extra work to deploy; but I don't see why we couldn't just start out by running a consumer process as the default on every web host. That would make it only about as complicated as choice 2; but still more flexible (we could give it a unique auto-scaling group later).

The necessity of using @bp.record doesn't seem like a huge deal either fwiw, since it seems to be idomatic for flask projects. Interested in reading input from others.

ghost commented Dec 29, 2014

Heck yeah. This looks most excellent to me. Regarding the recieving implementation, my support falls behind number 3. It will require some extra work to deploy; but I don't see why we couldn't just start out by running a consumer process as the default on every web host. That would make it only about as complicated as choice 2; but still more flexible (we could give it a unique auto-scaling group later).

The necessity of using @bp.record doesn't seem like a huge deal either fwiw, since it seems to be idomatic for flask projects. Interested in reading input from others.

@djmitche djmitche added the r? label Dec 29, 2014

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 29, 2014

Contributor

OK, implemented! And that being the last piece, this is ready for review.

Contributor

djmitche commented Dec 29, 2014

OK, implemented! And that being the last piece, this is ready for review.

+
+ Generic methods:
+
+ .. py:method:: connect_to(service_name, region_name)

This comment has been minimized.

@Callek

Callek Dec 29, 2014

Contributor

for all these docs, I'd much rather their doc be in-line rather than in the .rst. This is info that would help in reading the code as well.

@Callek

Callek Dec 29, 2014

Contributor

for all these docs, I'd much rather their doc be in-line rather than in the .rst. This is info that would help in reading the code as well.

This comment has been minimized.

@djmitche

djmitche Dec 29, 2014

Contributor

That's out of scope for this bug

@djmitche

djmitche Dec 29, 2014

Contributor

That's out of scope for this bug

+ except Exception:
+ logger.exception("While getting queue %r in region %s; listening cancelled",
+ queue_name, region_name)
+ return

This comment has been minimized.

@Callek

Callek Dec 29, 2014

Contributor

Total relengapi coverage dropped (0.22%)

This except block was one of the uncovered pieces.

@Callek

Callek Dec 29, 2014

Contributor

Total relengapi coverage dropped (0.22%)

This except block was one of the uncovered pieces.

+ # note that we do nothing with the message; it will
+ # remain invisible for a while, then reappear and maybe
+ # cause another exception
+ continue

This comment has been minimized.

@Callek

Callek Dec 29, 2014

Contributor

also uncovered was this except block

@Callek

Callek Dec 29, 2014

Contributor

also uncovered was this except block

relengapi/lib/aws.py
+ # threads will be killed during process shutdown
+ if not _testing: # pragma: no cover
+ while True:
+ time.sleep(2 ** 31)

This comment has been minimized.

@Callek

Callek Dec 29, 2014

Contributor

Note that thd.daemon=true is not generally recommended since it doesn't allow resource cleanup at thread termination.

@Callek

Callek Dec 29, 2014

Contributor

Note that thd.daemon=true is not generally recommended since it doesn't allow resource cleanup at thread termination.

This comment has been minimized.

@djmitche

djmitche Dec 29, 2014

Contributor

That's the idea here :)

@djmitche

djmitche Dec 29, 2014

Contributor

That's the idea here :)

+ logger.exception("while invoking %r", listener)
+ # note that we do nothing with the message; it will
+ # remain invisible for a while, then reappear and maybe
+ # cause another exception

This comment has been minimized.

@Callek

Callek Dec 29, 2014

Contributor

not a fan of "do nothing with the message" which in this case is even "do not log" though.

@Callek

Callek Dec 29, 2014

Contributor

not a fan of "do nothing with the message" which in this case is even "do not log" though.

This comment has been minimized.

@djmitche

djmitche Dec 29, 2014

Contributor

What do you think logger.exception does? :)

The message goes back in the queue, and once it's visible again, will be re-delivered. So this is the typical consumer resilience pattern.

@djmitche

djmitche Dec 29, 2014

Contributor

What do you think logger.exception does? :)

The message goes back in the queue, and once it's visible again, will be re-delivered. So this is the typical consumer resilience pattern.

This comment has been minimized.

@Callek

Callek Dec 29, 2014

Contributor

Didn't think logger.exception output the passed-in arg values (e.g. the message itself). Which is what I was trying to suggest here. If my memory of that is incorrect feel free to ignore.

@Callek

Callek Dec 29, 2014

Contributor

Didn't think logger.exception output the passed-in arg values (e.g. the message itself). Which is what I was trying to suggest here. If my memory of that is incorrect feel free to ignore.

@Callek

This comment has been minimized.

Show comment
Hide comment
@Callek

Callek Dec 29, 2014

Contributor

with my comments I'm hesitantly calling this 👍 (hesitant only because I'm not too familiar with boto/sqs) and I would like some guidance here in doc form on "when to use what queue feature".

Followups could be for allowing multiple authentication sources (e.g. multiple API keys for different groups/engines --- e.g. we may want to allow one key to do the sqs stuff, but another more restricted key that can create/stop instances)

Nomatter what, for me to have full personal confidence I'd need to see a consumer of this API, which I don't expect in this PR.

Contributor

Callek commented Dec 29, 2014

with my comments I'm hesitantly calling this 👍 (hesitant only because I'm not too familiar with boto/sqs) and I would like some guidance here in doc form on "when to use what queue feature".

Followups could be for allowing multiple authentication sources (e.g. multiple API keys for different groups/engines --- e.g. we may want to allow one key to do the sqs stuff, but another more restricted key that can create/stop instances)

Nomatter what, for me to have full personal confidence I'd need to see a consumer of this API, which I don't expect in this PR.

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 29, 2014

Contributor

coverage is fixed

Contributor

djmitche commented Dec 29, 2014

coverage is fixed

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 29, 2014

Contributor

AWS keys can have arbitrary permissions -- it's very unusual to have a single app using multiple credentials. So I don't think we need to support that.

Contributor

djmitche commented Dec 29, 2014

AWS keys can have arbitrary permissions -- it's very unusual to have a single app using multiple credentials. So I don't think we need to support that.

@djmitche

This comment has been minimized.

Show comment
Hide comment
@djmitche

djmitche Dec 29, 2014

Contributor

Also, there's only one queue feature (SQS). Celery uses a queue internally, and Badpenny uses Celery, but that doesn't mean they're queues. If you need one of these tools, which to choose is obvious.

The first use-case for this functionality will be BuildAPI, which sends messages to masters and waits for messages back indicating the task has been completed. The masters aren't running RelengAPI or Celery, nor do I want them to. Other use-cases may be interfacing with SQS from other languages, too (e.g., taskcluster).

Contributor

djmitche commented Dec 29, 2014

Also, there's only one queue feature (SQS). Celery uses a queue internally, and Badpenny uses Celery, but that doesn't mean they're queues. If you need one of these tools, which to choose is obvious.

The first use-case for this functionality will be BuildAPI, which sends messages to masters and waits for messages back indicating the task has been completed. The masters aren't running RelengAPI or Celery, nor do I want them to. Other use-cases may be interfacing with SQS from other languages, too (e.g., taskcluster).

@Callek

This comment has been minimized.

Show comment
Hide comment
@Callek

Callek Dec 29, 2014

Contributor

...unless we want to support keys that are themselves attached to different AWS accounts, (e.g. releng + IT, or releng + ATeam, etc.)

Contributor

Callek commented Dec 29, 2014

...unless we want to support keys that are themselves attached to different AWS accounts, (e.g. releng + IT, or releng + ATeam, etc.)

@ghost

This comment has been minimized.

Show comment
Hide comment
@ghost

ghost Dec 29, 2014

This is a 👍 from me

ghost commented Dec 29, 2014

This is a 👍 from me

djmitche added a commit to djmitche/build-relengapi that referenced this pull request Dec 29, 2014

@moz-v2v-gh moz-v2v-gh merged commit 1ff4986 into mozilla:master Dec 29, 2014

1 check passed

continuous-integration/travis-ci The Travis CI build passed
Details

@djmitche djmitche modified the milestones: v1.2.0, v1.1.0 Jan 16, 2015

@djmitche djmitche modified the milestones: v1.1.0, v1.2.0 Jan 16, 2015

djmitche pushed a commit to djmitche/build-relengapi that referenced this pull request May 27, 2015

Morgan Phillips
Merge pull request #18 from mrrrgn/release-hide
Removes the ability of users to see release-* builders

djmitche pushed a commit to djmitche/build-relengapi that referenced this pull request May 27, 2015

Merge pull request #18 from Callek/issue18
Conflicts:
	relengapi/blueprints/slaveloan/__init__.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment