In [1]:
#code to correct table output in JupyterNotebook
#source: https://stackoverflow.com/questions/48357459/make-cell-output-in-jupyter-notebook-scroll-horizontally

from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
#testing spark
spark

In [4]:
#importing required libraries
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, isnull, when, count, udf, avg, min, max, stddev, to_timestamp, rand

In [5]:
#spark setup
spark = SparkSession.builder.appName("EDA_and_Cleaning").getOrCreate()
dataset_path = "gs://my-mlproject-bucket-rj/landing/itineraries.csv"

df = spark.read.csv(dataset_path, header=True, inferSchema = True)

24/11/21 20:36:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [7]:
#dropping unnecessary columns
df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: s

In [None]:
#listing all columns
print("Columns:", df.columns)

Columns: ['legId', 'searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode', 'travelDuration', 'elapsedDays', 'isBasicEconomy', 'isRefundable', 'isNonStop', 'baseFare', 'totalFare', 'seatsRemaining', 'totalTravelDistance', 'segmentsDepartureTimeEpochSeconds', 'segmentsDepartureTimeRaw', 'segmentsArrivalTimeEpochSeconds', 'segmentsArrivalTimeRaw', 'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode', 'segmentsAirlineName', 'segmentsAirlineCode', 'segmentsEquipmentDescription', 'segmentsDurationInSeconds', 'segmentsDistance', 'segmentsCabinCode']


In [9]:
#dropping unnecessary columns
sdf = df.drop(
    'legId', 
    'travelDuration', 
    'elapsedDays', 
    'baseFare', 
    'totalFare', 
    'segmentsDepartureTimeEpochSeconds', 
    'segmentsDepartureTimeRaw', 
    'segmentsArrivalTimeEpochSeconds', 
    'segmentsArrivalTimeRaw', 
    'segmentsArrivalAirportCode', 
    'segmentsDepartureAirportCode', 
    'segmentsAirlineName', 
    'segmentsAirlineCode', 
    'segmentsEquipmentDescription', 
    'segmentsDurationInSeconds', 
    'segmentsDistance'
)


In [11]:
#counting the amount of non-null records in each column
non_null_values = (
    sdf.select([count(when(~isnull(c), c)).alias(c) for c in sdf.columns])
    .first()
    .asDict()
)
for k, v in non_null_values.items():
    print(f"{k}: {v}")



searchDate: 82138753
flightDate: 82138753
startingAirport: 82138753
destinationAirport: 82138753
fareBasisCode: 82138753
isBasicEconomy: 82138753
isRefundable: 82138753
isNonStop: 82138753
seatsRemaining: 82138753
totalTravelDistance: 76044221
segmentsCabinCode: 82138753



                                                                                

In [12]:
#percentage with total travel distance missing
percentage = (82138753-76044221)/82138753


In [13]:
#removing records with null values
sdf = sdf.filter(sdf["totalTravelDistance"].isNotNull())

In [14]:
#checking for null records-removal
non_null_values = (
    sdf.select([count(when(~isnull(c), c)).alias(c) for c in sdf.columns])
    .first()
    .asDict()
)
for k, v in non_null_values.items():
    print(f"{k}: {v}")



searchDate: 76044221
flightDate: 76044221
startingAirport: 76044221
destinationAirport: 76044221
fareBasisCode: 76044221
isBasicEconomy: 76044221
isRefundable: 76044221
isNonStop: 76044221
seatsRemaining: 76044221
totalTravelDistance: 76044221
segmentsCabinCode: 76044221



                                                                                

In [18]:
#removing duplicate records
sdf = sdf.dropDuplicates()

In [19]:
#checking for duplicate records-removal
non_null_values = (
    sdf.select([count(when(~isnull(c), c)).alias(c) for c in sdf.columns])
    .first()
    .asDict()
)
for k, v in non_null_values.items():
    print(f"{k}: {v}")



searchDate: 50061920
flightDate: 50061920
startingAirport: 50061920
destinationAirport: 50061920
fareBasisCode: 50061920
isBasicEconomy: 50061920
isRefundable: 50061920
isNonStop: 50061920
seatsRemaining: 50061920
totalTravelDistance: 50061920
segmentsCabinCode: 50061920



                                                                                

In [33]:
dups = 76044221 - 50061920
dup_percent = (dups / 76044221) * 100
print(f'# of duplicate records = {dups}')
print(f'% of records that were duplicates = {dup_percent:.2f}%')

# of duplicate records = 25982301
% of records that were duplicates = 34.17%


In [22]:
#writing the data to the "/cleaned" folder
sdf.write.mode("overwrite").format("parquet").save("gs://my-mlproject-bucket-rj/cleaned")

                                                                                