In [1]:
from pyspark.sql import SparkSession

In [2]:
# Create a SparkSession instance (an entry point to all Spark functions)
spark = SparkSession.builder.appName("MYAPP").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 17:49:56 INFO SparkEnv: Registering MapOutputTracker
25/04/28 17:49:56 INFO SparkEnv: Registering BlockManagerMaster
25/04/28 17:49:56 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/04/28 17:49:56 INFO SparkEnv: Registering OutputCommitCoordinator


In [3]:
df = spark.read.csv('gs://dataproc-staging-us-central1-740540545323-gfe3jrkb/data/2019-01-h1.csv', header=True,
    inferSchema=True)
df_taxi = df.select("passenger_count", "pulocationid", "dolocationid", "total_amount")
df_taxi.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



In [4]:
trainDF, testDF = df_taxi.randomSplit([.8, .2])

In [5]:
from pyspark.ml.feature import VectorAssembler

In [6]:
vecAss = VectorAssembler(inputCols = ['total_amount'], outputCol = "features")
vecTrainDF = vecAss.transform(trainDF)
vecTrainDF.show(3)

[Stage 3:>                                                          (0 + 1) / 1]

+---------------+------------+------------+------------+--------+
|passenger_count|pulocationid|dolocationid|total_amount|features|
+---------------+------------+------------+------------+--------+
|            0.0|         1.0|         1.0|        90.0|  [90.0]|
|            0.0|         1.0|         1.0|      101.39|[101.39]|
|            0.0|         1.0|         1.0|      116.75|[116.75]|
+---------------+------------+------------+------------+--------+
only showing top 3 rows



                                                                                

In [7]:
from pyspark.ml.regression import LinearRegression

In [8]:
lr = LinearRegression(featuresCol="features", labelCol="total_amount")

In [9]:
from pyspark.ml import Pipeline

In [10]:
pipeline = Pipeline(stages = [vecAss, lr])
pipelineModel = pipeline.fit(trainDF) # ML tranformer DF --> DF + prediction

25/04/28 18:04:29 WARN Instrumentation: [229c0a3d] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [11]:
predDF = pipelineModel.transform(testDF)
predDF.show(10)

[Stage 6:>                                                          (0 + 1) / 1]

+---------------+------------+------------+------------+--------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|features|        prediction|
+---------------+------------+------------+------------+--------+------------------+
|            0.0|         4.0|         4.0|         4.8|   [4.8]| 4.799999999999999|
|            0.0|         4.0|        33.0|       17.75| [17.75]|17.749999999999996|
|            0.0|         4.0|        80.0|       15.95| [15.95]|15.949999999999996|
|            0.0|         4.0|       107.0|       10.55| [10.55]|10.549999999999999|
|            0.0|         4.0|       144.0|        9.45|  [9.45]| 9.449999999999998|
|            0.0|         7.0|         7.0|         7.8|   [7.8]| 7.799999999999998|
|            0.0|         7.0|       107.0|        18.8|  [18.8]|18.799999999999997|
|            0.0|         7.0|       112.0|        16.8|  [16.8]|16.799999999999997|
|            0.0|         7.0|       145.0|         6.3|   [6.3]|

25/04/28 18:06:04 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.128.0.3:35570 is closed
25/04/28 18:06:04 WARN BlockManagerMasterEndpoint: Error trying to remove broadcast 13 from block manager BlockManagerId(5, cluster-bdc2-w-0.c.perceptive-lamp-449119-t0.internal, 45613, None)
java.io.IOException: Connection from /10.128.0.3:35570 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147) ~[spark-network-common_2.12-3.5.3.jar:3.5.3]
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) ~[spark-network-common_2.12-3.5.3.jar:3.5.3]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.10

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
evalr = RegressionEvaluator(predictionCol = 'prediction',
                          labelCol = 'total_amount', 
                          metricName = 'rmse')
rmse = evalr.evaluate(predDF)
print(rmse)



1.054023653005258e-14


                                                                                