### grp

# Course: _Introduction to PySpark_:
1.  intro
2.  manipulate data
3.  ml pipelines
4.  model tuning

https://spark.apache.org/docs/latest/api/python/index.html

## _1. Getting to know PySpark_:
-  **Architecture**:
    -  big data cluster distributed computing across machines (i.e. nodes)
    -  _master_ => manages splitting up the data and computations
    -  _slaves_ => compute workers and send results back to _master_
-  **RDDs**:
    -  _resilient distributed dataset_
    -  unstructured low level objects
    -  core data structure designed to distribute data in parallel across cluster
-  **DataFrames**:
    -  structured high level objects
    -  similar to SQL table (_rows & columns_)
-  **Catalog**:
    -  lists tables in cluster via _Hive Metastore_

#### SparkContext

In [1]:
print(sc)
print(sc.version)

<SparkContext master=local[8] appName=PySparkShell>
2.3.2


#### SparkSession

In [2]:
from pyspark.sql import SparkSession

my_spark = SparkSession.builder.getOrCreate() # creates entry point
print(my_spark)
print(my_spark.version)

<pyspark.sql.session.SparkSession object at 0x11252cf28>
2.3.2


#### view tables:
-  ```createOrReplaceTempView()```

In [3]:
path = '/Users/grp/Documents/BIGDATA/DATACAMP/otherCourses/introtopyspark/flights_small.csv'
flights_csv_df = spark.read.option("header", True).csv(path)
flights_csv_df.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [4]:
flights_csv_df.createOrReplaceTempView("flights_sql")

In [5]:
print(spark.catalog.listTables())

[Table(name='flights_sql', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


#### sql query:
-  ```spark.sql()```

In [6]:
query = "FROM flights_sql SELECT * LIMIT 10"

flights10 = spark.sql(query)

flights10.show(truncate=True)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

#### convert spark df to pandas df:
-  only recommended for smaller datasets because data is returned and stored in driver memory
-  ```toPandas()```

In [7]:
import pandas as pd

query = "SELECT origin, dest, COUNT(*) as N FROM flights_sql GROUP BY origin, dest"

flight_counts = spark.sql(query)
print(flight_counts.show(5))

pd_counts = flight_counts.toPandas()
print(pd_counts.head())

+------+----+---+
|origin|dest|  N|
+------+----+---+
|   SEA| RNO|  8|
|   SEA| DTW| 98|
|   SEA| CLE|  2|
|   SEA| LAX|450|
|   PDX| SEA|144|
+------+----+---+
only showing top 5 rows

None
  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


#### convert pandas df to spark df:
-  ```spark.createDataFrame()```

In [8]:
import numpy as np

pd_temp = pd.DataFrame(np.random.random(10))

spark_temp = spark.createDataFrame(pd_temp)
print(spark.catalog.listTables()) # prior
print("="*10)
spark_temp.createOrReplaceTempView("temp")
print(spark.catalog.listTables()) # confirm temp table is available in spark catalog

[Table(name='flights_sql', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='flights_sql', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


#### read csv:
-  ```spark.read.csv()```

In [9]:
path = '/Users/grp/Documents/BIGDATA/DATACAMP/otherCourses/introtopyspark/airports.csv'
airports_csv_df = spark.read.option("header", True).csv(path)
airports_csv_df.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- tz: string (nullable = true)
 |-- dst: string (nullable = true)



In [10]:
airports_csv_df.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

## _2. Manipulating Data_:
-  ```pyspark.sql``` module

#### create new columns:
-  ```withColumn()```
-  ```withColumnRenamed()```

In [11]:
flights_sql_df = spark.table("flights_sql")

for i in flights_sql_df.take(3): print(i)

print("="*10)

flights_df = flights_sql_df.withColumn("duration_hrs", flights_sql_df.air_time/60)
flights_df.select("duration_hrs").show(5)

print("="*10)

flights_df.printSchema()

Row(year='2014', month='12', day='8', dep_time='658', dep_delay='-7', arr_time='935', arr_delay='-5', carrier='VX', tailnum='N846VA', flight='1780', origin='SEA', dest='LAX', air_time='132', distance='954', hour='6', minute='58')
Row(year='2014', month='1', day='22', dep_time='1040', dep_delay='5', arr_time='1505', arr_delay='5', carrier='AS', tailnum='N559AS', flight='851', origin='SEA', dest='HNL', air_time='360', distance='2677', hour='10', minute='40')
Row(year='2014', month='3', day='9', dep_time='1443', dep_delay='-2', arr_time='1652', arr_delay='2', carrier='VX', tailnum='N847VA', flight='755', origin='SEA', dest='SFO', air_time='111', distance='679', hour='14', minute='43')
+------------------+
|      duration_hrs|
+------------------+
|               2.2|
|               6.0|
|              1.85|
|1.3833333333333333|
|2.1166666666666667|
+------------------+
only showing top 5 rows

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string 

#### filter data:
-  ```filter()```

In [12]:
long_flights1 = flights_df.filter("distance > 1000")
long_flights2 = flights_df.filter(flights_df.distance > 1000)

for i in long_flights1.take(3): print(i)
print("="*10)
for i in long_flights2.take(3): print(i)

Row(year='2014', month='1', day='22', dep_time='1040', dep_delay='5', arr_time='1505', arr_delay='5', carrier='AS', tailnum='N559AS', flight='851', origin='SEA', dest='HNL', air_time='360', distance='2677', hour='10', minute='40', duration_hrs=6.0)
Row(year='2014', month='4', day='19', dep_time='1236', dep_delay='-4', arr_time='1508', arr_delay='-7', carrier='AS', tailnum='N309AS', flight='490', origin='SEA', dest='SAN', air_time='135', distance='1050', hour='12', minute='36', duration_hrs=2.25)
Row(year='2014', month='11', day='19', dep_time='1812', dep_delay='-3', arr_time='2352', arr_delay='-4', carrier='AS', tailnum='N564AS', flight='26', origin='SEA', dest='ORD', air_time='198', distance='1721', hour='18', minute='12', duration_hrs=3.3)
Row(year='2014', month='1', day='22', dep_time='1040', dep_delay='5', arr_time='1505', arr_delay='5', carrier='AS', tailnum='N559AS', flight='851', origin='SEA', dest='HNL', air_time='360', distance='2677', hour='10', minute='40', duration_hrs=6.0)

#### select data:
-  ```select()```
-  ```selectExpr()```
-  ```cast()```

In [13]:
selected1 = flights_df.select("tailnum", "origin", "dest")
temp = flights_df.select(flights_df.origin, flights_df.dest, flights_df.carrier)

filterA = flights_df.origin == "SEA"
filterB = flights_df.dest == "PDX"

selected2 = temp.filter(filterA).filter(filterB)
selected2.show()

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     AS|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     AS|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
+------+----+-------+
only showing top 20 rows



In [14]:
avg_speed = (flights_df.distance/(flights_df.air_time/60)).alias("avg_speed") # calc col

speed1 = flights_df.select("origin", "dest", "tailnum", avg_speed)
speed2 = flights_df.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

speed1.show()
speed2.show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

#### aggregation:
-  ```groupBy()```
-  ```min()```
-  ```max()```
-  ```avg()```

In [15]:
from pyspark.sql.functions import col, min, max

flights_df\
.select(col("distance").cast("int"))\
.filter(flights_df.origin == "PDX")\
.groupBy()\
.min("distance")\
.show()

flights_df\
.select(col("air_time").cast("int"))\
.filter(flights_df.origin == "SEA")\
.groupBy()\
.max("air_time")\
.show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



In [16]:
flights_df\
.select(col("air_time").cast("int"))\
.filter(flights_df.carrier == "DL")\
.filter(flights_df.origin == "SEA")\
.groupBy()\
.avg("air_time")\
.show()

# total hours in the air
flights_df\
.withColumn("duration_hrs", flights_df.air_time/60)\
.groupBy()\
.sum("duration_hrs")\
.show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



In [17]:
by_plane = flights_df.groupBy("tailnum")

# number of flights each plane made
by_plane.count().show()

# group by origin + cast air_time as new col to int
by_origin = flights_df\
.select("*", col("air_time")\
        .cast("int")\
        .alias("air_time_int"))\
.groupBy("origin")

# avg duration of flights from PDX and SEA
by_origin\
.avg("air_time_int")\
.show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin| avg(air_time_int)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [18]:
import pyspark.sql.functions as F

# group by month/dest + cast dep_delay as new col to int
by_month_dest = flights_df\
.withColumn("dep_delay_int", col("dep_delay")\
        .cast("int"))\
.groupBy("month", "dest")

by_month_dest.avg("dep_delay_int").show()

# standard deviation
by_month_dest.agg(F.stddev("dep_delay_int")).show()

+-----+----+--------------------+
|month|dest|  avg(dep_delay_int)|
+-----+----+--------------------+
|   11| TUS| -2.3333333333333335|
|   11| ANC|   7.529411764705882|
|    1| BUR|               -1.45|
|    1| PDX| -5.6923076923076925|
|    6| SBA|                -2.5|
|    5| LAX|-0.15789473684210525|
|   10| DTW|                 2.6|
|    6| SIT|                -1.0|
|   10| DFW|  18.176470588235293|
|    3| FAI|                -2.2|
|   10| SEA|                -0.8|
|    2| TUS| -0.6666666666666666|
|   12| OGG|  25.181818181818183|
|    9| DFW|   4.066666666666666|
|    5| EWR|               14.25|
|    3| RDM|                -6.2|
|    8| DCA|                 2.6|
|    7| ATL|   4.675675675675675|
|    4| JFK| 0.07142857142857142|
|   10| SNA| -1.1333333333333333|
+-----+----+--------------------+
only showing top 20 rows

+-----+----+--------------------------+
|month|dest|stddev_samp(dep_delay_int)|
+-----+----+--------------------------+
|   11| TUS|        3.0550504633038935

#### join

In [19]:
airports_csv_df.show()

# Rename the faa column
airports_df = airports_csv_df.withColumnRenamed("faa", "dest")

# Join the DataFrames
flights_with_airports = flights_df.join(airports_df, on="dest", how="left_outer")

# Examine the data again
flights_with_airports.printSchema()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

## _3. Getting Started with Machine Learning Pipelines_:
-  ```pyspark.ml``` module:
    -  _Transformer_ => ```.transform()``` typically return a new dataframe (i.e. appended column)
    -  _Estimator_ => ```.fit()``` typically return a model object
    -  all inputs must be _numeric_ (i.e. integers, doubles)
    -  ```Pipeline()``` chains stages together
-  ```pyspark.ml.feature``` submodule:
    -  handling categorical variables:
        -  ```StringIndexer()``` map each unique categorical strings to unique numeric numbers
        -  ```OneHotEncoder()``` encode numeric numbers as one-hot vectors (i.e. 0 or 1)
    -  merge transformers & estimators together:
        -  ```VectorAssembler()``` assembles numeric features as vector

#### df join

In [20]:
path = '/Users/grp/Documents/BIGDATA/DATACAMP/otherCourses/introtopyspark/planes.csv'
planes_csv_df = spark.read.option("header", True).csv(path)
planes_csv_df.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: string (nullable = true)
 |-- seats: string (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



In [21]:
planes_df = planes_csv_df.withColumnRenamed("year", "plane_year")
model_data = flights_df.join(planes_df, on="tailnum", how="left_outer")

#### cast cols to int

In [22]:
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

#### create new col

In [23]:
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: string (nullable = true)
 |-- seats: string (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- plane_age: double 

#### create boolean target var

In [24]:
model_data.select("arr_delay").show(5)

+---------+
|arr_delay|
+---------+
|       -5|
|        5|
|        2|
|       34|
|        1|
+---------+
only showing top 5 rows



In [25]:
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0) # boolean true/false
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
model_data = model_data\
.filter("arr_delay is not NULL \
and dep_delay is not NULL \
and air_time is not NULL \
and plane_year is not NULL")

In [26]:
model_data.select("is_late").show(5)
model_data.select("label").show(5)

+-------+
|is_late|
+-------+
|  false|
|   true|
|   true|
|   true|
|   true|
+-------+
only showing top 5 rows

+-----+
|label|
+-----+
|    0|
|    1|
|    1|
|    1|
|    1|
+-----+
only showing top 5 rows



#### ohe

In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

#### assemble vector

In [28]:
from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=["month",\
                                           "air_time",\
                                           "carrier_fact",\
                                           "dest_fact",\
                                           "plane_age"],\
                                outputCol="features")

#### create pipeline

In [29]:
from pyspark.ml import Pipeline

flights_pipe = Pipeline(stages=\
                        [dest_indexer,\
                         dest_encoder,\
                         carr_indexer,\
                         carr_encoder,\
                         vec_assembler])

In [30]:
flights_pipe.getStages()

[StringIndexer_4e9a8130caa1b4ee143a,
 OneHotEncoder_4487883cf664fea1b0ba,
 StringIndexer_44ff81ee5447e0f897b6,
 OneHotEncoder_4091852421518b8f2ad3,
 VectorAssembler_498db05d0a4f892a173d]

#### fit feature engineering model

In [31]:
piped_data = flights_pipe.fit(model_data).transform(model_data)
piped_data.printSchema()
for i in piped_data.take(3): print(i)

root
 |-- tailnum: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: string (nullable = true)
 |-- seats: string (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- plane_age: double 

#### split data

In [32]:
training, test = piped_data.randomSplit([.75, .25])

## _4. Model Tuning and Selection_:
-  hyperparameters:
    -  ```pyspark.ml.tuning```
-  k-fold cross validation:
    -  ```pyspark.ml.tuning```
-  model evaluation:
    -  ```pyspark.ml.evaluation```

#### create learning algorithm

In [33]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

#### evaluator:
-  examples:
    -  AUC (**area under curve**)
    -  ROC (**receiver operating curve**)
-  ```BinaryClassificationEvaluator()```:
    -  calculates the area under the ROC
    -  this metric combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number

In [34]:
import pyspark.ml.evaluation as evals

evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

#### grid search:
-  ```ParamGridBuilder()```

In [35]:
import pyspark.ml.tuning as tune

grid = tune.ParamGridBuilder()

grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

grid = grid.build()

#### cv:
"_CV splits the training data into a few different subsets (folds). Once the data is split up, one of the subsets is set aside, and the model is fit to the remaing subsets. Then the error is measured against the held out fold. This is repeated for each of the subsets, so every subset of data is held out and used as a test (validation) set exactly once. Then the error on each of the folds is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out (test) set data._"

In [36]:
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

#### fit pipeline w/ learning algorithm

In [37]:
# models = cv.fit(training)
# best_lr = models.bestModel

best_lr = lr.fit(training)
print(best_lr)

LogisticRegression_4364b33b7373c2acdaec


#### test set

In [38]:
test_results = best_lr.transform(test)
print(evaluator.evaluate(test_results))

0.7001747051850052


In [39]:
test_results.select("label", "prediction", "probability").show(10, truncate=False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.41666750257218893,0.5833324974278111]|
|0    |0.0       |[0.5542176992483967,0.4457823007516033] |
|0    |0.0       |[0.6693876833008713,0.3306123166991287] |
|0    |0.0       |[0.655883400471693,0.344116599528307]   |
|1    |0.0       |[0.6007650969608681,0.39923490303913184]|
|0    |1.0       |[0.42117985761492577,0.5788201423850743]|
|0    |0.0       |[0.8170825280591004,0.18291747194089955]|
|1    |1.0       |[0.3310653365041338,0.6689346634958662] |
|1    |1.0       |[0.31788391111256753,0.6821160888874325]|
|0    |0.0       |[0.8226886697897335,0.1773113302102664] |
+-----+----------+----------------------------------------+
only showing top 10 rows



### grp