### Useful links
#### MLLib docs:
https://spark.apache.org/docs/2.3.2/ml-guide.html

#### Check if we have SparkSession active

In [1]:
spark

#### Most important facts about Spark MLLib

* works only with numeric data
* models accepts training data and labels as two columns. For labels it is normal, but training data has to be vector of variables you want to use. Below you will learn how to build such vectors
* API is very similar to scikit-learn with .fit and .transform methods

#### For sample execution we will load original data. After checking how it works I would like you to load data prepared in previous step and build model based on it

In [2]:
taxi = spark.sql("""SELECT taxi_id,
                        trip_start_timestamp,
                        trip_end_timestamp,
                        trip_seconds,
                        trip_miles,
                        pickup_census_tract,
                        dropoff_census_tract,
                        pickup_community_area,
                        dropoff_community_area,
                        fare,
                        tips,
                        tolls,
                        extras,
                        trip_total,
                        company,
                        IF(payment_type='Credit Card',1,0) target
                    FROM
                        tomek.taxi_cleaned
                    WHERE
                        yyyymm BETWEEN 201601 AND 201612
                    """)

In [3]:
# we will show only first 4 columns for readability
taxi = taxi.select('target','company','fare','trip_seconds','pickup_community_area','dropoff_community_area')
taxi.show(4)

+------+--------------------+------+------------+---------------------+----------------------+
|target|             company|  fare|trip_seconds|pickup_community_area|dropoff_community_area|
+------+--------------------+------+------------+---------------------+----------------------+
|     0|                    |2925.0|        1020|                   32|                    56|
|     1|Taxi Affiliation ...|1850.0|        1860|                   33|                     8|
|     1|                    |1250.0|         960|                   33|                    28|
|     0|Dispatch Taxi Aff...| 825.0|         720|                    8|                     8|
+------+--------------------+------+------------+---------------------+----------------------+
only showing top 4 rows



#### Basic statistics in MLLib

Check if we have all numeric features stored as numeric as spark is accepting only numeric data

Hint: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchema

In [4]:
taxi.printSchema()

root
 |-- target: integer (nullable = false)
 |-- company: string (nullable = true)
 |-- fare: float (nullable = true)
 |-- trip_seconds: long (nullable = true)
 |-- pickup_community_area: long (nullable = true)
 |-- dropoff_community_area: long (nullable = true)



Looks like there is nothing to convert

#### Get rid of null values

As you can see in initial show statement there are some null values in our data. We have to get rid of them, because null values are not accepted in some spark methods.

In [5]:
from pyspark.sql.functions import col

In [6]:
taxi.filter(col("fare").isNull()).count()

0

In [7]:
taxi.filter(col("pickup_community_area").isNull()).count()

1447175

In [8]:
taxi.filter(col("dropoff_community_area").isNull()).count()

1716431

In [9]:
taxi.count()

15061895

In [10]:
taxi.printSchema()

root
 |-- target: integer (nullable = false)
 |-- company: string (nullable = true)
 |-- fare: float (nullable = true)
 |-- trip_seconds: long (nullable = true)
 |-- pickup_community_area: long (nullable = true)
 |-- dropoff_community_area: long (nullable = true)



In [11]:
taxi = taxi.na.fill(-99)

In [12]:
taxi.filter(col("pickup_community_area").isNull()).count()

0

In [13]:
taxi.filter(col("dropoff_community_area").isNull()).count()

0

#### Transformation of columns

##### Vector assembler

Vector assembler is used for building column of vectors of variables. Most MLLib algos accepts only such input

In [14]:
from pyspark.ml.feature import VectorAssembler

In [15]:
assembler = VectorAssembler(
    inputCols=['fare','trip_seconds','pickup_community_area','dropoff_community_area'],
    outputCol="numeric_features")

In [16]:
assembler\
    .transform(taxi)\
    .select('fare',
            'trip_seconds',
            'pickup_community_area',
            'dropoff_community_area',
            'numeric_features')\
    .show(10)

+------+------------+---------------------+----------------------+--------------------+
|  fare|trip_seconds|pickup_community_area|dropoff_community_area|    numeric_features|
+------+------------+---------------------+----------------------+--------------------+
| 800.0|         420|                   24|                     8|[800.0,420.0,24.0...|
|1750.0|        1080|                    8|                     6|[1750.0,1080.0,8....|
| 800.0|         660|                    8|                    32|[800.0,660.0,8.0,...|
| 725.0|         420|                   24|                     8|[725.0,420.0,24.0...|
| 675.0|         360|                    8|                    24|[675.0,360.0,8.0,...|
| 525.0|         240|                   32|                    32|[525.0,240.0,32.0...|
|1350.0|         921|                   24|                     8|[1350.0,921.0,24....|
| 875.0|         660|                    8|                    24|[875.0,660.0,8.0,...|
|4775.0|        4440|           

Please remember that to save such operation you have to overwrite original df

In [17]:
taxi = assembler.transform(taxi)

##### Binarizer

Binarization is the process of thresholding numerical features to binary (0/1) features.

In [18]:
from pyspark.ml.feature import Binarizer

In [19]:
from pyspark.sql.functions import avg

Let's add column indicating if trip time is higher or lower than average

In [20]:
# let's calculate average trip time
taxi.select(avg(col("trip_seconds"))).show()

+-----------------+
|avg(trip_seconds)|
+-----------------+
|863.3475186887174|
+-----------------+



In [21]:
# and now extract time
taxi.select(avg(col("trip_seconds"))).collect()[0][0]

863.3475186887174

In [22]:
taxi.select(avg(col("trip_seconds"))).collect()[0][0]

863.3475186887174

In [23]:
taxi = taxi.withColumn("trip_seconds_double",col("trip_seconds").cast("double"))

In [24]:
binarizer = Binarizer(threshold=taxi.select(avg(col("trip_seconds"))).collect()[0][0],
                      inputCol="trip_seconds_double",
                      outputCol="binarized_features")

In [25]:
binarizer.transform(taxi).select("trip_seconds","binarized_features").show(10)

+------------+------------------+
|trip_seconds|binarized_features|
+------------+------------------+
|         420|               0.0|
|         180|               0.0|
|         720|               0.0|
|         660|               0.0|
|         360|               0.0|
|         660|               0.0|
|         360|               0.0|
|         300|               0.0|
|        2460|               1.0|
|         180|               0.0|
+------------+------------------+
only showing top 10 rows



And again save results!

In [26]:
taxi = binarizer.transform(taxi)

You can also binarize whole vector at once. For this case we will use features vector created in last step.

In [27]:
binarizer = Binarizer(threshold=0.5, inputCol="numeric_features", outputCol="binarized_features_2")

In [28]:
binarizer.transform(taxi).select("numeric_features","binarized_features_2").show(10)

+--------------------+--------------------+
|    numeric_features|binarized_features_2|
+--------------------+--------------------+
|[3325.0,1740.0,-9...|   [1.0,1.0,0.0,0.0]|
|[4475.0,2340.0,-9...|   [1.0,1.0,0.0,0.0]|
|[1800.0,960.0,-99...|   [1.0,1.0,0.0,0.0]|
|[650.0,540.0,-99....|   [1.0,1.0,0.0,0.0]|
|[1575.0,780.0,-99...|   [1.0,1.0,0.0,0.0]|
|[4475.0,1560.0,-9...|   [1.0,1.0,0.0,0.0]|
|[765.0,540.0,8.0,...|   [1.0,1.0,1.0,1.0]|
|[425.0,180.0,8.0,...|   [1.0,1.0,1.0,1.0]|
|[1005.0,600.0,-99...|   [1.0,1.0,0.0,0.0]|
|[1125.0,1260.0,-9...|   [1.0,1.0,0.0,0.0]|
+--------------------+--------------------+
only showing top 10 rows



This time we will not save it as it does not make any sense

##### StringIndexer

MLLib is accepting only numeric variables as input. We have to convert any categorical columns to numeric ones. We will use string indexer for it.

StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0. The unseen labels will be put at index numLabels if user chooses to keep them. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.

In [29]:
from pyspark.ml.feature import StringIndexer

Again check for strings

In [30]:
taxi.printSchema()

root
 |-- target: integer (nullable = false)
 |-- company: string (nullable = true)
 |-- fare: float (nullable = false)
 |-- trip_seconds: long (nullable = true)
 |-- pickup_community_area: long (nullable = true)
 |-- dropoff_community_area: long (nullable = true)
 |-- numeric_features: vector (nullable = true)
 |-- trip_seconds_double: double (nullable = true)
 |-- binarized_features: double (nullable = true)



In [31]:
taxi.select('company').show(4)

+--------------------+
|             company|
+--------------------+
|                    |
|Taxi Affiliation ...|
|Taxi Affiliation ...|
|                    |
+--------------------+
only showing top 4 rows



Check if we have empty strings. If yes we have to put there some kind of representation as it is needed for further usage. Some string methods in spark does not accept variables with empty strings!

In [32]:
taxi.groupBy("company").count().sort(col("count").desc()).show()

+--------------------+-------+
|             company|  count|
+--------------------+-------+
|                    |7306268|
|Taxi Affiliation ...|3538219|
|Dispatch Taxi Aff...|1465481|
|Choice Taxi Assoc...| 975823|
|Northwest Managem...| 439672|
|Blue Ribbon Taxi ...| 337064|
|KOAM Taxi Associa...| 317938|
| Top Cab Affiliation| 286144|
|Chicago Medallion...| 201350|
|Chicago Medallion...|  72284|
|6743 - 78771 Luha...|   6214|
|        5129 - 87128|   5619|
|0118 - 42111 Godf...|   4982|
|3141 - 87803 Zip Cab|   4912|
|1085 - 72312 N an...|   4701|
|3011 - 66308 JBL ...|   4475|
|2092 - 61288 Sbei...|   4428|
|6574 - Babylon Ex...|   4253|
|5724 - 75306 KYVI...|   4065|
|3152 - 97284 Crys...|   3750|
+--------------------+-------+
only showing top 20 rows



This is most popular value. Let's populate it with something.

In [33]:
from pyspark.sql.functions import when

In [34]:
taxi = taxi.withColumn("company",when(col("company")=="","empty").otherwise(col("company")))

In [35]:
taxi.groupBy("company").count().sort(col("count").desc()).show()

+--------------------+-------+
|             company|  count|
+--------------------+-------+
|               empty|7306268|
|Taxi Affiliation ...|3538219|
|Dispatch Taxi Aff...|1465481|
|Choice Taxi Assoc...| 975823|
|Northwest Managem...| 439672|
|Blue Ribbon Taxi ...| 337064|
|KOAM Taxi Associa...| 317938|
| Top Cab Affiliation| 286144|
|Chicago Medallion...| 201350|
|Chicago Medallion...|  72284|
|6743 - 78771 Luha...|   6214|
|        5129 - 87128|   5619|
|0118 - 42111 Godf...|   4982|
|3141 - 87803 Zip Cab|   4912|
|1085 - 72312 N an...|   4701|
|3011 - 66308 JBL ...|   4475|
|2092 - 61288 Sbei...|   4428|
|6574 - Babylon Ex...|   4253|
|5724 - 75306 KYVI...|   4065|
|3152 - 97284 Crys...|   3750|
+--------------------+-------+
only showing top 20 rows



In [36]:
from pyspark.ml.feature import StringIndexer

In [37]:
string_indexer = StringIndexer(inputCol="company",outputCol="company_indexed")
taxi = string_indexer.fit(taxi).transform(taxi)

Label 0 is assigned to most popular value, label 1 for next and so on.

Additionally, there are three strategies regarding how StringIndexer will handle unseen labels when you have fit a StringIndexer on one dataset and then use it to transform another:
* throw an exception (which is the default)
* skip the row containing the unseen label entirely
* put unseen labels in a special additional bucket, at index numLabels

##### OneHotEncoder

Some algos are expecting features to be continues ones - like Logistic regression. We have to encode our categorical features.

One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using StringIndexer first.

In [38]:
from pyspark.ml.feature import OneHotEncoderEstimator

In [39]:
# Please modify to transform column you have used. It can transform multiple columns. Just put them in list.
encoder = OneHotEncoderEstimator(inputCols=["company_indexed"],
                                 outputCols=["company_indexed_vec"],
                                 handleInvalid='keep')

In [40]:
encoder.fit(taxi).transform(taxi).select("company_indexed","company_indexed_vec").show(10)

+---------------+-------------------+
|company_indexed|company_indexed_vec|
+---------------+-------------------+
|            0.0|     (56,[0],[1.0])|
|            0.0|     (56,[0],[1.0])|
|            0.0|     (56,[0],[1.0])|
|            3.0|     (56,[3],[1.0])|
|            3.0|     (56,[3],[1.0])|
|            4.0|     (56,[4],[1.0])|
|            0.0|     (56,[0],[1.0])|
|            0.0|     (56,[0],[1.0])|
|            2.0|     (56,[2],[1.0])|
|            0.0|     (56,[0],[1.0])|
+---------------+-------------------+
only showing top 10 rows



In [41]:
taxi = encoder.fit(taxi).transform(taxi)

#### Now we will gather all features as MLLib is accepting only feature vector as a column

In [42]:
taxi.printSchema()

root
 |-- target: integer (nullable = false)
 |-- company: string (nullable = true)
 |-- fare: float (nullable = false)
 |-- trip_seconds: long (nullable = true)
 |-- pickup_community_area: long (nullable = true)
 |-- dropoff_community_area: long (nullable = true)
 |-- numeric_features: vector (nullable = true)
 |-- trip_seconds_double: double (nullable = true)
 |-- binarized_features: double (nullable = true)
 |-- company_indexed: double (nullable = false)
 |-- company_indexed_vec: vector (nullable = true)



In [43]:
feature_assembler = VectorAssembler(inputCols=["numeric_features",
                                               "binarized_features",
                                               "company_indexed_vec"],
                                    outputCol="training_features")

In [44]:
feature_assembler.transform(taxi).select("training_features").show()

+--------------------+
|   training_features|
+--------------------+
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,4,5]...|
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,8],[...|
|(61,[0,1,2,3,8],[...|
|(61,[0,1,2,3,9],[...|
|(61,[0,1,2,3,4,5]...|
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,4,7]...|
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,7],[...|
|(61,[0,1,2,3,4,5]...|
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,4,5]...|
|(61,[0,1,2,3,9],[...|
|(61,[0,1,2,3,4,6]...|
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,5],[...|
|(61,[0,1,2,3,9],[...|
|(61,[0,1,2,3,4,6]...|
+--------------------+
only showing top 20 rows



In [45]:
taxi = feature_assembler.transform(taxi)

#### Let's prepare label now

In [46]:
taxi = taxi \
    .withColumn("target",taxi.target.cast("double"))

#### And train first model now!

In [47]:
from pyspark.ml.classification import LogisticRegression

In [48]:
(trainingData, testData) = taxi.randomSplit([0.7, 0.3],seed=1254129345)

In [51]:
lr = LogisticRegression(labelCol="target", featuresCol="training_features")

In [52]:
model = lr.fit(trainingData)

In [53]:
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [0.0002601895981080373,-2.6717119402180077e-05,0.0020725557511046323,-0.000684387006512551,0.07664988469450165,-0.3004405781570859,-0.2668672699268945,-0.06769330257006727,0.052594491093831915,-0.07810495796184856,-0.17201551578280883,-0.22616775645757659,-0.36103002528321143,-0.3457422934426323,-0.35833254539856474,-1.4136468195545173,0.1596929872001298,-0.7687137542437171,-0.3323240351863917,-0.31625049924330106,0.09046467491945548,-5.071261369160556,-0.019706944889140373,-0.06427560196830057,-0.5100102592724889,-0.20474427863371567,-0.29361702695327685,-0.1619791640384127,0.12301203595031768,-1.1028813448242347,-0.17752059354614438,-0.12774403917696137,-0.40254338553778385,-0.587488800705374,-0.20753290524457177,-0.7446846339243893,0.010012199177659581,-0.37937009242837655,-0.03912486010517421,-0.3506676257877196,0.040918413295948274,-1.110641714688644,-0.5724476269424014,-0.47597134685117953,-0.16273944826372905,-1.5290306434422898,0.0735526982548517,0.113663316092060

In [54]:
predictions = model.transform(testData)

In [55]:
predictions.select("prediction", "target", "training_features").show(5)

+----------+------+--------------------+
|prediction|target|   training_features|
+----------+------+--------------------+
|       0.0|   0.0|(61,[0,1,2,3,17],...|
|       0.0|   0.0|(61,[0,1,2,3,17],...|
|       0.0|   0.0|(61,[0,1,2,3,17],...|
|       0.0|   0.0|(61,[0,1,2,3,17],...|
|       0.0|   0.0|(61,[0,1,2,3,17],...|
+----------+------+--------------------+
only showing top 5 rows



#### How good we are?

In [56]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [57]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.408599 


#### Let's check some trivial solutions

In [58]:
taxi.groupBy("target").count().show()

+------+-------+
|target|  count|
+------+-------+
|   0.0|8099718|
|   1.0|6962177|
+------+-------+



In [59]:
from pyspark.sql.functions import lit

In [60]:
predictions = predictions.withColumn("all_zeros",lit(0.0))

In [61]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", predictionCol="all_zeros", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.462161 


### Let's try to improve our results with CrossValidation and Parameters tuning

In [62]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [63]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0.0, 1.0]) \
    .addGrid(lr.regParam, [0.0, 0.1]) \
    .build()

In [64]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", predictionCol="prediction", metricName="accuracy")

In [65]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)  # use 3+ folds in practice

In [66]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainingData)

In [67]:
predictions = cvModel.transform(testData)

In [68]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.40878 


## Now I would like you to work with data prepared in previous notebook and train your own model