A production-ready Apache Kafka implementation for real-time browser event analytics and background task processing using Python. This project demonstrates modern data streaming patterns for web analytics, user behavior tracking, performance monitoring, and asynchronous task execution with event-driven workflows.
- Containerized Infrastructure: Single-command deployment with Docker Compose (Kafka, Redis, Flower)
- Kafka UI Dashboard: Visual monitoring at
http://localhost:8080
- Background Task Processing: Celery-powered async task execution with retry mechanisms
- Flower Monitoring: Real-time Celery task monitoring at
http://localhost:5555
- REST API: FastAPI-based task management at
http://localhost:8000
- Event-Driven Architecture: Tasks emit completion/failure events to Kafka topics
- Python SDK: Production-grade producers and consumers
- Real-World Analytics: Browser events, e-commerce tracking, performance metrics
- Modern Python Stack: UV package manager, structured project layout
- Multi-Topic Processing: Handles both browser events and task events
- Developer Friendly: Easy setup, comprehensive monitoring, clear documentation
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β FastAPI β β Celery β β Apache β β Kafka UI β
β Task API βββββΆβ Workers βββββΆβ Kafka ββββββ Dashboard β
β (Port: 8000) β β (Background) β β (Port: 9092) β β (Port: 8080) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Browser β β Flower β β Topics: β β Multi-Topic β
β Events β β Celery UI β β β’ browser- β β Consumer β
β Producer β β (Port: 5555) β β agent β β Engine β
βββββββββββββββββββ βββββββββββββββββββ β β’ task-events β βββββββββββββββββββ
β β βββββββββββββββββββ β
βΌ βΌ β² βΌ
βββββββββββββββββββ βββββββββββββββββββ β βββββββββββββββββββ
β Redis β β Task Status β β β Real-time β
β Broker βββββΆβ In-Memory ββββββββββββββββ β Monitoring β
β (Port: 6379) β β Storage β β & Alerts β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
- Docker & Docker Compose (for infrastructure)
- Python 3.8+ with UV package manager or pip
- Git for cloning
# Clone the repository
git clone https://github.com/yesabhishek/kafka-python-demo
cd kafka-demo
# Install dependencies
pip install -r requirements.txt
# Or with UV (if available)
uv sync
# Activate virtual environment (if using UV)
source .venv/bin/activate # Linux/Mac
# or
.venv\Scripts\activate # Windows
# Start all services (Kafka, Redis, Kafka UI, Flower)
docker-compose up -d
# Verify services are running
docker-compose ps
# Create Kafka topics
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --topic browser-agent \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1 \
--if-not-exists
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --topic task-events \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1 \
--if-not-exists
# Verify topics created
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--list --bootstrap-server localhost:9092
Open 4 terminal windows and run these commands:
Terminal 1: Start Celery Worker
# Start background task worker
celery -A tasks worker --loglevel=info
# Alternative: Start with more detailed logs
celery -A tasks worker --loglevel=debug
Terminal 2: Start FastAPI Server
# Start task management API
python api.py
# Alternative: Start with auto-reload
uvicorn api:app --reload --host 0.0.0.0 --port 8000
Terminal 3: Start Multi-Topic Consumer
# Start event consumer (handles both browser and task events)
python task_consumer.py
Terminal 4: Test the System
# Test browser events
python producer.py
# Test background tasks
python test_demo.py
# Or create tasks via API
curl -X POST "http://localhost:8000/api/tasks" \
-H "Content-Type: application/json" \
-d '{"type": "data_processing", "duration": 5, "simulate_failure": false}'
kafka-demo/
βββ api.py # FastAPI server for task management
βββ tasks.py # Celery task definitions and processors
βββ task_consumer.py # Multi-topic Kafka consumer (browser + task events)
βββ consumer.py # Original browser event consumer
βββ producer.py # Browser event producer (analytics data)
βββ test_demo.py # Automated testing script
βββ docker-compose.yml # Infrastructure setup (Kafka, Redis, Flower)
βββ requirements.txt # Python dependencies
βββ pyproject.toml # Python project configuration (optional)
βββ README.md # This documentation
βββ .gitignore # Git ignore rules
βββ .ropeproject/ # IDE configuration
βββ .venv/ # Virtual environment (if using UV)
Service | URL | Description |
---|---|---|
FastAPI Docs | http://localhost:8000/docs | Interactive API documentation and testing |
FastAPI API | http://localhost:8000 | Task management REST API |
Flower Dashboard | http://localhost:5555 | Celery task monitoring and management |
Kafka UI | http://localhost:8080 | Kafka topics, messages, and consumer monitoring |
# Create a new background task
POST http://localhost:8000/api/tasks
{
"type": "data_processing",
"duration": 10,
"data": {"input": "sample data"},
"simulate_failure": false
}
# Get task status
GET http://localhost:8000/api/tasks/{task_id}
# List all tasks (with optional filtering)
GET http://localhost:8000/api/tasks?status=COMPLETED&limit=20
# Cancel a running task
DELETE http://localhost:8000/api/tasks/{task_id}
# Start worker with specific concurrency
celery -A tasks worker --loglevel=info --concurrency=4
# Start worker with specific queue
celery -A tasks worker --loglevel=info --queues=high_priority,default
# Monitor worker status
celery -A tasks inspect active
# Purge all pending tasks
celery -A tasks purge
# Show worker statistics
celery -A tasks inspect stats
# Shutdown workers gracefully
celery -A tasks control shutdown
# Start Flower with custom settings
celery -A tasks flower --port=5555 --basic_auth=admin:password
# Start Flower with persistent data
celery -A tasks flower --persistent=True --db=flower.db
# Flower is automatically started via Docker Compose in this setup
# Check all Docker services
docker-compose ps
# Check Kafka broker
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 --list
# Check Redis connection
docker exec -it redis redis-cli ping
# View consumer groups
docker exec -it kafka /opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --list
# Check topic details
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--describe --topic task-events \
--bootstrap-server localhost:9092
# Test API health
curl http://localhost:8000/
# Check Celery worker status
celery -A tasks inspect ping
# View all service logs
docker-compose logs -f
# View specific service logs
docker-compose logs -f kafka
docker-compose logs -f redis
docker-compose logs -f flower
# Follow application logs
tail -f celery.log # If you configure file logging
python task_consumer.py # Real-time event processing logs
# Generate realistic browser events
python producer.py
# Monitor events in real-time
python task_consumer.py
# Or use the original consumer
python consumer.py
# Create different types of tasks
curl -X POST "http://localhost:8000/api/tasks" -H "Content-Type: application/json" -d '{"type": "email_sending", "duration": 3}'
curl -X POST "http://localhost:8000/api/tasks" -H "Content-Type: application/json" -d '{"type": "report_generation", "duration": 10}'
curl -X POST "http://localhost:8000/api/tasks" -H "Content-Type: application/json" -d '{"type": "data_processing", "duration": 5, "simulate_failure": true}'
# Monitor tasks in Flower: http://localhost:5555
# Check task status via API: http://localhost:8000/docs
# Tasks automatically emit events to Kafka when they complete
# Watch the task_consumer.py output to see:
# Task Completed events
# Task Failed events
# Browser Events
# View events in Kafka UI: http://localhost:8080
1. Services Not Starting
# Check Docker services
docker-compose ps
docker-compose logs
# Restart all services
docker-compose down && docker-compose up -d
2. Celery Connection Issues
# Check Redis connection
docker exec -it redis redis-cli ping
# Restart Celery worker
# Ctrl+C in celery terminal, then restart:
celery -A tasks worker --loglevel=info
3. Kafka Connection Issues
# Verify Kafka is ready
docker-compose logs kafka | grep "started"
# Test Kafka connectivity
python -c "from kafka import KafkaProducer; print('β
Kafka OK')"
# Recreate topics if needed
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--delete --topic task-events --bootstrap-server localhost:9092
4. Task Events Not Appearing
# Check if task-events topic exists
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--list --bootstrap-server localhost:9092
# Verify consumer is subscribed to correct topics
# Should show both 'browser-agent' and 'task-events'
5. Python Dependencies
# Using pip
pip install -r requirements.txt
# Using UV
uv sync --reinstall
# Check specific packages
python -c "import kafka, celery, fastapi; print('β
All imports OK')"