This app implements a consumer component of classic Producer-Consumer Architecture. It uses celery as a task management framework and SQL database to store incoming data stream. Celery can be bound to a queue at initilization. Whenever there is a new message in queue consumer will automatically pick this message and after performing some validation on the data, store it in the database.
This model is scalable and will sustain high rate of incoming messages. We can deploy same application to multiple servers(depending upon the load) and whichever instance is idle will process the new incoming message.
The following steps will get you a copy of app(single instance) on you local system:
git clone https://github.com/sohaibfarooqi/consumer.git
cd consumer
virtualenv -p python3 .venv
source .venv/bin/activate
pip install -r requirements/dev.txt
set environemnt variable CELERY_BROKER_URL=amqp://myuser:mypassword@localhost:5672/myvhost
set environemnt variable DATABASE_URI=postgresql+psycopg2://<user>:<password>@localhost/<database>
alembic upgrade head
celery -A consumer worker -l info -Q <queue_name>
At this point the app is ready to process incoming message.
Follow these commands to run tests and generate coverage reports
pip install -r requirements/test.txt
pytest
pytest --cov=consumer tests/
- Celery Distributed queue management.
- SQLAlchemy SQL ORM.
- Alembic Migration management.
- Marshmallow Schema Validation.
- Isort Sorting and arranging imports.
- Autopep8 Code styling to conform with PEP8
- Pytest Running test cases.
Github pre-commit hooks can be ver useful to automate things like code formating, running linters and checking for missing migrations. Use following commands to enable them:
- Run
pre-commit install
to enable the hook into your git repo. The hook will run automatically for each commit. - Run
git commit -m "Your message" -n
to skip the hook if you need.