Skip to content

sidymar/api_csv_processor

Repository files navigation

Vehicle Trips Data Engineering Pipeline

A scalable data engineering solution for ingesting, processing, and analyzing vehicle trip data with real-time monitoring capabilities.

πŸš€ Features

Mandatory Features βœ…

  • Automated Data Ingestion: REST API endpoint for CSV file upload with chunked processing
  • Trip Grouping: Automatic grouping of trips by similar origin, destination, and time of day
  • Weekly Analytics: Get weekly average trips for regions or bounding box areas
  • Real-time Status Updates: Server-Sent Events (SSE) for ingestion progress - no polling needed
  • Scalable Architecture: Designed to handle 100M+ entries with optimized indexing and aggregation
  • SQL Database: PostgreSQL with advanced indexing and materialized views

Bonus Features βœ…

  • Containerized Solution: Complete Docker Compose setup
  • Cloud Architecture: AWS deployment guide included
  • SQL Analysis Queries: Pre-built queries for business intelligence
  • Monitoring Stack: Prometheus + Grafana integration
  • Nginx Reverse Proxy: Production-ready load balancing

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Nginx Proxy   β”‚    β”‚   FastAPI App   β”‚    β”‚   PostgreSQL    β”‚
β”‚   (Port 80)     │◄───│   (Port 8000)   │◄───│   (Port 5432)   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
                              β–Ό
                       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                       β”‚     Redis       β”‚
                       β”‚   (Port 6379)   β”‚
                       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Scalability Proof

The solution handles 100M+ entries through:

  1. Chunked Processing: Data ingested in configurable chunks (default 10K rows)
  2. Optimized Indexing: Composite indexes on frequently queried columns
  3. Materialized Views: Pre-computed aggregations for O(1) analytics
  4. Spatial Indexing: GIN indexes for geospatial queries
  5. Connection Pooling: Database connection management for concurrent requests
  6. Async Processing: Non-blocking background job processing

πŸš€ Quick Start

Prerequisites

  • Docker & Docker Compose
  • 4GB+ RAM available
  • 10GB+ disk space

1. Clone and Setup

git clone <repository-url>
cd vehicle-trips-pipeline

2. Start Services

# Basic setup
docker-compose up -d

# With monitoring (optional)
docker-compose --profile monitoring up -d

3. Verify Installation

# Check API health
curl http://localhost:8000/health

# Check all services
make health

πŸ“Š Usage Examples

1. Ingest Data

# Upload CSV file
curl -X POST "http://localhost:8000/ingest" \
  -F "file=@sample_data.csv" \
  -F "chunk_size=10000"

2. Monitor Ingestion (Real-time)

# Get job status
curl http://localhost:8000/ingest/{job_id}/status

# Stream real-time updates (SSE)
curl -N http://localhost:8000/ingest/{job_id}/stream

3. Get Weekly Analytics

# By region
curl -X POST "http://localhost:8000/analytics/weekly-trips" \
  -H "Content-Type: application/json" \
  -d '{"region": "Prague"}'

# By bounding box
curl -X POST "http://localhost:8000/analytics/weekly-trips" \
  -H "Content-Type: application/json" \
  -d '{
    "bounding_box": {
      "min_lat": 50.0,
      "max_lat": 50.2,
      "min_lon": 14.0,
      "max_lon": 14.5
    }
  }'

4. Get Trip Groups

πŸ”¬ Migrations, Partitioning, and Scalability Proof

Database migrations

  • Alembic configured at database/migrations/
  • Apply latest migrations:
alembic -c database/migrations/alembic.ini upgrade head

Table partitioning

  • trips is range-partitioned by year with sample partitions (2023-2025) created in the initial migration.
  • Add a new partition yearly, e.g. for 2026:
CREATE TABLE IF NOT EXISTS trips_2026 PARTITION OF trips FOR VALUES FROM (2026) TO (2027);

Benchmark

  • Generate synthetic CSV and ingest via API; prints throughput:
export API_KEY=your-long-key
python3 scripts/bench_ingest.py --rows 200000 --chunk 10000

Query plans

  • Run EXPLAIN ANALYZE on key queries:
make explain
  • SQL used: scripts/explain_queries.sql
# Get grouped similar trips
curl "http://localhost:8000/analytics/trip-groups?limit=50&min_trip_count=5"

πŸ”§ API Documentation

Core Endpoints

Endpoint Method Description
/health GET Health check
/ingest POST Upload CSV for processing
/ingest/{job_id}/status GET Get job status
/ingest/{job_id}/stream GET Real-time job updates (SSE)
/analytics/weekly-trips POST Weekly trip averages
/analytics/trip-groups GET Grouped similar trips
/analytics/regions GET Available regions

Data Format

CSV files should contain these columns:

  • region: Geographic region name
  • origin_coord: Origin coordinates (POINT format or lat,lon)
  • destination_coord: Destination coordinates
  • datetime: Trip timestamp (ISO format)
  • datasource: Data source identifier

Example:

region,origin_coord,destination_coord,datetime,datasource
Prague,POINT (14.4378 50.0755),POINT (14.4656 50.0596),2024-01-15T10:30:00,mobile_app
Turin,45.0703,7.6869,45.0599,7.6797,2024-01-15T14:22:00,cheap_mobile

πŸ› οΈ Development

Local Development

# Install dependencies
pip install -r requirements.txt

# Run with auto-reload
uvicorn app:app --reload --host 0.0.0.0 --port 8000

Testing

# Run tests
make test

# Load testing
docker-compose exec api python load_test.py

Database Management

# Run migrations
make migrate

# Backup database
make backup-db

# View logs
make logs-api

πŸ“ˆ Monitoring

Access monitoring dashboards:

Key Metrics

  • Request latency and throughput
  • Database connection pool usage
  • Memory and CPU utilization
  • Job processing rates
  • Error rates by endpoint

☁️ Cloud Deployment (AWS)

Terraform Infrastructure

Complete AWS infrastructure deployment using Terraform with production-ready configuration.

Quick Deploy

# 1. Configure Terraform
cd terraform
cp terraform.tfvars.example terraform.tfvars
# Edit terraform.tfvars with your settings

# 2. Deploy infrastructure
terraform init
terraform plan
terraform apply

# 3. Build and push Docker image
ECR_URI=$(terraform output -raw ecr_repository_url)
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin ${ECR_URI%/*}
docker build -t trips-api .
docker tag trips-api:latest $ECR_URI:latest
docker push $ECR_URI:latest

# 4. Update ECS service
aws ecs update-service \
  --cluster $(terraform output -raw ecs_cluster_name) \
  --service $(terraform output -raw ecs_service_name) \
  --force-new-deployment

# 5. Get application URL
echo "API URL: $(terraform output -raw load_balancer_url)"

Required Configuration

Edit terraform.tfvars:

aws_region   = "us-east-1"
project_name = "trips-pipeline"
environment  = "production"

# Security (CHANGE THESE!)
db_password     = "YourSecurePassword123!"
redis_auth_token = "YourSecureRedisToken456!"
alarm_email     = "your-admin@company.com"

# Scaling
ecs_desired_count = 3
ecs_max_capacity  = 10
rds_instance_class = "db.r5.large"
redis_node_type = "cache.r5.large"

Infrastructure Components

  • ECS Fargate: Auto-scaling container service (1-10 tasks)
  • RDS PostgreSQL: Multi-AZ with read replicas and automated backups
  • ElastiCache Redis: Replication group with encryption
  • Application Load Balancer: Health checks and SSL termination
  • S3 Buckets: File uploads and automated backups
  • VPC: Private/public subnets across 2 availability zones
  • CloudWatch: Monitoring, alarms, and log aggregation

Cost Estimates (Monthly)

Environment ECS RDS Redis Other Total
Development $30-60 $50-100 $30-60 $70 $180-290
Production $120-300 $200-400 $150-300 $100 $570-1100

Monitoring & Alerts

  • Auto-scaling based on CPU/Memory (70% threshold)
  • CloudWatch alarms for all critical metrics
  • SNS notifications to configured email
  • Performance Insights for database monitoring
  • Application logs in CloudWatch

CI/CD Integration

GitHub Actions workflow included for automated deployments:

# .github/workflows/deploy.yml
- Build and push Docker image to ECR
- Deploy infrastructure changes via Terraform
- Update ECS service with new image
- Run health checks and rollback on failure
```*

## πŸ” SQL Analysis Queries

### Bonus Question Answers

**Q1: From the two most commonly appearing regions, which has the latest datasource?**
```sql
-- See bonus_queries.sql for complete implementation
WITH top_regions AS (
    SELECT region, COUNT(*) as trip_count
    FROM trips GROUP BY region ORDER BY trip_count DESC LIMIT 2
)
-- ... (full query in bonus_queries.sql)

Q2: What regions has the "cheap_mobile" datasource appeared in?

SELECT DISTINCT region, total_trips, first_appearance, last_appearance
FROM datasources
WHERE name = 'cheap_mobile'
ORDER BY last_appearance DESC;

πŸ”§ Configuration

Environment Variables

# Database
DATABASE_URL=postgresql://user:pass@host:port/db

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=password

# Performance
DEFAULT_CHUNK_SIZE=10000
MAX_WORKERS=4

# Security
CORS_ORIGINS=*
API_KEY=optional-api-key

Scaling Configuration

# docker-compose.override.yml for scaling
services:
  api:
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 4G
          cpus: '2.0'

🚨 Troubleshooting

Common Issues

1. High Memory Usage

# Reduce chunk size
export DEFAULT_CHUNK_SIZE=5000
docker-compose restart api

2. Database Connection Issues

# Check PostgreSQL logs
docker-compose logs postgres

# Test connection
docker-compose exec postgres psql -U trips_user -d trips_db -c "SELECT 1;"

3. Slow Query Performance

-- Check index usage
EXPLAIN ANALYZE SELECT * FROM trips WHERE region = 'Prague';

-- Monitor slow queries
SELECT query, mean_time, calls FROM pg_stat_statements ORDER BY mean_time DESC;

πŸ“ License

MIT License - See LICENSE file for details

🀝 Contributing

  1. Fork the repository
  2. Create feature branch (git checkout -b feature/amazing-feature)
  3. Commit changes (git commit -m 'Add amazing feature')
  4. Push to branch (git push origin feature/amazing-feature)
  5. Open Pull Request

πŸ“ž Support

  • Issues: GitHub Issues
  • Documentation: /docs endpoint when running
  • Monitoring: Grafana dashboards
  • Logs: make logs command

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published