# Dataproc Machine Learning Demo

In this notebook will demonstrate building a machine learning model using PySpark on Dataproc.
Will build a model to predict taxi fares based on a public dataset covering NYC taxi records.

## Import libraries and initialise spark

In [81]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import isnan, when, count, col
import datetime as dt

# Initialise spark session
spark = SparkSession.builder.master('yarn').appName('spark-test').getOrCreate()

# Define temporary bucket for BQ connector
bucket = "pyspark_temp_20201006_eu"
spark.conf.set('temporaryGcsBucket', bucket)

## Load data

In [82]:
# Load data from BigQuery.
df = spark.read.format('bigquery').option('table', 'sap-ds-demo:big_data_demo_ew2.ny_taxi_1k').load()
df.createOrReplaceTempView('taxi')

## Explore data

In [68]:
df.limit(10).toPandas()

Unnamed: 0,vendor_id,journey_length,year,month,dayofweek,pu_hour,do_hour,passenger_count,trip_distance,pickup_longitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,total_amount,pu_m_center,do_m_center,pu_m_jfk,do_m_jfk
0,CMT,3,2009,1,7,0,0,1,0.7,-73.998067,40.682297,,-73.992187,40.690419,Cash,5.0,7543.506,6558.198,7543.506,6558.198
1,CMT,4,2009,1,6,0,0,1,0.7,-73.994757,40.728271,,-73.987781,40.725424,Cash,5.0,5021.685,4464.615,5021.685,4464.615
2,CMT,2,2009,1,1,0,0,1,1.0,-73.952712,40.776503,,-73.962566,40.763019,Cash,5.0,5311.012,4276.116,5311.012,4276.116
3,CMT,4,2009,1,6,0,0,1,0.7,-73.983303,40.761363,,-73.97145,40.756312,Cash,5.0,5299.641,4180.02,5299.641,4180.02
4,CMT,3,2009,1,5,0,0,1,1.0,-73.968815,40.760197,,-73.958751,40.772449,Cash,5.0,4338.54,5056.222,4338.54,5056.222
5,CMT,4,2009,1,4,0,0,2,0.8,-73.92557,40.762173,,-73.938154,40.764812,Cash,5.0,3602.989,3810.998,3602.989,3810.998
6,CMT,4,2009,1,6,0,0,2,0.7,-73.990919,40.739789,,-73.987641,40.73826,Cash,5.0,4800.886,4496.195,4800.886,4496.195
7,CMT,3,2009,1,6,0,0,1,1.0,-73.988933,40.764411,,-73.982519,40.775709,Cash,5.0,5880.781,6403.661,5880.781,6403.661
8,CMT,3,2009,1,4,0,0,1,0.5,-73.985929,40.743614,,-73.988725,40.740323,Credit,5.75,4508.732,4633.881,4508.732,4633.881
9,CMT,6,2009,1,3,0,0,1,1.7,0.0,0.0,,0.0,0.0,Cash,7.0,8661586.0,8661586.0,8661586.0,8661586.0


In [69]:
print("Data length: {}".format(df.count()))

Data length: 1000


In [70]:
df.describe().toPandas()

Unnamed: 0,summary,vendor_id,journey_length,year,month,dayofweek,pu_hour,do_hour,passenger_count,trip_distance,...,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,total_amount,pu_m_center,do_m_center,pu_m_jfk,do_m_jfk
0,count,1000,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,...,1000.0,1000.0,1000.0,1000.0,1000,1000.0,1000.0,1000.0,1000.0,1000.0
1,mean,,10.374,2009.0,6.386,4.248,0.0,0.148,1.409,2.849099999999994,...,39.97032397499996,0.0126874279123414,-72.56991482899986,39.97190185800007,,10.697440000000055,169850.98340087276,170012.31159386612,169850.98340087276,170012.31159386612
2,stddev,,7.298314147074608,0.0,3.4171158424438404,2.2292297919832014,0.0,0.3552776691859793,0.7511879147612022,2.800993529938039,...,5.5654661649360815,0.1119862572655945,10.104574412163986,5.565733010226149,,7.7881508035822815,1182377.957431879,1182356.3327869084,1182377.957431879,1182356.3327869084
3,min,CMT,0.0,2009.0,1.0,1.0,0.0,0.0,1.0,0.0,...,0.0,,-74.038249,0.0,Cash,2.5,1666.118136647689,588.0298057704706,1666.118136647689,588.0298057704706
4,max,CMT,54.0,2009.0,12.0,7.0,0.0,1.0,4.0,20.5,...,40.849635,1.0,0.0,40.955138,No Charge,71.94,8661585.636265686,8661585.636265686,8661585.636265686,8661585.636265686


## Prepare model data

In [83]:
model_data = df.select("total_amount", "journey_length", "year", "month", 
                         "dayofweek", "pu_hour", "do_hour", 
                         "passenger_count", "trip_distance", 
                         "pu_m_center", 
                         "do_m_center", "pu_m_jfk", "do_m_jfk")
model_data.toPandas()

Unnamed: 0,total_amount,journey_length,year,month,dayofweek,pu_hour,do_hour,passenger_count,trip_distance,pu_m_center,do_m_center,pu_m_jfk,do_m_jfk
0,5.0,3,2009,1,7,0,0,1,0.7,7543.506091,6558.197960,7543.506091,6558.197960
1,5.0,4,2009,1,6,0,0,1,0.7,5021.685125,4464.615259,5021.685125,4464.615259
2,5.0,2,2009,1,1,0,0,1,1.0,5311.012485,4276.116166,5311.012485,4276.116166
3,5.0,4,2009,1,6,0,0,1,0.7,5299.640950,4180.020199,5299.640950,4180.020199
4,5.0,3,2009,1,5,0,0,1,1.0,4338.539500,5056.222290,4338.539500,5056.222290
...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,8.7,6,2009,12,1,0,0,3,2.5,5981.473283,4873.163178,5981.473283,4873.163178
996,10.7,10,2009,12,3,0,1,1,2.7,7135.936496,5736.789152,7135.936496,5736.789152
997,10.7,9,2009,12,6,0,0,1,3.5,4243.779268,4854.155165,4243.779268,4854.155165
998,12.7,12,2009,12,6,0,0,2,4.1,4555.891339,4555.891339,4555.891339,4555.891339


In [72]:
model_data_map = model_data.rdd.map(lambda x: (x[0], DenseVector(x[1:])))    
train_test = spark.createDataFrame(model_data_map, ["label", "features"])
train_test.toPandas()


Unnamed: 0,label,features
0,5.0,"[3.0, 2009.0, 1.0, 7.0, 0.0, 0.0, 1.0, 0.7, 75..."
1,5.0,"[4.0, 2009.0, 1.0, 6.0, 0.0, 0.0, 1.0, 0.7, 50..."
2,5.0,"[2.0, 2009.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 53..."
3,5.0,"[4.0, 2009.0, 1.0, 6.0, 0.0, 0.0, 1.0, 0.7, 52..."
4,5.0,"[3.0, 2009.0, 1.0, 5.0, 0.0, 0.0, 1.0, 1.0, 43..."
...,...,...
995,8.7,"[6.0, 2009.0, 12.0, 1.0, 0.0, 0.0, 3.0, 2.5, 5..."
996,10.7,"[10.0, 2009.0, 12.0, 3.0, 0.0, 1.0, 1.0, 2.7, ..."
997,10.7,"[9.0, 2009.0, 12.0, 6.0, 0.0, 0.0, 1.0, 3.5, 4..."
998,12.7,"[12.0, 2009.0, 12.0, 6.0, 0.0, 0.0, 2.0, 4.1, ..."


In [73]:
train, test = train_test.randomSplit([.8,.2],seed=1234)

## Fit model

In [74]:
rf = RandomForestRegressor(featuresCol="features")
model = rf.fit(train)

## Make predictions for test data

In [75]:
predictions = model.transform(test)

## Evaluate test predictions

In [76]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
    
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
    
print("RMSE: {:.2f}, R2 {:.2f}".format(rmse, r2))

RMSE: 3.69, R2 0.73


## Load the data from GCS

In [79]:
taxi_1m = spark.read.load('gs://pyspark_temp_20201006_eu/ny_taxi_1m',
                    format='com.databricks.spark.csv', 
                    header='true', 
                    inferSchema='true').cache()

In [80]:
taxi_1m.toPandas()

Unnamed: 0,vendor_id,journey_length,year,month,dayofweek,pu_hour,do_hour,passenger_count,trip_distance,pickup_longitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,total_amount,pu_m_center,do_m_center,pu_m_jfk,do_m_jfk
0,1,24,2015,3,2,0,0,1,17.3,-73.789612,40.646900,N,-73.985168,40.751045,1,72.66,1.540813e+04,4780.791020,1.540813e+04,4780.791020
1,1,5,2015,2,5,0,0,2,1.1,-73.975731,40.744705,N,-73.968338,40.756252,1,7.30,3.754140e+03,3987.976902,3.754140e+03,3987.976902
2,1,10,2016,2,7,0,0,1,3.6,0.000000,0.000000,N,-74.009850,40.705559,1,17.15,8.661586e+06,6877.169890,8.661586e+06,6877.169890
3,1,7,2015,6,4,0,0,1,4.1,-73.974754,40.742111,N,-74.006172,40.706516,1,17.15,3.566295e+03,6550.707277,3.566295e+03,6550.707277
4,1,34,2015,8,1,0,0,1,13.3,-74.004074,40.713142,N,-73.876450,40.841442,1,46.80,6.117242e+03,13280.872906,6.117242e+03,13280.872906
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,CMT,1239,2009,7,6,23,20,1,15.3,-73.972213,40.786376,0,-73.934621,40.724848,No Charge,2.50,6.938869e+03,642.839580,6.938869e+03,642.839580
999996,CMT,0,2009,5,5,23,23,1,0.0,-74.006290,40.739652,0,-74.006036,40.739841,No Charge,2.50,6.070091e+03,6052.507183,6.070091e+03,6052.507183
999997,CMT,7,2009,3,7,23,23,1,2.2,-73.994046,40.751259,0,-73.981915,40.778080,No Charge,8.20,5.460385e+03,6581.597964,5.460385e+03,6581.597964
999998,CMT,22,2009,2,6,23,0,4,7.8,-74.003080,40.749102,0,-73.931683,40.802867,No Charge,21.00,6.074007e+03,8040.213007,6.074007e+03,8040.213007


In [65]:
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.app.name', 'spark-test'),
 ('spark.ui.proxyBase', '/proxy/application_1602494989817_0006'),
 ('spark.driver.port', '44713'),
 ('spark.driver.host', 'spark-jupyter-m.c.sap-ds-demo.internal'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.yarn.historyServer.address', 'spark-jupyter-m:18080'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'spark-jupyter-m'),
 ('spark.executorEnv.PYTHONPATH',
  '{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.executor.memory', '5586m'),
 ('spark.executor.instances', '2'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.app.id', 'application_1602494989817_0006'),
 ('spark.driver.maxResultSize', '1920m'),
 ('spark.shuffle.service.enabled', 'true'),
 ('spark.scheduler.mode', '