# Data Preprocessing and Standardization Pipeline

This notebook is dedicated to the preprocessing and standardization of taxi trip data stored in Parquet format. The steps performed in this notebook include:

1. **Initialization**: Setting up a Spark session for efficient data processing and configuring session parameters.
2. **Loading and Inspecting Data**: Reading multiple Parquet files from a specified directory and inspecting their schemas to understand the structure and types of the data columns.
3. **Schema Standardization**: Defining and applying a function to standardize the schema across all datasets, ensuring consistent data types and column names for seamless merging.
4. **Combining DataFrames**: Merging all standardized datasets into a single DataFrame, which is essential for subsequent data analysis or machine learning tasks.
5. **Saving the Processed Data**: Writing the combined DataFrame back to disk in Parquet format, making it available for further analysis in subsequent stages of the project.
6. **Session Cleanup**: Stopping the Spark session to release resources and conclude the data preprocessing pipeline.

This notebook ensures that all datasets are in a consistent format and structure, paving the way for accurate and efficient data analysis.

## Importing Necessary Libraries
- `os`: Used for interacting with the operating system to handle file paths.
- `pyspark.sql.SparkSession`: The entry point to programming Spark with the Dataset and DataFrame API.
- `pyspark.sql.functions.col`: A function to access columns within DataFrames.

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

## Initializing Spark Session
Initialize a Spark session with specific configurations:
- **appName**: The name of our Spark application, useful for identifying the application in logs and UIs.
- **spark.sql.repl.eagerEval.enabled**: Enables eager evaluation for better performance in interactive environments.
- **spark.sql.parquet.cacheMetadata**: Caches Parquet metadata for faster access.
- **spark.sql.session.timeZone**: Sets the session's time zone to UTC.

In [3]:
spark = (
    SparkSession.builder.appName("MAST30034 Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

24/08/23 17:01:54 WARN Utils: Your hostname, apples-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.13.11.182 instead (on interface en0)
24/08/23 17:01:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/23 17:01:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Reading and Displaying Schema of Parquet Files
The following code reads all `.parquet` files in the specified directory, prints the schema for each file, and displays the path of the file being processed.

- **data_path**: The directory path where the Parquet files are stored.
- **parquet_files**: A list of paths to all Parquet files in the `data_path` directory.
- **df.printSchema()**: Displays the schema (i.e., structure) of each DataFrame loaded from a Parquet file, helping us understand the data types and structure of our datasets.

The output below shows the schema of the Parquet files, providing an understanding of the structure and types of the data columns.

In [4]:
data_path = '/Users/jennymai/Desktop/data_sci/mast_project1/data/landing'

parquet_files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')]

for file in parquet_files:
    print(f"Schema for file: {file}")
    df = spark.read.parquet(file)
    df.printSchema()
    print("-" * 80)

Schema for file: /Users/jennymai/Desktop/data_sci/mast_project1/data/landing/yellow_2022_12.parquet


                                                                                

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

--------------------------------------------------------------------------------
Schema for file: /Users/jennymai/Desktop/data_sci

## Standardizing Schema Across Parquet Files
This section defines a function to standardize the schema of the Parquet files. We will apply this function to each DataFrame and then combine all the DataFrames into a single DataFrame with a consistent schema.

- **standardize_schema function**: This function standardizes the schema by ensuring that specific columns have consistent data types across all DataFrames. This is important for when we later combine these DataFrames.
- **Combining DataFrames**: We iterate over each Parquet file, apply the standardization, and then combine the resulting DataFrames into a single DataFrame (`sdf`) using `unionByName`, which ensures that the columns are aligned by name.
- **Printing Final Schema**: Finally, we print the schema of the combined DataFrame to verify that all columns have consistent data types and names.

In [5]:
def standardize_schema(df):
    """
    Standardizes the schema of a DataFrame by:
    - Casting specific columns to consistent data types.
    - Renaming the 'Airport_fee' column to 'airport_fee' for consistency.
    
    Parameters:
    df (DataFrame): The input DataFrame to be standardized.

    Returns:
    DataFrame: A new DataFrame with standardized schema.
    """
    return (df
            .withColumn("VendorID", col("VendorID").cast("int"))  # Casting 'VendorID' to integer
            .withColumn("passenger_count", col("passenger_count").cast("double"))  # Casting 'passenger_count' to double
            .withColumn("RatecodeID", col("RatecodeID").cast("double"))  # Casting 'RatecodeID' to double
            .withColumn("PULocationID", col("PULocationID").cast("int"))  # Casting 'PULocationID' to integer
            .withColumn("DOLocationID", col("DOLocationID").cast("int"))  # Casting 'DOLocationID' to integer
            .withColumnRenamed("Airport_fee", "airport_fee")  # Renaming 'Airport_fee' to 'airport_fee'
           )

# Recreate the list of parquet files from the data path
parquet_files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')]

sdf = None  # Initialize an empty DataFrame to accumulate the standardized DataFrames

for file in parquet_files:
    df = spark.read.parquet(file)  # Read each parquet file into a DataFrame
    standardized_df = standardize_schema(df)  # Apply schema standardization
    
    if sdf is None:
        sdf = standardized_df  # Set the first DataFrame as the initial value of sdf
    else:
        sdf = sdf.unionByName(standardized_df)  # Union the standardized DataFrame with the accumulating sdf

sdf.printSchema()  # Print the schema of the final combined DataFrame

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



## Writing the Final DataFrame to Parquet and Stopping Spark Session
In this final step, we write the combined and standardized DataFrame to a new Parquet file, and then we stop the Spark session to release resources.

- **Writing the DataFrame**: The `sdf.write.parquet()` function writes the combined DataFrame (`sdf`) to the specified directory in Parquet format. The `mode='overwrite'` parameter ensures that any existing files in the target directory are replaced.
- **Stopping the Spark Session**: The `spark.stop()` command stops the Spark session, which is important to free up resources and clean up the environment after the data processing is complete.

In [6]:
# Write the combined standardized DataFrame to a new Parquet file
sdf.write.parquet('/Users/jennymai/Desktop/data_sci/mast_project1/data/raw', mode='overwrite')

                                                                                

In [7]:
# Stop the Spark session to release resources
spark.stop()