In [None]:
!pip install pyspark

In [None]:
import pyspark as sp

sc = sp.SparkContext.getOrCreate()
print(sc)
print(sc.version)

In [None]:
#import SparkSeccion pyspark.sql
from pyspark.sql import SparkSession

#Create my_spark
spark = SparkSession.builder.getOrCreate()

#print my_spark
print(spark)

In [None]:
import pandas as pd
import numpy as np

# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

In [None]:
file_path = 'Downloads/Airpots Project/airports.csv'

#Read in the airports path
airports = spark.read.csv(file_path, header=True)

airports.show()

In [None]:
type(airports)

In [None]:
spark.catalog.listDatabases()

In [None]:
spark.catalog.listTables()

In [None]:
flights = spark.read.csv('Downloads/Airpots Project/flights_small.csv', header=True)
flights.show()

In [None]:
flights.name = flights.createOrReplaceTempView('flights')
spark.catalog.listTables()

In [None]:
# Create the DataFrame flights
flights_df = spark.table('flights')
print(flights_df.show())

In [None]:
#include a new column called duration_hrs
flights = flights.withColumn('duration_hrs', flights.air_time / 60)
flights.show()

In [None]:
flights.describe().show()

In [None]:
# Filter flights with a SQL string
long_flights1 = flights.filter('distance > 1000')
long_flights1.show()

In [None]:
# Filter flights with a boolean column
long_flights2 = flights.filter(flights.distance > 1000 )
long_flights2.show()

In [None]:
# Select the first set of columns as a string
selected_1 = flights.select('tailnum', 'origin', 'dest')

In [None]:
# Select the second set of columns usinf df.col_name
temp = flights.select(flights.origin, flights.dest, flights.carrier)

In [None]:
# Define first filter to only keep flights from SEA to PDX.
FilterA = flights.origin == 'SEA'
FilterB =flights.dest == 'PDX'

In [None]:
# Filter the data, first by filterA then by filterB
selected_2 = temp.filter(FilterA).filter(FilterB)
selected_2.show()

In [None]:
#Create a table of the average speed of each flight both ways.
#Calculate average speed by dividing the distance by the air_time (converted to hours).Use the .alias() method name
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
speed_1 = flights.select('origin','dest','tailnum', avg_speed)

In [None]:
#Using the Spark DataFrame method .selectExpr() 
speed_2 =flights.selectExpr('origin','dest','tailnum','distance/(air_time/60) as avg_speed')
speed_2.show()

In [None]:
flights.describe()

In [None]:
#arr_time: string and distance: string, so to find min() and max() we need to convert this float 
flights = flights.withColumn('distance', flights.distance.cast('float'))
flights = flights.withColumn('air_time', flights.air_time.cast('float'))

flights.describe('air_time', 'distance').show()

In [None]:
#Find the length of the shortest (in terms of distance) flight that left PDX 
flights.filter(flights.origin =='PDX').groupBy().min('distance').show()

In [None]:
#Find the length of the longest (in terms of time) flight that left SEA
flights.filter(flights.origin == 'SEA').groupBy().max('air_time').show()

In [None]:
#get the average air time of Delta Airlines flights  that left SEA. 
flights.filter(flights.carrier == 'DL').filter(flights.origin == 'SEA').groupBy().avg('air_time').show()

In [None]:
#get the total number of hours all planes in this dataset spent in the air by creating a column called duration_hrs
flights.withColumn('duration_hrs', flights.air_time/60).groupBy().sum('duration_hrs').show()

In [None]:
#Group by tailnum column
by_plane = flights.groupBy('tailnum')

In [None]:
#Use the .count() method with no arguments to count the number of flights each plane made
by_plane.count().show()

In [None]:
#group by origin column
by_origin = flights.groupBy('origin')

In [None]:
#Find the .avg() of the air_time column to find average duration of flights from PDX and SEA
by_origin.avg('air_time').show()

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

#convert to dep_delay to numeric column
flights = flights.withColumn('dep_delay', flights.dep_delay.cast('float'))

# Group by month and dest
by_month_dest = flights.groupBy('month', 'dest')

In [None]:
# Average departure delay by month and destination
by_month_dest.avg('dep_delay').show()

In [None]:
airports.show()

In [None]:
# Rename the faa column
airports = airports.withColumnRenamed('faa','dest')

In [None]:
# Join the DataFrames
flights_with_airports= flights.join(airports, on='dest', how='leftouter')
flights_with_airports.show()

In [None]:
planes = spark.read.csv('Downloads/Airpots Project/planes.csv', header=True)
planes.show()

In [None]:
# Rename year column on panes to avoid duplicate column name
planes = planes.withColumnRenamed('year', 'plane_year')

In [None]:
#join the flights and plane table use key as tailnum column
model_data = flights.join(planes, on='tailnum', how='leftouter')
model_data.show()

In [None]:
model_data.describe()

In [None]:
model_data = model_data.withColumn('arr_delay', model_data.arr_delay.cast('integer'))
model_data = model_data.withColumn('air_time' , model_data.air_time.cast('integer'))
model_data = model_data.withColumn('month', model_data.month.cast('integer'))
model_data = model_data.withColumn('plane_year', model_data.plane_year.cast('integer'))

In [None]:
model_data.describe('arr_delay', 'air_time','month', 'plane_year').show()

In [None]:
# Create a new column
model_data =model_data.withColumn('plane_age', model_data.year - model_data.plane_year)

In [None]:
model_data = model_data.withColumn('is_late', model_data.arr_delay >0)

model_data = model_data.withColumn('label', model_data.is_late.cast('integer'))

model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [None]:
#Create a StringIndexer
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')
#Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carr_fact')

In [None]:
# encode the dest column just like you did above
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

In [None]:
# Assemble a  Vector
from pyspark.ml.feature import  VectorAssembler

In [None]:
vec_assembler =VectorAssembler(inputCols=['month', 'air_time','carr_fact','dest_fact','plane_age'],
                              outputCol='features',handleInvalid="skip")

In [None]:
# #### Create the pipeline
# You're finally ready to create a` Pipeline!` Pipeline is a class in the `pyspark.ml module` that combines all the Estimators and Transformers that you've already created.

from pyspark.ml import Pipeline

flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [None]:
piped_data =flights_pipe.fit(model_data).transform(model_data)

In [None]:
piped_data.show()

In [None]:
training, test = piped_data.randomSplit([.6, .4])

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

In [None]:
# #### Create the evaluator
# The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the pyspark.ml.evaluation submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the `BinaryClassificationEvaluator` from the `pyspark.ml.evaluation` module. This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number.

import pyspark.ml.evaluation as evals

evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

In [None]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

# Build the grid
grid = grid.build()

In [None]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

In [None]:
# Fit cross validation models
models = cv.fit(training)

In [None]:
# Extract the best model
best_lr = models.bestModel

In [None]:
# Use the model to predict the test set
test_results = best_lr.transform(test)
# 
# Evaluate the predictions
print(evaluator.evaluate(test_results))