# Silver Batch Processing

In [1]:
import os

# Importing Dependencies
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, regexp_replace

from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SilverBatchProcessor") \
    .master("local[*]") \
    .getOrCreate()

# Using Legacy Time parser for this dataset because latest version was having trouble parsing strings to dates..
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/12 03:11:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/12 03:11:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Read Raw dataset from Bronze

In [3]:
# Path to bronze data
csv_path = "../data/Raw_Airline_data.csv"

In [4]:
# If your CSV has a header
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_path)

row_count = df.count()

                                                                                

In [5]:
print(f'There are {row_count} records...')

There are 394476 records...


In [6]:
# Chekcing out the schema
schema = df.schema
for field in schema:
    print(f"{field.name} | {field.dataType} | {field.nullable}")


Passenger ID | StringType() | True
First Name | StringType() | True
Last Name | StringType() | True
Gender | StringType() | True
Age | IntegerType() | True
Nationality | StringType() | True
Airport Name | StringType() | True
Airport Country Code | StringType() | True
Country Name | StringType() | True
Airport Continent | StringType() | True
Continents | StringType() | True
Departure Date | StringType() | True
Arrival Airport | StringType() | True
Pilot Name | StringType() | True
Flight Status | StringType() | True


In [7]:
# Show Raw Data
df.show(5)

+------------+----------+---------+------+---+-----------+--------------------+--------------------+-------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|Passenger ID|First Name|Last Name|Gender|Age|Nationality|        Airport Name|Airport Country Code| Country Name|Airport Continent|   Continents|Departure Date|Arrival Airport|         Pilot Name|Flight Status|
+------------+----------+---------+------+---+-----------+--------------------+--------------------+-------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|      ABVWIg|    Edithe|   Leggis|Female| 62|      Japan|    Coldfoot Airport|                  US|United States|              NAM|North America|     6/28/2022|            CXF|Fransisco Hazeldine|      On Time|
|      jkXXAX|    Elwood|     Catt|  Male| 62|  Nicaragua|   Kugluktuk Airport|                  CA|       Canada|              NAM|North America|    12

### Removing Duplicates

In [None]:
df_deduped = df.dropDuplicates()
de_duped_row_count = df_deduped.count()
print(f'There are {de_duped_row_count} records after deduping...')

[Stage 8:===>                                                     (1 + 14) / 15]

There are 295857 records after deduping...


                                                                                

### Handle Null Values

In [None]:
# Drop rows where any value is null
df_no_nulls = df_deduped.dropna()
df_no_nulls_count = df_no_nulls.count()
print(f'There are {df_no_nulls_count} records after deduping...')

[Stage 14:===>                                                    (1 + 14) / 15]

There are 295857 records after deduping...


                                                                                

### Standardize Date Field

In [None]:
# some dates are seperated by '-' or '/' we need to standarize this field
df_no_nulls.select("Departure Date").show(truncate=False)



+--------------+
|Departure Date|
+--------------+
|09-10-2022    |
|4/15/2022     |
|3/27/2022     |
|02-01-2022    |
|8/27/2022     |
|4/13/2022     |
|3/14/2022     |
|11/30/2022    |
|11/24/2022    |
|08-10-2022    |
|7/26/2022     |
|6/13/2022     |
|09-08-2022    |
|1/29/2022     |
|9/27/2022     |
|4/17/2022     |
|11/30/2022    |
|12/13/2022    |
|08-07-2022    |
|10/20/2022    |
+--------------+
only showing top 20 rows



                                                                                

In [None]:

# Replace '-' with '/' to standardize the format
df_standardized = df_no_nulls.withColumn("Departure Date", regexp_replace("Departure Date", "-", "/"))

# Convert the standardized date to a proper date format
df_silver = df_standardized.withColumn("Departure Date", to_date("Departure Date", "MM/dd/yyyy"))

In [None]:
df_silver.show()



+------------+----------+-----------+------+---+-------------+--------------------+--------------------+------------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|Passenger ID|First Name|  Last Name|Gender|Age|  Nationality|        Airport Name|Airport Country Code|      Country Name|Airport Continent|   Continents|Departure Date|Arrival Airport|         Pilot Name|Flight Status|
+------------+----------+-----------+------+---+-------------+--------------------+--------------------+------------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|      aogYWd|    Vernen|   Ivakhnov|  Male| 90|        China|     Mullewa Airport|                  AU|         Australia|               OC|      Oceania|    2022-09-10|            MXU|     Lidia Kleinert|      Delayed|
|      AbSraZ|   Janenna|     Goozee|Female| 80|United States|New Stuyahok Airport|                  US|     United 

                                                                                

### Saving silver table to silver data layer (just a csv in this case)

In [15]:
# Path to save the silver DataFrame
output_path = '../data/silver_data'

# Save the DataFrame to a single CSV file
df_silver.coalesce(1).write.option("header", True).mode("overwrite").csv(output_path)

print(f"Silver data saved to: {output_path}")

                                                                                

Silver data saved to: ../data/silver_data


#### Reloading silver data just to show it saved correctly

In [16]:
# Path to the folder
silver_data_folder = '../data/silver_data'

# Find the actual CSV file in the folder
csv_file = [f for f in os.listdir(silver_data_folder) if f.endswith('.csv')][0]
silver_data_path = os.path.join(silver_data_folder, csv_file)

# Read the CSV file into a DataFrame
df_reloaded = spark.read.option("header", True).csv(silver_data_path)

# Show the reloaded DataFrame
df_reloaded.show()

+------------+----------+-----------+------+---+-------------+--------------------+--------------------+------------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|Passenger ID|First Name|  Last Name|Gender|Age|  Nationality|        Airport Name|Airport Country Code|      Country Name|Airport Continent|   Continents|Departure Date|Arrival Airport|         Pilot Name|Flight Status|
+------------+----------+-----------+------+---+-------------+--------------------+--------------------+------------------+-----------------+-------------+--------------+---------------+-------------------+-------------+
|      aogYWd|    Vernen|   Ivakhnov|  Male| 90|        China|     Mullewa Airport|                  AU|         Australia|               OC|      Oceania|    2022-09-10|            MXU|     Lidia Kleinert|      Delayed|
|      AbSraZ|   Janenna|     Goozee|Female| 80|United States|New Stuyahok Airport|                  US|     United 