In [0]:
%pip install tqdm pandas requests


In [0]:
dbutils.library.restartPython()


In [0]:
import os
import requests
from bs4 import BeautifulSoup
from time import sleep
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pandas as pd

# Create a session to handle retries
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount('https://', HTTPAdapter(max_retries=retries))

# Define the base URL for scraping and the local directory to save the files
scrape_url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"
local_dir = "/tmp/nyc_taxi_data_2019"  # Local temporary storage in Databricks

# Create the local directory if it doesn't exist
os.makedirs(local_dir, exist_ok=True)

# Function to download a file with retries
def download_file(url, output_path):
    try:
        response = session.get(url, stream=True)
        response.raise_for_status()  # Raise an error for bad status codes
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print(f"Successfully downloaded {output_path}")
    except requests.exceptions.RequestException as e:
        print(f"Failed to download {url}: {e}")

# Scrape the website to find the correct URLs
response = session.get(scrape_url)
soup = BeautifulSoup(response.text, 'html.parser')
links = soup.find_all('a', href=True)

# Filter and download parquet files for the year 2019
parquet_files = []
for link in links:
    href = link['href']
    if '2019' in href and href.endswith('.parquet'):
        filename = href.split('/')[-1]
        url = href
        output_path = os.path.join(local_dir, filename)
        download_file(url, output_path)
        parquet_files.append(output_path)
        sleep(1)  # Sleep for a short time between requests to avoid overloading the server

print("Download completed.")

# Copy the downloaded files to DBFS
dbfs_dir = "/mnt/nyc_taxi_data_2019"  # Change to your desired DBFS path

# Create the DBFS directory if it doesn't exist
dbutils.fs.mkdirs(dbfs_dir)

# Copy files to DBFS
for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    dbutils.fs.cp(f"file:{local_file}", dbfs_file)
    print(f"Copied {local_file} to {dbfs_file}")

print("Files copied to DBFS.")

# Read and display data from each parquet file in DBFS using Spark
for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    df = spark.read.parquet(dbfs_file)
    df.show(5)

print("Data loaded and displayed.")


In [0]:
import pandas as pd 
import os

def clean_and_transform_spark(df, taxi_type):
    if taxi_type == "yellow":
        required_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('tpep_pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('tpep_dropoff_datetime'))
    elif taxi_type == "green":
        required_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('lpep_pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('lpep_dropoff_datetime'))
    elif taxi_type == "hvfht":
        required_columns = ['pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('dropoff_datetime'))
    
    df = df.withColumn('trip_duration', (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 60) \
           .withColumn('average_speed', F.col('trip_distance') / (F.col('trip_duration') / 60))
    
    return df

# Read, clean, and display data from each parquet file in DBFS using Spark
from pyspark.sql import functions as F

for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    taxi_type = ""
    if "yellow" in dbfs_file:
        taxi_type = "yellow"
    elif "green" in dbfs_file:
        taxi_type = "green"
    elif "hvfht" in dbfs_file:
        taxi_type = "hvfht"
    
    if taxi_type:
        df = spark.read.parquet(dbfs_file)
        df = clean_and_transform_spark(df, taxi_type)
        df.show(5)

print("Data loaded, cleaned, and displayed.")

In [0]:
import pandas as pd
import os
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, filename='data_processing.log', 
                    format='%(asctime)s - %(levelname)s - %(message)s')

def clean_and_transform(df, file_type):
    try:
        if file_type == 'yellow':
            required_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount']
            df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
            df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
        elif file_type == 'green':
            required_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount']
            df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
            df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
        elif file_type == 'hvfht':
            required_columns = ['pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare']
        else:
            logging.error(f"Unknown file type: {file_type}")
            return None, None
        
        for col in required_columns:
            if col not in df.columns:
                logging.error(f"Missing column {col} in {file_type} taxi data")
                return None, None
        
        # Remove trips with missing or corrupt data
        df.dropna(subset=required_columns, inplace=True)
        
        if file_type in ['yellow', 'green']:
            df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
            df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
            fare_column = 'fare_amount'
        else:
            df['trip_duration'] = (pd.to_datetime(df['dropoff_datetime']) - pd.to_datetime(df['pickup_datetime'])).dt.total_seconds() / 60
            df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
            fare_column = 'base_passenger_fare'
        
        # Aggregate data
        df['date'] = df['pickup_datetime'].dt.date
        daily_aggregates = df.groupby('date').agg({
            'trip_duration': 'count',
            fare_column: 'mean'
        }).rename(columns={'trip_duration': 'total_trips', fare_column: 'average_fare'})
        
        return df, daily_aggregates
    except Exception as e:
        logging.error(f"Error processing {file_type} taxi data: {e}")
        return None, None

def process_all_files(data_folder):
    processed_data = []
    daily_aggregates = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        if 'yellow' in file_name:
            file_type = 'yellow'
        elif 'green' in file_name:
            file_type = 'green'
        elif 'hvfht' in file_name:
            file_type = 'hvfht'
        else:
            logging.error(f"Unknown file type for file: {file_name}")
            continue
        
        df = pd.read_parquet(file_path)
        df, daily_agg = clean_and_transform(df, file_type)
        if df is not None:
            processed_data.append(df)
        if daily_agg is not None:
            daily_aggregates.append(daily_agg)
    
    if processed_data:
        processed_data = pd.concat(processed_data)
    else:
        processed_data = pd.DataFrame()
    
    if daily_aggregates:
        daily_aggregates = pd.concat(daily_aggregates)
    else:
        daily_aggregates = pd.DataFrame()
    
    return processed_data, daily_aggregates

if __name__ == "__main__":
    data_folder = "/tmp/nyc_taxi_data_2019"
    processed_data, daily_aggregates = process_all_files(data_folder)
    if not processed_data.empty:
        processed_data.to_parquet("/dbfs/tmp/processed_data.parquet", index=False)
    if not daily_aggregates.empty:
        daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)



[0;31m---------------------------------------------------------------------------[0m
[0;31mThe Python process exited with exit code 137 (SIGKILL: Killed). This may have been caused by an OOM error. Check your command's memory usage.[0m
[0;31m[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mThe last 10 KB of the process's stderr and stdout can be found below. See driver logs for full logs.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stderr:[0m
[0;31mSun Jul 14 17:20:18 2024 Connection to spark from PID  1397[0m
[0;31mSun Jul 14 17:20:18 2024 Initialized gateway on port 36775[0m
[0;31mSun Jul 14 17:20:18 2024 Connected to spark.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stdout:[0m
[0;31m      2.5|  0.0|    0.5|       0.0|         0.0|                  0.3|         3.3|                

In [0]:
# Function to clean, transform, and aggregate data for Spark DataFrame
import pandas as pd
import os
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

def clean_transform_aggregate_spark(df, taxi_type):
    if taxi_type == "yellow":
        required_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('tpep_pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('tpep_dropoff_datetime'))
    elif taxi_type == "green":
        required_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('lpep_pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('lpep_dropoff_datetime'))
    elif taxi_type == "hvfht":
        required_columns = ['pickup_datetime', 'dropoff_datetime', 'base_passenger_fare']
        if 'trip_miles' in df.columns:
            required_columns.append('trip_miles')
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('dropoff_datetime'))
    
    if 'trip_miles' in df.columns:
        df = df.withColumn('trip_duration', (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 60) \
               .withColumn('average_speed', F.col('trip_miles') / (F.col('trip_duration') / 60))
    elif 'trip_distance' in df.columns:
        df = df.withColumn('trip_duration', (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 60) \
               .withColumn('average_speed', F.col('trip_distance') / (F.col('trip_duration') / 60))
    
    df = df.withColumn('date', F.to_date('pickup_datetime'))
    
    # Aggregate data to calculate total trips and average fare per day
    if 'fare_amount' in df.columns:
        daily_aggregates = df.groupBy('date').agg(
            F.count('*').alias('total_trips'),
            F.mean('fare_amount').alias('average_fare')
        )
    elif 'base_passenger_fare' in df.columns:
        daily_aggregates = df.groupBy('date').agg(
            F.count('*').alias('total_trips'),
            F.mean('base_passenger_fare').alias('average_fare')
        )
    
    return df, daily_aggregates

# Read, clean, transform, aggregate, and display data from each parquet file in DBFS using Spark
all_processed_data = []
all_daily_aggregates = []

for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    taxi_type = ""
    if "yellow" in dbfs_file:
        taxi_type = "yellow"
    elif "green" in dbfs_file:
        taxi_type = "green"
    elif "hvfht" in dbfs_file:
        taxi_type = "hvfht"
    
    if taxi_type:
        df = spark.read.parquet(dbfs_file)
        print(f"Schema for {taxi_type} taxi data:")
        df.printSchema()  # Print the schema for debugging purposes
        processed_df, daily_agg_df = clean_transform_aggregate_spark(df, taxi_type)
        
        processed_df.show(5)
        daily_agg_df.show(5)
        
        all_processed_data.append(processed_df)
        all_daily_aggregates.append(daily_agg_df)

# Concatenate all processed data and daily aggregates
final_processed_data = all_processed_data[0]
final_daily_aggregates = all_daily_aggregates[0]

for df in all_processed_data[1:]:
    final_processed_data = final_processed_data.union(df)

for df in all_daily_aggregates[1:]:
    final_daily_aggregates = final_daily_aggregates.union(df)

# Save the final processed data and daily aggregates to parquet files
final_processed_data.write.parquet("/dbfs/tmp/processed_data.parquet", mode='overwrite')
final_daily_aggregates.write.parquet("/dbfs/tmp/daily_aggregates.parquet", mode='overwrite')

print("Data loaded, cleaned, transformed, aggregated, and saved.")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2750058774820529>:53[0m
[1;32m     50[0m all_processed_data [38;5;241m=[39m []
[1;32m     51[0m all_daily_aggregates [38;5;241m=[39m []
[0;32m---> 53[0m [38;5;28;01mfor[39;00m local_file [38;5;129;01min[39;00m parquet_files:
[1;32m     54[0m     dbfs_file [38;5;241m=[39m os[38;5;241m.[39mpath[38;5;241m.[39mjoin(dbfs_dir, os[38;5;241m.[39mpath[38;5;241m.[39mbasename(local_file))
[1;32m     55[0m     taxi_type [38;5;241m=[39m [38;5;124m"[39m[38;5;124m"[39m

[0;31mNameError[0m: name 'parquet_files' is not defined

In [0]:
# Read, clean, transform, aggregate, and display data from each parquet file in DBFS using Spark
all_processed_data = []
all_daily_aggregates = []

for local_file in parquet_files:
    dbfs_file = os.path.join(dbfs_dir, os.path.basename(local_file))
    taxi_type = ""
    if "yellow" in dbfs_file:
        taxi_type = "yellow"
    elif "green" in dbfs_file:
        taxi_type = "green"
    elif "fhv" in dbfs_file:
        taxi_type = "fhv"
    elif "hvfht" in dbfs_file:
        taxi_type = "hvfht"

if taxi_type:
        df = spark.read.parquet(dbfs_file)
        print(f"Schema for {taxi_type} taxi data:")
        df.printSchema()  # Print the schema for debugging purposes
        processed_df, daily_agg_df = clean_transform_aggregate_spark(df, taxi_type)
        
        processed_df.show(5)
        daily_agg_df.show(5)
        
        all_processed_data.append(processed_df)
        all_daily_aggregates.append(daily_agg_df)

# Concatenate all processed data and daily aggregates
final_processed_data = all_processed_data[0]
final_daily_aggregates = all_daily_aggregates[0]

for df in all_processed_data[1:]:
    final_processed_data = final_processed_data.union(df)

for df in all_daily_aggregates[1:]:
    final_daily_aggregates = final_daily_aggregates.union(df)

# Save the final processed data and daily aggregates to parquet files
final_processed_data.write.parquet("/dbfs/tmp/processed_data.parquet", mode='overwrite')
final_daily_aggregates.write.parquet("/dbfs/tmp/daily_aggregates.parquet", mode='overwrite')

print("Data loaded, cleaned, transformed, aggregated, and saved.")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2750058774820525>:5[0m
[1;32m      2[0m all_processed_data [38;5;241m=[39m []
[1;32m      3[0m all_daily_aggregates [38;5;241m=[39m []
[0;32m----> 5[0m [38;5;28;01mfor[39;00m local_file [38;5;129;01min[39;00m parquet_files:
[1;32m      6[0m     dbfs_file [38;5;241m=[39m os[38;5;241m.[39mpath[38;5;241m.[39mjoin(dbfs_dir, os[38;5;241m.[39mpath[38;5;241m.[39mbasename(local_file))
[1;32m      7[0m     taxi_type [38;5;241m=[39m [38;5;124m"[39m[38;5;124m"[39m

[0;31mNameError[0m: name 'parquet_files' is not defined

In [0]:
# Read, clean, transform, aggregate, and display data from each parquet file in DBFS using Spark
all_processed_data = []
all_daily_aggregates = []

for local_file in parquet_files:
    dbfs_file = os.path.join("/dbfs" + dbfs_dir, os.path.basename(local_file))
    taxi_type = ""
    if "yellow" in dbfs_file:
        taxi_type = "yellow"
    elif "green" in dbfs_file:
        taxi_type = "green"
    elif "fhv" in dbfs_file:
        taxi_type = "fhv"
    elif "hvfht" in dbfs_file:
        taxi_type = "hvfht"
    
    if taxi_type:
        df = spark.read.parquet(dbfs_file)
        print(f"Schema for {taxi_type} taxi data:")
        df.printSchema()  # Print the schema for debugging purposes
        processed_df, daily_agg_df = clean_transform_aggregate_spark(df, taxi_type)
        
        processed_df.show(5)
        daily_agg_df.show(5)
        
        all_processed_data.append(processed_df)
        all_daily_aggregates.append(daily_agg_df)

# Concatenate all processed data and daily aggregates
final_processed_data = all_processed_data[0]
final_daily_aggregates = all_daily_aggregates[0]

for df in all_processed_data[1:]:
    final_processed_data = final_processed_data.union(df)

for df in all_daily_aggregates[1:]:
    final_daily_aggregates = final_daily_aggregates.union(df)

# Save the final processed data and daily aggregates to parquet files
final_processed_data.write.parquet("/dbfs/tmp/processed_data.parquet", mode='overwrite')
final_daily_aggregates.write.parquet("/dbfs/tmp/daily_aggregates.parquet", mode='overwrite')

print("Data loaded, cleaned, transformed, aggregated, and saved.")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2436500203063026>:5[0m
[1;32m      2[0m all_processed_data [38;5;241m=[39m []
[1;32m      3[0m all_daily_aggregates [38;5;241m=[39m []
[0;32m----> 5[0m [38;5;28;01mfor[39;00m local_file [38;5;129;01min[39;00m parquet_files:
[1;32m      6[0m     dbfs_file [38;5;241m=[39m os[38;5;241m.[39mpath[38;5;241m.[39mjoin([38;5;124m"[39m[38;5;124m/dbfs[39m[38;5;124m"[39m [38;5;241m+[39m dbfs_dir, os[38;5;241m.[39mpath[38;5;241m.[39mbasename(local_file))
[1;32m      7[0m     taxi_type [38;5;241m=[39m [38;5;124m"[39m[38;5;124m"[39m

[0;31mNameError[0m: name 'parquet_files' is not defined

In [0]:
elif taxi_type == "fhvhv":
        required_columns = ['pickup_datetime', 'dropoff_datetime', 'base_passenger_fare']
        if 'trip_miles' in df.columns:
            required_columns.append('trip_miles')
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('dropoff_datetime'))


               elif "fhv" in dbfs_file:
        taxi_type = "fhv"

In [0]:
import pandas as pd
import os

def clean_and_transform_yellow(df):
    required_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount']
    if not all(col in df.columns for col in required_columns):
        raise KeyError(f"Missing required columns in yellow taxi data: {required_columns}")
    
    df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_green(df):
    required_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount']
    if not all(col in df.columns for col in required_columns):
        raise KeyError(f"Missing required columns in green taxi data: {required_columns}")

    df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_hvfht(df):
    required_columns = ['pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare']
    if not all(col in df.columns for col in required_columns):
        raise KeyError(f"Missing required columns in HVFHT data: {required_columns}")
    
    df.dropna(inplace=True)
    
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
    
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'base_passenger_fare': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'base_passenger_fare': 'average_fare'})
    
    return df, daily_aggregates

def process_all_files(data_folder):
    processed_data = []
    daily_aggregates = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        df = pd.read_parquet(file_path)
        
        try:
            if 'yellow' in file_name:
                df, daily_agg = clean_and_transform_yellow(df)
            elif 'green' in file_name:
                df, daily_agg = clean_and_transform_green(df)
            elif 'hvfht' in file_name:
                df, daily_agg = clean_and_transform_hvfht(df)
            else:
                continue  # skip files that do not match any known type
            
            processed_data.append(df)
            daily_aggregates.append(daily_agg)
        
        except KeyError as e:
            print(f"Skipping file {file_name} due to missing columns: {e}")
    
    return pd.concat(processed_data), pd.concat(daily_aggregates)

if __name__ == "__main__":
    data_folder = "/tmp/nyc_taxi_data_2019"
    processed_data, daily_aggregates = process_all_files(data_folder)
    processed_data.to_parquet("/dbfs/tmp/processed_data.parquet", index=False)
    daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)


In [0]:
# Sample code to inspect the schema
data_folder = "/mnt/nyc_taxi_data_2019"
file_path = f"{data_folder}/yellow_tripdata_2019-01.parquet"
df = spark.read.parquet(file_path)
df.printSchema()


In [0]:
# Sample code to inspect the schema
data_folder = "/mnt/nyc_taxi_data_2019"
file_path = f"{data_folder}/green_tripdata_2019-01.parquet"
df = spark.read.parquet(file_path)
df.printSchema()


In [0]:
# Sample code to inspect the schema
data_folder = "/mnt/nyc_taxi_data_2019"
file_path = f"{data_folder}/fhv_tripdata_2019-01.parquet"
df = spark.read.parquet(file_path)
df.printSchema()


In [0]:
data_folder = "/mnt/nyc_taxi_data_2019"
file_path = f"{data_folder}/fhvhv_tripdata_2019-02.parquet"
df = spark.read.parquet(file_path)
df.printSchema()

In [0]:
# Sample code to inspect the schema
data_folder = "/mnt/nyc_taxi_data_2019"
file_path = f"{data_folder}/fhvhv_tripdata_2019-02.parquet"
df = spark.read.parquet(file_path)
df.printSchema()


In [0]:
import pandas as pd
import os

def clean_and_transform_yellow(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_green(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_fhvht(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'base_passenger_fare': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'base_passenger_fare': 'average_fare'})
    
    return df, daily_aggregates

def process_all_files(data_folder):
    processed_data = []
    daily_aggregates = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        df = pd.read_parquet(file_path)
        
        if 'yellow' in file_name:
            df, daily_agg = clean_and_transform_yellow(df)
        elif 'green' in file_name:
            df, daily_agg = clean_and_transform_green(df)
        else:
            df, daily_agg = clean_and_transform_fhvht(df)
        
        processed_data.append(df)
        daily_aggregates.append(daily_agg)
    
    return pd.concat(processed_data), pd.concat(daily_aggregates)

if __name__ == "__main__":
    data_folder = "/tmp/nyc_taxi_data_2019"
    processed_data, daily_aggregates = process_all_files(data_folder)
    processed_data.to_parquet("/dbfs/tmp/processed_data.parquet", index=False)
    daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)


[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/core/indexes/base.py:3621[0m, in [0;36mIndex.get_loc[0;34m(self, key, method, tolerance)[0m
[1;32m   3620[0m [38;5;28;01mtry[39;00m:
[0;32m-> 3621[0m     [38;5;28;01mreturn[39;00m [38;5;28;43mself[39;49m[38;5;241;43m.[39;49m[43m_engine[49m[38;5;241;43m.[39;49m[43mget_loc[49m[43m([49m[43mcasted_key[49m[43m)[49m
[1;32m   3622[0m [38;5;28;01mexcept[39;00m [38;5;167;01mKeyError[39;00m [38;5;28;01mas[39;00m err:

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/_libs/index.pyx:136[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/_libs/index.pyx:163[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File

In [0]:
import pandas as pd
import os

def clean_and_transform_yellow(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_green(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_fhv(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'base_passenger_fare': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'base_passenger_fare': 'average_fare'})
    
    return df, daily_aggregates

def clean_and_transform_hvfht(df):
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'base_passenger_fare': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'base_passenger_fare': 'average_fare'})
    
    return df, daily_aggregates

def process_all_files(data_folder):
    processed_data = []
    daily_aggregates = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        df = pd.read_parquet(file_path)
        
        if 'yellow' in file_name:
            df, daily_agg = clean_and_transform_yellow(df)
        elif 'green' in file_name:
            df, daily_agg = clean_and_transform_green(df)
        elif 'fhv' in file_name:
            df, daily_agg = clean_and_transform_fhv(df)
        elif 'hvfht' in file_name:
            df, daily_agg = clean_and_transform_hvfht(df)
        else:
            continue  # skip files that do not match any known type
        
        processed_data.append(df)
        daily_aggregates.append(daily_agg)
    
    return pd.concat(processed_data), pd.concat(daily_aggregates)

if __name__ == "__main__":
    data_folder = "/tmp/nyc_taxi_data_2019"
    processed_data, daily_aggregates = process_all_files(data_folder)
    processed_data.to_parquet("/dbfs/tmp/processed_data.parquet", index=False)
    daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)


[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/core/indexes/base.py:3621[0m, in [0;36mIndex.get_loc[0;34m(self, key, method, tolerance)[0m
[1;32m   3620[0m [38;5;28;01mtry[39;00m:
[0;32m-> 3621[0m     [38;5;28;01mreturn[39;00m [38;5;28;43mself[39;49m[38;5;241;43m.[39;49m[43m_engine[49m[38;5;241;43m.[39;49m[43mget_loc[49m[43m([49m[43mcasted_key[49m[43m)[49m
[1;32m   3622[0m [38;5;28;01mexcept[39;00m [38;5;167;01mKeyError[39;00m [38;5;28;01mas[39;00m err:

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/_libs/index.pyx:136[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/_libs/index.pyx:163[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File

In [0]:
import pandas as pd
import os

def clean_and_transform(file_path, taxi_type):
    df = pd.read_parquet(file_path)
    
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Schema-specific processing
    if taxi_type in ['yellow']:
        # Columns specific to yellow taxi
        df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
        df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
        df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    elif taxi_type in [ 'green']:
        # Columns specific to green taxi
        df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
        df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
        df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
    elif taxi_type in ['fhv', 'hvfht']:
        # Columns specific to FHV and HVFHT taxi
        df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])
        df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
        df['average_speed'] = df['trip_miles'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean' if taxi_type in ['yellow', 'green'] else 'base_passenger_fare'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare' if taxi_type in ['yellow', 'green'] else 'base_passenger_fare'})
    
    return df, daily_aggregates

def process_all_files(data_folder):
    processed_data = []
    daily_aggregates = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        taxi_type = file_name.split('_')[0]
        df, daily_agg = clean_and_transform(file_path, taxi_type)
        processed_data.append(df)
        daily_aggregates.append(daily_agg)
    
    return pd.concat(processed_data), pd.concat(daily_aggregates)

if __name__ == "__main__":
    data_folder = "/tmp/nyc_taxi_data_2019"
    processed_data, daily_aggregates = process_all_files(data_folder)
    processed_data.to_parquet("/dbfs/tmp/processed_data.parquet", index=False)
    daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)


[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/core/indexes/base.py:3621[0m, in [0;36mIndex.get_loc[0;34m(self, key, method, tolerance)[0m
[1;32m   3620[0m [38;5;28;01mtry[39;00m:
[0;32m-> 3621[0m     [38;5;28;01mreturn[39;00m [38;5;28;43mself[39;49m[38;5;241;43m.[39;49m[43m_engine[49m[38;5;241;43m.[39;49m[43mget_loc[49m[43m([49m[43mcasted_key[49m[43m)[49m
[1;32m   3622[0m [38;5;28;01mexcept[39;00m [38;5;167;01mKeyError[39;00m [38;5;28;01mas[39;00m err:

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/_libs/index.pyx:136[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/_libs/index.pyx:163[0m, in [0;36mpandas._libs.index.IndexEngine.get_loc[0;34m()[0m

File

In [0]:
# Remove trips with missing or corrupt data
# Assuming 'pickup_datetime', 'dropoff_datetime', 'trip_distance', and 'fare_amount' are essential columns
df = df.dropna(subset=['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount'])


In [0]:
# Convert datetime columns to datetime objects
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-783755515913070>:2[0m
[1;32m      1[0m [38;5;66;03m# Convert datetime columns to datetime objects[39;00m
[0;32m----> 2[0m df[[38;5;124m'[39m[38;5;124mtpep_pickup_datetime[39m[38;5;124m'[39m] [38;5;241m=[39m pd[38;5;241m.[39mto_datetime(df[[38;5;124m'[39m[38;5;124mtpep_pickup_datetime[39m[38;5;124m'[39m])
[1;32m      3[0m df[[38;5;124m'[39m[38;5;124mtpep_dropoff_datetime[39m[38;5;124m'[39m] [38;5;241m=[39m pd[38;5;241m.[39mto_datetime(df[[38;5;124m'[39m[38;5;124mtpep_dropoff_datetime[39m[38;5;124m'[39m])

File [0;32m/databricks/python/lib/python3.9/site-packages/pandas/core/tools/datetimes.py:1046[0m, in [0;36mto_datetime[0;34m(arg, errors, dayfirst, yearfirst, utc, format, exact, unit, infer_datetime_format, origin, cache)[0m
[1;32m   1044[0m     

In [0]:
# Derive new columns: trip duration in minutes and average speed in miles per hour
df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)  # converting duration to hours

In [0]:
# Aggregate data to calculate total trips and average fare per day
df['day'] = df['pickup_datetime'].dt.date
aggregated_data = df.groupby('day').agg(
    total_trips=('day', 'size'),
    average_fare=('fare_amount', 'mean')
).reset_index()

# Display the aggregated data
print(aggregated_data)


In [0]:
import os

file_path = '/tmp/nyc_taxi_data_2019'
file_size = os.path.getsize(file_path) / (1024 * 1024)  # Size in MB
print(f"File size: {file_size:.2f} MB")


In [0]:
pip install dask

In [0]:
'%pip --disable-pip-version-check install dask dataframe'

In [0]:
import dask.dataframe as dd

# Load the Parquet file using Dask
df = dd.read_parquet('/tmp/nyc_taxi_data_2019')

# Remove trips with missing or corrupt data
df = df.dropna(subset=['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'fare_amount'])

# Convert datetime columns to datetime objects
df['pickup_datetime'] = dd.to_datetime(df['pickup_datetime'])
df['dropoff_datetime'] = dd.to_datetime(df['dropoff_datetime'])

# Derive new columns: trip duration in minutes and average speed in miles per hour
df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)  # converting duration to hours

# Aggregate data to calculate total trips and average fare per day
df['day'] = df['pickup_datetime'].dt.date
aggregated_data = df.groupby('day').agg(
    total_trips=('day', 'size'),
    average_fare=('fare_amount', 'mean')
).compute().reset_index()

# Display the aggregated data
print(aggregated_data.head())


[0;31m---------------------------------------------------------------------------[0m
[0;31mImportError[0m                               Traceback (most recent call last)
File [0;32m/local_disk0/.ephemeral_nfs/envs/pythonEnv-26985d3b-357a-4fcb-bef6-2c0da7e59a90/lib/python3.9/site-packages/dask/dataframe/__init__.py:52[0m
[1;32m     51[0m [38;5;28;01mfrom[39;00m [38;5;21;01mdask[39;00m[38;5;21;01m.[39;00m[38;5;21;01mbase[39;00m [38;5;28;01mimport[39;00m compute
[0;32m---> 52[0m [38;5;28;01mfrom[39;00m [38;5;21;01mdask[39;00m[38;5;21;01m.[39;00m[38;5;21;01mdataframe[39;00m [38;5;28;01mimport[39;00m backends, dispatch, methods, rolling
[1;32m     53[0m [38;5;28;01mfrom[39;00m [38;5;21;01mdask[39;00m[38;5;21;01m.[39;00m[38;5;21;01mdataframe[39;00m[38;5;21;01m.[39;00m[38;5;21;01m_testing[39;00m [38;5;28;01mimport[39;00m test_dataframe

File [0;32m/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py:171[0m, in [0;3

In [0]:
import pandas as pd 

# load the Parquet file
df=pd.read_parquet('/tmp/nyc_taxi_data_2019')

[0;31m---------------------------------------------------------------------------[0m
[0;31mThe Python process exited with exit code 137 (SIGKILL: Killed). This may have been caused by an OOM error. Check your command's memory usage.[0m
[0;31m[0m
[0;31m[0m
[0;31m[0m
[0;31mThe last 10 KB of the process's stderr and stdout can be found below. See driver logs for full logs.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stderr:[0m
[0;31mThu Jul 11 07:05:53 2024 Connection to spark from PID  1083[0m
[0;31mThu Jul 11 07:05:53 2024 Initialized gateway on port 40959[0m
[0;31mThu Jul 11 07:05:54 2024 Connected to spark.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stdout:[0m
[0;31m       N|[0m
[0;31m           HV0005|              B02510|                NULL|2019-11-01 00:04:01|               NULL|2019-11-01 00:13:25|2019-11-01 00:24:09| 

In [0]:
# Remove trips with missing or corrupt data
# Assuming 'pickup_datetime', 'dropoff_datetime', 'trip_distance', and 'fare_amount' are essential columns
df = df.dropna(subset=['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'fare_amount'])


In [0]:
# Convert datetime columns to datetime objects
df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])

In [0]:
# Derive new columns: trip duration in minutes and average speed in miles per hour
df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)  # converting duration to hours

In [0]:
# Aggregate data to calculate total trips and average fare per day
df['day'] = df['pickup_datetime'].dt.date
aggregated_data = df.groupby('day').agg(
    total_trips=('day', 'size'),
    average_fare=('fare_amount', 'mean')
).reset_index()

# Display the aggregated data
print(aggregated_data)


In [0]:
import pandas as pd

# Load the Parquet file
df = pd.read_parquet('/tmp/nyc_taxi_data_2019')

# Remove trips with missing or corrupt data
# Assuming 'pickup_datetime', 'dropoff_datetime', 'trip_distance', and 'fare_amount' are essential columns
df = df.dropna(subset=['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'fare_amount'])

# Convert datetime columns to datetime objects
df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])

# Derive new columns: trip duration in minutes and average speed in miles per hour
df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)  # converting duration to hours

# Aggregate data to calculate total trips and average fare per day
df['day'] = df['pickup_datetime'].dt.date
aggregated_data = df.groupby('day').agg(
    total_trips=('day', 'size'),
    average_fare=('fare_amount', 'mean')
).reset_index()

# Display the aggregated data
print(aggregated_data)


[0;31m---------------------------------------------------------------------------[0m
[0;31mThe Python process exited with exit code 137 (SIGKILL: Killed). This may have been caused by an OOM error. Check your command's memory usage.[0m
[0;31m[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mThe last 10 KB of the process's stderr and stdout can be found below. See driver logs for full logs.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stderr:[0m
[0;31mWed Jul 10 06:29:22 2024 Connection to spark from PID  878[0m
[0;31mWed Jul 10 06:29:22 2024 Initialized gateway on port 46489[0m
[0;31mWed Jul 10 06:29:23 2024 Connected to spark.[0m
[0;31m---------------------------------------------------------------------------[0m
[0;31mLast messages on stdout:[0m
[0;31m       N|[0m
[0;31m           HV0005|              B02510|                null|2019-11-01 00:04:01

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
import os

def clean_and_transform(pdf):
    # Remove trips with missing or corrupt data
    pdf.dropna(inplace=True)
    
    # Derive new columns
    pdf['pickup_datetime'] = pd.to_datetime(pdf['tpep_pickup_datetime'])
    pdf['dropoff_datetime'] = pd.to_datetime(pdf['tpep_dropoff_datetime'])
    pdf['trip_duration'] = (pdf['dropoff_datetime'] - pdf['pickup_datetime']).dt.total_seconds() / 60
    pdf['average_speed'] = pdf['trip_distance'] / (pdf['trip_duration'] / 60)
    
    # Aggregate data
    pdf['date'] = pdf['pickup_datetime'].dt.date
    daily_aggregates = pdf.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return pdf, daily_aggregates

def process_all_files(data_folder):
    spark = SparkSession.builder.appName("NYC Taxi Data Processing").getOrCreate()
    
    processed_data = []
    daily_aggregates_list = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        
        # Load the Parquet file into a Spark DataFrame
        sdf = spark.read.parquet(file_path)
        
        # Convert Spark DataFrame to Pandas DataFrame
        pdf = sdf.toPandas()
        
        # Clean and transform the data
        cleaned_pdf, daily_agg = clean_and_transform(pdf)
        
        # Convert cleaned Pandas DataFrame back to Spark DataFrame
        cleaned_sdf = spark.createDataFrame(cleaned_pdf)
        
        # Collect daily aggregates
        daily_aggregates_list.append(daily_agg)
        
        # Append the cleaned Spark DataFrame to the list
        processed_data.append(cleaned_sdf)
    
    # Concatenate all cleaned Spark DataFrames
    if processed_data:
        final_sdf = processed_data[0]
        for sdf in processed_data[1:]:
            final_sdf = final_sdf.union(sdf)
    
    # Save the final cleaned data
    final_sdf.write.parquet("/dbfs/tmp/processed_data.parquet", mode="overwrite")
    
    # Concatenate all daily aggregates
    if daily_aggregates_list:
        final_daily_aggregates = pd.concat(daily_aggregates_list)
        final_daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)

if __name__ == "__main__":
    data_folder = "/tmp/nyc_taxi_data_2019"
    process_all_files(data_folder)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-712169100413798>:67[0m
[1;32m     65[0m [38;5;28;01mif[39;00m [38;5;18m__name__[39m [38;5;241m==[39m [38;5;124m"[39m[38;5;124m__main__[39m[38;5;124m"[39m:
[1;32m     66[0m     data_folder [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/tmp/nyc_taxi_data_2019[39m[38;5;124m"[39m
[0;32m---> 67[0m     process_all_files(data_folder)

File [0;32m<command-712169100413798>:34[0m, in [0;36mprocess_all_files[0;34m(data_folder)[0m
[1;32m     31[0m file_path [38;5;241m=[39m os[38;5;241m.[39mpath[38;5;241m.[39mjoin(data_folder, file_name)
[1;32m     33[0m [38;5;66;03m# Load the Parquet file into a Spark DataFrame[39;00m
[0;32m---> 34[0m sdf [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mparquet[49m[43m([49m[43mfil

In [0]:
import pandas as pd
import os

def clean_and_transform(file_path):
    df = pd.read_parquet(file_path)
    
    # Remove trips with missing or corrupt data
    df.dropna(inplace=True)
    
    # Derive new columns
    df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)
    
    # Aggregate data
    df['date'] = df['pickup_datetime'].dt.date
    daily_aggregates = df.groupby('date').agg({
        'trip_duration': 'count',
        'fare_amount': 'mean'
    }).rename(columns={'trip_duration': 'total_trips', 'fare_amount': 'average_fare'})
    
    return df, daily_aggregates

def process_all_files(data_folder):
    processed_data = []
    daily_aggregates = []
    
    for file_name in os.listdir(data_folder):
        file_path = os.path.join(data_folder, file_name)
        df, daily_agg = clean_and_transform(file_path)
        processed_data.append(df)
        daily_aggregates.append(daily_agg)
    
    return pd.concat(processed_data), pd.concat(daily_aggregates)

if __name__ == "__main__":
    data_folder = "/dbfs/tmp/2019"
    processed_data, daily_aggregates = process_all_files(data_folder)
    processed_data.to_parquet("/dbfs/tmp/processed_data.parquet", index=False)
    daily_aggregates.to_parquet("/dbfs/tmp/daily_aggregates.parquet", index=True)


In [0]:
# Read and display processed data
processed_data = pd.read_parquet("/dbfs/tmp/processed_data.parquet")
daily_aggregates = pd.read_parquet("/dbfs/tmp/daily_aggregates.parquet")

print(processed_data.head())
print(daily_aggregates.head())


In [0]:
CREATE TABLE trips (
    id INTEGER PRIMARY KEY,
    pickup_datetime TEXT,
    dropoff_datetime TEXT,
    trip_duration REAL,
    trip_distance REAL,
    fare_amount REAL,
    passenger_count INTEGER,
    average_speed REAL
);

CREATE TABLE daily_aggregates (
    date TEXT PRIMARY KEY,
    total_trips INTEGER,
    average_fare REAL
);


In [0]:
import sqlite3
import pandas as pd

def load_data_to_sqlite(processed_data_file, daily_aggregates_file, db_name):
    conn = sqlite3.connect(db_name)
    processed_data = pd.read_csv(processed_data_file)
    daily_aggregates = pd.read_csv(daily_aggregates_file)
    
    processed_data.to_sql('trips', conn, if_exists='replace', index=False)
    daily_aggregates.to_sql('daily_aggregates', conn, if_exists='replace', index=False)
    
    conn.close()

if __name__ == "__main__":
    load_data_to_sqlite("processed_data.csv", "daily_aggregates.csv", "ny_taxi.db")


In [0]:
-- Peak hours for taxi usage
SELECT strftime('%H', pickup_datetime) as hour, COUNT(*) as total_trips
FROM trips
GROUP BY hour
ORDER BY total_trips DESC
LIMIT 10;

-- Passenger count effect on trip fare
SELECT passenger_count, AVG(fare_amount) as average_fare
FROM trips
GROUP BY passenger_count;

-- Trends in usage over the year
SELECT date, total_trips, average_fare
FROM daily_aggregates
ORDER BY date;


In [0]:
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def create_visualizations(db_name):
    conn = sqlite3.connect(db_name)
    
    # Peak hours visualization
    query = "SELECT strftime('%H', pickup_datetime) as hour, COUNT(*) as total_trips FROM trips GROUP BY hour ORDER BY total_trips DESC LIMIT 10;"
    peak_hours = pd.read_sql(query, conn)
    sns.barplot(x='hour', y='total_trips', data=peak_hours)
    plt.title("Peak Hours for Taxi Usage")
    plt.show()
    
    # Passenger count effect on fare
    query = "SELECT passenger_count, AVG(fare_amount) as average_fare FROM trips GROUP BY passenger_count;"
    passenger_fares = pd.read_sql(query, conn)
    sns.barplot(x='passenger_count', y='average_fare', data=passenger_fares)
    plt.title("Effect of Passenger Count on Fare")
    plt.show()
    
    # Trends over the year
    query = "SELECT date, total_trips, average_fare FROM daily_aggregates ORDER BY date;"
    trends = pd.read_sql(query, conn)
    trends['date'] = pd.to_datetime(trends['date'])
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()
    ax1.plot(trends['date'], trends['total_trips'], 'g-')
    ax2.plot(trends['date'], trends['average_fare'], 'b-')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Total Trips', color='g')
    ax2.set_ylabel('Average Fare', color='b')
    plt.title("Trends in Taxi Usage Over the Year")
    plt.show()
    
    conn.close()

if __name__ == "__main__":
    create_visualizations("ny_taxi.db")


# New York Taxi Data Processing

## Project Overview
This project processes New York Taxi Trip data for the year 2019 to derive analytical insights and load the processed data into a SQLite database for further analysis.

## Environment Setup
- Python 3.7+
- Pandas
- Requests
- TQDM
- SQLite3
- Matplotlib
- Seaborn

## Running the Project
1. Clone the repository.
2. Run the data extraction script:
    ```bash
    python data_extraction.py
    ```
3. Run the data processing script:
    ```bash
    python data_processing.py
    ```
4. Load the processed data into SQLite:
    ```bash
    python data_loading.py
    ```
5. Generate visualizations:
    ```bash
    python data_analysis.py
    ```

## Data Analysis
- Peak hours for taxi usage.
- Effect of passenger count on trip fare.
- Trends in taxi usage over the year.
