### Python Script Description

This Python script performs data processing and visualization using PySpark and Matplotlib. Below is a step-by-step breakdown of the script's functionality:

1. **Initialize a Spark Session**:
    - The script begins by initializing a Spark session using the `SparkSession.builder`. It configures various Spark settings:
      - `spark.driver.host` is set to `localhost` to bind the driver to the local machine.
      - `spark.executor.extraJavaOptions` is configured to optimize garbage collection with `G1GC` and to adjust the heap occupancy threshold.
      - Memory allocations for the Spark driver and executors are set to 16GB and 8GB, respectively.
      - `spark.driver.maxResultSize` is set to 4GB to limit the size of data collected to the driver.
      - `spark.network.timeout` is extended to 1200 seconds to prevent network timeout errors during long-running tasks.
      - The number of shuffle partitions is set to 200, which helps in distributing large datasets across multiple partitions.

2. **Read a Parquet File**:
    - The script reads data from a Parquet file located at `datasets\yellow_tripdata\yellow_tripdata_2023-06.parquet`.
    - The DataFrame is repartitioned into 10 partitions and persisted in memory to optimize further operations.

3. **Display the Schema**:
    - The schema of the loaded DataFrame is printed to the console to provide an overview of the data structure, including column names and data types.

4. **Count the Total Number of Records**:
    - The script counts the total number of records (rows) in the DataFrame and prints this value to the console.

5. **Convert to Pandas DataFrame**:
    - The Spark DataFrame is converted to a Pandas DataFrame to facilitate plotting with Matplotlib. This step is necessary because Matplotlib operates on Pandas DataFrames.

6. **Plot 1: Trip Distance vs. Total Amount (All Data)**:
    - A scatter plot is created to visualize the relationship between `trip_distance` and `total_amount` for all the data points.
    - The plot is saved as `trip_distance_vs_total_amount_all_data.png` to the local file system.

7. **Calculate Mean and Standard Deviation**:
    - The script calculates the mean and standard deviation for the `trip_distance` and `total_amount` columns.

8. **Filter Data Based on Standard Deviations**:
    - A filtered Pandas DataFrame, `normalized_pdf`, is created by including only the data points where both `trip_distance` and `total_amount` are within 2 standard deviations of their respective means.

9. **Plot 2: Trip Distance vs. Total Amount (Normalized Data)**:
    - Another scatter plot is generated to visualize the relationship between `trip_distance` and `total_amount`, but this time only for the normalized data.
    - This plot is saved as `trip_distance_vs_total_amount_normalized_data.png`.

10. **Stop the Spark Session**:
    - Finally, the script stops the Spark session to release resources.

This script is useful for analyzing and visualizing large datasets by leveraging the distributed computing capabilities of PySpark and the flexible plotting functions of Matplotlib.


In [1]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from IPython.display import display

# Initialize a Spark session
# local[*] = run local with as many threads as cores available
spark = SparkSession.builder \
    .config("spark.driver.host", "localhost") \
    .appName("Read Parquet File") \
    .master("local[*]") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.network.timeout", "1200s") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read the Parquet file
# parquet_file = "./datasets/combined_datasets/yellow_tripdata_2023.parquet" # too large apparently
parquet_file = "./datasets/filtered_fields_datasets/yellow_tripdata_2023_06_distance_amount.parquet"
df = spark.read.parquet(parquet_file).repartition(10).persist()

# Show the schema of the DataFrame
df.printSchema()

# Get the total number of records
total_records = df.count()

print(f"Total number of records: {total_records}")

# Convert to Pandas DataFrame for plotting
pdf = df.toPandas()

# Plot 1: All data
plt.figure(figsize=(10, 6))
plt.scatter(pdf['trip_distance'], pdf['total_amount'], alpha=0.5, color='blue')
plt.title('Trip Distance vs Total Amount (All Data)')
plt.xlabel('Trip Distance (miles)')
plt.ylabel('Total Amount ($)')
plt.grid(True)
plt.savefig('trip_distance_vs_total_amount_all_data.png')  # Save the plot as a PNG file
plt.close()  # Close the plot to free memory

# Calculate mean and standard deviation
mean_trip_distance = pdf['trip_distance'].mean()
std_trip_distance = pdf['trip_distance'].std()

mean_total_amount = pdf['total_amount'].mean()
std_total_amount = pdf['total_amount'].std()

# Filter data to include only values within 2 standard deviations from the mean
normalized_pdf = pdf[
    (pdf['trip_distance'] > mean_trip_distance - 2 * std_trip_distance) & 
    (pdf['trip_distance'] < mean_trip_distance + 2 * std_trip_distance) &
    (pdf['total_amount'] > mean_total_amount - 2 * std_total_amount) &
    (pdf['total_amount'] < mean_total_amount + 2 * std_total_amount)
]

# Plot 2: Normalized data
plt.figure(figsize=(10, 6))
plt.scatter(normalized_pdf['trip_distance'], normalized_pdf['total_amount'], alpha=0.5, color='green')
plt.title('Trip Distance vs Total Amount (Normalized Data)')
plt.xlabel('Trip Distance (miles)')
plt.ylabel('Total Amount ($)')
plt.grid(True)
plt.savefig('trip_distance_vs_total_amount_normalized_data.png')  # Save the plot as a PNG file
plt.close()  # Close the plot to free memory

spark.stop()

root
 |-- trip_distance: double (nullable = true)
 |-- total_amount: double (nullable = true)

Total number of records: 3307234
