In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://apache.osuosl.org/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
# !tar xf https://www-eu.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# Set environmental variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-2.2.1-bin-hadoop2.7"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NYC').getOrCreate()

In [0]:
model = spark.read.csv("model_subset.csv", header=True, inferSchema=True)

In [0]:
model.printSchema()

root
 |-- hour: integer (nullable = true)
 |-- date_yellow: integer (nullable = true)
 |-- day_number: integer (nullable = true)
 |-- day: string (nullable = true)
 |-- is_weekend: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- duration: double (nullable = true)
 |-- speed: double (nullable = true)



In [0]:
model.show(5)

+----+-----------+----------+---+----------+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+------------------+
|hour|date_yellow|day_number|day|is_weekend|VendorID|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|          duration|             speed|
+----+-----------+----------+---+----------+--------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------------------+------------------+
|   0|          6|         7|Sun|         1|       1|              1|          0.7|         1|                 N|          79|         113|           1|        5.0|  0.5|  

## Regression

In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance'], outputCol = 'features')
regression_df = vectorAssembler.transform(model)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------+-----------+
|features|fare_amount|
+--------+-----------+
|   [0.7]|        5.0|
|   [1.9]|        8.5|
|   [3.0]|       12.0|
+--------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [2.8218350622100417]
Intercept: 4.511944780778228


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2.006527
r2: 0.833571


In [0]:
train_df.describe().show()

+-------+-----------------+
|summary|      fare_amount|
+-------+-----------------+
|  count|           114232|
|   mean|10.00305518593739|
| stddev|4.918505346559324|
|    min|              3.0|
|    max|             28.5|
+-------+-----------------+



In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+--------+
|       prediction|fare_amount|features|
+-----------------+-----------+--------+
|4.540163131400328|        3.0|  [0.01]|
|4.540163131400328|        3.0|  [0.01]|
|4.540163131400328|        3.5|  [0.01]|
|4.540163131400328|       25.0|  [0.01]|
|4.568381482022429|        3.0|  [0.02]|
+-----------------+-----------+--------+
only showing top 5 rows

R Squared (R2) on test data = 0.835036


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 2.00731


AttributeError: ignored

## Dummy 

In [0]:
from pyspark.sql import functions as F

df = spark.createDataFrame([
    (1, "a"),
    (2, "b"),
    (3, "c"),
    (4, "a"),
    (5, "b")],
    ["ID", "Text"])    

df.groupBy("ID").pivot("Text").agg(F.lit(1)).na.fill(0).show()

+---+---+---+---+
| ID|  a|  b|  c|
+---+---+---+---+
|  5|  0|  1|  0|
|  1|  1|  0|  0|
|  3|  0|  0|  1|
|  2|  0|  1|  0|
|  4|  1|  0|  0|
+---+---+---+---+



In [0]:
p = model.select('payment_type')
p.groupBy('payment_type').count().show()

+------------+------+
|payment_type| count|
+------------+------+
|           1|113136|
|           3|   597|
|           4|   150|
|           2| 49573|
+------------+------+



In [0]:
yellow = model.select('trip_distance','payment_type','fare_amount')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
yellow.show(5)

+-------------+------------+-----------+---+
|trip_distance|payment_type|fare_amount| id|
+-------------+------------+-----------+---+
|          0.7|           1|        5.0|  0|
|          1.9|           2|        8.5|  1|
|          3.0|           2|       12.0|  2|
|         2.88|           1|       11.0|  3|
|         6.77|           1|       25.0|  4|
+-------------+------------+-----------+---+
only showing top 5 rows



In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("payment_type").agg(F.lit(1)).na.fill(0)

In [0]:
new_yellow.show()

+----------+---+---+---+---+
|        id|  1|  2|  3|  4|
+----------+---+---+---+---+
|     17703|  1|  0|  0|  0|
|     23766|  1|  0|  0|  0|
|     35148|  1|  0|  0|  0|
|     41424|  1|  0|  0|  0|
|    102561|  1|  0|  0|  0|
|8589974015|  1|  0|  0|  0|
|     22165|  1|  0|  0|  0|
|     91628|  1|  0|  0|  0|
|     96729|  1|  0|  0|  0|
|     97216|  1|  0|  0|  0|
|8589940873|  1|  0|  0|  0|
|8589952622|  1|  0|  0|  0|
|8589993287|  1|  0|  0|  0|
|     34340|  0|  1|  0|  0|
|     55671|  1|  0|  0|  0|
|     98559|  0|  1|  0|  0|
|    101155|  1|  0|  0|  0|
|8589953403|  0|  1|  0|  0|
|8589955092|  1|  0|  0|  0|
|8589963501|  0|  0|  1|  0|
+----------+---+---+---+---+
only showing top 20 rows



In [0]:
#yellow.filter(yellow["id"] == 102561).show()

In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer").drop("id")

In [0]:
dummy_encoded_df.show(5)

+-------------+------------+-----------+---+---+---+---+
|trip_distance|payment_type|fare_amount|  1|  2|  3|  4|
+-------------+------------+-----------+---+---+---+---+
|         2.54|           1|       13.5|  1|  0|  0|  0|
|         1.01|           1|        6.0|  1|  0|  0|  0|
|          1.9|           1|        9.0|  1|  0|  0|  0|
|         1.13|           1|        8.5|  1|  0|  0|  0|
|         1.29|           2|        6.5|  0|  1|  0|  0|
+-------------+------------+-----------+---+---+---+---+
only showing top 5 rows



In [0]:
dummy_encoded_df = dummy_encoded_df.drop('payment_type')

In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance', '1', '2', '3'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+------------------+-----------+
|          features|fare_amount|
+------------------+-----------+
|[2.54,1.0,0.0,0.0]|       13.5|
|[1.01,1.0,0.0,0.0]|        6.0|
| [1.9,1.0,0.0,0.0]|        9.0|
+------------------+-----------+
only showing top 3 rows



In [0]:
#vectorAssembler = VectorAssembler(inputCols = ['trip_distance', '1', '2', '3'], outputCol = 'features')
#regression_df = vectorAssembler.transform(dummy_encoded_df)
#regression_df.show(5)

In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [2.824493887017495,0.0,0.0,0.0]
Intercept: 4.508720954342777


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2.004808
r2: 0.833768


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+------------------+
|       prediction|fare_amount|          features|
+-----------------+-----------+------------------+
|5.356069120448025|        4.0|     (4,[0],[0.3])|
|5.073619731746276|        3.5| [0.2,0.0,1.0,0.0]|
|5.356069120448025|        4.0| [0.3,0.0,1.0,0.0]|
|5.525538753669075|        3.5|[0.36,0.0,1.0,0.0]|
|5.638518509149774|        3.5| [0.4,0.0,1.0,0.0]|
+-----------------+-----------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.834909


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 2.00939


## Dummy Variable with RateCode ID

In [0]:
from pyspark.sql import functions as F

df = spark.createDataFrame([
    (1, "a"),
    (2, "b"),
    (3, "c"),
    (4, "a"),
    (5, "b")],
    ["ID", "Text"])    

df.groupBy("ID").pivot("Text").agg(F.lit(1)).na.fill(0).show()

+---+---+---+---+
| ID|  a|  b|  c|
+---+---+---+---+
|  5|  0|  1|  0|
|  1|  1|  0|  0|
|  3|  0|  0|  1|
|  2|  0|  1|  0|
|  4|  1|  0|  0|
+---+---+---+---+



In [0]:
p = model.select('RateCodeID')
p.groupBy('RateCodeID').count().show()#Remove 3 for regression


+----------+------+
|RateCodeID| count|
+----------+------+
|         1|163408|
|         3|    14|
|         5|    19|
|         4|    15|
+----------+------+



In [0]:
yellow = model.select('trip_distance','payment_type','RateCodeID','fare_amount')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
yellow.show(5)

+-------------+------------+----------+-----------+---+
|trip_distance|payment_type|RateCodeID|fare_amount| id|
+-------------+------------+----------+-----------+---+
|          0.7|           1|         1|        5.0|  0|
|          1.9|           2|         1|        8.5|  1|
|          3.0|           2|         1|       12.0|  2|
|         2.88|           1|         1|       11.0|  3|
|         6.77|           1|         1|       25.0|  4|
+-------------+------------+----------+-----------+---+
only showing top 5 rows



In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("payment_type").agg(F.lit(1)).na.fill(0)


In [0]:
new_yellow2 = yellow.groupBy("id").pivot("RateCodeID").agg(F.lit(1)).na.fill(0)

In [0]:
new_yellow2.show()

+----------+---+---+---+---+
|        id|  1|  3|  4|  5|
+----------+---+---+---+---+
|     10156|  1|  0|  0|  0|
|     17703|  1|  0|  0|  0|
|     23766|  1|  0|  0|  0|
|     35148|  1|  0|  0|  0|
|     41424|  1|  0|  0|  0|
|     47492|  1|  0|  0|  0|
|     71530|  1|  0|  0|  0|
|    102561|  1|  0|  0|  0|
|8589974015|  1|  0|  0|  0|
|8589992667|  1|  0|  0|  0|
|     22165|  1|  0|  0|  0|
|     91628|  1|  0|  0|  0|
|     96729|  1|  0|  0|  0|
|     97216|  1|  0|  0|  0|
|    101011|  1|  0|  0|  0|
|8589940873|  1|  0|  0|  0|
|8589952622|  1|  0|  0|  0|
|8589993287|  1|  0|  0|  0|
|     55671|  1|  0|  0|  0|
|    101155|  1|  0|  0|  0|
+----------+---+---+---+---+
only showing top 20 rows



In [0]:
new_yellow = new_yellow.withColumnRenamed('1','p1')
new_yellow =new_yellow.withColumnRenamed('2','p2')
new_yellow =new_yellow.withColumnRenamed('3','p3')
new_yellow =new_yellow.withColumnRenamed('4','p4')
new_yellow.show(5)

+------+---+---+---+---+
|    id| p1| p2| p3| p4|
+------+---+---+---+---+
| 17703|  1|  0|  0|  0|
| 23766|  1|  0|  0|  0|
| 35148|  1|  0|  0|  0|
| 41424|  1|  0|  0|  0|
|102561|  1|  0|  0|  0|
+------+---+---+---+---+
only showing top 5 rows



In [0]:
new_yellow2 = new_yellow2.withColumnRenamed('1','r1')
new_yellow2 =new_yellow2.withColumnRenamed('2','r2')
new_yellow2 =new_yellow2.withColumnRenamed('3','r3')
new_yellow2 =new_yellow2.withColumnRenamed('4','r4')
new_yellow2 =new_yellow2.withColumnRenamed('5','r5')

new_yellow2.show(5)

+-----+---+---+---+---+
|   id| r1| r3| r4| r5|
+-----+---+---+---+---+
|10156|  1|  0|  0|  0|
|17703|  1|  0|  0|  0|
|23766|  1|  0|  0|  0|
|35148|  1|  0|  0|  0|
|41424|  1|  0|  0|  0|
+-----+---+---+---+---+
only showing top 5 rows



In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer")

In [0]:
#new_yellow2.show()

+----------+---+---+---+---+
|        id|  1|  3|  4|  5|
+----------+---+---+---+---+
|     10156|  1|  0|  0|  0|
|     17703|  1|  0|  0|  0|
|     23766|  1|  0|  0|  0|
|     35148|  1|  0|  0|  0|
|     41424|  1|  0|  0|  0|
|     47492|  1|  0|  0|  0|
|     71530|  1|  0|  0|  0|
|    102561|  1|  0|  0|  0|
|8589974015|  1|  0|  0|  0|
|8589992667|  1|  0|  0|  0|
|     22165|  1|  0|  0|  0|
|     91628|  1|  0|  0|  0|
|     96729|  1|  0|  0|  0|
|     97216|  1|  0|  0|  0|
|    101011|  1|  0|  0|  0|
|8589940873|  1|  0|  0|  0|
|8589952622|  1|  0|  0|  0|
|8589993287|  1|  0|  0|  0|
|     55671|  1|  0|  0|  0|
|    101155|  1|  0|  0|  0|
+----------+---+---+---+---+
only showing top 20 rows



In [0]:
dummy_encoded_df.show(5)

+----+-------------+------------+----------+-----------+---+---+---+---+
|  id|trip_distance|payment_type|RateCodeID|fare_amount| p1| p2| p3| p4|
+----+-------------+------------+----------+-----------+---+---+---+---+
|  26|         2.54|           1|         1|       13.5|  1|  0|  0|  0|
|  29|         1.01|           1|         1|        6.0|  1|  0|  0|  0|
| 474|          1.9|           1|         1|        9.0|  1|  0|  0|  0|
| 964|         1.13|           1|         1|        8.5|  1|  0|  0|  0|
|1677|         1.29|           2|         1|        6.5|  0|  1|  0|  0|
+----+-------------+------------+----------+-----------+---+---+---+---+
only showing top 5 rows



In [0]:
dummy_encoded_df = dummy_encoded_df.join(new_yellow2, "id", "outer").drop("id")

In [0]:
dummy_encoded_df.show()

+-------------+------------+----------+-----------+---+---+---+---+---+---+---+---+
|trip_distance|payment_type|RateCodeID|fare_amount| p1| p2| p3| p4| r1| r3| r4| r5|
+-------------+------------+----------+-----------+---+---+---+---+---+---+---+---+
|         2.54|           1|         1|       13.5|  1|  0|  0|  0|  1|  0|  0|  0|
|         1.01|           1|         1|        6.0|  1|  0|  0|  0|  1|  0|  0|  0|
|          1.9|           1|         1|        9.0|  1|  0|  0|  0|  1|  0|  0|  0|
|         1.13|           1|         1|        8.5|  1|  0|  0|  0|  1|  0|  0|  0|
|         1.29|           2|         1|        6.5|  0|  1|  0|  0|  1|  0|  0|  0|
|         2.22|           1|         1|        8.5|  1|  0|  0|  0|  1|  0|  0|  0|
|          1.6|           2|         1|        8.0|  0|  1|  0|  0|  1|  0|  0|  0|
|         3.15|           1|         1|       15.0|  1|  0|  0|  0|  1|  0|  0|  0|
|          0.4|           1|         1|        4.0|  1|  0|  0|  0|  1|  0| 

In [0]:
#dummy_encoded_df.columns

['trip_distance',
 'payment_type',
 'RateCodeID',
 'fare_amount',
 '1',
 '2',
 '3',
 '4',
 '1',
 '3',
 '4',
 '5']

In [0]:
dummy_encoded_df = dummy_encoded_df.drop('payment_type','RateCodeID')
dummy_encoded_df.show(5)

+-------------+-----------+---+---+---+---+---+---+---+---+
|trip_distance|fare_amount| p1| p2| p3| p4| r1| r3| r4| r5|
+-------------+-----------+---+---+---+---+---+---+---+---+
|         2.54|       13.5|  1|  0|  0|  0|  1|  0|  0|  0|
|         1.01|        6.0|  1|  0|  0|  0|  1|  0|  0|  0|
|          1.9|        9.0|  1|  0|  0|  0|  1|  0|  0|  0|
|         1.13|        8.5|  1|  0|  0|  0|  1|  0|  0|  0|
|         1.29|        6.5|  0|  1|  0|  0|  1|  0|  0|  0|
+-------------+-----------+---+---+---+---+---+---+---+---+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance', 'p1', 'p2', 'p3', 'r1','r4','r5'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|(7,[0,1,4],[2.54,...|       13.5|
|(7,[0,1,4],[1.01,...|        6.0|
|(7,[0,1,4],[1.9,1...|        9.0|
+--------------------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [2.822887930015665,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 4.510973432935555


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2.004769
r2: 0.833978


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|5.640128604941821|        4.0|(7,[0,1,4],[0.4,1...|
|5.640128604941821|        5.0|(7,[0,1,4],[0.4,1...|
|5.724815242842291|        4.0|(7,[0,1,4],[0.43,...|
|5.865959639343074|        4.5|(7,[0,1,4],[0.48,...|
|5.922417397943388|        6.5|(7,[0,1,4],[0.5,1...|
+-----------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.83423


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 2.01063


In [0]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 7
objectiveHistory: [0.49999999999999956, 0.4192677702921184, 0.12937221823075531, 0.12923896779938862, 0.1292287840339873, 0.12922871247985676, 0.12922871247804815]
+--------------------+
|           residuals|
+--------------------+
|  -1.349719984537435|
| -1.8296109326400973|
|  -2.357839811940254|
|  -2.357839811940254|
|  -2.414297570540568|
| -2.0272130877411945|
|  -1.555441967041351|
|  -1.640128604941821|
|  0.3598713950581791|
|-0.33773076004291713|
| -0.9224173979433878|
| -0.9224173979433878|
| 0.07758260205661216|
| -1.4506462772435444|
|-0.45064627724354445|
|-0.45064627724354445|
|  1.9364382055558291|
|  -1.120019553044484|
| -1.6764773116447973|
| -1.6764773116447973|
+--------------------+
only showing top 20 rows



## IsWeekend

In [0]:
p = model.select('is_weekend')
p.groupBy('is_weekend').count().show()

+----------+------+
|is_weekend| count|
+----------+------+
|         1| 44237|
|         0|119219|
+----------+------+



In [0]:
yellow = model.select('trip_distance','payment_type','RateCodeID','fare_amount','is_weekend')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
yellow.show(5)

+-------------+------------+----------+-----------+----------+---+
|trip_distance|payment_type|RateCodeID|fare_amount|is_weekend| id|
+-------------+------------+----------+-----------+----------+---+
|          0.7|           1|         1|        5.0|         1|  0|
|          1.9|           2|         1|        8.5|         1|  1|
|          3.0|           2|         1|       12.0|         1|  2|
|         2.88|           1|         1|       11.0|         1|  3|
|         6.77|           1|         1|       25.0|         1|  4|
+-------------+------------+----------+-----------+----------+---+
only showing top 5 rows



In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("payment_type").agg(F.lit(1)).na.fill(0)


In [0]:
new_yellow2 = yellow.groupBy("id").pivot("RateCodeID").agg(F.lit(1)).na.fill(0)


In [0]:
new_yellow = new_yellow.withColumnRenamed('1','p1')
new_yellow =new_yellow.withColumnRenamed('2','p2')
new_yellow =new_yellow.withColumnRenamed('3','p3')
new_yellow =new_yellow.withColumnRenamed('4','p4')
new_yellow.show(5)

+------+---+---+---+---+
|    id| p1| p2| p3| p4|
+------+---+---+---+---+
| 17703|  1|  0|  0|  0|
| 23766|  1|  0|  0|  0|
| 35148|  1|  0|  0|  0|
| 41424|  1|  0|  0|  0|
|102561|  1|  0|  0|  0|
+------+---+---+---+---+
only showing top 5 rows



In [0]:
new_yellow2 = new_yellow2.withColumnRenamed('1','r1')
new_yellow2 =new_yellow2.withColumnRenamed('2','r2')
new_yellow2 =new_yellow2.withColumnRenamed('3','r3')
new_yellow2 =new_yellow2.withColumnRenamed('4','r4')
new_yellow2 =new_yellow2.withColumnRenamed('5','r5')

In [0]:
new_yellow2.show(5)

+-----+---+---+---+---+
|   id| r1| r3| r4| r5|
+-----+---+---+---+---+
|10156|  1|  0|  0|  0|
|17703|  1|  0|  0|  0|
|23766|  1|  0|  0|  0|
|35148|  1|  0|  0|  0|
|41424|  1|  0|  0|  0|
+-----+---+---+---+---+
only showing top 5 rows



In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer")

In [0]:
new_yellow3 = yellow.groupBy("id").pivot("is_weekend").agg(F.lit(1)).na.fill(0)
new_yellow3.show(2)

+-----+---+---+
|   id|  0|  1|
+-----+---+---+
|23766|  0|  1|
|71530|  0|  1|
+-----+---+---+
only showing top 2 rows



In [0]:
new_yellow3 = new_yellow3.withColumnRenamed('0','w0')
new_yellow3 = new_yellow3.withColumnRenamed('1','w1')
new_yellow3.show(2)

+-----+---+---+
|   id| w0| w1|
+-----+---+---+
|23766|  0|  1|
|71530|  0|  1|
+-----+---+---+
only showing top 2 rows



In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer")

In [0]:
dummy_encoded_df = dummy_encoded_df.join(new_yellow2, "id", "outer")

In [0]:
dummy_encoded_df = dummy_encoded_df.join(new_yellow3, "id", "outer").drop("id")

In [0]:
dummy_encoded_df.show(5)

+-------------+------------+----------+-----------+----------+---+---+---+---+---+---+---+---+---+---+
|trip_distance|payment_type|RateCodeID|fare_amount|is_weekend| p1| p2| p3| p4| r1| r3| r4| r5| w0| w1|
+-------------+------------+----------+-----------+----------+---+---+---+---+---+---+---+---+---+---+
|         2.54|           1|         1|       13.5|         1|  1|  0|  0|  0|  1|  0|  0|  0|  0|  1|
|         1.01|           1|         1|        6.0|         1|  1|  0|  0|  0|  1|  0|  0|  0|  0|  1|
|          1.9|           1|         1|        9.0|         0|  1|  0|  0|  0|  1|  0|  0|  0|  1|  0|
|         1.13|           1|         1|        8.5|         0|  1|  0|  0|  0|  1|  0|  0|  0|  1|  0|
|         1.29|           2|         1|        6.5|         0|  0|  1|  0|  0|  1|  0|  0|  0|  1|  0|
+-------------+------------+----------+-----------+----------+---+---+---+---+---+---+---+---+---+---+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance', 'p1', 'p2', 'p3', 'r1','r4','r5','w0'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|(8,[0,1,4],[2.54,...|       13.5|
|(8,[0,1,4],[1.01,...|        6.0|
|(8,[0,1,4,7],[1.9...|        9.0|
+--------------------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [2.825755595119003,0.0,0.0,0.0,0.0,0.0,0.0,0.06470733159976307]
Intercept: 4.456175995738902


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.998973
r2: 0.835485


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|6.264659576615064|        6.5|(8,[0,1,4],[0.64,...|
|6.462462468273394|        5.0|(8,[0,1,4],[0.71,...|
|6.490720024224585|        5.5|(8,[0,1,4],[0.72,...|
|6.688522915882914|        5.0|(8,[0,1,4],[0.79,...|
|6.716780471834104|        5.0|(8,[0,1,4],[0.8,1...|
+-----------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.83306


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 2.00991


In [0]:
#Only IsWeekend
yellow = model.select('fare_amount','is_weekend','trip_distance')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
model.groupby('is_weekend').count().show()

+----------+------+
|is_weekend| count|
+----------+------+
|         1| 44237|
|         0|119219|
+----------+------+



In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("is_weekend").agg(F.lit(1)).na.fill(0)

In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer").drop("id")


In [0]:
dummy_encoded_df.show(5)

+-----------+----------+-------------+---+---+
|fare_amount|is_weekend|trip_distance|  0|  1|
+-----------+----------+-------------+---+---+
|       13.5|         1|         2.54|  0|  1|
|        6.0|         1|         1.01|  0|  1|
|        9.0|         0|          1.9|  1|  0|
|        8.5|         0|         1.13|  1|  0|
|        6.5|         0|         1.29|  1|  0|
+-----------+----------+-------------+---+---+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance', '0'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+----------+-----------+
|  features|fare_amount|
+----------+-----------+
|[2.54,0.0]|       13.5|
|[1.01,0.0]|        6.0|
| [1.9,1.0]|        9.0|
+----------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [2.8256731796200487,0.07108565967297849]
Intercept: 4.453808454976943


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2.004637
r2: 0.834432


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+----------+
|       prediction|fare_amount|  features|
+-----------------+-----------+----------+
|4.807461432611926|        3.0| [0.1,1.0]|
|5.344339336739735|        3.5|[0.29,1.0]|
|5.372596068535936|        3.5| [0.3,1.0]|
|5.372596068535936|        4.0| [0.3,1.0]|
|5.372596068535936|        4.0| [0.3,1.0]|
+-----------------+-----------+----------+
only showing top 5 rows

R Squared (R2) on test data = 0.835712


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 1.99559


## Duration, Trip Distance, Is Weekend

In [0]:
yellow = model.select('trip_distance','duration','fare_amount','is_weekend')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("is_weekend").agg(F.lit(1)).na.fill(0)

In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer").drop("id")
dummy_encoded_df.show(5)

+-------------+------------------+-----------+----------+---+---+
|trip_distance|          duration|fare_amount|is_weekend|  0|  1|
+-------------+------------------+-----------+----------+---+---+
|         2.54|19.233333333333334|       13.5|         1|  0|  1|
|         1.01| 4.866666666666666|        6.0|         1|  0|  1|
|          1.9|              8.95|        9.0|         0|  1|  0|
|         1.13|11.533333333333333|        8.5|         0|  1|  0|
|         1.29| 6.633333333333334|        6.5|         0|  1|  0|
+-------------+------------------+-----------+----------+---+---+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance','duration', '0'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|[2.54,19.23333333...|       13.5|
|[1.01,4.866666666...|        6.0|
|      [1.9,8.95,1.0]|        9.0|
+--------------------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [1.6029722272783702,0.3668493883508371,0.0]
Intercept: 2.557461173042077


In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.574849
r2: 0.986389


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|4.247626668340876|        4.0|[0.2,3.7333333333...|
| 4.10915430887729|        4.0|     [0.27,3.05,1.0]|
|4.303983231035977|        4.0|      [0.3,3.45,1.0]|
|3.492289082374618|        3.0|     [0.32,1.15,1.0]|
|4.562931042285797|        4.0|     [0.37,3.85,0.0]|
+-----------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.987555


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.549069


## Duration, trip_distance, is_weekend, day

In [0]:
yellow = model.select('trip_distance','duration','fare_amount','day')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
p = model.select('day')
p.groupBy('day').count().show()

+---+-----+
|day|count|
+---+-----+
|Sun|21302|
|Mon|21606|
|Thu|25782|
|Sat|22935|
|Wed|24826|
|Fri|22879|
|Tue|24126|
+---+-----+



In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("day").agg(F.lit(1)).na.fill(0)

In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer").drop("id")
dummy_encoded_df.show(5)

+-------------+------------------+-----------+---+---+---+---+---+---+---+---+
|trip_distance|          duration|fare_amount|day|Fri|Mon|Sat|Sun|Thu|Tue|Wed|
+-------------+------------------+-----------+---+---+---+---+---+---+---+---+
|         2.54|19.233333333333334|       13.5|Sun|  0|  0|  0|  1|  0|  0|  0|
|         1.01| 4.866666666666666|        6.0|Sat|  0|  0|  1|  0|  0|  0|  0|
|          1.9|              8.95|        9.0|Mon|  0|  1|  0|  0|  0|  0|  0|
|         1.13|11.533333333333333|        8.5|Wed|  0|  0|  0|  0|  0|  0|  1|
|         1.29| 6.633333333333334|        6.5|Wed|  0|  0|  0|  0|  0|  0|  1|
+-------------+------------------+-----------+---+---+---+---+---+---+---+---+
only showing top 5 rows



In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance','duration', 'Mon','Tue','Wed','Thu','Fri','Sat'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|(8,[0,1],[2.54,19...|       13.5|
|(8,[0,1,7],[1.01,...|        6.0|
|(8,[0,1,2],[1.9,8...|        9.0|
+--------------------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [1.6030026176477388,0.36687701602328493,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 2.555608841338215


In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [1.6030026176477388,0.36687701602328493,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 2.555608841338215


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|3.985595472847373|        3.5|(8,[0,1],[0.4,2.15])|
|5.489791238542841|        5.5|(8,[0,1],[0.4,6.25])|
|5.365344947398743|        5.0| (8,[0,1],[0.7,4.6])|
|6.287491755057676|        6.0|(8,[0,1],[0.89,6....|
|5.496392345982926|        5.0|(8,[0,1],[0.9,4.0...|
+-----------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.986494


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.572494


## duration, trip_distance, hour

In [0]:
yellow = model.select('trip_distance','duration','fare_amount','hour')
from pyspark.sql.functions import monotonically_increasing_id

yellow = yellow.withColumn("id", monotonically_increasing_id())

In [0]:
p = model.select('hour')
p.groupBy('hour').count().sort('count').show()

+----+-----+
|hour|count|
+----+-----+
|   4| 1282|
|   5| 1393|
|   3| 1722|
|   2| 2483|
|   1| 3462|
|   6| 3726|
|   0| 4910|
|   7| 6461|
|  23| 6981|
|   9| 7628|
|   8| 7724|
|  10| 7734|
|  16| 7758|
|  11| 8084|
|  12| 8315|
|  13| 8402|
|  15| 8524|
|  14| 8576|
|  22| 9013|
|  17| 9207|
+----+-----+
only showing top 20 rows



In [0]:
from pyspark.sql import functions as F
new_yellow = yellow.groupBy("id").pivot("hour").agg(F.lit(1)).na.fill(0)

In [0]:
dummy_encoded_df = yellow.join(new_yellow, "id", "outer").drop("id")
dummy_encoded_df.show(5)

+-------------+------------------+-----------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|trip_distance|          duration|fare_amount|hour|  0|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23|
+-------------+------------------+-----------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|         2.54|19.233333333333334|       13.5|   1|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|         1.01| 4.866666666666666|        6.0|   1|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|          1.9|              8.95|        9.0|  17|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|         1.13|11.533333333333333|        8.5|   8|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0| 

In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['trip_distance','duration','0','1','2','3','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20','21','22','23'], outputCol = 'features')
regression_df = vectorAssembler.transform(dummy_encoded_df)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|(25,[0,1,3],[2.54...|       13.5|
|(25,[0,1,3],[1.01...|        6.0|
|(25,[0,1,18],[1.9...|        9.0|
+--------------------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [1.602377316945116,0.366984156121214,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
Intercept: 2.5563861542910753


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+-----------+--------------------+
|       prediction|fare_amount|            features|
+-----------------+-----------+--------------------+
|4.396503353799265|        4.0|(25,[0,1],[0.53,2...|
|6.141943447619548|        5.5|(25,[0,1],[1.02,5...|
|4.941214548113242|        4.5|(25,[0,1,2],[0.66...|
| 4.79270108304914|        4.5|(25,[0,1,2],[0.72...|
|7.270741049230413|        7.0|(25,[0,1,2],[1.4,...|
+-----------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.986464


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.574432


# All Variables.

In [0]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['hour','date_yellow','is_weekend','VendorID','passenger_count','RatecodeID','PULocationID','DOLocationID','payment_type','duration','speed','trip_distance', 'payment_type'], outputCol = 'features')
regression_df = vectorAssembler.transform(model)
regression_df = regression_df.select(['features', 'fare_amount'])
regression_df.show(3)

+--------------------+-----------+
|            features|fare_amount|
+--------------------+-----------+
|[0.0,6.0,1.0,1.0,...|        5.0|
|[0.0,7.0,1.0,1.0,...|        8.5|
|[0.0,7.0,1.0,1.0,...|       12.0|
+--------------------+-----------+
only showing top 3 rows



In [0]:
splits = regression_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='fare_amount', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.3670892869585571,0.0,1.6039488925042589,0.0]
Intercept: 2.55165392703481


In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","fare_amount","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="fare_amount",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|fare_amount|            features|
+------------------+-----------+--------------------+
| 4.867458334403102|        4.5|[0.0,1.0,0.0,1.0,...|
| 4.043656998823838|        4.0|[0.0,1.0,0.0,1.0,...|
|10.109394499777316|       10.0|[0.0,1.0,0.0,2.0,...|
| 5.792291893413546|        5.5|[0.0,1.0,0.0,2.0,...|
|10.274750017943822|       10.5|[0.0,1.0,0.0,2.0,...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.986176


In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.577133
