# Data Pipeline Validation & Dashboard - Google Colab Notebook

This notebook provides a comprehensive validation and showcase of the data pipeline functionality.

**Features:**
- AWS S3 data loading
- Data transformation pipeline
- Metrics calculation and visualization
- Interactive dashboard
- Complete end-to-end workflow

**Author:** mrohitth  
**Date:** 2026-01-16

## 1. Setup and Installation

First, let's clone the repository and install all required dependencies.

In [None]:
# Clone the repository
!git clone https://github.com/mrohitth/test_project1.git
%cd test_project1

In [None]:
# Install required packages
!pip install -q -r requirements.txt

# Additional packages for Colab
!pip install -q ipywidgets

print("âœ… All packages installed successfully!")

## 2. AWS S3 Configuration

Set up your AWS credentials to access S3 data.

In [None]:
import os
import getpass

# Option 1: Manual input (recommended for security)
print("Please enter your AWS credentials:")
AWS_ACCESS_KEY_ID = getpass.getpass('AWS Access Key ID: ')
AWS_SECRET_ACCESS_KEY = getpass.getpass('AWS Secret Access Key: ')
AWS_REGION = input('AWS Region (default: us-east-1): ') or 'us-east-1'

# Set environment variables
os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
os.environ['AWS_DEFAULT_REGION'] = AWS_REGION

print("\nâœ… AWS credentials configured!")

## 3. Import Required Modules

In [None]:
import sys
sys.path.insert(0, '/content/test_project1/src')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import json

# Import custom modules
from s3_loader import S3DataLoader
from load_data import load_data, validate_data, get_project_root

# Set plotting style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("âœ… Modules imported successfully!")

## 4. User Input - Data Source Configuration

In [None]:
from ipywidgets import widgets
from IPython.display import display

# Create input widgets
print("ðŸ“Š Data Source Configuration\n")

data_source_type = widgets.Dropdown(
    options=['S3 Bucket', 'Local File', 'Generate Sample Data'],
    value='Generate Sample Data',
    description='Data Source:',
    style={'description_width': 'initial'}
)

s3_bucket = widgets.Text(
    value='my-data-bucket',
    description='S3 Bucket:',
    style={'description_width': 'initial'}
)

s3_key = widgets.Text(
    value='data/input/data.csv',
    description='S3 Key:',
    style={'description_width': 'initial'}
)

local_file = widgets.Text(
    value='data/raw/sample.csv',
    description='Local File:',
    style={'description_width': 'initial'}
)

sample_size = widgets.IntSlider(
    value=5000,
    min=100,
    max=50000,
    step=100,
    description='Sample Size:',
    style={'description_width': 'initial'}
)

display(data_source_type)
display(s3_bucket)
display(s3_key)
display(local_file)
display(sample_size)

print("\nâœ… Configuration inputs ready!")

## 5. Data Loading

Load data from the configured source.

In [None]:
def generate_sample_data(n_records=5000):
    """
    Generate sample taxi trip data for demonstration.
    """
    np.random.seed(42)
    
    dates = [datetime.now() - timedelta(days=x) for x in range(30)]
    
    data = {
        'trip_id': range(1, n_records + 1),
        'pickup_datetime': np.random.choice(dates, n_records),
        'dropoff_datetime': [d + timedelta(minutes=np.random.randint(5, 60)) 
                            for d in np.random.choice(dates, n_records)],
        'vendor_id': np.random.choice([1, 2, 3], n_records),
        'passenger_count': np.random.choice([1, 2, 3, 4, 5, 6], n_records),
        'trip_distance': np.random.uniform(0.5, 30, n_records),
        'fare_amount': np.random.uniform(2.5, 100, n_records),
        'tip_amount': np.random.uniform(0, 50, n_records),
        'total_amount': np.random.uniform(2.5, 150, n_records),
        'payment_type': np.random.choice(['Credit Card', 'Cash', 'Mobile'], n_records),
        'pickup_location': np.random.choice(['Manhattan', 'Brooklyn', 'Queens', 'Bronx'], n_records),
        'dropoff_location': np.random.choice(['Manhattan', 'Brooklyn', 'Queens', 'Bronx'], n_records),
        'speed_mph': np.random.uniform(5, 60, n_records),
    }
    
    df = pd.DataFrame(data)
    
    # Add some missing values for realism
    missing_indices = np.random.choice(df.index, size=int(n_records * 0.02), replace=False)
    df.loc[missing_indices, 'tip_amount'] = np.nan
    
    return df


# Load data based on user selection
print("ðŸ“¥ Loading data...\n")

if data_source_type.value == 'S3 Bucket':
    print(f"Loading from S3: s3://{s3_bucket.value}/{s3_key.value}")
    s3_loader = S3DataLoader()
    df = s3_loader.load_csv_from_s3(s3_bucket.value, s3_key.value)
    print(f"âœ… Loaded {len(df)} records from S3")
    
elif data_source_type.value == 'Local File':
    print(f"Loading from local file: {local_file.value}")
    df = load_data(local_file.value)
    print(f"âœ… Loaded {len(df)} records from local file")
    
else:  # Generate Sample Data
    print(f"Generating {sample_size.value} sample records...")
    df = generate_sample_data(sample_size.value)
    print(f"âœ… Generated {len(df)} sample records")

# Display basic info
print(f"\nDataset shape: {df.shape}")
print(f"Columns: {list(df.columns)}")
print("\nFirst few rows:")
display(df.head())

## 6. Data Transformation Pipeline

Apply data cleaning and transformation.

In [None]:
print("ðŸ”„ Starting data transformation pipeline...\n")

# Store original count
original_count = len(df)
print(f"Original record count: {original_count:,}")

# 1. Remove duplicates
df_clean = df.drop_duplicates(subset=['trip_id'])
duplicates_removed = original_count - len(df_clean)
print(f"Duplicates removed: {duplicates_removed}")

# 2. Handle missing values
before_missing = df_clean.isnull().sum().sum()
df_clean['tip_amount'].fillna(df_clean['tip_amount'].median(), inplace=True)
df_clean.dropna(subset=['pickup_datetime', 'vendor_id', 'passenger_count'], inplace=True)
after_missing = df_clean.isnull().sum().sum()
print(f"Missing values handled: {before_missing - after_missing}")

# 3. Validate ranges
before_validation = len(df_clean)
df_clean = df_clean[
    (df_clean['passenger_count'] > 0) &
    (df_clean['passenger_count'] <= 8) &
    (df_clean['trip_distance'] >= 0) &
    (df_clean['fare_amount'] >= 0) &
    (df_clean['total_amount'] >= 0)
]
invalid_records = before_validation - len(df_clean)
print(f"Invalid records removed: {invalid_records}")

# 4. Convert data types
df_clean['pickup_datetime'] = pd.to_datetime(df_clean['pickup_datetime'])
df_clean['dropoff_datetime'] = pd.to_datetime(df_clean['dropoff_datetime'])
df_clean['passenger_count'] = df_clean['passenger_count'].astype(int)

# 5. Add derived features
df_clean['trip_duration_minutes'] = (
    df_clean['dropoff_datetime'] - df_clean['pickup_datetime']
).dt.total_seconds() / 60

df_clean['hour_of_day'] = df_clean['pickup_datetime'].dt.hour
df_clean['day_of_week'] = df_clean['pickup_datetime'].dt.dayofweek
df_clean['is_weekend'] = df_clean['day_of_week'].isin([5, 6])

final_count = len(df_clean)
success_rate = (final_count / original_count) * 100

print(f"\nâœ… Transformation complete!")
print(f"Final record count: {final_count:,}")
print(f"Records removed: {original_count - final_count:,}")
print(f"Success rate: {success_rate:.2f}%")

# Display summary
print("\nTransformed data sample:")
display(df_clean.head())

## 7. Metrics Calculation

Calculate comprehensive metrics for the dataset.

In [None]:
print("ðŸ“Š Calculating metrics...\n")

# Calculate metrics
metrics = {
    'Total Trips': len(df_clean),
    'Unique Vendors': df_clean['vendor_id'].nunique(),
    'Total Revenue': df_clean['total_amount'].sum(),
    'Average Fare': df_clean['fare_amount'].mean(),
    'Average Trip Distance': df_clean['trip_distance'].mean(),
    'Average Passengers': df_clean['passenger_count'].mean(),
    'Average Trip Duration (min)': df_clean['trip_duration_minutes'].mean(),
    'Average Speed (mph)': df_clean['speed_mph'].mean(),
    'Total Tips': df_clean['tip_amount'].sum(),
    'Average Tip': df_clean['tip_amount'].mean(),
    'Tip Rate (% of fare)': (df_clean['tip_amount'].sum() / df_clean['fare_amount'].sum()) * 100,
    'Weekend Trips': df_clean['is_weekend'].sum(),
    'Weekday Trips': (~df_clean['is_weekend']).sum(),
}

# Display metrics
print("=" * 60)
print("PIPELINE METRICS SUMMARY")
print("=" * 60)
for key, value in metrics.items():
    if isinstance(value, float):
        if 'Revenue' in key or 'Fare' in key or 'Tip' in key and 'Rate' not in key:
            print(f"{key:.<40} ${value:,.2f}")
        else:
            print(f"{key:.<40} {value:,.2f}")
    else:
        print(f"{key:.<40} {value:,}")
print("=" * 60)

# Store metrics for later use
metrics_df = pd.DataFrame([metrics]).T
metrics_df.columns = ['Value']

print("\nâœ… Metrics calculated successfully!")

## 8. Data Visualizations

Create comprehensive visualizations of the data.

In [None]:
# Set up the figure with subplots
fig, axes = plt.subplots(3, 2, figsize=(16, 14))
fig.suptitle('Data Pipeline Analysis Dashboard', fontsize=20, fontweight='bold', y=1.00)

# 1. Trip Distribution by Vendor
vendor_counts = df_clean['vendor_id'].value_counts().sort_index()
axes[0, 0].bar(vendor_counts.index, vendor_counts.values, color='steelblue', alpha=0.8)
axes[0, 0].set_title('Trip Distribution by Vendor', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('Vendor ID')
axes[0, 0].set_ylabel('Number of Trips')
axes[0, 0].grid(axis='y', alpha=0.3)

# 2. Revenue by Vendor
revenue_by_vendor = df_clean.groupby('vendor_id')['total_amount'].sum().sort_index()
axes[0, 1].bar(revenue_by_vendor.index, revenue_by_vendor.values, color='green', alpha=0.8)
axes[0, 1].set_title('Total Revenue by Vendor', fontsize=14, fontweight='bold')
axes[0, 1].set_xlabel('Vendor ID')
axes[0, 1].set_ylabel('Revenue ($)')
axes[0, 1].grid(axis='y', alpha=0.3)

# 3. Trip Distance Distribution
axes[1, 0].hist(df_clean['trip_distance'], bins=50, color='coral', alpha=0.7, edgecolor='black')
axes[1, 0].set_title('Trip Distance Distribution', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Distance (miles)')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].axvline(df_clean['trip_distance'].mean(), color='red', linestyle='--', 
                   label=f'Mean: {df_clean["trip_distance"].mean():.2f}')
axes[1, 0].legend()
axes[1, 0].grid(axis='y', alpha=0.3)

# 4. Fare Amount Distribution
axes[1, 1].hist(df_clean['fare_amount'], bins=50, color='purple', alpha=0.7, edgecolor='black')
axes[1, 1].set_title('Fare Amount Distribution', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Fare ($)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].axvline(df_clean['fare_amount'].mean(), color='red', linestyle='--',
                   label=f'Mean: ${df_clean["fare_amount"].mean():.2f}')
axes[1, 1].legend()
axes[1, 1].grid(axis='y', alpha=0.3)

# 5. Trips by Hour of Day
hourly_trips = df_clean['hour_of_day'].value_counts().sort_index()
axes[2, 0].plot(hourly_trips.index, hourly_trips.values, marker='o', 
                linewidth=2, markersize=8, color='teal')
axes[2, 0].fill_between(hourly_trips.index, hourly_trips.values, alpha=0.3, color='teal')
axes[2, 0].set_title('Trips by Hour of Day', fontsize=14, fontweight='bold')
axes[2, 0].set_xlabel('Hour')
axes[2, 0].set_ylabel('Number of Trips')
axes[2, 0].set_xticks(range(0, 24, 2))
axes[2, 0].grid(alpha=0.3)

# 6. Payment Type Distribution
payment_dist = df_clean['payment_type'].value_counts()
colors = plt.cm.Set3(range(len(payment_dist)))
axes[2, 1].pie(payment_dist.values, labels=payment_dist.index, autopct='%1.1f%%',
               colors=colors, startangle=90)
axes[2, 1].set_title('Payment Type Distribution', fontsize=14, fontweight='bold')

plt.tight_layout()
plt.show()

print("\nâœ… Visualizations created successfully!")

## 9. Advanced Analytics

Additional insights and analysis.

In [None]:
# Create additional visualizations
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
fig.suptitle('Advanced Analytics Dashboard', fontsize=20, fontweight='bold', y=1.00)

# 1. Average Fare by Passenger Count
avg_fare_by_passengers = df_clean.groupby('passenger_count')['fare_amount'].mean()
axes[0, 0].bar(avg_fare_by_passengers.index, avg_fare_by_passengers.values, 
               color='darkblue', alpha=0.7)
axes[0, 0].set_title('Average Fare by Passenger Count', fontsize=14, fontweight='bold')
axes[0, 0].set_xlabel('Number of Passengers')
axes[0, 0].set_ylabel('Average Fare ($)')
axes[0, 0].grid(axis='y', alpha=0.3)

# 2. Weekend vs Weekday Comparison
weekend_stats = df_clean.groupby('is_weekend')['total_amount'].agg(['count', 'sum', 'mean'])
weekend_labels = ['Weekday', 'Weekend']
x = np.arange(len(weekend_labels))
width = 0.35

axes[0, 1].bar(x - width/2, weekend_stats['count'], width, label='Trip Count', color='orange', alpha=0.7)
axes[0, 1].set_ylabel('Trip Count', color='orange')
axes[0, 1].tick_params(axis='y', labelcolor='orange')

ax2 = axes[0, 1].twinx()
ax2.bar(x + width/2, weekend_stats['mean'], width, label='Avg Revenue', color='blue', alpha=0.7)
ax2.set_ylabel('Average Revenue ($)', color='blue')
ax2.tick_params(axis='y', labelcolor='blue')

axes[0, 1].set_title('Weekend vs Weekday Analysis', fontsize=14, fontweight='bold')
axes[0, 1].set_xticks(x)
axes[0, 1].set_xticklabels(weekend_labels)
axes[0, 1].grid(alpha=0.3)

# 3. Top Routes
top_routes = df_clean.groupby(['pickup_location', 'dropoff_location']).size().nlargest(10)
route_labels = [f"{p} â†’ {d}" for p, d in top_routes.index]
axes[1, 0].barh(range(len(top_routes)), top_routes.values, color='mediumseagreen', alpha=0.8)
axes[1, 0].set_yticks(range(len(top_routes)))
axes[1, 0].set_yticklabels(route_labels, fontsize=9)
axes[1, 0].set_title('Top 10 Routes', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Number of Trips')
axes[1, 0].invert_yaxis()
axes[1, 0].grid(axis='x', alpha=0.3)

# 4. Speed Distribution by Vendor
vendors = df_clean['vendor_id'].unique()
for vendor in sorted(vendors):
    vendor_data = df_clean[df_clean['vendor_id'] == vendor]['speed_mph']
    axes[1, 1].hist(vendor_data, bins=30, alpha=0.5, label=f'Vendor {vendor}')

axes[1, 1].set_title('Speed Distribution by Vendor', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Speed (mph)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].legend()
axes[1, 1].grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

print("\nâœ… Advanced analytics complete!")

## 10. Dashboard Summary Table

In [None]:
# Create vendor performance summary table
vendor_summary = df_clean.groupby('vendor_id').agg({
    'trip_id': 'count',
    'total_amount': ['sum', 'mean'],
    'fare_amount': 'mean',
    'trip_distance': 'mean',
    'passenger_count': 'mean',
    'speed_mph': 'mean',
    'tip_amount': ['sum', 'mean']
}).round(2)

vendor_summary.columns = [
    'Total Trips', 'Total Revenue', 'Avg Revenue per Trip',
    'Avg Fare', 'Avg Distance (mi)', 'Avg Passengers',
    'Avg Speed (mph)', 'Total Tips', 'Avg Tip'
]

print("\n" + "=" * 120)
print("VENDOR PERFORMANCE SUMMARY")
print("=" * 120)
display(vendor_summary)
print("=" * 120)

## 11. Export Results

Save the processed data and metrics.

In [None]:
from google.colab import files
import io

print("ðŸ’¾ Exporting results...\n")

# 1. Export transformed data
csv_filename = f'transformed_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
df_clean.to_csv(csv_filename, index=False)
print(f"âœ… Transformed data saved: {csv_filename}")

# 2. Export metrics
metrics_filename = f'pipeline_metrics_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
with open(metrics_filename, 'w') as f:
    json.dump(metrics, f, indent=2, default=str)
print(f"âœ… Metrics saved: {metrics_filename}")

# 3. Export vendor summary
summary_filename = f'vendor_summary_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
vendor_summary.to_csv(summary_filename)
print(f"âœ… Vendor summary saved: {summary_filename}")

# 4. Download files
print("\nðŸ“¥ Downloading files to your computer...")
files.download(csv_filename)
files.download(metrics_filename)
files.download(summary_filename)

print("\nâœ… All results exported successfully!")

## 12. Optional: Upload Results to S3

In [None]:
# Upload results back to S3 (optional)
upload_to_s3 = input("Upload results to S3? (yes/no): ").lower() == 'yes'

if upload_to_s3:
    output_bucket = input("S3 Bucket for output: ")
    output_prefix = input("S3 Prefix (e.g., 'output/'): ")
    
    s3_loader = S3DataLoader()
    
    # Upload transformed data
    s3_loader.upload_dataframe(
        df_clean, 
        output_bucket, 
        f"{output_prefix}{csv_filename}"
    )
    print(f"âœ… Uploaded {csv_filename} to S3")
    
    # Upload metrics
    s3_loader.upload_file(
        metrics_filename,
        output_bucket,
        f"{output_prefix}{metrics_filename}"
    )
    print(f"âœ… Uploaded {metrics_filename} to S3")
    
    # Upload vendor summary
    s3_loader.upload_file(
        summary_filename,
        output_bucket,
        f"{output_prefix}{summary_filename}"
    )
    print(f"âœ… Uploaded {summary_filename} to S3")
    
    print("\nâœ… All results uploaded to S3!")
else:
    print("Skipping S3 upload.")

## 13. Interactive Dashboard (Streamlit in Colab)

Launch the interactive dashboard.

In [None]:
# Install Streamlit tunnel for Colab
!pip install -q streamlit pyngrok

# Save current data for dashboard
df_clean.to_csv('data/processed/current_data.csv', index=False)

print("\nðŸš€ Starting dashboard...")
print("\nNote: You can run the dashboard with:")
print("!streamlit run dashboard/app.py &")
print("\nThen use ngrok to expose it publicly.")
print("\nFor this demo, the dashboard files are prepared and ready to run.")

## 14. Final Summary and Report

In [None]:
print("\n" + "=" * 80)
print("PIPELINE EXECUTION COMPLETE - FINAL SUMMARY")
print("=" * 80)
print(f"\nExecution Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"\nData Source: {data_source_type.value}")
print(f"Original Records: {original_count:,}")
print(f"Final Records: {final_count:,}")
print(f"Success Rate: {success_rate:.2f}%")
print(f"\nTotal Revenue Processed: ${metrics['Total Revenue']:,.2f}")
print(f"Average Transaction: ${metrics['Average Fare']:,.2f}")
print(f"\nFiles Generated:")
print(f"  - {csv_filename}")
print(f"  - {metrics_filename}")
print(f"  - {summary_filename}")
print("\n" + "=" * 80)
print("âœ… ALL PIPELINE TASKS COMPLETED SUCCESSFULLY!")
print("=" * 80)