In [1]:
spark

In [16]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import *
import pandas as pd
import os
from pyspark.sql.types import IntegerType, DoubleType

In [3]:

#initialize Spark session
spark = SparkSession.builder.appName("Clean").getOrCreate()

#Load data from Google Cloud Storage
gcs_path = "gs://my-bigdata-project-lh/landing/itineraries.csv"
df = spark.read.csv(gcs_path, header=True, inferSchema=True)


#Show first 5 rows
df.show(5)

24/12/02 20:03:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/12/02 20:06:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDi

In [6]:
num_records = df.count()

                                                                                

In [7]:
# From the stats in eda, totalFare and baseFare have some extremely low value 
# Set a minimum fare threshold
min_fare_threshold = 50

# Filter out records with baseFare or totalFare below the threshold
df_filtered = df.filter((df.baseFare >= min_fare_threshold) & (df.totalFare >= min_fare_threshold))

# Display filtered statistics
df_filtered.describe(["baseFare", "totalFare", "seatsRemaining", "elapsedDays", "totalTravelDistance"]).show()

# Inspect rows with low fares to understand the data
df_low_fare = df.filter((df.baseFare < min_fare_threshold) | (df.totalFare < min_fare_threshold))
df_low_fare.show(5)

                                                                                

+-------+-----------------+------------------+-----------------+-------------------+-------------------+
|summary|         baseFare|         totalFare|   seatsRemaining|        elapsedDays|totalTravelDistance|
+-------+-----------------+------------------+-----------------+-------------------+-------------------+
|  count|         80298120|          80298120|         80298120|           80298120|           75664849|
|   mean|298.8108562180906|346.13909576468325|6.082574984320928|0.14987510292893533| 1615.2010937866273|
| stddev|180.6529988522368|194.43098823645911| 2.79351604666186|0.35697522763777334|  855.7658866219672|
|    min|             50.0|              67.1|                0|                  0|                 89|
|    max|          7662.33|           8260.61|               10|                  2|               7252|
+-------+-----------------+------------------+-----------------+-------------------+-------------------+

+--------------------+----------+----------+----------

In [9]:
# Find columns that contain null values and count them
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])

# Show the count of nulls for each column
null_counts.show()

#Fill null value in segmentsEquipmentDescription
df = df.fillna({"segmentsEquipmentDescription": ""})

#Calculate the percentage of nulls in totalTravelDistance
null_totalTravelDistance_count = df.filter(F.col("totalTravelDistance").isNull()).count()
null_percentage = (null_totalTravelDistance_count / num_records) * 100

# Print the null percentage in totalTravelDistance
print(f"Null percentage in totalTravelDistance: {null_percentage}%")

#drop the null values in the columns 
df_cleaned = df.dropna(subset=['totalTravelDistance'])

null_counts_after_drop = df_cleaned.select(F.count(F.when(F.col("totalTravelDistance").isNull(), "totalTravelDistance")).alias("totalTravelDistance"))
null_counts_after_drop.show()

# Check if all null values in segmentsEquipmentDescription are gone
null_counts_segments_equipment = df_cleaned.select(F.count(F.when(F.col("segmentsEquipmentDescription").isNull(), "segmentsEquipmentDescription")).alias("segmentsEquipmentDescription"))
null_counts_segments_equipment.show()


                                                                                

+-----+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDistance|segmentsCabinCode|
+---

                                                                                

Null percentage in totalTravelDistance: 7.419800979934527%


                                                                                

+-------------------+
|totalTravelDistance|
+-------------------+
|                  0|
+-------------------+





+----------------------------+
|segmentsEquipmentDescription|
+----------------------------+
|                           0|
+----------------------------+



                                                                                

In [10]:
#double check the entire data set
clean_null_counts = df_cleaned.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
clean_null_counts.show()



+-----+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segmentsDistance|segmentsCabinCode|
+---

                                                                                

In [11]:
# Define columns to drop
columns_to_drop = [
    "legId",  
    "segmentsDepartureTimeEpochSeconds", 
    "travelDuration",
    "elapsedDays",
    "totalTravelDistance",
    "segmentsDepartureTimeRaw",
    "segmentsArrivalTimeRaw",
    "segmentsArrivalTimeEpochSeconds", 
    "segmentsEquipmentDescription", 
    "segmentsAirlineCode", 
    "segmentsArrivalAirportCode", 
    "segmentsDepartureAirportCode", 
    "segmentsDistance", 
    "segmentsDurationInSeconds", 
]

df_cleaned = df.drop(*columns_to_drop)

print("Columns after dropping:")
print(df_cleaned.columns)

df_cleaned.limit(5).toPandas()

Columns after dropping:
['searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode', 'isBasicEconomy', 'isRefundable', 'isNonStop', 'baseFare', 'totalFare', 'seatsRemaining', 'segmentsAirlineName', 'segmentsCabinCode']


Unnamed: 0,searchDate,flightDate,startingAirport,destinationAirport,fareBasisCode,isBasicEconomy,isRefundable,isNonStop,baseFare,totalFare,seatsRemaining,segmentsAirlineName,segmentsCabinCode
0,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,False,False,True,217.67,248.6,9,Delta,coach
1,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,False,False,True,217.67,248.6,4,Delta,coach
2,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,False,False,True,217.67,248.6,9,Delta,coach
3,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,False,False,True,217.67,248.6,8,Delta,coach
4,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,False,False,True,217.67,248.6,9,Delta,coach


In [14]:
df_cleaned = df_cleaned.withColumn("seatsRemaining", df_cleaned["seatsRemaining"].cast(IntegerType()))
df_cleaned = df_cleaned.withColumn("baseFare", df_cleaned["baseFare"].cast(DoubleType()))
df_cleaned = df_cleaned.withColumn("totalFare", df_cleaned["totalFare"].cast(DoubleType()))

In [15]:
baseFare_pd = df_cleaned.select("baseFare").limit(1000).toPandas()
print(baseFare_pd.head())

   baseFare
0    217.67
1    217.67
2    217.67
3    217.67
4    217.67


In [18]:
#Calculate Q1 and Q3
Q1 = df_cleaned.approxQuantile("baseFare", [0.25], 0.01)[0]  # 25th percentile
Q3 = df_cleaned.approxQuantile("baseFare", [0.75], 0.01)[0]  # 75th percentile
IQR = Q3 - Q1

# Calculate lower and upper bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

print(f"IQR: {IQR}, Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")




IQR: 236.85000000000002, Lower Bound: -196.77500000000003, Upper Bound: 750.625


                                                                                

In [19]:
#Calculate Q1 and Q3
Q1 = df_cleaned.approxQuantile("totalFare", [0.25], 0.01)[0]  # 25th percentile
Q3 = df_cleaned.approxQuantile("totalFare", [0.75], 0.01)[0]  # 75th percentile
IQR = Q3 - Q1

# Calculate lower and upper bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

print(f"IQR: {IQR}, Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")



IQR: 252.50000000000003, Lower Bound: -182.15000000000006, Upper Bound: 827.8500000000001


                                                                                

In [20]:
filtered_df = df_cleaned.filter((col("baseFare") >= 0) & (col("baseFare") <= 754.89))
filtered_df.select(
    min("baseFare").alias("min_baseFare"),
    max("baseFare").alias("max_baseFare"),
    mean("baseFare").alias("mean_baseFare")
).show()



+------------+------------+------------------+
|min_baseFare|max_baseFare|     mean_baseFare|
+------------+------------+------------------+
|        0.01|      754.89|281.97008948295854|
+------------+------------+------------------+



                                                                                

In [21]:
filtered_df = df_cleaned.filter((col("totalFare") >= 0) & (col("totalFare") <= 830.85))
filtered_df.select(
    min("totalFare").alias("min_totalFare"),
    max("totalFare").alias("max_totalFare"),
    mean("totalFare").alias("mean_totalFare")
).show()



+-------------+-------------+------------------+
|min_totalFare|max_totalFare|    mean_totalFare|
+-------------+-------------+------------------+
|        19.59|       830.85|328.59731282624017|
+-------------+-------------+------------------+



                                                                                

In [22]:
sampled_df = filtered_df.sample(withReplacement=False, fraction=0.1)

In [23]:
sampled_df.show(5)
sampled_df.printSchema()
sampled_df.count()

+----------+----------+---------------+------------------+-------------+--------------+------------+---------+--------+---------+--------------+--------------------+-----------------+
|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining| segmentsAirlineName|segmentsCabinCode|
+----------+----------+---------------+------------------+-------------+--------------+------------+---------+--------+---------+--------------+--------------------+-----------------+
|2022-04-16|2022-04-17|            ATL|               BOS|     V0AJZNN1|         false|       false|    false|  213.02|    252.6|             3|American Airlines...|     coach||coach|
|2022-04-16|2022-04-17|            ATL|               BOS|     L0AJZNN1|         false|       false|    false|  260.47|   302.11|             1|American Airlines...|     coach||coach|
|2022-04-16|2022-04-17|            ATL|               BOS|          YNR|        

                                                                                

8089613

In [24]:
#Write to Parquet
output_file_path = 'gs://my-bigdata-project-lh/cleaned/cleaned_data.parquet'
filtered_df.write.parquet(output_file_path, mode='overwrite')

                                                                                