Streaming Data Loader is a high-performance, async-powered microservice for processing data streams from Kafka and bulk-loading them into Elasticsearch. It combines modern Python tooling with observability best practices to provide reliable, scalable, and debuggable data pipelines.
Itβs fast, resilient, and production-ready β ideal for those who need lightweight alternatives to complex ETL systems.
- Asynchronous processing with
asyncio
,aiokafka
, andaiohttp
- Batch insertions for throughput efficiency
- Retry & fault-tolerant logic for Kafka and Elasticsearch
- Configurable via
.env
andpydantic-settings
- Docker & Kubernetes ready
- Prometheus + Grafana monitoring included
- Tested with
pytest
, including integration scenarios
docker-compose up --build
- http://localhost:9090 β Prometheus
- http://localhost:3000 β Grafana (admin / admin)
- http://localhost:8080 β Kafka UI
./k8s-deploy.sh
./k8s-clean.sh
This project uses Hexagonal Architecture (Ports and Adapters), ensuring modularity, extensibility, and clean separation of concerns.
Kafka -β KafkaConsumerService -β EventService -β ElasticsearchClientService -β Elasticsearch
β β
β-β Metrics + Logging (Prometheus + JSON logs)
- Input Ports: Kafka Consumer (aiokafka), deserialization, batching
- Application Core: Event transformation, validation, retry logic
- Output Ports: Async Elasticsearch client, bulk insert, failure handling
- Infrastructure: Docker, Kubernetes, logging, metrics, monitoring
- β True async data pipeline β lower latency, better throughput
- β
No heavyweight config DSL β Python code,
pyproject.toml
,.env
- β Built-in retries & fault handling β robust out of the box
- β JSON logging and metric labeling for full observability
- β Open-source & customizable β perfect for modern data teams
Prometheus scrapes metrics on /metrics
(port 8000
). Dashboards are automatically provisioned in Grafana.
Metric | Description |
---|---|
messages_processed_total |
Total number of processed messages |
errors_total |
Total errors during processing |
consume_duration_seconds |
Time spent reading from Kafka |
response_duration_seconds |
Time to insert into Elasticsearch |
transform_duration_seconds |
Time spent transforming messages |
batch_processing_duration_seconds |
Full batch processing time |
pytest -v
Includes:
- β Unit tests
- β Integration tests (Kafka β ES)
- β Metrics verification
- β Config validation
Python 3.12
+asyncio
Kafka + aiokafka
Elasticsearch
Bulk API
Pydantic
dotenv
poetry
Prometheus
Grafana
Docker
docker-compose
Kubernetes-ready
- JSON logging (
python-json-logger
)
streaming-data-loader/
βββ configs/ # Prometheus / Grafana
βββ src/ # Main source code
β βββ domain/
β βββ ports/
β βββ services/ # Event processing logic
β βββ config.py # Settings & env config
β βββ logger.py # JSON logger setup
β βββ metrics.py # Prometheus metrics
βββ tests/ # Unit & integration tests
βββ k8s/ # Kubernetes manifests
βββ docker-compose.yml
βββ Dockerfile
βββ deploy.sh / clean.sh
βββ pyproject.toml
...give it a star, fork it, or mention it in your next data project!
Anatoly Dudko
GitHub @aDudko β’ LinkedIn