## Load Modules

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .master('local[*]') \
                    .config("spark.driver.bindAddress", "127.0.0.1") \
                    .appName('App') \
                    .getOrCreate()

24/05/22 16:31:26 WARN Utils: Your hostname, Juliens-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.0.0.2 instead (on interface en0)
24/05/22 16:31:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/22 16:31:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
DATA_DIR = "/Users/julien/Documents/EPITA/S2/Spark/data/NYC"

In [3]:
df = spark.read.format("parquet").load(DATA_DIR)

In [4]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-12-01 00:19:51|  2021-12-01 00:37:01|            1.0|          9.3|       1.0|                 N|         138|         141|           1|       26.5| 4.25|    0.5|       7.

## Trip Analysis

1. Average Duration and Distance of rides

In [5]:
# Duration
from pyspark.sql.functions import month, avg, unix_timestamp, dayofmonth, hour

def analyze(spark, format="parquet", gcs_input_path=None, gcs_output_path=None):
    df = spark.read.format(format).load(gcs_input_path)

    df_enriched = df.withColumn(
        "duration_time_in_minutes",
        (
            unix_timestamp(df["tpep_dropoff_datetime"])
            - unix_timestamp(df["tpep_pickup_datetime"])
        )
        / 60,
    )

    # Performs basic analysis of dataset
    df_month = df_enriched.groupBy(
        month("tpep_pickup_datetime").alias("month")).agg(
        avg("duration_time_in_minutes").alias("average_trip_time_in_minutes")
    ).orderBy("month", ascending=True) \
    
    df_month.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/month_analysis")

    df_day = df_enriched.groupBy(
        dayofmonth("tpep_pickup_datetime").alias("dayofmonth")).agg(
        avg("duration_time_in_minutes").alias("average_trip_time_in_minutes")
    ).orderBy("dayofmonth", ascending=True)

    df_day.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/day_analysis")

    df_hour = df_enriched.groupBy(
        hour("tpep_pickup_datetime").alias("hour")).agg(
        avg("duration_time_in_minutes").alias("average_trip_time_in_minutes")
    ).orderBy("hour", ascending=True)

    df_hour.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/hour_analysis")

In [6]:
df["trip_distance"]

Column<'trip_distance'>

In [7]:
# Trip distance

def analyze(spark, format="parquet", gcs_input_path=None, gcs_output_path=None):
    df = spark.read.format(format).load(gcs_input_path)

    df_enriched = df.withColumn(
        "distance_in_miles",
        (
            df["trip_distance"])
        )

    # Performs basic analysis of dataset
    df_month = df_enriched.groupBy(
        month("tpep_pickup_datetime").alias("month")).agg(
        avg("trip_distance").alias("average_trip_distance_in_miles")
    ).orderBy("month", ascending=True) \
    
    df_month.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/month_analysis")

    df_day = df_enriched.groupBy(
        dayofmonth("tpep_pickup_datetime").alias("dayofmonth")).agg(
        avg("trip_distance").alias("average_trip_distance_in_miles")
    ).orderBy("dayofmonth", ascending=True)

    df_day.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/day_analysis")

    df_hour = df_enriched.groupBy(
        hour("tpep_pickup_datetime").alias("hour")).agg(
        avg("trip_distance").alias("average_trip_distance_in_miles")
    ).orderBy("hour", ascending=True)

    df_hour.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/hour_analysis")

2. Popular locations

In [8]:
df.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-12-01 00:19:51|  2021-12-01 00:37:01|            1.0|          9.3|       1.0|                 N|         138|         141|           1|       26.5| 4.25|    0.5|       7.

In [9]:
df.createOrReplaceTempView("NYC_data")
spark.sql("select PULocationID, COUNT(*) as pickup_count from NYC_data GROUP BY 1 ORDER BY 2 DESC LIMIT 5").show()

+------------+------------+
|PULocationID|pickup_count|
+------------+------------+
|         237|     1553554|
|         236|     1424614|
|         161|     1091329|
|         132|     1025063|
|         186|     1019650|
+------------+------------+



In [10]:
spark.sql("select DOLocationID, COUNT(*) as drop_count from NYC_data GROUP BY 1 ORDER BY 2 DESC LIMIT 5").show()

+------------+----------+
|DOLocationID|drop_count|
+------------+----------+
|         236|   1434919|
|         237|   1356518|
|         161|   1001077|
|         170|    920433|
|         141|    902052|
+------------+----------+



                                                                                

In [11]:
from pyspark.sql.functions import col, udf, count, expr
from pyspark.sql.types import StringType

df.groupBy("DOLocationID") \
    .agg(count("*").alias("drop_count")) \
    .orderBy(("drop_count"), ascending = False) \
    .limit(5) \
    .show()

+------------+----------+
|DOLocationID|drop_count|
+------------+----------+
|         236|   1434919|
|         237|   1356518|
|         161|   1001077|
|         170|    920433|
|         141|    902052|
+------------+----------+



In [12]:
df_pickups = df.groupBy("PULocationID") \
    .agg(count("*").alias("drop_count")) \
    .orderBy(("drop_count"), ascending = False) \
    .limit(5)

df_pickups.show()

+------------+----------+
|PULocationID|drop_count|
+------------+----------+
|         237|   1553554|
|         236|   1424614|
|         161|   1091329|
|         132|   1025063|
|         186|   1019650|
+------------+----------+



In [13]:
# Top pickups and drop locations

def analyze(spark, format="parquet", gcs_input_path=None, gcs_output_path=None):
    df = spark.read.format(format).load(gcs_input_path)
    
    df_pickups = df.groupBy("PULocationID") \
    .agg(count("*").alias("drop_count")) \
    .orderBy(("drop_count"), ascending = False) \
    .show(5)

    df_pickups.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/top_pickup_locations")

    df_drops = df.groupBy("DOLocationID") \
    .agg(count("*").alias("drop_count")) \
    .orderBy(("drop_count"), ascending = False) \
    .show(5)

    df_drops.repartition(1) \
        .write \
        .mode("overwrite") \
        .format("csv") \
        .option("header", "true") \
        .save(f"{gcs_output_path}/top_drops_locations")

## Tip Analysis

Tip percentage by trip:
- Tip by locations
- correlation between distance and tip

In [14]:
df.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-12-01 00:19:51|  2021-12-01 00:37:01|            1.0|          9.3|       1.0|                 N|         138|         141|           1|       26.5| 4.25|    0.5|       7.

In [15]:
# tip percentage by drop location

df = df.withColumn("tip_percentage", col("tip_amount") / col("total_amount"))

df_avg_tip = df.groupBy("DOLocationID") \
    .agg(avg("tip_percentage").alias("average_tip_percentage")) \
    .orderBy("average_tip_percentage", ascending=False)

# Show the results
df_avg_tip.show(5)


+------------+----------------------+
|DOLocationID|average_tip_percentage|
+------------+----------------------+
|          52|   0.13507571291857495|
|          40|    0.1349902504372998|
|         255|   0.13294293712786734|
|         138|   0.13097318414425485|
|         249|    0.1305343050804773|
+------------+----------------------+
only showing top 5 rows



                                                                                

In [16]:
# tip percentage by pickup location

df_avg_tip = df.groupBy("PULocationID") \
    .agg(avg("tip_percentage").alias("average_tip_percentage")) \
    .orderBy("average_tip_percentage", ascending=False)

# Show the results
df_avg_tip.show(5)

+------------+----------------------+
|PULocationID|average_tip_percentage|
+------------+----------------------+
|         255|   0.13022838297171588|
|         249|   0.12950151760556006|
|         158|   0.12867509846430827|
|         199|   0.12681086193114427|
|         125|   0.12679905291940308|
+------------+----------------------+
only showing top 5 rows



                                                                                

In [17]:
df.groupBy("trip_distance") \
    .agg(avg("tip_percentage").alias("average_tip_percentage")) \
    .orderBy("average_tip_percentage", ascending=False) \
    .show(5)

+-------------+----------------------+
|trip_distance|average_tip_percentage|
+-------------+----------------------+
|        59.28|     0.683073832245103|
|     29707.75|    0.5571030640668524|
|      6206.38|    0.5141388174807198|
|        78.29|                   0.5|
|        427.7|   0.49748734950270457|
+-------------+----------------------+
only showing top 5 rows



                                                                                

Tips by time
- time of day
- day
- week
- year

In [18]:
from pyspark.sql.functions import month, hour, avg, dayofmonth, dayofweek, dayofyear, avg, col, expr

df.groupBy(hour("tpep_pickup_datetime").alias("hour")) \
    .agg(avg("tip_amount").alias("average_tip_amount")) \
    .orderBy("hour", ascending=[False, True]) \
    .show(5)

+----+------------------+
|hour|average_tip_amount|
+----+------------------+
|  23|2.5983358158722796|
|  22|2.5913331085310247|
|  21|2.5033740848759414|
|  20| 2.401028250796292|
|  19|2.3693319152925376|
+----+------------------+
only showing top 5 rows



In [19]:
df.groupBy(dayofweek("tpep_pickup_datetime").alias("day")) \
    .agg(avg("tip_amount").alias("average_tip_amount")) \
    .orderBy(("day"), ascending = False) \
    .limit(5) \
    .show()

+---+------------------+
|day|average_tip_amount|
+---+------------------+
|  7|2.3031583344450977|
|  6|2.3545071417337415|
|  5|2.3668346014951807|
|  4| 2.301123499816944|
|  3|2.2775269698276484|
+---+------------------+



                                                                                

In [20]:
# time of day
from pyspark.sql.functions import month, hour, avg, dayofmonth, dayofweek, dayofyear, year

df_hour = df.groupBy(hour("tpep_pickup_datetime").alias("hour")) \
    .agg(avg("tip_amount").alias("average_tip_amount")) \
    .orderBy("hour", ascending=[False, True])

In [21]:
# day
df_day = df.groupBy(dayofweek("tpep_pickup_datetime").alias("day")) \
    .agg(avg("tip_amount").alias("average_tip_amount")) \
    .orderBy(("day"), ascending = False)

In [22]:
# month
df_month = df.groupBy(month("tpep_pickup_datetime").alias("month")) \
    .agg(avg("tip_amount").alias("average_tip_amount")) \
    .orderBy(("month"), ascending = False)

In [23]:
# year
df_year = df.groupBy(year("tpep_pickup_datetime").alias("year")) \
    .agg(avg("tip_amount").alias("average_tip_amount")) \
    .orderBy(("year"), ascending = False) \
    .where("year < 2025 ")

Payment type

In [24]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|     tip_percentage|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|       1| 2021-12-01 00:19:51|  2021-12-01 00:37:01|            1.0|          9.3|       1.0|                 N|         138| 

In [25]:
df.groupBy("payment_type") \
    .agg(avg("tip_amount").alias("average_tip")) \
    .orderBy("average_tip", ascending=False) \
    .show(5)

+------------+--------------------+
|payment_type|         average_tip|
+------------+--------------------+
|           1|  3.0755510306720333|
|           0|  2.1700068168216124|
|           4|0.022958282745690756|
|           2|4.108590704647676E-4|
|           5|                 0.0|
+------------+--------------------+
only showing top 5 rows



## Fare Analysis

- Avg Fare by pickup and drop location
- Avg fare by passenger count
- Fare amount and distance

In [26]:
df.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|     tip_percentage|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------------+
|       1| 2021-12-01 00:19:51|  2021-12-01 00:37:01|            1.0|          9.3|       1.0|                 N|         138| 

In [27]:
# pick up
df.groupBy("PULocationID") \
    .agg(avg("fare_amount").alias("average_fare_amount")) \
    .orderBy(("average_fare_amount"), ascending = False) \
    .show(5)

+------------+-------------------+
|PULocationID|average_fare_amount|
+------------+-------------------+
|          44|  99.34643231114435|
|          84|  90.55050847457628|
|         110|               84.5|
|         204|  83.64895833333334|
|          99|  82.19200000000001|
+------------+-------------------+
only showing top 5 rows



In [28]:
# drop
df.groupBy("DOLocationID") \
    .agg(avg("fare_amount").alias("average_fare_amount")) \
    .orderBy(("average_fare_amount"), ascending = False) \
    .show(5)

+------------+-------------------+
|DOLocationID|average_fare_amount|
+------------+-------------------+
|          44|  93.73268292682927|
|          84|  81.05751937984495|
|         204|  78.87132450331126|
|           5|  75.44233918128654|
|         265|  72.63122658480091|
+------------+-------------------+
only showing top 5 rows



In [29]:
# passenger
df.groupBy("passenger_count") \
    .agg(avg("fare_amount").alias("average_fare_amount")) \
    .orderBy(("average_fare_amount"), ascending = False) \
    .where("passenger_count is not null") \
    .show(5)

+---------------+-------------------+
|passenger_count|average_fare_amount|
+---------------+-------------------+
|            9.0|              61.35|
|            7.0|  52.91679487179488|
|            8.0|  49.14408163265307|
|            4.0| 14.284687986716266|
|            2.0|  13.77639929394148|
+---------------+-------------------+
only showing top 5 rows



                                                                                

In [30]:
# trip distance
df.groupBy("trip_distance") \
    .agg(avg("fare_amount").alias("average_fare_amount")) \
    .orderBy(("average_fare_amount"), ascending = False) \
    .show(20)

+-------------+-------------------+
|trip_distance|average_fare_amount|
+-------------+-------------------+
|       964.27|             2413.0|
|       821.54|             2056.0|
|       709.88|             1217.0|
|        427.7|             1128.5|
|       243.33|             1043.5|
|        633.8|             1025.0|
|       344.88|              864.5|
|        323.0|              823.0|
|        271.4|              808.5|
|       207.13|              800.0|
|       215.95|              800.0|
|       165.99|              790.0|
|       153.84|              749.5|
|        260.5|              722.0|
|        282.1|              716.0|
|        267.7|              708.0|
|       110.17|              701.0|
|        270.2|              688.5|
|       258.98|              653.0|
|       247.37|              620.5|
+-------------+-------------------+
only showing top 20 rows



                                                                                

## 5 Demand Prediction

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, count, year, month, dayofmonth
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import os

spark = SparkSession.builder.appName("NYC Taxi Data Analysis").getOrCreate()

folder_path = DATA_DIR  # Placeholder path
parquet_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.parquet')]

df = spark.read.parquet(*parquet_files)
df = df.withColumn("hour_of_day", hour("tpep_pickup_datetime"))
df = df.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))
df = df.withColumn("day_of_month", dayofmonth("tpep_pickup_datetime"))
df = df.withColumn("month", month("tpep_pickup_datetime"))
df = df.withColumn("year", year("tpep_pickup_datetime"))
df_grouped = df.groupBy("hour_of_day", "day_of_week", "day_of_month", "month", "year").agg(count("*").alias("num_pickups"))

feature_columns = ["hour_of_day", "day_of_week", "day_of_month", "month", "year"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

lr = LinearRegression(featuresCol="features", labelCol="num_pickups")
pipeline = Pipeline(stages=[assembler, lr])
(trainingData, testData) = df_grouped.randomSplit([0.7, 0.3])

model = pipeline.fit(trainingData)
predictions = model.transform(testData)
results = predictions.select("prediction", "num_pickups", "features")

evaluator = RegressionEvaluator(labelCol="num_pickups", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) on test data =", mse)

evaluator = evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data =", rmse)

#comments
# This code snippet performs the following actions:
# - It initializes a Spark session and loads the dataset.
# - It extracts the hour of the day and the day of the week from the pickup datetime.
# - It aggregates the data to count the number of pickups per hour and day of the week.
# - It prepares the features for the model using `VectorAssembler`.
# - It initializes and trains a linear regression model using a pipeline.
# - It makes predictions on the test data and displays 5 example rows of predictions.
# - It evaluates the model by calculating the MSE and RMSE on the test data and prints these values.

24/05/22 16:31:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/22 16:31:40 WARN Instrumentation: [b0afd75d] regParam is zero, which might cause numerical instability and overfitting.
24/05/22 16:31:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/22 16:31:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/05/22 16:31:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

Mean Squared Error (MSE) on test data = 2923598.772192168
Root Mean Squared Error (RMSE) on test data = 1709.8534358804466


In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, count, year, month, dayofmonth
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import os

# Initialize Spark session
spark = SparkSession.builder.appName("NYC Taxi Data Analysis").getOrCreate()

# Define folder path and read parquet files
folder_path = DATA_DIR # Placeholder path
parquet_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.parquet')]

df = spark.read.parquet(*parquet_files)

# Add time-related columns
df = df.withColumn("hour_of_day", hour("tpep_pickup_datetime"))
df = df.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))
df = df.withColumn("day_of_month", dayofmonth("tpep_pickup_datetime"))
df = df.withColumn("month", month("tpep_pickup_datetime"))
df = df.withColumn("year", year("tpep_pickup_datetime"))

# Aggregate data
df_grouped = df.groupBy("hour_of_day", "day_of_week", "day_of_month", "month", "year").agg(count("*").alias("num_pickups"))

# Prepare features for the model
feature_columns = ["hour_of_day", "day_of_week", "day_of_month", "month", "year"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Initialize and configure Random Forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="num_pickups", numTrees=100)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Split data into training and test sets
(trainingData, testData) = df_grouped.randomSplit([0.7, 0.3])

# Train the model
model = pipeline.fit(trainingData)

# Make predictions
predictions = model.transform(testData)
predictions.select("prediction", "num_pickups", "features").show(5)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="num_pickups", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) on test data =", mse)

evaluator = evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data =", rmse)


                                                                                

+------------------+-----------+--------------------+
|        prediction|num_pickups|            features|
+------------------+-----------+--------------------+
|2146.6412291762754|       3669|[0.0,1.0,6.0,6.0,...|
|2456.1554286690625|       5103|[0.0,1.0,10.0,10....|
|  2447.44968864824|       4895|[0.0,1.0,17.0,10....|
|  2489.31817780362|       3413|[0.0,1.0,19.0,12....|
| 2468.257706044099|       5547|[0.0,1.0,21.0,11....|
+------------------+-----------+--------------------+
only showing top 5 rows



                                                                                

Mean Squared Error (MSE) on test data = 966031.1114618272
Root Mean Squared Error (RMSE) on test data = 982.8688170156927


                                                                                

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, count, year, month, dayofmonth, lag
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import os

# Initialize Spark session
spark = SparkSession.builder.appName("NYC Taxi Data Analysis").getOrCreate()

# Define folder path and read parquet files
folder_path = DATA_DIR # Placeholder path
parquet_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.parquet')]

df = spark.read.parquet(*parquet_files)

# Add time-related columns
df = df.withColumn("hour_of_day", hour("tpep_pickup_datetime"))
df = df.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))
df = df.withColumn("day_of_month", dayofmonth("tpep_pickup_datetime"))
df = df.withColumn("month", month("tpep_pickup_datetime"))
df = df.withColumn("year", year("tpep_pickup_datetime"))

# Aggregate data
df_grouped = df.groupBy("tpep_pickup_datetime", "hour_of_day", "day_of_week", "day_of_month", "month", "year").agg(count("*").alias("num_pickups"))

# Create lag features
windowSpec = Window.orderBy("tpep_pickup_datetime")
df_grouped = df_grouped.withColumn("lag_1", lag("num_pickups", 1).over(windowSpec))
df_grouped = df_grouped.withColumn("lag_2", lag("num_pickups", 2).over(windowSpec))

# Drop null values created by lag
df_grouped = df_grouped.na.drop()

# Prepare features for the model
feature_columns = ["hour_of_day", "day_of_week", "day_of_month", "month", "year", "lag_1", "lag_2"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Initialize and configure Random Forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="num_pickups", numTrees=100)

# Create a pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Ensure time-based split for training and test sets
# For example, use data from 2019-2020 for training and 2021 for testing
train_df = df_grouped.filter(col("year") < 2021)
test_df = df_grouped.filter(col("year") == 2021)

# Train the model
model = pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)
predictions.select("prediction", "num_pickups", "features").show(5)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="num_pickups", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE) on test data =", mse)

evaluator = evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data =", rmse)


24/05/22 16:31:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 16:31:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 16:31:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 16:31:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:31:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:31:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:31:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:31:48 WARN RowBasedKeyVal

+------------------+-----------+--------------------+
|        prediction|num_pickups|            features|
+------------------+-----------+--------------------+
|1.0630748484046644|          1|[0.0,6.0,1.0,1.0,...|
|1.0630748484046644|          1|[0.0,6.0,1.0,1.0,...|
|1.0630748484046644|          1|[0.0,6.0,1.0,1.0,...|
|1.0630748484046644|          1|[0.0,6.0,1.0,1.0,...|
|1.0630748484046644|          1|[0.0,6.0,1.0,1.0,...|
+------------------+-----------+--------------------+
only showing top 5 rows



24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:32:31 WARN RowBasedKeyValueBatch: Calling spill() on

Mean Squared Error (MSE) on test data = 1.76210127791547


24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:33:23 WARN RowBasedKeyValueBatch: Calling spill() on

Root Mean Squared Error (RMSE) on test data = 1.3274416288166762


                                                                                

In [34]:
df = spark.read.parquet(*parquet_files)

# Add time-related columns
df = df.withColumn("hour_of_day", hour("tpep_pickup_datetime"))
df = df.withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))
df = df.withColumn("day_of_month", dayofmonth("tpep_pickup_datetime"))
df = df.withColumn("month", month("tpep_pickup_datetime"))
df = df.withColumn("year", year("tpep_pickup_datetime"))

# Aggregate data
df_grouped = df.groupBy("tpep_pickup_datetime", "hour_of_day", "day_of_week", "day_of_month", "month", "year").agg(count("*").alias("num_pickups"))

df_grouped.show()

24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:19 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------------+-----------+-----------+------------+-----+----+-----------+
|tpep_pickup_datetime|hour_of_day|day_of_week|day_of_month|month|year|num_pickups|
+--------------------+-----------+-----------+------------+-----+----+-----------+
| 2021-12-01 00:41:51|          0|          4|           1|   12|2021|          1|
| 2021-12-01 00:39:27|          0|          4|           1|   12|2021|          1|
| 2021-11-30 23:57:51|         23|          3|          30|   11|2021|          2|
| 2021-12-01 00:47:27|          0|          4|           1|   12|2021|          1|
| 2021-12-01 00:34:56|          0|          4|           1|   12|2021|          1|
| 2021-12-01 00:12:54|          0|          4|           1|   12|2021|          1|
| 2021-12-01 01:31:43|          1|          4|           1|   12|2021|          1|
| 2021-12-01 01:56:26|          1|          4|           1|   12|2021|          1|
| 2021-12-01 01:08:14|          1|          4|           1|   12|2021|          1|
| 20

                                                                                

In [35]:
# Create lag features
windowSpec = Window.orderBy("tpep_pickup_datetime")
df_grouped = df_grouped.withColumn("lag_1", lag("num_pickups", 1).over(windowSpec))
df_grouped = df_grouped.withColumn("lag_2", lag("num_pickups", 2).over(windowSpec))

df_grouped.show()

24/05/22 16:37:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 16:37:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 16:37:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 16:37:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/22 16:37:33 WARN RowBasedKeyVal

+--------------------+-----------+-----------+------------+-----+----+-----------+-----+-----+
|tpep_pickup_datetime|hour_of_day|day_of_week|day_of_month|month|year|num_pickups|lag_1|lag_2|
+--------------------+-----------+-----------+------------+-----+----+-----------+-----+-----+
| 2002-12-31 23:07:20|         23|          3|          31|   12|2002|          1| null| null|
| 2003-01-01 00:26:16|          0|          4|           1|    1|2003|          1|    1| null|
| 2003-01-01 00:43:37|          0|          4|           1|    1|2003|          1|    1|    1|
| 2003-01-01 01:21:25|          1|          4|           1|    1|2003|          1|    1|    1|
| 2003-01-01 01:32:47|          1|          4|           1|    1|2003|          1|    1|    1|
| 2003-01-03 12:54:36|         12|          6|           3|    1|2003|          1|    1|    1|
| 2003-01-05 07:23:34|          7|          1|           5|    1|2003|          1|    1|    1|
| 2003-01-05 23:37:05|         23|          1|    

                                                                                