Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery and RQ support #49

Closed
jfhbrook opened this issue Apr 13, 2019 · 5 comments
Closed

Celery and RQ support #49

jfhbrook opened this issue Apr 13, 2019 · 5 comments

Comments

@jfhbrook
Copy link
Owner

jfhbrook commented Apr 13, 2019

6.0.0 has a way of supporting fairly arbitrary backends. That means that we could make an event emitter that can hook onto a celery app. Maybe like:

from celery import Celery
from pyee import CeleryEventEmitter

app = Celery('events', broker='pyamqp://guest@localhost//')

# Creates a unique @app.task that gets emit task calls for this
# EE from celery and executes standard emit behavior
ee = CeleryEventEmitter(app=app)

# Register handlers like normal
@ee.on('data')
def handler(data):
    print(data)

then you can spin up workers like normal:

celery -A example_app worker --loglevel=info

and emit from your main program:

from example_app import ee

# Calls our generated celery task with the payload;'
# actual emit code happens on servers
ee.emit('data', dict(some='data'))

See: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application

@jfhbrook jfhbrook changed the title Celery support Celery (and RQ?) support Apr 13, 2019
@jfhbrook
Copy link
Owner Author

Likely in this case there would be no special error handling, since celery doesn't by default try to do anything smart with errors either.

@jfhbrook
Copy link
Owner Author

jfhbrook commented Apr 13, 2019

RQ is a little weirder:

http://python-rq.org/

from pyee import RQEventEmitter

# Sets a (name, BaseEventEmitter()) key/val pair in a
# shared dict namespace. The worker and the
# client have to agree on these names, so no
# autogeneration on the fly unfortunately.
ee = RQEventEmitter(name='a_unique_deterministic_name')

# Registers the event with the underlying base EE
@ee.on('data')
def handler(data):
    print(data)

Then you spin up your rq worker:

rq worker

and then call the EE in your app

from redis import Redis
from rq import Queue

from sample_app import ee

# You only need to set up a queue object if you're emitting,
# so in this example we do it after ee is assembled
ee.queue = Queue(connection=Redis())

# Enqueues a global event handler function (pyee._rq.emit or
# something) with the name of the EE, the name of the event,
# and the payload. On the worker, the underlying base EE gets
# called.
ee.emit('data', dict(some='data'))

RQ seems to allow returning results but doesn't require us to do so.

Error handling is a little involved: http://python-rq.org/docs/exceptions/ I think the best bet is going to be to have no special behavior, meaning that because everything is running synchronously by default raised exceptions will trigger the standard DLQ behavior.

@jfhbrook jfhbrook changed the title Celery (and RQ?) support Celery (and RQ? SQS?) support Apr 13, 2019
@jfhbrook
Copy link
Owner Author

jfhbrook commented Apr 13, 2019

Fun fact, Celery supports not just rabbit but also redis SQS and *checks notes* zookeeper

http://docs.celeryproject.org/en/latest/getting-started/brokers/

@jfhbrook jfhbrook changed the title Celery (and RQ? SQS?) support Celery and RQ support Apr 13, 2019
@jfhbrook
Copy link
Owner Author

jfhbrook commented Apr 13, 2019

One edge case here is what happens if a worker emits an event on its own emitter.

I would hope that celery handles this - that if you call app.handler on the worker it gets inserted into the queue like everything else. I would hope!

I think RQ doesn't have any special handling here, but I think we can say that if the queue is set then we submit to the queue, and otherwise either raise or emit on the underlying (configurable?).

@jfhbrook
Copy link
Owner Author

Over the years, I've reached the conclusion that the EventEmitter API has a lot of weaknesses as compared to alternative APIs, such as futures, coroutines and channels. For example, it's not type safe and errors regularly lose their context. Moreover, the fact that error handling diverges so wildly between subclasses is itself indicative of a leaky abstraction.

Meanwhile, the Celery and RQ abstractions are tailor-fit for their use cases. Any adopting to pyee I'd make would necessarily lose fidelity - after all, it has to adapt to the behavior of message queues, which are pretty different from fire-and-forget events.

I've come to see pyee less as a good idea or an itch that needs scratching, and more as a tool for when you need or want to use Node-like abstractions. For instance, it would be appropriate if you were doing a straight port of some Node.js code, and it would be defensible if you were a Node.js developer finding themselves straying temporarily into Python. However, I would not recommend introducing pyee as a layer of abstraction into a Python project otherwise - which would certainly be the case with a Celery or RQ backend.

Part of me thinks it would be fun to implement this feature. But given I think actually using it would be a bad idea, I think it would be best to close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant