In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('minadz').getOrCreate()

In [2]:
data = spark.read.csv('yellow_tripdata_2021-01.csv', inferSchema=True, header=True)

In [3]:
data.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [4]:
data.summary

<bound method DataFrame.summary of DataFrame[VendorID: int, tpep_pickup_datetime: string, tpep_dropoff_datetime: string, passenger_count: int, trip_distance: double, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double]>

In [5]:
data = data.drop("VendorID", "PULocationID", "RatecodeID", "DOLocationID" )
# could also check with dummy variables

In [6]:
from pyspark.ml.feature import StringIndexer
# values Y and N
stringIndexer = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_flag_idx",
                              stringOrderType="alphabetAsc", handleInvalid="keep")
model = stringIndexer.fit(data)
data = model.transform(data)

### find missing values

In [7]:
from pyspark.sql.functions import isnull, when, count, col

nacounts = data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).toPandas()
nacounts

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,store_and_fwd_flag,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,store_and_fwd_flag_idx
0,0,0,98352,0,98352,98352,0,0,0,0,0,0,0,0,0


In [8]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='median', inputCols=['passenger_count'], outputCols=['passenger_count_imp'])
imputer_model = imputer.fit(data)
data = imputer_model.transform(data)

In [9]:
imputer = Imputer(strategy='mode', inputCols=['store_and_fwd_flag_idx', ], outputCols=['store_and_fwd_flag_imp'])
imputer_model = imputer.fit(data)
data = imputer_model.transform(data)

In [12]:
imputer = Imputer(strategy='mode', inputCols=['payment_type', ], outputCols=['payment_type_imp'])
imputer_model = imputer.fit(data)
data = imputer_model.transform(data)

In [14]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['passenger_count_imp', 'trip_distance', 'store_and_fwd_flag_idx', 'payment_type_imp', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge'], outputCol = 'features')
vtripdata_df = vectorAssembler.transform(data)
vtripdata_df = vtripdata_df.select(['features', 'total_amount'])

In [15]:
vtripdata_df.show()

+--------------------+------------+
|            features|total_amount|
+--------------------+------------+
|[1.0,2.1,0.0,2.0,...|        11.8|
|[1.0,0.2,0.0,2.0,...|         4.3|
|[1.0,14.7,0.0,1.0...|       51.95|
|[0.0,10.6,0.0,1.0...|       36.35|
|[1.0,4.94,0.0,1.0...|       24.36|
|[1.0,1.6,0.0,1.0,...|       14.15|
|[1.0,4.1,0.0,2.0,...|        17.3|
|[1.0,5.7,0.0,2.0,...|        21.8|
|[1.0,9.1,0.0,4.0,...|        28.8|
|[2.0,2.7,0.0,1.0,...|       18.95|
|[3.0,6.11,0.0,1.0...|        24.3|
|[2.0,1.21,0.0,1.0...|       10.79|
|[2.0,7.4,0.0,2.0,...|       33.92|
|[5.0,1.7,0.0,1.0,...|       14.16|
|[5.0,0.81,0.0,2.0...|         8.3|
|[1.0,1.01,0.0,1.0...|        10.3|
|[1.0,0.73,0.0,1.0...|       12.09|
|[1.0,1.17,0.0,1.0...|       12.36|
|[1.0,0.78,0.0,1.0...|        9.96|
|[2.0,1.66,0.0,2.0...|        12.3|
+--------------------+------------+
only showing top 20 rows



In [16]:
train_df, test_df = vtripdata_df.randomSplit([0.7, 0.3])

In [17]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='total_amount', maxIter=20, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.9827316381065105,0.05754258109856909,0.0,0.9590415329081708,0.9001620556163971,1.7305578221402393,0.5588629007926905]
Intercept: 1.7710034249551878


In [28]:
predictions = lr_model.transform(test_df)
predictions.show(5)

+--------------------+------------+------------------+
|            features|total_amount|        prediction|
+--------------------+------------+------------------+
|(11,[0,1,2,3,4,9]...|        62.8| 63.71089815325417|
|(11,[0,1,2,3,4,9]...|       100.3|100.56333458224832|
|(11,[0,1,2,3,9],[...|         0.3|2.2901707715972597|
|(11,[0,1,3],[1.0,...|         0.0|1.7710034249551878|
|(11,[0,1,3],[1.0,...|         0.0|1.7710034249551878|
+--------------------+------------+------------------+
only showing top 5 rows



In [29]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.712911


In [30]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='total_amount')
dt_model = dt.fit(train_df)

In [31]:
predictions = dt_model.transform(test_df)
predictions.show(5)

+--------------------+------------+-------------------+
|            features|total_amount|         prediction|
+--------------------+------------+-------------------+
|(11,[0,1,2,3,4,9]...|        62.8|  52.57105177372951|
|(11,[0,1,2,3,4,9]...|       100.3| 117.76367886178866|
|(11,[0,1,2,3,9],[...|         0.3|  8.851457709186821|
|(11,[0,1,3],[1.0,...|         0.0|-10.145543893129787|
|(11,[0,1,3],[1.0,...|         0.0|-10.145543893129787|
+--------------------+------------+-------------------+
only showing top 5 rows



In [32]:
evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.95571


In [36]:
from pyspark.ml.feature import MinMaxScaler

#We scale features to be between 0 and 1 to prevent the exploding gradient problem

featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(vtripdata_df)

In [39]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", labelCol="total_amount", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])

# Train model.
model = pipeline.fit(train_df)

# Make predictions.
predictions = model.transform(test_df)

# Select example rows to display.
predictions.select("prediction", "total_amount", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+------------------+------------+--------------------+
|        prediction|total_amount|            features|
+------------------+------------+--------------------+
|1.2424141894171044|        62.8|(11,[0,1,2,3,4,9]...|
| 1.469689181808229|       100.3|(11,[0,1,2,3,4,9]...|
|1.4285877545808996|         0.3|(11,[0,1,2,3,9],[...|
|0.7443631771994863|         0.0|(11,[0,1,3],[1.0,...|
| 0.744363369941704|         0.0|(11,[0,1,3],[1.0,...|
+------------------+------------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 20.7713
