In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=47fd3ffd87cab55190f871cafa97f3e423ad6b488964b8ae29c6f4b8994d83dc
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession, functions as F

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TaxiDataProcessing") \
    .getOrCreate()

def clean_and_transform_spark(df, taxi_type):
    if taxi_type == "yellow":
        required_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('tpep_pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('tpep_dropoff_datetime'))
    elif taxi_type == "green":
        required_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('lpep_pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('lpep_dropoff_datetime'))
    '''elif taxi_type == "hvfht":
        required_columns = ['pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare']
        df = df.dropna(subset=required_columns)
        df = df.withColumn('pickup_datetime', F.to_timestamp('pickup_datetime')) \
               .withColumn('dropoff_datetime', F.to_timestamp('dropoff_datetime'))'''

    df = df.withColumn('trip_duration',
                       (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 60) \
           .withColumn('average_speed', F.col('trip_distance') / (F.col('trip_duration') / 60))

    return df

# Define paths
use_google_drive = True  # Set to False if using local upload
if use_google_drive:
    base_path = '/content/drive/My Drive/nyc_taxi_data_2019'# Update with your Google Drive path
else:
    base_path = '/content/'  # Default path for local uploads

parquet_files = [
    'file1.parquet',
    'file2.parquet'
]  # Update with your actual parquet file names

# Read, clean, and display data from each parquet file
for local_file in parquet_files:
    file_path = os.path.join(base_path, local_file)
    taxi_type = ""
    if "yellow" in file_path:
        taxi_type = "yellow"
    elif "green" in file_path:
        taxi_type = "green"
    '''elif "hvfht" in file_path:
        taxi_type = "hvfht"'''

    if taxi_type:
        try:
            df = spark.read.parquet(file_path)
            df = clean_and_transform_spark(df, taxi_type)
            df.show(5)
        except Exception as e:
            print(f"Error processing file {file_path}: {e}")

print("Data loaded, cleaned, and displayed.")


Data loaded, cleaned, and displayed.


In [None]:
import os
import pandas as pd
from datetime import datetime

# Define local directory
local_dir = "/content/drive/My Drive/nyc_taxi_data_2019"

# Function to process data
def process_data(file_path, pickup_col, dropoff_col, distance_col, fare_col):
    # Read parquet file into DataFrame
    df = pd.read_parquet(file_path)

    # Select necessary columns and drop missing values
    df = df[[pickup_col, dropoff_col, distance_col, fare_col]].dropna()

    # Rename columns
    df.columns = ['pickup_datetime', 'dropoff_datetime', 'trip_distance', 'fare_amount']

    # Convert datetime columns
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'])

    # Derive new columns: trip duration (minutes) and average speed (mph)
    df['trip_duration'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.total_seconds() / 60
    df['average_speed'] = df['trip_distance'] / (df['trip_duration'] / 60)

    # Remove invalid data
    df = df[(df['trip_duration'] > 0) & (df['average_speed'].notnull())]

    # Aggregate data: total trips and average fare per day
    df['pickup_date'] = df['pickup_datetime'].dt.date
    agg_df = df.groupby('pickup_date').agg(
        total_trips=('trip_distance', 'count'),
        average_fare=('fare_amount', 'mean')
    ).reset_index()

    return df, agg_df

# Process each file and display results
for month in ['01']:
    for taxi_type, pickup_col, dropoff_col, distance_col, fare_col in [
        ('yellow', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount'),
        ('green', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_distance', 'fare_amount'),
        # ('fhvhv', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'base_passenger_fare')  # Removed extra values if any
    ]:
        # Get file path
        file_path = os.path.join(local_dir, f'{taxi_type}_tripdata_2019-{month}.parquet')

        # Process data
        try:
            df, agg_df = process_data(file_path, pickup_col, dropoff_col, distance_col, fare_col)

            # Show DataFrame description
            print(f"Description of {taxi_type} data for 2019-{month}:")
            print(df.describe())

            # Show aggregated results
            print(agg_df.head())
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

print("Data processing completed.")

Description of yellow data for 2019-01:
                  pickup_datetime            dropoff_datetime  trip_distance  \
count                     7690060                     7690060   7.690060e+06   
mean   2019-01-17 00:59:01.944673  2019-01-17 01:15:36.753087   2.832390e+00   
min           2001-02-02 14:55:07         2001-02-02 15:07:27   0.000000e+00   
25%    2019-01-09 17:39:55.750000         2019-01-09 17:56:01   9.000000e-01   
50%    2019-01-16 22:15:47.500000  2019-01-16 22:30:22.500000   1.540000e+00   
75%           2019-01-24 19:12:24         2019-01-24 19:27:59   2.840000e+00   
max           2088-01-24 00:25:39         2088-01-24 07:28:25   8.318000e+02   
std                           NaN                         NaN   3.775024e+00   

        fare_amount  trip_duration  average_speed  
count  7.690060e+06   7.690060e+06   7.690060e+06  
mean   1.243029e+01   1.658014e+01   1.318942e+01  
min   -3.620000e+02   1.666667e-02   0.000000e+00  
25%    6.000000e+00   6.116667e