Skip to content

yesabhishek/kafka-celery-fastapi-py

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

2 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Kafka Python with Celery Task Processing

A production-ready Apache Kafka implementation for real-time browser event analytics and background task processing using Python. This project demonstrates modern data streaming patterns for web analytics, user behavior tracking, performance monitoring, and asynchronous task execution with event-driven workflows.

Features

  • Containerized Infrastructure: Single-command deployment with Docker Compose (Kafka, Redis, Flower)
  • Kafka UI Dashboard: Visual monitoring at http://localhost:8080
  • Background Task Processing: Celery-powered async task execution with retry mechanisms
  • Flower Monitoring: Real-time Celery task monitoring at http://localhost:5555
  • REST API: FastAPI-based task management at http://localhost:8000
  • Event-Driven Architecture: Tasks emit completion/failure events to Kafka topics
  • Python SDK: Production-grade producers and consumers
  • Real-World Analytics: Browser events, e-commerce tracking, performance metrics
  • Modern Python Stack: UV package manager, structured project layout
  • Multi-Topic Processing: Handles both browser events and task events
  • Developer Friendly: Easy setup, comprehensive monitoring, clear documentation

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   FastAPI       β”‚    β”‚   Celery        β”‚    β”‚   Apache        β”‚    β”‚   Kafka UI      β”‚
β”‚   Task API      │───▢│   Workers       │───▢│   Kafka         │◀───│   Dashboard     β”‚
β”‚   (Port: 8000)  β”‚    β”‚   (Background)  β”‚    β”‚   (Port: 9092)  β”‚    β”‚   (Port: 8080)  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                       β”‚                       β”‚                       β”‚
         β–Ό                       β–Ό                       β–Ό                       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Browser       β”‚    β”‚   Flower        β”‚    β”‚   Topics:       β”‚    β”‚   Multi-Topic   β”‚
β”‚   Events        β”‚    β”‚   Celery UI     β”‚    β”‚   β€’ browser-    β”‚    β”‚   Consumer      β”‚
β”‚   Producer      β”‚    β”‚   (Port: 5555)  β”‚    β”‚     agent       β”‚    β”‚   Engine        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚   β€’ task-events β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                       β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚
         β–Ό                       β–Ό                       β–²                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Redis         β”‚    β”‚   Task Status   β”‚              β”‚              β”‚   Real-time     β”‚
β”‚   Broker        │───▢│   In-Memory     β”‚β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚   Monitoring    β”‚
β”‚   (Port: 6379)  β”‚    β”‚   Storage       β”‚                             β”‚   & Alerts      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Quick Start

Prerequisites

  • Docker & Docker Compose (for infrastructure)
  • Python 3.8+ with UV package manager or pip
  • Git for cloning

1. Setup Environment

# Clone the repository
git clone https://github.com/yesabhishek/kafka-python-demo
cd kafka-demo

# Install dependencies
pip install -r requirements.txt

# Or with UV (if available)
uv sync

# Activate virtual environment (if using UV)
source .venv/bin/activate  # Linux/Mac
# or
.venv\Scripts\activate     # Windows

2. Start Infrastructure

# Start all services (Kafka, Redis, Kafka UI, Flower)
docker-compose up -d

# Verify services are running
docker-compose ps

# Create Kafka topics
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --create --topic browser-agent \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1 \
  --if-not-exists

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --create --topic task-events \
  --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1 \
  --if-not-exists

# Verify topics created
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --list --bootstrap-server localhost:9092

3. Start Application Services

Open 4 terminal windows and run these commands:

Terminal 1: Start Celery Worker

# Start background task worker
celery -A tasks worker --loglevel=info

# Alternative: Start with more detailed logs
celery -A tasks worker --loglevel=debug

Terminal 2: Start FastAPI Server

# Start task management API
python api.py

# Alternative: Start with auto-reload
uvicorn api:app --reload --host 0.0.0.0 --port 8000

Terminal 3: Start Multi-Topic Consumer

# Start event consumer (handles both browser and task events)
python task_consumer.py

Terminal 4: Test the System

# Test browser events
python producer.py

# Test background tasks
python test_demo.py

# Or create tasks via API
curl -X POST "http://localhost:8000/api/tasks" \
  -H "Content-Type: application/json" \
  -d '{"type": "data_processing", "duration": 5, "simulate_failure": false}'

Project Structure

kafka-demo/
β”œβ”€β”€ api.py                   # FastAPI server for task management
β”œβ”€β”€ tasks.py                 # Celery task definitions and processors
β”œβ”€β”€ task_consumer.py         # Multi-topic Kafka consumer (browser + task events)
β”œβ”€β”€ consumer.py              # Original browser event consumer
β”œβ”€β”€ producer.py              # Browser event producer (analytics data)
β”œβ”€β”€ test_demo.py             # Automated testing script
β”œβ”€β”€ docker-compose.yml       # Infrastructure setup (Kafka, Redis, Flower)
β”œβ”€β”€ requirements.txt         # Python dependencies
β”œβ”€β”€ pyproject.toml           # Python project configuration (optional)
β”œβ”€β”€ README.md                # This documentation
β”œβ”€β”€ .gitignore              # Git ignore rules
β”œβ”€β”€ .ropeproject/           # IDE configuration
└── .venv/                  # Virtual environment (if using UV)

Monitoring & Operations

Web Interfaces

Service URL Description
FastAPI Docs http://localhost:8000/docs Interactive API documentation and testing
FastAPI API http://localhost:8000 Task management REST API
Flower Dashboard http://localhost:5555 Celery task monitoring and management
Kafka UI http://localhost:8080 Kafka topics, messages, and consumer monitoring

API Endpoints

# Create a new background task
POST http://localhost:8000/api/tasks
{
  "type": "data_processing",
  "duration": 10,
  "data": {"input": "sample data"},
  "simulate_failure": false
}

# Get task status
GET http://localhost:8000/api/tasks/{task_id}

# List all tasks (with optional filtering)
GET http://localhost:8000/api/tasks?status=COMPLETED&limit=20

# Cancel a running task
DELETE http://localhost:8000/api/tasks/{task_id}

Celery Commands

# Start worker with specific concurrency
celery -A tasks worker --loglevel=info --concurrency=4

# Start worker with specific queue
celery -A tasks worker --loglevel=info --queues=high_priority,default

# Monitor worker status
celery -A tasks inspect active

# Purge all pending tasks
celery -A tasks purge

# Show worker statistics
celery -A tasks inspect stats

# Shutdown workers gracefully
celery -A tasks control shutdown

Flower Commands

# Start Flower with custom settings
celery -A tasks flower --port=5555 --basic_auth=admin:password

# Start Flower with persistent data
celery -A tasks flower --persistent=True --db=flower.db

# Flower is automatically started via Docker Compose in this setup

Health Checks

# Check all Docker services
docker-compose ps

# Check Kafka broker
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 --list

# Check Redis connection
docker exec -it redis redis-cli ping

# View consumer groups
docker exec -it kafka /opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 --list

# Check topic details
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --describe --topic task-events \
  --bootstrap-server localhost:9092

# Test API health
curl http://localhost:8000/

# Check Celery worker status
celery -A tasks inspect ping

Log Monitoring

# View all service logs
docker-compose logs -f

# View specific service logs
docker-compose logs -f kafka
docker-compose logs -f redis
docker-compose logs -f flower

# Follow application logs
tail -f celery.log         # If you configure file logging
python task_consumer.py    # Real-time event processing logs

Usage Examples

1. Browser Event Analytics

# Generate realistic browser events
python producer.py

# Monitor events in real-time
python task_consumer.py
# Or use the original consumer
python consumer.py

2. Background Task Processing

# Create different types of tasks
curl -X POST "http://localhost:8000/api/tasks" -H "Content-Type: application/json" -d '{"type": "email_sending", "duration": 3}'

curl -X POST "http://localhost:8000/api/tasks" -H "Content-Type: application/json" -d '{"type": "report_generation", "duration": 10}'

curl -X POST "http://localhost:8000/api/tasks" -H "Content-Type: application/json" -d '{"type": "data_processing", "duration": 5, "simulate_failure": true}'

# Monitor tasks in Flower: http://localhost:5555
# Check task status via API: http://localhost:8000/docs

3. Event-Driven Workflows

# Tasks automatically emit events to Kafka when they complete
# Watch the task_consumer.py output to see:
# Task Completed events
# Task Failed events
# Browser Events

# View events in Kafka UI: http://localhost:8080

Troubleshooting

Common Issues

1. Services Not Starting

# Check Docker services
docker-compose ps
docker-compose logs

# Restart all services
docker-compose down && docker-compose up -d

2. Celery Connection Issues

# Check Redis connection
docker exec -it redis redis-cli ping

# Restart Celery worker
# Ctrl+C in celery terminal, then restart:
celery -A tasks worker --loglevel=info

3. Kafka Connection Issues

# Verify Kafka is ready
docker-compose logs kafka | grep "started"

# Test Kafka connectivity
python -c "from kafka import KafkaProducer; print('βœ… Kafka OK')"

# Recreate topics if needed
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --delete --topic task-events --bootstrap-server localhost:9092

4. Task Events Not Appearing

# Check if task-events topic exists
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --list --bootstrap-server localhost:9092

# Verify consumer is subscribed to correct topics
# Should show both 'browser-agent' and 'task-events'

5. Python Dependencies

# Using pip
pip install -r requirements.txt

# Using UV
uv sync --reinstall

# Check specific packages
python -c "import kafka, celery, fastapi; print('βœ… All imports OK')"

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages