flowchart TD
rr[register-run]
er{end-run?}
it[ingest-telemetry]
us1[update-system-state]
us2[update-system-state]
us3[update-system-state]
bg[build-gtfs-realtime]
sg[save-gtfs-feed-messages]
mr[manage-run-lifecycle]
rr --> us1
us1 --> er
er -- "no" --> it
it --> us2
us2 --> er
er -- "no / every 15 s"--> bg
bg --> sg
sg --> er
er -- "yes" --> us3
us3 --> mr
Requisites
Flow
Databús
Interfaces
/runendpoint to start a "run" (an instance of a trip). Note: schema on API docs (databus.yml, OpenAPI).transit/vehicle/<vehicle_id>/positiontopic with location pings (every 7 s). Note: schema based on the/positionREST API endpoint (databus.yml, AsyncAPI).Note: OpenAPI must be automatically generated with DRF Spectacular @Kroenenn.
Processes
register_run: getsPOST /runand results in an updates of the transit system state (SADD runs:in_progress run_idviaupdate_system_state)ingest_telemetry: gets message fromtransit/vehicle/<vehicle_id>/positionand updates location viaupdate_system_state.update_system_state: updates the vehicle state (XADD vehicles:ABC123:locations * latitude 37.7749 longitude -122.4194 timestamp 1713881558 speed 45). Notes: all necessary formatting or computations happen here.build_gtfs_realtime: triggered by Celery Beat, gets the runs in progress (SMEMBERS runs:in_progress) and then grab the data for each (XREVRANGE vehicles:ABC123:locations + - COUNT 1) and builds and publishes as .pb and .json. Notes: no formatting or anything here, just copy and paste and pack and build. Emits:publication_assertionin topicgtfs:realtime.save_gtfs_feed_messages: triggered bypublication_assertion(tasks.save_gtfs_feed_messages.delay()), gets .pb and usesgtfs-ioto transform to PyArrow models and stores as Parquet.end_run: gets simulated API call to end run (PATCH /run/<run_id>), andorchestratorasks to update system to exclude the run (update_system_state) and initiate the run lifecyle management process.manage_run_lifecycle: gets run's data and saves to database the "traces" (not executed in this first vertical).flowchart TD rr[register-run] er{end-run?} it[ingest-telemetry] us1[update-system-state] us2[update-system-state] us3[update-system-state] bg[build-gtfs-realtime] sg[save-gtfs-feed-messages] mr[manage-run-lifecycle] rr --> us1 us1 --> er er -- "no" --> it it --> us2 us2 --> er er -- "no / every 15 s"--> bg bg --> sg sg --> er er -- "yes" --> us3 us3 --> mrURLs
Infobús
Interfaces
wss://[dominio]/ws/route/<route_id>Processes
gtfs_realtime_polling: every N seconds requests new feed messages in the provided URL forGTFSProvider. For everyTransitSystemand for everyGTFSProvider: poll withgtfs-ioand keep as pure Python DataClass. Backup: store blob in Redis. The logic of the WebSocket acts upon the Python objects thatgtfs-ioreturns.publish_gtfs_realtime_updates: publish to WebSocket endpoints