# 00 Data Ingestion Pipeline
## inDrive Hackathon 2024 - Case 2: Geotracks Analysis

This notebook handles the ingestion and initial validation of anonymized trip geotrack data.

### Setup and Imports

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Validation libraries
from typing import Optional, List, Dict, Tuple
import logging

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("Data Ingestion Pipeline - Ready!")
print(f"Pandas version: {pd.__version__}")
print(f"Numpy version: {np.__version__}")

### Data Schema Definition

In [None]:
# Expected data schema
EXPECTED_COLUMNS = {
    'trip_id': 'object',
    'timestamp': 'object',  # Will be converted to datetime
    'lat': 'float64',
    'lon': 'float64'
}

# Data quality constraints
DATA_CONSTRAINTS = {
    'lat_range': (-90.0, 90.0),
    'lon_range': (-180.0, 180.0),
    'min_trip_duration': 60,  # seconds
    'max_trip_duration': 7200,  # 2 hours
    'min_speed': 0.5,  # km/h
    'max_speed': 150.0  # km/h
}

def validate_schema(df: pd.DataFrame) -> bool:
    """Validate that DataFrame matches expected schema"""
    
    # Check required columns
    missing_cols = set(EXPECTED_COLUMNS.keys()) - set(df.columns)
    if missing_cols:
        logger.error(f"Missing required columns: {missing_cols}")
        return False
    
    # Check data types (after conversion)
    for col, expected_type in EXPECTED_COLUMNS.items():
        if col == 'timestamp':
            continue  # Will be converted separately
        
        if not pd.api.types.is_dtype_equal(df[col].dtype, expected_type):
            logger.warning(f"Column {col} has type {df[col].dtype}, expected {expected_type}")
    
    logger.info("Schema validation passed")
    return True

print("Schema validation functions defined")

### Data Loading Functions

In [None]:
def load_trip_data(file_path: str) -> pd.DataFrame:
    """Load trip data from CSV file with validation"""
    
    try:
        logger.info(f"Loading data from: {file_path}")
        
        # Load CSV
        df = pd.read_csv(file_path)
        logger.info(f"Loaded {len(df)} records")
        
        # Validate schema
        if not validate_schema(df):
            raise ValueError("Schema validation failed")
        
        # Convert timestamp
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        logger.info("Timestamp conversion successful")
        
        # Basic data quality checks
        initial_count = len(df)
        
        # Remove duplicates
        df = df.drop_duplicates(subset=['trip_id', 'timestamp'])
        if len(df) < initial_count:
            logger.warning(f"Removed {initial_count - len(df)} duplicate records")
        
        # Remove invalid coordinates
        lat_min, lat_max = DATA_CONSTRAINTS['lat_range']
        lon_min, lon_max = DATA_CONSTRAINTS['lon_range']
        
        valid_coords = (
            (df['lat'] >= lat_min) & (df['lat'] <= lat_max) &
            (df['lon'] >= lon_min) & (df['lon'] <= lon_max)
        )
        
        invalid_count = (~valid_coords).sum()
        if invalid_count > 0:
            logger.warning(f"Removed {invalid_count} records with invalid coordinates")
            df = df[valid_coords]
        
        logger.info(f"Data loading complete. Final count: {len(df)} records")
        return df
        
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise

def generate_sample_data(n_trips: int = 1000) -> pd.DataFrame:
    """Generate sample trip data for demonstration purposes"""
    
    logger.info(f"Generating {n_trips} sample trips")
    
    # Moscow coordinates as base
    center_lat, center_lon = 55.7558, 37.6176
    
    # Generate trips
    trips = []
    
    for i in range(n_trips):
        trip_id = f"T-2024-{i:06d}"
        
        # Random timestamp in the last 7 days
        base_time = datetime.now() - timedelta(days=7)
        random_seconds = np.random.randint(0, 7*24*3600)
        timestamp = base_time + timedelta(seconds=random_seconds)
        
        # Generate realistic GPS coordinates
        # Create clusters around different city areas
        cluster_type = i % 5
        
        if cluster_type == 0:  # Airport area
            lat_offset = np.random.normal(0.15, 0.02)
            lon_offset = np.random.normal(0.20, 0.02)
        elif cluster_type == 1:  # City center
            lat_offset = np.random.normal(0, 0.01)
            lon_offset = np.random.normal(0, 0.01)
        elif cluster_type == 2:  # Business district
            lat_offset = np.random.normal(-0.05, 0.02)
            lon_offset = np.random.normal(-0.10, 0.02)
        elif cluster_type == 3:  # Residential north
            lat_offset = np.random.normal(0.08, 0.03)
            lon_offset = np.random.normal(-0.05, 0.03)
        else:  # Shopping/entertainment
            lat_offset = np.random.normal(-0.02, 0.015)
            lon_offset = np.random.normal(0.08, 0.02)
        
        lat = center_lat + lat_offset
        lon = center_lon + lon_offset
        
        trips.append({
            'trip_id': trip_id,
            'timestamp': timestamp,
            'lat': lat,
            'lon': lon
        })
    
    df = pd.DataFrame(trips)
    logger.info(f"Generated {len(df)} sample trips")
    
    return df

print("Data loading functions defined")

### Load or Generate Data

In [None]:
# Try to load real data, fall back to sample data
data_file_path = "../data/trip_data.csv"  # Adjust path as needed

try:
    # Attempt to load real data
    df_trips = load_trip_data(data_file_path)
    data_source = "real"
except (FileNotFoundError, Exception) as e:
    logger.info(f"Real data not available ({e}), generating sample data")
    df_trips = generate_sample_data(n_trips=2000)
    data_source = "sample"

print(f"\n=== DATA SUMMARY ====")
print(f"Data source: {data_source}")
print(f"Total records: {len(df_trips):,}")
print(f"Date range: {df_trips['timestamp'].min()} to {df_trips['timestamp'].max()}")
print(f"Unique trips: {df_trips['trip_id'].nunique():,}")
print(f"Columns: {list(df_trips.columns)}")

### Data Quality Assessment

In [None]:
def assess_data_quality(df: pd.DataFrame) -> Dict[str, any]:
    """Comprehensive data quality assessment"""
    
    assessment = {}
    
    # Basic statistics
    assessment['total_records'] = len(df)
    assessment['unique_trips'] = df['trip_id'].nunique()
    assessment['date_range'] = (df['timestamp'].min(), df['timestamp'].max())
    
    # Missing values
    assessment['missing_values'] = df.isnull().sum().to_dict()
    
    # Coordinate ranges
    assessment['lat_range'] = (df['lat'].min(), df['lat'].max())
    assessment['lon_range'] = (df['lon'].min(), df['lon'].max())
    
    # Temporal distribution
    df['hour'] = df['timestamp'].dt.hour
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    
    assessment['hourly_distribution'] = df['hour'].value_counts().sort_index().to_dict()
    assessment['daily_distribution'] = df['day_of_week'].value_counts().sort_index().to_dict()
    
    # Geographic distribution
    assessment['coordinate_std'] = {
        'lat_std': df['lat'].std(),
        'lon_std': df['lon'].std()
    }
    
    return assessment

# Perform quality assessment
quality_report = assess_data_quality(df_trips)

print("\n=== DATA QUALITY REPORT ====")
print(f"Total records: {quality_report['total_records']:,}")
print(f"Unique trips: {quality_report['unique_trips']:,}")
print(f"Missing values: {quality_report['missing_values']}")
print(f"Latitude range: {quality_report['lat_range'][0]:.4f} to {quality_report['lat_range'][1]:.4f}")
print(f"Longitude range: {quality_report['lon_range'][0]:.4f} to {quality_report['lon_range'][1]:.4f}")
print(f"Geographic spread (std): lat={quality_report['coordinate_std']['lat_std']:.4f}, lon={quality_report['coordinate_std']['lon_std']:.4f}")

### Data Visualization

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. Geographic distribution
axes[0, 0].scatter(df_trips['lon'], df_trips['lat'], alpha=0.5, s=1)
axes[0, 0].set_xlabel('Longitude')
axes[0, 0].set_ylabel('Latitude')
axes[0, 0].set_title('Geographic Distribution of Trips')
axes[0, 0].grid(True, alpha=0.3)

# 2. Temporal distribution by hour
hourly_counts = df_trips['timestamp'].dt.hour.value_counts().sort_index()
axes[0, 1].bar(hourly_counts.index, hourly_counts.values)
axes[0, 1].set_xlabel('Hour of Day')
axes[0, 1].set_ylabel('Number of Trips')
axes[0, 1].set_title('Trip Distribution by Hour')
axes[0, 1].grid(True, alpha=0.3)

# 3. Daily distribution
daily_counts = df_trips['timestamp'].dt.dayofweek.value_counts().sort_index()
day_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
axes[1, 0].bar(range(len(daily_counts)), daily_counts.values)
axes[1, 0].set_xticks(range(len(day_names)))
axes[1, 0].set_xticklabels(day_names)
axes[1, 0].set_xlabel('Day of Week')
axes[1, 0].set_ylabel('Number of Trips')
axes[1, 0].set_title('Trip Distribution by Day of Week')
axes[1, 0].grid(True, alpha=0.3)

# 4. Coordinate density heatmap
try:
    # Sample data for heatmap if too many points
    sample_df = df_trips.sample(min(1000, len(df_trips)))
    
    # Create 2D histogram
    h = axes[1, 1].hist2d(sample_df['lon'], sample_df['lat'], bins=50, cmap='YlOrRd')
    axes[1, 1].set_xlabel('Longitude')
    axes[1, 1].set_ylabel('Latitude')
    axes[1, 1].set_title('Trip Density Heatmap')
    plt.colorbar(h[3], ax=axes[1, 1])
except Exception as e:
    axes[1, 1].text(0.5, 0.5, f'Heatmap error: {str(e)}', 
                   ha='center', va='center', transform=axes[1, 1].transAxes)

plt.tight_layout()
plt.show()

print("\nData visualization complete!")

### Export Processed Data

In [None]:
# Export processed data for next steps
output_path = "../data/processed/01_ingested_trips.csv"

# Create output directory if it doesn't exist
import os
os.makedirs("../data/processed", exist_ok=True)

# Save processed data
df_trips.to_csv(output_path, index=False)
logger.info(f"Processed data saved to: {output_path}")

# Save quality report
import json
quality_report_path = "../data/processed/01_quality_report.json"

# Convert datetime objects for JSON serialization
quality_report_json = quality_report.copy()
quality_report_json['date_range'] = [
    quality_report['date_range'][0].isoformat(),
    quality_report['date_range'][1].isoformat()
]

with open(quality_report_path, 'w') as f:
    json.dump(quality_report_json, f, indent=2)
    
logger.info(f"Quality report saved to: {quality_report_path}")

print("\n=== INGESTION COMPLETE ====")
print(f"✅ {len(df_trips):,} trips processed successfully")
print(f"✅ Data exported to: {output_path}")
print(f"✅ Quality report saved to: {quality_report_path}")
print("\nReady for next step: 01_preprocessing.ipynb")