In [1]:
# Standard library imports
import os
from typing import List, Union, Optional

# PySpark SQL imports for data types and structures
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# PySpark SQL imports for functions
from pyspark.sql.functions import col

### Do some data manipulations on PySpark 

In [2]:
# Step 1: Import the necessary library
from pyspark.sql import SparkSession

In [3]:
# Step 2: Create a SparkSession
# The SparkSession is the entry point to any Spark functionality.
# .appName() gives your application a name in the Spark UI.
# .getOrCreate() gets an existing session or creates a new one if none exists.
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .appName("ReadSimpleParquet") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

print(f"SparkSession created successfully. Spark version: {spark.version}")

SparkSession created successfully. Spark version: 3.5.4


In [4]:
# Get all configurations
all_configs = spark.sparkContext.getConf().getAll()

# Print them out nicely
print("--- All Spark Configurations ---")
for conf in sorted(all_configs):
    print(f"{conf[0]}: {conf[1]}")

--- All Spark Configurations ---
spark.app.id: local-1750692345921
spark.app.name: ReadSimpleParquet
spark.app.startTime: 1750692344286
spark.app.submitTime: 1750692344038
spark.driver.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.

### Read df 

In [3]:
parquet_path = "Combined_Flights_2019.parquet"

# Step 4: Read the Parquet file into a Spark DataFrame
print(f"\nAttempting to read Parquet file from: {parquet_path}")

try:
    # spark.read.parquet() automatically infers the schema from the file.
    # The result is a DataFrame, a distributed table-like data structure.
    df = spark.read.parquet(parquet_path)

    # Step 5: Verify the data has been loaded
    print("\nFile read successfully! Here is a summary of the DataFrame:")

    # 5a. Print the schema (column names and data types)
    print("\n--- DataFrame Schema ---")
    df.printSchema()

    # 5b. Show the first 10 rows of the data
    print("\n--- DataFrame Content (Top 10 rows) ---")
    df.show(10)

    # 5c. Show the total number of records
    record_count = df.count()
    print(f"\nTotal number of records found: {record_count}")

except Exception as e:
    print(f"\nERROR: Could not read the file. Please check the path and file integrity.")
    print(f"Details: {e}")

finally:
    # Step 6: Stop the SparkSession to release cluster resources
    print("\nStopping the SparkSession.")
    spark.stop()


Attempting to read Parquet file from: Combined_Flights_2019.parquet

ERROR: Could not read the file. Please check the path and file integrity.
Details: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

Stopping the SparkSession.


In [6]:
parquet_path = "Combined_Flights_2019.parquet"

In [8]:
df = spark.read.parquet(parquet_path)

In [9]:
df.show(10)

+-------------------+---------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+---------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+-----------------+
|         FlightDate|  Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|

In [15]:
df_single_partition = df_converted.coalesce(1)

In [16]:
df_single_partition.write.mode("overwrite").parquet("flights_2019_fixed.parquet")

In [11]:
df_converted = df.withColumn(
    "DivAirportLandings",
    col("DivAirportLandings").cast(DoubleType())
)

In [16]:
df.printSchema()

root
 |-- FlightDate: timestamp_ntz (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID

In [5]:

def read_parquet_folder(
    spark: SparkSession,
    data_path: Union[str, List[str]],
    golden_schema_path: Optional[str] = None
) -> DataFrame:
    """
    Reads all Parquet files from a folder, enforcing a single, consistent schema.

    This function operates in two modes:
    1.  Default (Auto-Detect Schema Mode): If `golden_schema_path` is not provided,
        the function automatically discovers the first Parquet file within `data_path`
        and uses its schema as the "golden" reference for reading all other files.
        Warning: This behavior can be non-deterministic if files in the folder have
        different schemas, as the "first" file discovered may vary.

    2.  Explicit (Enforced Schema Mode): If `golden_schema_path` is provided,
        the function will use that specific file's schema, providing full control
        and deterministic behavior. This is the recommended approach for production.

    Args:
        spark (SparkSession): The active SparkSession.
        data_path (Union[str, List[str]]): The path to the directory containing
                                           all Parquet files, or a list of paths.
        golden_schema_path (Optional[str], optional): Path to a specific reference
                                           Parquet file to enforce its schema.
                                           If None, a file will be auto-detected.

    Returns:
        DataFrame: A single Spark DataFrame with a consistently applied schema.
                   
    Raises:
        ValueError: If `data_path` contains no Parquet files and auto-detection is used.
        Exception: If any of the Spark read operations fail.
    """
    print("--- Reading Parquet folder with enforced schema ---")
    schema_source_path: str
    
    try:
        if golden_schema_path:
            #print(f"Mode: Using explicit golden file provided by user: {golden_schema_path}")
            schema_source_path = golden_schema_path
        else:
            #print(f"Mode: Auto-detecting schema from the first file in: {data_path}")
            # This is a lazy operation, it doesn't read the whole file yet.
            temp_df_for_discovery = spark.read.parquet(data_path)
            input_files = temp_df_for_discovery.inputFiles()
            if not input_files:
                raise ValueError(f"Could not find any Parquet files in the path: {data_path}")
            schema_source_path = input_files[0]
            #print(f"   -> Automatically selected '{os.path.basename(schema_source_path)}' as the schema source.")

        #print(f"\nExtracting schema from: {os.path.basename(schema_source_path)}")
        # Read the source file into a temporary DataFrame to access its schema AND methods
        df_with_golden_schema = spark.read.parquet(schema_source_path)
        target_schema = df_with_golden_schema.schema
        
        #print("\nFinal schema to be applied:")
        # --- THIS IS THE CORRECTED LOGIC ---
        # Call .printSchema() on the DataFrame, not the schema object.
        #df_with_golden_schema.printSchema()
        # --- END OF CORRECTION ---

        #print(f"\nReading all data from '{data_path}' using this schema...")
        combined_df = spark.read.schema(target_schema).parquet(data_path)
        #print("Data read successfully.")

        return combined_df

    except Exception as e:
        print(f"\nERROR: Failed during the read process. Details: {e}")
        raise

In [21]:
df_explicit = read_parquet_folder(
        spark=spark,
        data_path='data/',
        golden_schema_path='data/Combined_Flights_2018.parquet' # We force it to use the `long` schema
    )

--- Reading Parquet folder with enforced schema ---


In [22]:
df_explicit.count()

18803593

In [23]:
df_explicit.show(10)

+-------------------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+-----------------+
|         FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|D

In [24]:
df_explicit.printSchema()

root
 |-- FlightDate: timestamp_ntz (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: long (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: long (nullable = true)
 |-- Quarter: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- DayofMonth: long (nullable = true)
 |-- DayOfWeek: long (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID

### Group by

In [25]:
from pyspark.sql.functions import sum,avg,count,min,max

# Correct way using pyspark.sql.functions
df_explicit.groupBy("Airline").agg(
    sum("DepDelayMinutes").alias("TotalDepDelayMinutes"),
    avg("Airtime").alias("AverageAirtime"),
    count("FlightDate").alias("TotalFlights"),
    min("AirTime").alias("MinAirTime"),
    max("AirTime").alias("MaxAirTime")
).show()

+--------------------+--------------------+------------------+------------+----------+----------+
|             Airline|TotalDepDelayMinutes|    AverageAirtime|TotalFlights|MinAirTime|MaxAirTime|
+--------------------+--------------------+------------------+------------+----------+----------+
|GoJet Airlines, L...|           2987864.0| 75.36526416623683|      181550|      13.0|     236.0|
|   Endeavor Air Inc.|           6430948.0| 70.02952781269916|      588733|      14.0|     256.0|
|       Allegiant Air|           4247661.0|117.29545510696046|      300015|      17.0|     315.0|
|SkyWest Airlines ...|         2.6038771E7| 76.71127614515942|     1965533|       4.0|     327.0|
|      Virgin America|            187977.0|191.06103888532618|       17670|      37.0|     432.0|
|         Horizon Air|           1969357.0| 71.46498950500296|      303205|       9.0|     295.0|
|United Air Lines ...|         2.0599582E7|155.60338560099026|     1555692|      13.0|     684.0|
|Air Wisconsin Air..

### Data Filters

In [26]:
df_explicit.show(10)

+-------------------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+-----------------+
|         FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|D

In [27]:
df_2=df_explicit.filter(
    (col("Origin") == "ABY") & (col("AirTime") > 30)
)

In [28]:
df_2.count()

2227

In [29]:
df_2.show()

+-------------------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+-----------------+
|         FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|D

In [31]:
df_2.describe().show()

+-------+--------------------+------+----+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+--------+------------------+------------------+------------------+------------------+------------------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+-------------------+--------------------+----------+------------------+------------------+------------------+-----------------+------------------+------------------+-------------------+-----

### Joins 

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast

# Employee data
emp_data = [
    (1, "Alice", 10),
    (2, "Bob", 20),
    (3, "Charlie", 10),
    (4, "David", None), # Employee with no department
    (5, "Eve", 20)
]
emp_columns = ["emp_id", "emp_name", "dept_id"]
employees_df = spark.createDataFrame(data=emp_data, schema=emp_columns)

# Department data
dept_data = [
    (10, "Engineering"),
    (20, "Marketing"),
    (30, "HR") # Department with no employees
]
dept_columns = ["dept_id", "dept_name"]
depts_df = spark.createDataFrame(data=dept_data, schema=dept_columns)

print("--- Employees DataFrame ---")
employees_df.show()
print("--- Departments DataFrame ---")
depts_df.show()

--- Employees DataFrame ---
+------+--------+-------+
|emp_id|emp_name|dept_id|
+------+--------+-------+
|     1|   Alice|     10|
|     2|     Bob|     20|
|     3| Charlie|     10|
|     4|   David|   NULL|
|     5|     Eve|     20|
+------+--------+-------+

--- Departments DataFrame ---
+-------+-----------+
|dept_id|  dept_name|
+-------+-----------+
|     10|Engineering|
|     20|  Marketing|
|     30|         HR|
+-------+-----------+



In [6]:
# The join condition is on the 'dept_id' column
inner_join_df = employees_df.join(depts_df, "dept_id", "inner")

print("--- Inner Join ---")
inner_join_df.show()

--- Inner Join ---
+-------+------+--------+-----------+
|dept_id|emp_id|emp_name|  dept_name|
+-------+------+--------+-----------+
|     10|     1|   Alice|Engineering|
|     10|     3| Charlie|Engineering|
|     20|     2|     Bob|  Marketing|
|     20|     5|     Eve|  Marketing|
+-------+------+--------+-----------+



In [7]:
# The join condition is on the 'dept_id' column
left_join_df = employees_df.join(depts_df, "dept_id", "left")

print("--- Left Join ---")
left_join_df.show()

--- Left Join ---
+-------+------+--------+-----------+
|dept_id|emp_id|emp_name|  dept_name|
+-------+------+--------+-----------+
|     10|     1|   Alice|Engineering|
|     20|     2|     Bob|  Marketing|
|     10|     3| Charlie|Engineering|
|   NULL|     4|   David|       NULL|
|     20|     5|     Eve|  Marketing|
+-------+------+--------+-----------+



### Complex joins with multiple conditions

In [13]:
join_conditions_list = [
    # Condition 1: Product IDs must match
    (employees_df.dept_id == depts_df.dept_id)&

    # Condition 2: Sale timestamp must be on or after the start time
    (depts_df.dept_name=="Marketing")

]

In [14]:
sales_with_promo_df = employees_df.join(
    depts_df,
    on=join_conditions_list,
    how="left"
)

In [15]:
sales_with_promo_df.show()

+------+--------+-------+-------+---------+
|emp_id|emp_name|dept_id|dept_id|dept_name|
+------+--------+-------+-------+---------+
|     1|   Alice|     10|   NULL|     NULL|
|     2|     Bob|     20|     20|Marketing|
|     3| Charlie|     10|   NULL|     NULL|
|     4|   David|   NULL|   NULL|     NULL|
|     5|     Eve|     20|     20|Marketing|
+------+--------+-------+-------+---------+



In [16]:
spark.stop()