## Creating a SparkSession

In [1]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Mlearning").getOrCreate()

# Print my_spark
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fe88c76e860>


In [2]:
flights = spark.read.csv('Data_Camp/flights_small.csv',header=True,inferSchema=True)

In [3]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [4]:
# Register the DataFrame as a SQL temporary view
flights.createOrReplaceTempView("flights")

In [5]:
planes = spark.read.csv('Data_Camp/planes.csv',header=True,inferSchema=True)

In [6]:
planes.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



## Join the DataFrames

In [7]:
# Rename year column
planes = planes.withColumnRenamed('year','plane_year')

# Join the DataFrames
model_data = flights.join(planes, on='tailnum', how="leftouter")

In [8]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



## String to integer

In [9]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
#model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [10]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



## Create a new column

In [11]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year-model_data.plane_year)

## Making a Boolean/Outcome Variable

In [13]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

In [14]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- plane_age: integer (nullable = true)
 |-- is_late: boole

In [27]:
model_data = model_data.select('year','month','day', 'arr_delay', 'carrier', 'flight', 'dest', 'air_time','distance', 'engines', 'seats', 'plane_age', 'is_late', 'label')

In [38]:
model_data_1 = model_data.select('year','month','day', 'arr_delay','flight', 'air_time','distance', 'engines', 'seats', 'plane_age', 'is_late', 'label')

In [28]:
model_data.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- plane_age: integer (nullable = true)
 |-- is_late: boolean (nullable = true)
 |-- label: integer (nullable = true)



## One-Hot Encoding: Carrier column

In [29]:
# Import StringIndexer, OneHotEncoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [35]:
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

## One-Hot Encoding: Dest column

In [36]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

## Assemble a vector

In [41]:
# Import VectorAssembler
from pyspark.ml.feature import VectorAssembler

In [42]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

## Create the pipeline

In [43]:
# Import Pipeline
from pyspark.ml import Pipeline

In [44]:
# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

## Transform the data

In [45]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

## Split the data

In [54]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

## Create the modeler

In [55]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

In [56]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()

## Create the evaluator

In [57]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

In [58]:
# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

## Make a grid

In [59]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Import numpy
import numpy as np

In [60]:
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

## Make the validator

In [61]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

## Fit the model(s)

In [63]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [64]:
# Print best_lr
print(best_lr)

LogisticRegression_4955acdd710bdf71b745


## Evaluate the model

In [65]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.7078678324074312
