In [1]:
import pandas as pd
import seaborn as sns
from pyspark.sql.functions import *
import numpy as np
from pyspark.sql.functions import col, isnan, isnull, when, count, udf, datediff
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler 

In [2]:
config_file_path = 'config.json'

with open(config_file_path, 'r') as config_file:
    config = json.load(config_file)

#spark log level to show errors
sc.setLogLevel("ERROR")
#setting up path + reading my csv from bucket
bucket = config["bucket"]
filename = config["cleaningoutput"]
file_path = 'gs://'+ bucket+'/'+filename

sdf= spark.read.parquet(file_path)

sdf.printSchema()

                                                                                

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineCode: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)



In [3]:
##removed outliers as suggeste by feedback
std_val = sdf.agg({'totalFare': 'stddev'}).collect()[0][0] 
mean_val = sdf.agg({'totalFare': 'mean'}).collect()[0][0] 
hi_bound= mean_val + (3*std_val) 
low_bound= mean_val - (3*std_val) 
sdf=sdf.where((sdf['totalFare']< hi_bound)&(sdf['totalFare'] > low_bound)) 

                                                                                

In [4]:
sdf = sdf.withColumn("leadTime", datediff(col("flightDate"), col("searchDate")))
sdf.show()


                                                                                

+----------+----------+---------------+------------------+--------------+--------------+------------+---------+---------+--------------+-------------------+--------------------------+----------------------------+-------------------+-------------------+--------+
|searchDate|flightDate|startingAirport|destinationAirport|travelDuration|isBasicEconomy|isRefundable|isNonStop|totalFare|seatsRemaining|totalTravelDistance|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineCode|  segmentsCabinCode|leadTime|
+----------+----------+---------------+------------------+--------------+--------------+------------+---------+---------+--------------+-------------------+--------------------------+----------------------------+-------------------+-------------------+--------+
|2022-05-06|2022-05-20|            SFO|               CLT|      PT10H50M|         false|       false|    false|   550.11|             7|               2510|             LAX||ATL||CLT|               SFO||LAX||ATL|  

In [5]:
#turned as a double as suggested on feedback
sdf = sdf.withColumn("seatsRemaining", sdf.seatsRemaining.cast("double"))
sdf = sdf.withColumn("totalTravelDistance", sdf.totalTravelDistance.cast("double"))


In [6]:
num_assembler = VectorAssembler(inputCols=['seatsRemaining', 'totalTravelDistance'], outputCol='scaledvector')

sdf = num_assembler.transform(sdf)
scalednum_scaler = MinMaxScaler(inputCol='scaledvector', outputCol='scalednums')
sdf = scalednum_scaler.fit(sdf).transform(sdf)

                                                                                

In [7]:
sdf.printSchema()

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: double (nullable = true)
 |-- totalTravelDistance: double (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineCode: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)
 |-- leadTime: integer (nullable = true)
 |-- scaledvector: vector (nullable = true)
 |-- scalednums: vector (nullable = true)



In [8]:
sdf=sdf.withColumn('isRefundable_binary', when(sdf.isRefundable==True, 1).otherwise(0))
sdf=sdf.withColumn('isNonStop_binary', when(sdf.isNonStop==True, 1).otherwise(0))
sdf=sdf.withColumn('isBasicEconomy_binary', when(sdf.isBasicEconomy==True, 1).otherwise(0))

In [9]:
sdf.printSchema()

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: double (nullable = true)
 |-- totalTravelDistance: double (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineCode: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)
 |-- leadTime: integer (nullable = true)
 |-- scaledvector: vector (nullable = true)
 |-- scalednums: vector (nullable = true)
 |-- isRefundable_binary: integer (nullable = false)
 |-- isNonStop_binary: integer (nullable = false)
 |-- isBasicEconomy_binary: integer (nullable = fals

In [10]:
indexer = StringIndexer(inputCols=["segmentsCabinCode", "startingAirport", "destinationAirport", "travelDuration"], outputCols=["segmentsCabinCodeIndex", "startingAirportIndex", "destinationAirportIndex", "travelDurationIndex"],handleInvalid="keep")
indexed_sdf = indexer.fit(sdf).transform(sdf)

# Create an encoder for the three indexes and the age integer column.
encoder = OneHotEncoder(inputCols=["segmentsCabinCodeIndex", "startingAirportIndex", "destinationAirportIndex", "travelDurationIndex","isBasicEconomy_binary", "isRefundable_binary", "isNonStop_binary"],outputCols=["segmentsCabinCodeVector", "startingAirportVector", "destinationAirportVector", "travelDurationVector", "isBasicEconomyVector", "isRefundableVector", "isNonStopVector"], dropLast=False)
encoded_sdf = encoder.fit(indexed_sdf).transform(indexed_sdf)

# Create an assembler for the individual feature vectors and the float/double columns
assembler = VectorAssembler(inputCols=["leadTime","scalednums","isBasicEconomyVector", "isRefundableVector", "isNonStopVector", "segmentsCabinCodeVector", "startingAirportVector", "destinationAirportVector", "travelDurationVector"], outputCol="features")

assembled_sdf = assembler.transform(encoded_sdf)
assembled_sdf.select(['features']).show (truncate=False)

                                                                                

+----------------------------------------------------------------------------------------------------------+
|features                                                                                                  |
+----------------------------------------------------------------------------------------------------------+
|(1623,[0,1,2,3,5,7,11,53,73,526],[14.0,0.7000000000000001,0.3379868770068407,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(1623,[0,1,2,3,5,7,9,53,73,831],[14.0,0.7000000000000001,0.46586625715482344,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(1623,[0,1,2,3,5,7,9,53,73,982],[14.0,0.7000000000000001,0.46586625715482344,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(1623,[0,1,2,3,5,7,9,53,73,366],[14.0,0.2,0.3445483735864861,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                |
|(1623,[0,1,2,3,5,7,9,53,73,810],[14.0,0.9,0.3445483735864861,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                |
|(1623,[0,1,2,3,5,7,9,53,73,885],[14.0,0.4,0.3445483735864861,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                |
|(1623,[0,1,2,3,5,7

                                                                                

In [11]:
assembled_sdf.printSchema()

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: double (nullable = true)
 |-- totalTravelDistance: double (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineCode: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)
 |-- leadTime: integer (nullable = true)
 |-- scaledvector: vector (nullable = true)
 |-- scalednums: vector (nullable = true)
 |-- isRefundable_binary: integer (nullable = false)
 |-- isNonStop_binary: integer (nullable = false)
 |-- isBasicEconomy_binary: integer (nullable = fals

In [12]:
output= config["featureengineeringoutput"]
output_file_path= 'gs://' + bucket + '/' + filename + output
assembled_sdf.write.parquet(output_file_path)

24/11/20 18:42:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                