Session setup

In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F # Import functions with an alias for clarity
import warnings


# Suppress specific warnings if needed (optional)
# warnings.filterwarnings("ignore", category=FutureWarning, module="pyspark.sql.pandas.conversion")

# Create or get a SparkSession
spark = SparkSession.builder \
    .appName("SparkML Lab - Enhanced") \
    .config("spark.sql.shuffle.partitions", 4) \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers (improves Spark -> Pandas conversion performance)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Optional: Enable eager evaluation for better interactive display in notebooks
# spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

sqlContext = spark # Use spark variable directly, sqlContext is legacy

print(f"Spark Session Initialized. Spark version: {spark.version}")

Spark Session Initialized. Spark version: 3.5.3


LoadING Data from the 2009.csv file,  with all fields as strings (by keeping the schema inference off)

In [2]:
# Define the path to the data
# Make sure the 'lab_data/adult.data' file is accessible in your Spark environment
data_path = "./data/2009.csv"

# Loading data initially as strings to inspect headers
df_string = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .load(data_path)

# Inspect column names - they might have leading/trailing spaces or hyphens
print("Original Column Names:")
print(df_string.columns)


Original Column Names:
['FL_DATE', 'OP_CARRIER', 'OP_CARRIER_FL_NUM', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'Unnamed: 27']


DATA CLEANING

In [3]:

# Clean column names: remove spaces, replace hyphens with underscores
clean_columns = [col.strip().replace('-', '_') for col in df_string.columns]

# Apply new column names
df_renamed = df_string.toDF(*clean_columns)

print("Cleaned Column Names:")
print(df_renamed.columns)


Cleaned Column Names:
['FL_DATE', 'OP_CARRIER', 'OP_CARRIER_FL_NUM', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'Unnamed: 27']


Now Inferring schema and Inspecting Data types

In [4]:
from pyspark.sql.types import *

flight_schema = StructType([
    StructField("FL_DATE", DateType(), True),
    StructField("OP_CARRIER", StringType(), True),
    StructField("OP_CARRIER_FL_NUM", StringType(), True),
    StructField("ORIGIN", StringType(), True),
    StructField("DEST", StringType(), True),
    StructField("CRS_DEP_TIME", StringType(), True),
    StructField("DEP_TIME", DoubleType(), True),
    StructField("DEP_DELAY", DoubleType(), True),
    StructField("TAXI_OUT", DoubleType(), True),
    StructField("WHEELS_OFF", DoubleType(), True),
    StructField("WHEELS_ON", DoubleType(), True),
    StructField("TAXI_IN", DoubleType(), True),
    StructField("CRS_ARR_TIME", StringType(), True),
    StructField("ARR_TIME", DoubleType(), True),
    StructField("ARR_DELAY", DoubleType(), True),
    StructField("CANCELLED", DoubleType(), True),
    StructField("CANCELLATION_CODE", StringType(), True),
    StructField("DIVERTED", DoubleType(), True),
    StructField("CRS_ELAPSED_TIME", DoubleType(), True),
    StructField("ACTUAL_ELAPSED_TIME", DoubleType(), True),
    StructField("AIR_TIME", DoubleType(), True),
    StructField("DISTANCE", DoubleType(), True),
    StructField("CARRIER_DELAY", DoubleType(), True),
    StructField("WEATHER_DELAY", DoubleType(), True),
    StructField("NAS_DELAY", DoubleType(), True),
    StructField("SECURITY_DELAY", DoubleType(), True),
    StructField("LATE_AIRCRAFT_DELAY", DoubleType(), True),
    StructField("Unnamed: 27", StringType(), True),
])

# Load data with the explicit schema and cleaned names
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("ignoreLeadingWhiteSpace", "true") \
    .option("ignoreTrailingWhiteSpace", "true") \
    .schema(flight_schema) \
    .load(data_path)

# probably not needed, using it anyway
df = df.toDF(*clean_columns)        

# Print the schema to verify data types
print("DataFrame Schema:")
df.printSchema()


# Cache the DataFrame for faster access in subsequent operations
df.cache()

print(f"\nNumber of records: {df.count()}")

DataFrame Schema:
root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEAT

Saving as Partitioned Parquet

In [7]:
# # Reload with schema now that columns are cleaned
# df_2009 = spark.read.format("csv") \
#     .option("header", "true") \
#     .schema(flight_schema) \
#     .option("ignoreLeadingWhiteSpace", "true") \
#     .option("ignoreTrailingWhiteSpace", "true") \
#     .load(data_path)

# # Clean column names again for consistency
# clean_columns = [col.strip().replace('-', '_') for col in df_2009.columns]
# df_2009 = df_2009.toDF(*clean_columns)

# Optional: drop completely empty rows or unneeded trailing column
# df = df.drop("Unnamed: 27")

# Write to Parquet, partitioned by OP_CARRIER (airline)
output_path = "./output/flight_2009_parquet"

# df.write \
#     .mode("overwrite") \
#     .partitionBy("OP_CARRIER") \
#     .parquet(output_path)

df.write.mode("overwrite").parquet(output_path)

# df.write.parquet(output_path)

print(f"✅ Parquet saved to: {output_path}")


ConnectionRefusedError: [Errno 111] Connection refused