Data Extraction

Task: Automate the downloading of the dataset from the Year 2019.

Write a script that downloads PARQUET files from the Year 2019.
Ensure the script can handle network errors and retries.

In [0]:
%pip install tqdm
%pip install pandas
%pip install requests

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import os
import requests
from bs4 import BeautifulSoup
from time import sleep
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pandas as pd

# Create a session to handle retries
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))

# Define the base URL for scraping and the local directory to save the files
scrape_url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"
local_dir = "/tmp/nyc_taxi_data_2019"  # Local temporary storage in Databricks

# Create the local directory if it doesn't exist
os.makedirs(local_dir, exist_ok=True)

# Function to download a file with retries
def download_file(url, output_path):
    try:
        response = session.get(url, stream=True)
        response.raise_for_status()  # Raise an error for bad status codes
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print(f"Successfully downloaded {output_path}")
    except requests.exceptions.RequestException as e:
        print(f"Failed to download {url}: {e}")

# Scrape the website to find the correct URLs
response = session.get(scrape_url)
soup = BeautifulSoup(response.text, 'html.parser')
links = soup.find_all('a', href=True)

# Filter and download parquet files for the first three months of 2019
parquet_files = []
months = ['01', '02', '03']
for link in links:
    href = link['href']
    if any(f'2019-{month}' in href for month in months) and href.endswith('.parquet'):
        filename = href.split('/')[-1]
        url = href
        output_path = os.path.join(local_dir, filename)
        download_file(url, output_path)
        parquet_files.append(output_path)
        sleep(1)  # Sleep for a short time between requests to avoid overloading the server

print("Download completed.")

# Copy the downloaded files to DBFS
dbfs_dir = "/mnt/nyc_taxi_data_2019"  # Change to your desired DBFS path

# Create the DBFS directory if it doesn't exist
dbutils.fs.mkdirs(dbfs_dir)

# Copy files to DBFS
for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    dbutils.fs.cp(f"file:{local_file}", dbfs_file)
    print(f"Copied {local_file} to {dbfs_file}")

print("Files copied to DBFS.")

# Read and display data from each parquet file in DBFS using Spark
for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    df = spark.read.parquet(dbfs_file)
    df.show(5)

print("Data loaded and displayed.")


Successfully downloaded /tmp/nyc_taxi_data_2019/yellow_tripdata_2019-01.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/green_tripdata_2019-01.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/fhv_tripdata_2019-01.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/yellow_tripdata_2019-02.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/green_tripdata_2019-02.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/fhv_tripdata_2019-02.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/fhvhv_tripdata_2019-02.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/yellow_tripdata_2019-03.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/green_tripdata_2019-03.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/fhv_tripdata_2019-03.parquet
Successfully downloaded /tmp/nyc_taxi_data_2019/fhvhv_tripdata_2019-03.parquet
Download completed.
Copied /tmp/nyc_taxi_data_2019/yellow_tripdata_2019-01.parquet to /mnt/nyc_taxi_data_2019/yellow_tripdata_2019-01.

Data Processing

Task: Clean and transform the data using Python and Pandas.
Remove any trips that have missing or corrupt data.
Derive new columns such as trip duration and average speed.
Aggregate data to calculate total trips and average fare per day.

In [0]:
import os
import pandas as pd
from datetime import datetime

# Define local directory
local_dir = "/tmp/nyc_taxi_data_2019"

# Function to process data
def process_data(file_path, pickup_col, dropoff_col, distance_col, fare_col):
    # Read parquet file into DataFrame
    df = pd.read_parquet(file_path)
    
    # Select necessary columns and drop missing values
    df = df[[pickup_col, dropoff_col, distance_col, fare_col]].dropna()
    
    # Rename columns
    df.columns = ['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'fare_amount']
    
    # Convert datetime columns
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
    
    # Derive new columns: trip duration (minutes) and average speed (mph)
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    # Remove invalid data
    df = df[(df['trip_duration'] > 0) & (df['average_speed'].notnull())]
    
    # Aggregate data: total trips and average fare per day
    df['pickup_date'] = df['pickup_datetime'].dt.date
    agg_df = df.groupby('pickup_date').agg(
        total_trips=('trip_distance', 'count'),
        average_fare=('fare_amount', 'mean')
    ).reset_index()
    
    return df, agg_df

# Process each file and display results
for month in ['01', '02', '03']:
    for taxi_type, pickup_col, dropoff_col, distance_col, fare_col in [
        ('yellow', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount'),
        ('green', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount'),
        ('fhvhv', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare')
    ]:
        # Get file path
        file_path = os.path.join(local_dir, f'{taxi_type}_tripdata_2019-{month}.parquet')
        
        # Process data
        try:
            df, agg_df = process_data(file_path, pickup_col, dropoff_col, distance_col, fare_col)
            
            # Show DataFrame description
            print(f"Description of {taxi_type} data for 2019-{month}:")
            print(df.describe())
            
            # Show aggregated results
            print(agg_df.head())
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

print("Data processing completed.")



Description of yellow data for 2019-01:
       trip_distance   fare_amount  trip_duration  average_speed
count   7.690060e+06  7.690060e+06   7.690060e+06   7.690060e+06
mean    2.832390e+00  1.243029e+01   1.658014e+01   1.318942e+01
std     3.775024e+00  2.250695e+02   7.516960e+01   1.374931e+02
min     0.000000e+00 -3.620000e+02   1.666667e-02   0.000000e+00
25%     9.000000e-01  6.000000e+00   6.116667e+00   7.594937e+00
50%     1.540000e+00  9.000000e+00   1.020000e+01   1.010204e+01
75%     2.840000e+00  1.350000e+01   1.666667e+01   1.377049e+01
max     8.318000e+02  6.232599e+05   4.364802e+04   7.344000e+04
  pickup_date  total_trips  average_fare
0  2001-02-02            1      2.500000
1  2003-01-01            1      0.000000
2  2008-12-31           22     12.636364
3  2009-01-01           48     13.510417
4  2018-11-28           10     70.015000
Description of green data for 2019-01:
       trip_distance    fare_amount  trip_duration  average_speed
count  671386.000000  67

Data Loading

Task: Load the processed data into an SQLite database.
Design and implement a schema suitable for querying trip metrics.
Use SQL to load data into the database efficiently.

In [0]:
import os
import pandas as pd
import sqlite3
from datetime import datetime

# Define local directory
local_dir = "/tmp/nyc_taxi_data_2019"

# Function to process data
def process_data(file_path, pickup_col, dropoff_col, distance_col, fare_col):
    df = pd.read_parquet(file_path)
    df = df[[pickup_col, dropoff_col, distance_col, fare_col]].dropna()
    df.columns = ['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'fare_amount']
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    df = df[(df['trip_duration'] > 0) & (df['average_speed'].notnull())]
    df['pickup_date'] = df['pickup_datetime'].dt.date
    agg_df = df.groupby('pickup_date').agg(
        total_trips=('trip_distance', 'count'),
        average_fare=('fare_amount', 'mean')
    ).reset_index()
    return df, agg_df

# Connect to SQLite database (or create it)
conn = sqlite3.connect('/tmp/nyc_taxi_data_2019.db')
cursor = conn.cursor()

# Create tables if not exist
for taxi_type in ['yellow', 'green', 'fhvhv']:
    for month in ['01', '02', '03']:
        cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {taxi_type}_trips_{month} (
            pickup_datetime TEXT,
            dropoff_datetime TEXT,
            trip_distance REAL,
            fare_amount REAL,
            trip_duration REAL,
            average_speed REAL,
            pickup_date DATE
        )
        """)
        cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {taxi_type}_agg_{month} (
            pickup_date DATE,
            total_trips INTEGER,
            average_fare REAL
        )
        """)
        
conn.commit()

# Process each file and load into SQLite
for month in ['01', '02', '03']:
    for taxi_type, pickup_col, dropoff_col, distance_col, fare_col in [
        ('yellow', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount'),
        ('green', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount'),
        ('fhvhv', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare')
    ]:
        file_path = os.path.join(local_dir, f'{taxi_type}_tripdata_2019-{month}.parquet')
        
        try:
            df, agg_df = process_data(file_path, pickup_col, dropoff_col, distance_col, fare_col)
            
            # Load data into SQLite tables
            df.to_sql(f'{taxi_type}_trips_{month}', conn, if_exists='replace', index=False)
            agg_df.to_sql(f'{taxi_type}_agg_{month}', conn, if_exists='replace', index=False)
            
            print(f"Data for {taxi_type} 2019-{month} loaded into SQLite.")
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

# Close the connection
conn.close()

print("Data loading completed.")


Data for yellow 2019-01 loaded into SQLite.
Data for green 2019-01 loaded into SQLite.
Error processing /tmp/nyc_taxi_data_2019/fhvhv_tripdata_2019-01.parquet: [Errno 2] No such file or directory: '/tmp/nyc_taxi_data_2019/fhvhv_tripdata_2019-01.parquet'
Data for yellow 2019-02 loaded into SQLite.
Data for green 2019-02 loaded into SQLite.
Data for fhvhv 2019-02 loaded into SQLite.
Data for yellow 2019-03 loaded into SQLite.
Data for green 2019-03 loaded into SQLite.
Data for fhvhv 2019-03 loaded into SQLite.
Data loading completed.


Data Analysis and Reporting

Task: Generate insights and reports from the database.
Develop SQL queries to answer the following questions:
    What are the peak hours for taxi usage?
    How does passenger count affect the trip fare?
    What are the trends in usage over the year?

In [0]:
%sql
SELECT
    HOUR(pickup_datetime) AS hour,
    COUNT(*) AS trip_count
FROM
    (SELECT pickup_datetime FROM yellow_trips_01
     UNION ALL
     SELECT pickup_datetime FROM yellow_trips_02
     UNION ALL
     SELECT pickup_datetime FROM yellow_trips_03)
GROUP BY
    hour
ORDER BY
    trip_count DESC;

In [0]:
%sql
SELECT
    EXTRACT(HOUR FROM pickup_datetime) AS hour,
    COUNT(*) AS trip_count
FROM
    trip_data
GROUP BY
    hour
ORDER BY
    hour;


In [0]:
%sql
SELECT
    passenger_count,
    AVG(fare_amount) AS average_fare
FROM
    taxi_trip_data
GROUP BY
    passenger_count
ORDER BY
    passenger_count;


Create visualizations to represent the findings.

In [0]:
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def create_visualizations(db_name):
    conn = sqlite3.connect(db_name)
    
    # Peak hours visualization
    query = "SELECT strftime('%H', pickup_datetime) as hour, COUNT(*) as total_trips FROM trips GROUP BY hour ORDER BY total_trips DESC LIMIT 10;"
    peak_hours = pd.read_sql(query, conn)
    sns.barplot(x='hour', y='total_trips', data=peak_hours)
    plt.title("Peak Hours for Taxi Usage")
    plt.show()
    
    # Passenger count effect on fare
    query = "SELECT passenger_count, AVG(fare_amount) as average_fare FROM trips GROUP BY passenger_count;"
    passenger_fares = pd.read_sql(query, conn)
    sns.barplot(x='passenger_count', y='average_fare', data=passenger_fares)
    plt.title("Effect of Passenger Count on Fare")
    plt.show()
    
    # Trends over the year
    query = "SELECT date, total_trips, average_fare FROM daily_aggregates ORDER BY date;"
    trends = pd.read_sql(query, conn)
    trends['date'] = pd.to_datetime(trends['date'])
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()
    ax1.plot(trends['date'], trends['total_trips'], 'g-')
    ax2.plot(trends['date'], trends['average_fare'], 'b-')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Total Trips', color='g')
    ax2.set_ylabel('Average Fare', color='b')
    plt.title("Trends in Taxi Usage Over the Year")
    plt.show()
    
    conn.close()

if __name__ == "__main__":
    create_visualizations("ny_taxi.db")
