Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to store events inside a Postgres database #724

Closed
wants to merge 13 commits into from

Conversation

bosonogi
Copy link
Contributor

@bosonogi bosonogi commented Jul 24, 2017

This backward-compatible change introduces a new persistence mode that is more reliable for a long-running process.

With the shelve based persistence, data is written to disk only when cleanly stopping Flower. If for any reason the process is abnormally terminated, the in-memory data is lost. This unfortunately happens quite often when running Flower inside a Docker container.

Postgres persistence implemented here works by adding an event callback to the EventsState instance that inserts events into the database as they happen (excluding worker heartbeat events). On process start, these events are replayed to recreate the State instance as it was.

Note that migration of data persisted in shelf databases is not possible (or at least easy) -- as such a migration would need to recreate events based on aggregated state.

Please review and comment; if I this feature is to be accepted I will add documentation for it.

@johnarnold
Copy link
Contributor

Cool idea. If this gets support, perhaps I will write a redis driver.

@mher
Copy link
Owner

mher commented Aug 21, 2017

I think reading task info directly from Celery result backend is a better approach (#542 describes it pretty well). It avoids creating redundant copy of tasks and querying can be faster. And the implementation is simple. It can be implemented by introducing a single configuration option and server-side data processor for DataTables.

@bosonogi
Copy link
Contributor Author

@mher The result store does not contain all meta data for tasks, so I don't see how that idea could work. Or is this maybe result backend specific? Also what about tasks that are decorated with task(ignore_result=True)? Would it be ok for them not to show up in Flower?

Storing raw events appears most useful from an auditing/monitoring standpoint mentioned by @wiltzius in #542 as events in themselves potentially contain more information then the task objects that are constructed by processing them. However I agree that his approach needs more work, and I will continue to improve on this PR as I use it.

@mher
Copy link
Owner

mher commented Aug 27, 2017

I think the ability to read directly from result backend can be optional and by default flower can show task info extracted from celery events. It is acceptable to omit some task attributes. The main ones (task name, args/kwargs, etc.) are available in result backend.

@mher
Copy link
Owner

mher commented Aug 27, 2017

Storing event in flower has several limitations:

  1. Events are not necessarily contain full task information. Celery truncates events if they exceed some threshold.
  2. It doesn't scale well. Flower will not be able to handle events from all workers and will be a limit to horizontal scalability.
  3. The current task search implementation is not optimal and has O(n) complexity. It will not work well with big task sets.

@johnarnold
Copy link
Contributor

what is the delta of task info between events and results backend?

@bosonogi
Copy link
Contributor Author

@johnarnold It's best to illustrate it with an example. Here are the data for a simple "echo" task:

Events related to the task

[
    {
        "eta": null,
        "pid": 1,
        "args": "['hi']",
        "name": "daneel.basic.echo",
        "type": "task-received",
        "uuid": "3e57bf51-6c97-4f9d-81ed-d951ef2150c6",
        "clock": 437165,
        "kwargs": "{}",
        "expires": null,
        "retries": 0,
        "root_id": "3e57bf51-6c97-4f9d-81ed-d951ef2150c6",
        "hostname": "celery@worker1.korriban-hub",
        "parent_id": null,
        "timestamp": 1505139192.2699912,
        "utcoffset": 0,
        "local_received": 1505139192.2711556
    },
    {
        "pid": 1,
        "type": "task-started",
        "uuid": "3e57bf51-6c97-4f9d-81ed-d951ef2150c6",
        "clock": 437166,
        "hostname": "celery@worker1.korriban-hub",
        "timestamp": 1505139192.274833,
        "utcoffset": 0,
        "local_received": 1505139192.2774658
    },
    {
        "pid": 1,
        "type": "task-succeeded",
        "uuid": "3e57bf51-6c97-4f9d-81ed-d951ef2150c6",
        "clock": 437167,
        "result": "'hi'",
        "runtime": 0.07660293905064464,
        "hostname": "celery@worker1.korriban-hub",
        "timestamp": 1505139192.3503628,
        "utcoffset": 0,
        "local_received": 1505139192.3522208
    }
]

celery.events.state.Task object (inspected from PyCharm's debugger)

These objects are constructed by Celery based on events, and serialized/deserialized when using the shelve-based persistent mode (through flower.events.EventsState).

<Task: daneel.basic.echo(3e57bf51-6c97-4f9d-81ed-d951ef2150c6) SUCCESS clock:437167>
 _fields = {tuple} <class 'tuple'>: ('uuid', 'name', 'state', 'received', 'sent', 'started', 'rejected', 'succeeded', 'failed', 'retried', 'revoked', 'args', 'kwargs', 'eta', 'expires', 'retries', 'worker', 'result', 'exception', 'timestamp', 'runtime', 'traceback', 'exchang
 _info_fields = {tuple} <class 'tuple'>: ('args', 'kwargs', 'retries', 'result', 'eta', 'runtime', 'expires', 'exception', 'exchange', 'routing_key', 'root_id', 'parent_id')
 _serializer_handlers = {dict} {'children': <bound method Task._serializable_children of <Task: daneel.basic.echo(3e57bf51-6c97-4f9d-81ed-d951ef2150c6) SUCCESS clock:437167>>, 'root': <bound method Task._serializable_root of <Task: daneel.basic.echo(3e57bf51-6c97-4f9d-81ed-d951ef2150c6)
 args = {str} '[\\'hi\\']'
 children = {WeakSet} <_weakrefset.WeakSet object at 0x7fcf2b602588>
 client = {NoneType} None
 clock = {int} 437167
 cluster_state = {EventsState} <State: events=1701 tasks=563>
 eta = {NoneType} None
 exception = {NoneType} None
 exchange = {NoneType} None
 expires = {NoneType} None
 failed = {NoneType} None
 hostname = {str} 'celery@worker1.korriban-hub'
 id = {str} '3e57bf51-6c97-4f9d-81ed-d951ef2150c6'
 kwargs = {str} '{}'
 local_received = {float} 1505139192.3522208
 merge_rules = {dict} {'RECEIVED': ('name', 'args', 'kwargs', 'parent_id', 'root_idretries', 'eta', 'expires')}
 name = {str} 'daneel.basic.echo'
 origin = {str} 'celery@worker1.korriban-hub.None'
 parent = {NoneType} None
 parent_id = {NoneType} None
 pid = {int} 1
 ready = {bool} True
 received = {float} 1505139192.2699912
 rejected = {NoneType} None
 result = {str} '\\'hi\\''
 retried = {NoneType} None
 retries = {int} 0
 revoked = {NoneType} None
 root = {Task} <Task: daneel.basic.echo(3e57bf51-6c97-4f9d-81ed-d951ef2150c6) SUCCESS clock:437167>
 root_id = {str} '3e57bf51-6c97-4f9d-81ed-d951ef2150c6'
 routing_key = {NoneType} None
 runtime = {float} 0.07660293905064464
 sent = {NoneType} None
 started = {float} 1505139192.274833
 state = {str} 'SUCCESS'
 succeeded = {float} 1505139192.3503628
 timestamp = {float} 1505139192.3503628
 traceback = {NoneType} None
 type = {str} 'task-succeeded'
 utcoffset = {int} 0
 uuid = {str} '3e57bf51-6c97-4f9d-81ed-d951ef2150c6'
 worker = {Worker} <Worker: celery@worker1.korriban-hub (OFFLINE clock:0)

Task data present in a postgres result store

+------+--------------------------------------+----------+--------------------------------------+----------------------------+-------------+
| id   | task_id                              | status   | result                               | date_done                  | traceback   |
|------+--------------------------------------+----------+--------------------------------------+----------------------------+-------------|
| 41   | 3e57bf51-6c97-4f9d-81ed-d951ef2150c6 | SUCCESS  | \x80049506000000000000008c026869942e | 2017-09-11 14:13:12.278151 | <null>      |
+------+--------------------------------------+----------+--------------------------------------+----------------------------+-------------+

So as you see, the delta of information that is present is actually pretty subtle (if you don't count the result store). More relevant (for myself anyway) is the difference in format and approach (event stream vs. object store).

@johnarnold
Copy link
Contributor

Are there any critical fields that are in the event stream that are not in the results backend?

Or, are there any timing behaviors that make the event stream more usable (e.g. "this doesn't post to results backend until task completed, but shows up in event stream sooner")?

We use a results backend, and I suspect many/most others do as well. Reconstructing state from event stream seems more difficult than just grabbing state explicitly from the results backend if it's available. The event stream seems like it's intended for triggering actions downstream, rather than being something like a transaction log.

So, for my purposes, I tend to agree with mher that coding into the results backend would be more straightforward (and possibly less error prone). But, at the risk of adding complexity, it's possible to support both persisted event data (for cases where user doesn't have results backend) and supporting lookups directly to the backend.

@bosonogi
Copy link
Contributor Author

Are there any critical fields that are in the event stream that are not in the results backend?

Some of the more relevant things that are missing in the results backend:

  • task name
  • task inputs
  • worker that executed the task
  • runtime
  • retries

And you don't have anything on tasks that have been revoked, and on tasks that ignore their result.

Reconstructing state from event stream seems more difficult than just grabbing state explicitly from the results backend if it's available.

Task state reconstruction doesn't need to be reimplemented, you just "replay" the relevant events and Celery does it for you the way it usually would. However, if we talk about analyzing this data by software other than Flower; task object reconstruction is not always necessary -- as these are often (in our case) related more to what was happening on the cluster than the specific results of tasks; so a timeline of events is more useful (or if you need the results, you can get them from "task-succeeded" events).

At the moment, the biggest issue with my approach is that the in-memory task object cache has problems. Most notably, when the LRU cache ejects an item, the task heap is not updated; so you get unexpected behavior when displaying those tasks in Flower. In our case, it is not a problem if only recent tasks are accessible with Flower while all events are persisted (and available for offline analysis); so the current in-memory model of Flower is not an issue, but the current in-memory caching mechanism would need to be improved (which is something I plan to try doing as soon as I get the chance).

@johnarnold
Copy link
Contributor

Just for reference, here's the redis results backend output for a task:

"{
  \"result\":  {
    \"app\":  \"0.8.1\"
  },
   \"children\":  [
    
  ],
   \"status\":  \"SUCCESS\",
   \"task_id\":  \"24a62f2a-3ea2-4e8d-a5fa-29238d1b6b1d\",
   \"traceback\":  null
}"

@johnarnold
Copy link
Contributor

I've been thinking about this a bit. Like @bosongi, I run flower in docker containers, and somewhat regularly have to restart it (unrelated, but it appears that there's a bug when you hit max tasks which makes flower inoperative). On restart, I lose my state. Persisting to disk from an immutable docker container is possible but kind of an anti-pattern. I actually run two instances of flower, behind a HTTP loadbalancer (for HA).

If I reboot a flower container, it will effectively miss events from the celery workers for however long it's down, even when it comes back up (and restores event state, etc). This seems unacceptable if you're using the API as your primary interface to the service -- you need to check a task result and it's unavailable due to the missing events.

Also with multiple instances of flower, would each one need it's own event database for persistence?

I wonder if it would make sense to have celery.events.snapshot backup the cluster event state to a database regularly? Then a flower instance booting up could load current and historical state from database, and/or search the state database?

@johnarnold
Copy link
Contributor

Hmm, this sure feels a lot like trying to recreate https://django-celery-monitor.readthedocs.io/en/latest/ without django...

@johnarnold
Copy link
Contributor

@bosonogi I took a look at the code.

It looks like you're adding all events, with event timestamp and a json dump of the object. Done that way, each instance of flower would need it's own database. It should work fine for a persistence story, but doesn't seem a whole lot better than just file persist.

Another way to do it would be to expand the schema, and make an actual "events database" with separate tables for worker and task events. With that approach, you could index different columns for lookup performance, and you could have multiple instances of flower "share" a database (e.g. don't duplicate an identical event record). That would give you a reliability story if one flower instance was down, the other one would still be writing events.

this is sort of like the celery "snapshot" functionality, but writes could be done on the fly as events come in, rather than on a timer/tick. Thoughts?

@bosonogi
Copy link
Contributor Author

bosonogi commented Oct 4, 2017

@johnarnold For persisting events, maybe we should look to the message broker -- there is one of those after all, so there can be one database as well that multiple flower instances can hook into in read-only mode.

The event data is already indexed with the current schema (GIN index for a JSONB column), which gives you plenty of flexible and efficient options for querying (https://www.postgresql.org/docs/9.6/static/functions-json.html).

@johnarnold
Copy link
Contributor

@bosonogi take a look at celery/celery#4490 when you have a chance.

If I can get that merged, and expand #772 we would have a straightforward way for flower to get most of the task state from the results backend, in the event of a Event cache miss. Not a silver bullet but it plays well with the Event cache.

@johnarnold
Copy link
Contributor

Also, how does performance look when loading a large number of events back into memory? Say, 1M or 10M tasks?

It seems like there should be an easy way to truncate the database to keep the size under control.

(
id SERIAL PRIMARY KEY,
time TIMESTAMP,
data JSONB NOT NULL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unique (time, data)

to make sure you don't store duplicate events when you have multiple writers

"CREATE INDEX event_time_index ON events (time ASC)",
)

_add_event = """INSERT INTO events (time, data) VALUES (%s, %s)"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_add_event = """INSERT INTO events (time, data) VALUES (%s, %s) ON CONFLICT DO NOTHING"""

postgresql 9.5 supports ON CONFLICT resolution (for upsert, but also good for avoiding dups)

Copy link
Contributor

@johnarnold johnarnold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bosonogi Please take a look at these requested changes.

Edit: PR: bosonogi#1

cursor.execute(_all_tables)
tables = cursor.fetchone()
if tables is None:
logger.debug('Database empty, executing schema definition.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause a problem if the database is not empty. Should just check that the events table exists.

cursor = connection.cursor()
try:
cursor.execute(_all_events)
for row in cursor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the table is empty, this raises:
pg8000.core.ProgrammingError: ('ERROR', 'ERROR', '42P01', 'relation "events" does not exist', '18', 'parse_relation.c', '1160', 'parserOpenTable')

Need to catch exception.

@bosonogi
Copy link
Contributor Author

@johnarnold I have been delaying the inevitable by setting FLOWER_MAX_TASKS to 100000 and deleting tasks that are less relevant from the database to keep their total number smaller than that.

I didn't precisely measure startup time with event replay for these numbers, however it seems like it's on the order of 10-20 seconds.

An idea I contemplated and intended to explore is to make Flower stateless (or pretty much stateless). I imagined doing this by having it rely on an external HTTP service to get task data using a well defined API. That way there could be different implementations of task data persistence independent of Flower, and it might make it possible to avoid the redundancy that currently exists for storing this data (celery task cache, flower's cache, result store, event store...). Also, this would make it easy to spin up multiple Flowers while having just a single task data storage service.

This sounds like a lot of work however, so I'll be thrilled if your approach turns out to be good enough :)

@johnarnold
Copy link
Contributor

@bosonogi I think it will be "good enough" for most users, especially if Celery will take my PR to add properties to the results backend.

Not sure if you saw it, I made all the suggested changes for this PR, and tested it successfully. See: bosonogi#1

@bosonogi
Copy link
Contributor Author

@johnarnold I don't see any new activity on the PR. Did you forget to push?

@johnarnold
Copy link
Contributor

@bosonogi I submitted a PR to your branch. Merge the PR into your branch and it will show up here.

@bosonogi
Copy link
Contributor Author

@johnarnold There has been a misunderstanding. When you wrote:

Not sure if you saw it, I made all the suggested changes for this PR, and tested it successfully. See: bosonogi#1

I understood that you made the changes I suggested while reviewing bosonogi#1 (for which there has been no reply, so I assume you did not see it).

@johnarnold
Copy link
Contributor

@bosonogi Sorry about the mix up. I updated the PR.

Fix up schema loading and event loading
@johnarnold
Copy link
Contributor

@mher I've verified this works. Merge?

@johnarnold
Copy link
Contributor

@bosonogi I submitted another PR against your branch, please review & approve.

@punit-kulal
Copy link

punit-kulal commented Aug 29, 2019

why isn't this pull request merged with the main branch as of yet??
@johnarnold
@bosonogi

@bosonogi
Copy link
Contributor Author

@punit-kulal We didn't quite finish work on bosonogi#2 so this PR was left hanging...

Meanwhile, I've started https://github.com/bosonogi/celery_pantry and am using that as a persistent task data store with a HTTP API.

The idea at the time was to make Flower get task data from there instead of storing it itself (as a step toward making it stateless), but I did not yet get the chance to pursue this approach.

@mher
Copy link
Owner

mher commented Jun 26, 2020

Closing this pull request because it is not complete

@mher mher closed this Jun 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants