# Get data into a PySpark DataFrame

In [1]:
# Url for flight data
url = "gs://my-bigdata-project-mp/landing/itineraries.csv"

# Load data into a PySpark DataFrame
clean_df = spark.read.csv(url, header=True, inferSchema=True)

                                                                                

In [2]:
# Display our inital Schema before all changes.
clean_df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: s

# Remove NULLS

#### From the Exploratory Data Analysis, we know that the only columns with NULLS are...
- totalTravelDistance
- segmentsEquipmentDescription

In [4]:
# Count records BEFORE dropping nulls
clean_df.count()

                                                                                

82138753

In [5]:
# Count records AFTER dropping nulls in totalTravelDistance
clean_df = clean_df.filter("totalTravelDistance is not NULL")
clean_df.count()

                                                                                

76044221

In [6]:
# Count records AFTER dropping nulls in segmentsEquipmentDescription
clean_df = clean_df.filter("segmentsEquipmentDescription is not NULL")
clean_df.count()

                                                                                

74754290

# Remove Outliers

In [7]:
#Although this function could be part of the feature engineering process, 
# we do it here to remove outliers before we start feature engineering.


# We will extract the hour and minutes from travelDuration and combine them into a new column called travelDurationMinutes

#We will use these functions to extract the numbers from the strings in travelDuration
from pyspark.sql.functions import regexp_extract, col, when, expr

def transform_travel_duration(spark_df):
    
    # Define regex patterns to capture hours and minutes
    hours_pattern = "PT(\\d+)H"       # Captures the digits before 'H' in the "PT#H" format
    minutes_pattern = "H(\\d+)M"       # Captures the digits before 'M' in the "#M" format after 'H'
    only_minutes_pattern = "PT(\\d+)M" # For cases with only minutes (e.g., "PT20M")

    # Extract hours and minutes, converting to integers
    df_extracted = spark_df \
        .withColumn("hours", regexp_extract(col("travelDuration"), hours_pattern, 1).cast("int")) \
        .withColumn("minutes", when(col("travelDuration").rlike(only_minutes_pattern),
                                     regexp_extract(col("travelDuration"), only_minutes_pattern, 1))
                    .otherwise(regexp_extract(col("travelDuration"), minutes_pattern, 1)).cast("int"))

    # Calculate total minutes
    df_with_total_minutes = df_extracted.withColumn(
        "travelDurationMinutes",
        expr("coalesce(hours, 0) * 60 + coalesce(minutes, 0)")
    )

    # Get these new columns into our df, then drop the two unnecessary columns
    spark_df = df_with_total_minutes

    # Compare travelDuration and travelDurationMinutes to make sure the values are correct
    spark_df = spark_df.drop("hours","minutes")
    
    # Finally, drop travelDuration as it is no longer useful
    spark_df = spark_df.drop("travelDuration")
    
    return spark_df

In [8]:
#FUNCTION 1
# we will make a function that takes a pyspark df, column name, min, max, as arguments
#
# it modifies the pyspark dataframe to enforce the min and max values in the given column. 
#   - specifically, it will then remove any value equal to or above max, and any value equal to or below min


def set_min_max_col(spark_df, col_name: str, min: float, max: float):
    new_df = spark_df.where((col(col_name) <= max) & (col(col_name) >= min))
    return new_df


#FUNCTION 2
# Uses the previous function multiple times
# We come up with min max manually with new found knowledge from the Exploratory Data Analysis script.

def trim_outliers(spark_df):
    spark_df = set_min_max_col(spark_df, 'elapsedDays', 0, 1.2)
    spark_df = set_min_max_col(spark_df, "baseFare", 0, 740)
    spark_df = set_min_max_col(spark_df, "totalFare", 0, 825)
    spark_df = set_min_max_col(spark_df, "travelDurationMinutes", 0, 1000) # remember to call transform_travel_duration() first.
    spark_df = set_min_max_col(spark_df, "totalTravelDistance",0,4700)
    spark_df = set_min_max_col(spark_df, "seatsRemaining",0,20)
    return spark_df

In [9]:
#Turn travel duration into a column we can measure. Also allows us to remove its outliers.
clean_df = transform_travel_duration(clean_df)

In [10]:
#Transform all columns with outliers
clean_df = trim_outliers(clean_df)

In [11]:
#Count records after forcing a min-max on all numeric columns
clean_df.count()

                                                                                

72733945

# Display our final schema and write to /cleaned in our google bucket

In [12]:
# Here is the final schema for the clean DataFrame
clean_df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineNa

In [13]:
# We will now write this back to /cleaned

url = "gs://my-bigdata-project-mp/cleaned"

clean_df.write.parquet(path=url, mode="overwrite")

24/11/16 02:07:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                