# Spark

In [2]:
import findspark
findspark.init()

import pyspark

### Examining the SparkContext

* `SparkContext` (aka Spark context) is the heart of a Spark application
* You could also assume that a SparkContext instance is a Spark application
* Spark context sets up internal services and establishes a connection to a Spark execution environment
* Once a SparkContext is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs (until SparkContext is stopped)
* A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application

In [3]:
sc = pyspark.SparkContext()
print(sc)
print(sc.version)

<pyspark.context.SparkContext object at 0x1067174e0>
2.1.1


### Creating a SparkSession

* What if we're not sure a Spark Session exists?
* Using the following method returns an existing Spark Session if there's one already in the environment, else it creates a new one if necessary

In [4]:
from pyspark.sql import SparkSession

# Create my_spark (a spark session, creates a new one or retrieves current)
spark = SparkSession.builder.getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x105d765c0>


### Viewing the Catalog of Tables

* A list of all the data inside the cluster

In [5]:
# This will be empty for now
spark.catalog.listTables()

[]

### Putting data into a Spark Cluster

* `.createDataFrame()` method takes a dataframe and returns a Spark dataframe
* The output of this method is stored locally (not in the SparkSession catalog); this means we can use all the Spark DataFrame methods on it, but cannot access the data in other contexts
* To use the `.sql()` method, we'd have to save the data as a temporary table
* We can do this using the `.createTempView()` Spark DataFrame method
* This method registers the dataframe as a table in the catalog, but as this table is temporary, it can only be accessed from the specific `SparkSession` used to create the Spark DataFrame
* There is also the `.createOrReplaceTempView()` method which safely creates a new temp table if nothing was there before, or updates an existing one

In [6]:
import pandas as pd
import numpy as np

pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

[]
[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


### Dropping the middle man

* Why deal with pandas at all? Can we directly read in data? Yes
* `SparkSession` has a `.read` attribute which has several methods for reading different data sources into Spark DataFrames

In [8]:
file_path = "airline_data/train_df.csv"

airports = spark.read.csv(file_path, header=True)

airports.show(1)

+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|CARRIER|FL_NUM|ORIGIN|DEST|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|AIR_TIME|DISTANCE|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+
|2015|    1|           1|          4|     AA|     1|   JFK| LAX|     855|       -5|    1237|        7|        0|             null|     378|    2475|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+
only showing top 1 row



In [10]:
# Add spark_temp to the catalog
airports.createOrReplaceTempView("airports")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

# OR maybe this?

# from pyspark.sql import HiveContext
# sqlContext = HiveContext(sc)

# flights.write.format("orc").saveAsTable("flights")

[Table(name='airports', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## Using DataFrames & SQL

* Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster.

* RDDs are hard to work with directly; we'll use the Spark DataFrame abstraction built on top of RDDs.

In [13]:
query = "FROM airports SELECT * LIMIT 2"

airports10 = spark.sql(query)

airports10.show()

+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|CARRIER|FL_NUM|ORIGIN|DEST|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|AIR_TIME|DISTANCE|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+
|2015|    1|           1|          4|     AA|     1|   JFK| LAX|     855|       -5|    1237|        7|        0|             null|     378|    2475|
|2015|    1|           2|          5|     AA|     1|   JFK| LAX|     850|      -10|    1211|      -19|        0|             null|     357|    2475|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+



In [14]:
query = """
        SELECT origin, dest, COUNT(*) AS N FROM airports 
        GROUP BY origin, dest
        """

flight_counts = spark.sql(query)
pd_counts = flight_counts.toPandas()
pd_counts.head()

Unnamed: 0,origin,dest,N
0,ORD,PDX,786
1,SNA,PHX,276
2,ATL,GSP,914
3,EWR,STT,181
4,CLE,SJU,30


___
# Manipulating Data

## Creating Columns

In [16]:
# Create a new column which shows the duration in hours for each flight
flights = spark.table("airports")
flights = flights.withColumn('duration_hrs', flights.AIR_TIME/60)
flights.show(1)

+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|CARRIER|FL_NUM|ORIGIN|DEST|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|AIR_TIME|DISTANCE|duration_hrs|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
|2015|    1|           1|          4|     AA|     1|   JFK| LAX|     855|       -5|    1237|        7|        0|             null|     378|    2475|         6.3|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
only showing top 1 row



## Filtering Data

* The `.filter()` method is Spark's counterpart of SQL's `WHERE` clause

In [17]:
long_flights1 = flights.filter("DISTANCE > 1000")
long_flights2 = flights.filter(flights.DISTANCE > 1000)

long_flights1.show(1) == long_flights2.show(1)

+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|CARRIER|FL_NUM|ORIGIN|DEST|DEP_TIME|DEP_DELAY|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|AIR_TIME|DISTANCE|duration_hrs|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
|2015|    1|           1|          4|     AA|     1|   JFK| LAX|     855|       -5|    1237|        7|        0|             null|     378|    2475|         6.3|
+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
only showing top 1 row

+----+-----+------------+-----------+-------+------+------+----+--------+---------+--------+---------+---------+-----------------+--------+--------+------------+
|YEA

True

## Selecting I

* Spark's variant of SQL's `SELECT` is the `.select()` method
* The difference between `.select()` and `.withColumn()` is that the former returns only the columns we specifiy, whereas the latter returns all the columns of the DataFrame in addition to the one we defined

In [19]:
selected1 = flights.select("FL_NUM", "ORIGIN", "DEST")
temp = flights.select(flights.ORIGIN, flights.DEST, flights.CARRIER)

filterA = flights.ORIGIN == 'SEA'
filterB = flights.DEST == 'LAX'

selected2 = temp.filter(filterA).filter(filterB)
selected2.show(2)

+------+----+-------+
|ORIGIN|DEST|CARRIER|
+------+----+-------+
|   SEA| LAX|     DL|
|   SEA| LAX|     DL|
+------+----+-------+
only showing top 2 rows



## Selecting II

* Similar to SQL, we can use `.select()` to perform column-wise operations
* When selecting using the `df.colName` notation, we can perform any column operations and the `.select()` method will return the transformed column
* We can also use the `.alias()` method to rename a column we're selecting
* The equivalent Spark DataFrame method `.selectExpr()` takes SQL expressions as a string with the `as` keyword being equivalent to the `.alias()` method

In [29]:
avg_speed = (flights.DISTANCE/(flights.AIR_TIME/60)).alias("avg_speed")
speed1 = flights.select("ORIGIN", "DEST", "FL_NUM", avg_speed)

speed2 = flights.selectExpr("ORIGIN", "DEST", "FL_NUM",
                            "DISTANCE/(AIR_TIME/60) as avg_speed")
speed1.show(1) == speed2.show(1)

+------+----+------+-----------------+
|ORIGIN|DEST|FL_NUM|        avg_speed|
+------+----+------+-----------------+
|   JFK| LAX|     1|392.8571428571429|
+------+----+------+-----------------+
only showing top 1 row

+------+----+------+-----------------+
|ORIGIN|DEST|FL_NUM|        avg_speed|
+------+----+------+-----------------+
|   JFK| LAX|     1|392.8571428571429|
+------+----+------+-----------------+
only showing top 1 row



True

## Aggregating I

* The common aggregation methods `.min()`, `.max()`, and `.count()` are `GroupedData` methods, and are created by calling the `.groupBy()` DataFrame method

In [44]:
flights.select(flights.AIR_TIME.cast("float"))

DataFrame[AIR_TIME: float]

In [51]:
flights_num = flights.select(flights.DISTANCE.cast("float"), 
                             flights.ORIGIN, 
                             flights.CARRIER,
                             flights.AIR_TIME.cast("float"))

In [50]:
flights_num.filter(flights.ORIGIN=='LAX').groupBy().min("DISTANCE").show()
flights_num.filter(flights.ORIGIN=='SEA').groupBy().max("AIR_TIME").show()

+-------------+
|min(DISTANCE)|
+-------------+
|        236.0|
+-------------+

+-------------+
|max(AIR_TIME)|
+-------------+
|        412.0|
+-------------+



## Aggregating II

In [54]:
flights_num.filter(flights_num.CARRIER == "DL")\
           .filter(flights_num.ORIGIN == "SEA")\
           .groupBy()\
           .avg("AIR_TIME").show()
    
    
flights_num.withColumn("duration_hrs", flights_num.AIR_TIME/60)\
           .groupBy()\
           .sum("duration_hrs").show()

+----------------+
|   avg(AIR_TIME)|
+----------------+
|196.135770609319|
+----------------+

+------------------+
| sum(duration_hrs)|
+------------------+
|2359307.1666666847|
+------------------+



## Grouping & Aggregating I

In [55]:
by_plane = flights.groupBy("FL_NUM")
by_plane.count().show()

+------+-----+
|FL_NUM|count|
+------+-----+
|   296|  496|
|  1090|  744|
|  1159|  970|
|  1572|  758|
|  1512|  478|
|   829|  295|
|  1436|  598|
|  2069|  402|
|  2088|  232|
|  2136|  405|
|  2294|  187|
|   467|  278|
|   691|  325|
|  2162|  208|
|   675|  281|
|   125|  360|
|  1372|  395|
|  2275|  385|
|  2464|  595|
|   800|   49|
+------+-----+
only showing top 20 rows



In [57]:
by_origin = flights_num.groupBy("ORIGIN")
by_origin.avg("AIR_TIME").show()

+------+------------------+
|ORIGIN|     avg(AIR_TIME)|
+------+------------------+
|   MSY|102.44792872020366|
|   GEG|110.44017094017094|
|   SNA|160.89759815906802|
|   GRB| 51.48905109489051|
|   GRR| 69.30209481808159|
|   GSO|53.421578421578424|
|   PVD|122.09344660194175|
|   MYR|54.166666666666664|
|   OAK| 83.61252115059222|
|   MSN|62.285419532324624|
|   FAR| 38.67024128686327|
|   DCA|127.56614850890654|
|   LEX|58.769673704414586|
|   ORF|101.29183673469387|
|   EVV| 54.18181818181818|
|   CRW| 58.55080213903744|
|   SAV|40.853939045428405|
|   TRI|  42.3859649122807|
|   CMH| 108.8392026578073|
|   CAK| 79.54024390243903|
+------+------------------+
only showing top 20 rows



## Grouping & Aggregating II

* In addition the the `GroupedData` methods, there is also `.agg()`, which lets us pass an aggregate column expression that uses any of the aggregate functions from the `pyspark.sql.functions` submodule

* This submodule contains many useful functions for computing things like standard deviation, etc

In [63]:
flights_num = flights.select(flights.DISTANCE.cast("float"), 
                             flights.ORIGIN, 
                             flights.CARRIER,
                             flights.MONTH,
                             flights.DEST,
                             flights.AIR_TIME.cast("float"),
                             flights.DEP_DELAY.cast("float"))

In [65]:
import pyspark.sql.functions as F

by_month_dest = flights_num.groupBy("MONTH", "DEST")
by_month_dest.avg("DEP_DELAY").show()
by_month_dest.agg(F.stddev("DEP_DELAY")).show()

+-----+----+-------------------+
|MONTH|DEST|     avg(DEP_DELAY)|
+-----+----+-------------------+
|    1| BNA|  7.274881516587678|
|    1| PNS|  5.964102564102564|
|    2| CAE|   8.24731182795699|
|    2| LIT|  8.904109589041095|
|    3| BDL|  9.492374727668846|
|    6| DLH|-0.2564102564102564|
|    1| PDX|  12.58909090909091|
|    1| ABE|-0.6086956521739131|
|    5| PIT| 12.168758716875871|
|    7| BDL|  6.613526570048309|
|    1| AVP|               16.5|
|    2| TUS|  9.970297029702971|
|    5| LAX| 10.368102288021534|
|    7| STT| 17.967441860465115|
|    5| EWR| 14.490764063811923|
|    1| MFE| 10.473333333333333|
|    1| MDT|   9.61344537815126|
|    3| PBI| 10.913649025069638|
|    4| JFK| 15.184190902311707|
|    4| MSY|  9.760371959942775|
+-----+----+-------------------+
only showing top 20 rows

+-----+----+----------------------+
|MONTH|DEST|stddev_samp(DEP_DELAY)|
+-----+----+----------------------+
|    1| BNA|    27.874270456937698|
|    1| PNS|     38.02893287563469|
| 

## Joining

In [66]:
# No airports table, but this is how we'd do it
# flights_with_aiports = flights.join(airports, on="dest", how="leftouter")

---
# Machine Learning Pipelines

* At the core of the `pyspark.ml` module are the `Transformer` and `Estimator` classes
* `Transformer` classes have a `.transform()` method that takes a DataFrame and returns a new DataFrame (usually the original with a new column appended)
    * For example, we might use the class `Bucketizer` to create discrete bins from a continuous feature, or the class `PCA` to reduce the dimensionality of our dataset using principal component analysis
* `Estimator` classes all implement a `.fit()` method
* These methods take a DataFrame and return a model object
    * For example, we can use `StringIndexerModel` for including categorical data saved as strings in our models, or a `RandomForestModel` that uses the RF algorithm for classification or regression

In [75]:
planes = spark.read.csv("plane-data.csv", header=True)
planes.show(1)

flights = spark.read.csv("DelayedFlights.csv", header=True)
flights.show(1)

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
only showing top 1 row

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|Weat

In [76]:
planes = planes.withColumnRenamed("year", "plane_year")
planes = planes.withColumnRenamed("tailnum", "TailNum")
model_data = flights.join(planes, on="TailNum", how="leftouter")

In [77]:
model_data.show(2)

+-------+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+-----------+------------+----------+-------+------+--------------------+-----------+----------+
|TailNum|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|       type|manufacturer|issue_date|  model|status|       aircraft_type|engine_type|plane_year|
+-------+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+-------+--------+--------+------+----+--------+------+----

#### Converting String to Int/Float

In [78]:
model_data = model_data\
            .withColumn("ArrDelay", \
                        model_data.ArrDelay.cast("integer"))
model_data = model_data\
            .withColumn("AirTime", \
                        model_data.ArrTime.cast("integer"))
model_data = model_data\
            .withColumn("Month", \
                        model_data.Month.cast("integer"))
model_data = model_data\
            .withColumn("plane_year", \
                        model_data.plane_year.cast("integer"))

#### Computing the Age of a Plane

In [82]:
model_data = model_data\
            .withColumn("plane_age", 
                        model_data.Year - model_data.plane_year)

#### Creating the Label (Is the plane late?)

In [83]:
model_data = model_data\
            .withColumn("is_late", model_data.ArrDelay > 0)
    
model_data = model_data\
            .withColumn("label", \
                       model_data.is_late.cast("integer"))

#### Removing NULL values

In [84]:
remove_nulls = """ArrDelay is not null and DepDelay is not null
                  and AirTime is not null and plane_year is not null
               """
model_data = model_data.filter(remove_nulls)

## Strings and Factors

In [86]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [95]:
carr_indexer = StringIndexer(inputCol="UniqueCarrier", 
                             outputCol="carrier_index")

carr_encoder = OneHotEncoder(inputCol="carrier_index",
                             outputCol="carrier_fact")

In [96]:
dest_indexer = StringIndexer(inputCol="Dest",
                            outputCol="dest_index")

dest_encoder = OneHotEncoder(inputCol="dest_index",
                            outputCol="dest_fact")

### Assemble a Vector

* Combine all of the columns containing our features into a single column

In [97]:
from pyspark.ml.feature import VectorAssembler

In [98]:
vec_assembler = VectorAssembler(inputCols=["Month", "AirTime",
                                           "carrier_fact", "dest_fact",
                                           "plane_age"],
                                outputCol="features")

### Create the Pipeline

* `Pipeline` is a class in `pyspark.ml` which combines all the estimators and transformers we've already created

In [99]:
from pyspark.ml import Pipeline

In [100]:
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder,
                                carr_indexer, carr_encoder,
                                vec_assembler])

### Transform the data

In [101]:
piped_data = flights_pipe.fit(model_data).transform(model_data)

### Split the Data

In [103]:
training, test = piped_data.randomSplit([0.6, 0.4])

### Create the modeler

In [104]:
from pyspark.ml.classification import LogisticRegression

In [105]:
lr = LogisticRegression()

### Create the Evaluator

In [106]:
import pyspark.ml.evaluation as evals

evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

### Make a Grid

In [107]:
import pyspark.ml.tuning as tune
import numpy as np

grid = tune.ParamGridBuilder()

grid = grid.addGrid(lr.regParam, np.arange(0., 0.1, 0.01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

grid = grid.build()

### Make the Validator

In [108]:
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator)

### Fit the Model

In [109]:
models = cv.fit(training)
best_lr = models.bestModel
print(best_lr)

LogisticRegression_4433bd993f0bdbe9c50b


### Evaluate the Model

In [110]:
test_results = best_lr.transform(test)
print(evaluator.evaluate(test_results))

0.6103597602744483
