In [1]:
from pyspark import SparkContext, SparkConf

In [4]:
conf = SparkConf().setAppName("appTest").setMaster("local")
sc = SparkContext(conf=conf)

Spark has a master and a workers. The master send pieces of data to be process on each workers. 

In [5]:
#Verify spark Context
print(sc)
print(sc.version)

<SparkContext master=local appName=appTest>
2.4.4


Spark data structure is Resillient Distributed Dataset(RDD), so it allow spark split data across multiple nodes in the cluster.<br>
For start working with DataSet you first have to create a SparkSession object from SparkContext.


In [10]:
from pyspark.sql import SparkSession

In [11]:
spark_session= SparkSession.builder.getOrCreate()
print(spark_session)

<pyspark.sql.session.SparkSession object at 0x114bb2650>


In [13]:
#Print tables in the catalog
print(spark_session.catalog.listTables())

[]


In [26]:
df = spark_session.read.format('csv').options(header='true', inferSchema='true').load('flights_small.csv')

In [27]:
df.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [28]:
df.createOrReplaceTempView("flights")

In [34]:
sqlDF = spark_session.sql("SELECT * FROM flights limit 10")
sqlDF.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [35]:
query = "SELECT origin, dest, COUNT(*) AS N FROM flights GROUP BY origin, dest"
flights_counts = spark_session.sql(query)
pd_counts = flights_counts.toPandas()
print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


In [36]:
spark_temp = spark_session.createDataFrame(pd_counts)
print(spark_session.catalog.listTables())

[Table(name=u'flights', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True)]


In [37]:
spark_temp.createOrReplaceTempView('flights2')
print(spark_session.catalog.listTables())

[Table(name=u'flights', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'flights2', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True)]


In [38]:
airports = spark_session.read.csv("airports.csv",header=True)
print(airports.show())

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

**Create Column-Wise-Operations.**<br>
SparkDataFrames are different than working in pandas because SparkDataFrame is inmutable. This means that it can't be changed, and so columns can't be updated in place.<br>
Thus, all these methods return a new DataFrame. To override the original DataFrame you must reassing the returned<br> DataFrame usisng the method like so:<br>
**df= df.withColumn("newCol",df.oldCol+1)<br>**
The abobe code creates a DataFrame with the same columns as df plus a new column. where every entry is equal to the correspondenting entry from oldCol, plus one.

In [39]:
flights = spark_session.table("flights")
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [45]:
#Add a new column that convert duration_hrs in minutes to hours and called it air_time
flights = flights.withColumn("duration_hrs",flights.air_time/60)

In [56]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

In [49]:
long_flights = flights.filter("distance > 1000")
long_flights2 = flights.filter(flights.distance > 1000)
print(long_flights.show())
print(long_flights2.show())

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

The difference between .select() and withColumn() methods is that .select returns only the column you specify
while withColumn returns all the columns of the DataFrame in adition to the one you defined.
So it's often a good idea to drop columns you don't need at the beginning of a operation.

In [60]:
selected1 = flights.select("tailnum","origin","dest")
temp = flights.select(flights.origin,flights.dest,flights.carrier)
filterA = flights.origin == "SEA"
filterB = flights.dest == "PDX"
selected2 = temp.filter(filterA).filter(filterB)

In [61]:
selected1.show()

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
| N360SW|   PDX| SJC|
| N612AS|   SEA| BUR|
| N646SW|   PDX| DEN|
| N422WN|   PDX| OAK|
| N361VA|   SEA| SFO|
| N309AS|   SEA| SAN|
| N564AS|   SEA| ORD|
| N323AS|   SEA| LAX|
| N305AS|   SEA| PHX|
| N433AS|   SEA| LAS|
| N765AS|   SEA| ANC|
| N713AS|   SEA| SFO|
| N27205|   PDX| SFO|
| N626AS|   SEA| SMF|
| N8634A|   SEA| MDW|
| N597AS|   SEA| BOS|
| N215AG|   PDX| BUR|
+-------+------+----+
only showing top 20 rows



In [70]:
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
speed1 = flights.select("origin","dest","tailnum",avg_speed)
speed2 = flights.selectExpr("origin","dest","tailnum","distance/(air_time/60) as avg_speed")

In [72]:
flights.filter("origin == 'PDX'").groupBy().min("distance").show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



In [78]:
flights.filter("origin == 'SEA'").groupBy().max("distance").show()

+-------------+
|max(distance)|
+-------------+
|         2724|
+-------------+



In [84]:
flights.filter("carrier =='DL'").filter("origin=='SEA'").groupby().avg("duration_hrs").show()
flights.withColumn("duration_hrs",flights.air_time/60).groupBy().sum("duration_hrs").show()

+-----------------+
|avg(duration_hrs)|
+-----------------+
|3.136781609195401|
+-----------------+

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



In [86]:
by_plane = flights.groupBy("tailnum")
by_plane.count().show()
by_origin = flights.groupBy("origin")
by_origin.avg("duration_hrs").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin| avg(duration_hrs)|
+------+------------------+
|   SEA| 2.673935826752105|
|   PDX|2.2852572080481233|
+------+------------------+



In [87]:
import pyspark.sql.functions as F

In [99]:
by_month_dest = flights.groupBy("month","dest")
by_month_dest.avg("duration_hrs").show()

+-----+----+-------------------+
|month|dest|  avg(duration_hrs)|
+-----+----+-------------------+
|    4| PHX|  2.262222222222222|
|    1| RDM|0.45625000000000004|
|    5| ONT|  1.996296296296296|
|    7| OMA|                2.7|
|    8| MDW| 3.6358333333333333|
|    6| DEN| 2.1031446540880503|
|    5| IAD|  4.444444444444445|
|   12| COS|  2.341666666666667|
|   11| ANC| 3.2151960784313722|
|    5| AUS| 3.5875000000000004|
|    5| COS| 2.2666666666666666|
|    2| PSP|               2.15|
|    4| ORD| 3.4207317073170738|
|   10| DFW|   3.23030303030303|
|   10| DCA|               4.35|
|    8| JNU| 2.1312499999999996|
|   11| KOA|               6.12|
|   10| OMA|  2.716666666666667|
|    6| ONT| 1.9166666666666667|
|    3| MSP| 2.7383333333333333|
+-----+----+-------------------+
only showing top 20 rows



In [100]:
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    1| RDM|     8.830749846821778|
|    5| ONT|    18.895178691342874|
|    7| OMA|    2.1213203435596424|
|    8| MDW|    14.467659032985843|
|    6| DEN|    13.536905534420026|
|    5| IAD|    3.8078865529319543|
|   12| COS|    1.4142135623730951|
|   11| ANC|    18.604716401245316|
|    5| AUS|     4.031128874149275|
|    5| COS|     33.38163167571851|
|    2| PSP|     4.878524367060187|
|    4| ORD|    11.593882803741764|
|   10| DFW|     45.53019017606675|
|   10| DCA|    0.7071067811865476|
|    8| JNU|     40.79368823727514|
|   11| KOA|    1.8708286933869707|
|   10| OMA|    5.8594652770823155|
|    6| ONT|     25.98316762829351|
|    3| MSP|    21.556779370817555|
+-----+----+----------------------+
only showing top 20 rows



In [101]:
print(airports.show())

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [102]:
airports = airports.withColumnRenamed("faa","dest")
flights_with_airports = flights.join(airports,on="dest",how='leftouter')
print(flights_with_airports.show())

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|               2.2|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|               6.0|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|    

***Step by step machine learning PIPELINE, from data intake to model evaluation***
.transform() method takes a DataFrame and returns a new DataFrame, usually the origin one with a new column appenened.
Class Bucketizer create discreate bins from a continuous features or the class PCA to reduce dimmensionality
of your dataset using principal component analysis.

Estimator class all implement a fit() method. This take a dataframe but insteans of return a dataframe returns a model object.



In [104]:
planes = spark_session.read.csv("planes.csv",header=True)
print(planes.show())

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA

In [105]:
planes = planes.withColumnRenamed("year","plane_year")
model_data = flights.join(planes,on="tailnum",how="leftouter")

Spark only handles numeric data. All columns in your DataFrame must be either integers or decimals.
To cast values you can use cast() work at columns and withColumns() works on DataFrames.

In [110]:
model_data = model_data.withColumn("arr_delay",model_data.arr_delay.cast("integer"))

In [111]:
model_data = model_data.withColumn("air_time",model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month",model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year",model_data.plane_year.cast("integer"))

In [115]:
model_data = model_data.withColumn("plane_age",model_data.year - model_data.plane_year)

In [116]:
model_data = model_data.withColumn("is_late",model_data.arr_delay>0)

In [117]:
model_data = model_data.withColumn("label",model_data.is_late.cast("integer"))

In [118]:
model_data = model_data.filter("""arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL
                               and plane_year is not null""")

one hot vectors or one hot encoding with pyspark.<br>
1. First step create a StringIndexer member of this class are Estimator map each unique string to a number
2. Estimator returns a Transformer that takes a dataframe, attaches the mapping to it as metadata and return a new      DataFrame.<br>
3. Encode this numeric column as one jot vector using a OneHotEncoder.
All you have to do is to create a StringIndexer and a OneHotEncoder and the Pipeline will take care of the rest.

In [121]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

carr_indexer = StringIndexer(inputCol="carrier",outputCol="carrier_index")
carr_encoder = OneHotEncoder(inputCol="carrier_index",outputCol="carrier_fact")

In [124]:
dest_indexer = StringIndexer(inputCol="dest",outputCol="dest_index")
dest_encoder = OneHotEncoder(inputCol="dest_index",outputCol="dest_fact")

LAST STEP IN THE PIPELINE IS TO COMBINE AL COLIMNS CONTAINING OUR FEATURES into a single column. this has to be done before modeling can take place because every spark modeling routine expects the data to be in this form. for this pyspark.ml.feature includes Transformer that takes all of the columns you specify and combines them into a new cector column

In [126]:
from pyspark.ml.feature import VectorAssembler
vec_assembler = VectorAssembler(inputCols=['month','air_time','carrier_fact','dest_fact','plane_age'],outputCol="features")

***SO WE ARE READY TO CREATE A PIPELINE***
Pipeline is a class in the pyspark.ml module that combines all the Estimators and Transformers that you've already
created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object.

In [153]:
from pyspark.ml import Pipeline

In [154]:
flights_pipe = Pipeline(stages=[dest_indexer,dest_encoder,carr_indexer,carr_encoder,vec_assembler])

Test and Train
After you've cleaned your data gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. after that, don't touch your test data until you think you have a good model!
In spark it's important make sure you split the data after all transformation. This is because operations
like StringIndexer don't always produce the same index even when given the same list of strings.

***Pass your data throught the Pipeline you created!*** 

In [156]:
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [157]:
training,test = piped_data.randomSplit([.6,.4])

***HyperParameters improve model perfomance***

In [158]:
from pyspark.ml.classification import LogisticRegression

In [159]:
lr = LogisticRegression()

Cross validation: k-fold cross validation. method to estimate the model performance on unseen data like test dataset.
Pyspark default numer of fold or partition 3. you will use cross validation error to compare all the different models so you can chosee the best one. also to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters.

In [160]:
import pyspark.ml.evaluation as evals

As your model is a BinaryClasification model you will be using the BinaryClassificationEvaluator 
from the pyspark.ml.evaluation module.
This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary
classifier can make(falso positives and false negatives) into a simple number.

In [164]:
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

Make a grid to select the best hyperparameters.The submodule pyspark.ml.tuning includes a class called PramGirdbuilder
that does just that.
you will need use .addGrid and .build methods to create a grid that you can use for cross validation.


In [167]:
import pyspark.ml.tuning as tune
import numpy as np
grid = tune.ParamGridBuilder()
grid = grid.addGrid(lr.regParam,np.arange(0,.1,.01)) #np arange create a list from 0 to .1 increase by 0.1
grid = grid.addGrid(lr.elasticNetParam,[0,1])
grid = grid.build()

In [170]:
##Create a cross validator
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator)

In [171]:
#fit cross validation models
models = cv.fit(training)
#Extract the best model
best_lr = models.bestModel

In [172]:
#best_lr = lr.fit()
print(best_lr)

LogisticRegressionModel: uid = LogisticRegression_b72608d2ba7f, numClasses = 2, numFeatures = 81


In [174]:
best_lr2 = lr.fit(training)
print(best_lr2)

LogisticRegressionModel: uid = LogisticRegression_b72608d2ba7f, numClasses = 2, numFeatures = 81


In [175]:
#Better AUC is to 1 better score is
test_result = best_lr2.transform(test)

In [176]:
evaluator.evaluate(test_result)

0.6805108284777475

Next steps how to create large scale Spark Clusters and manage and submit jobs so that you can use
models in the real world.