# Demand Prediction

- Use date and time data from pickups to develop features for a predictive model.
- Apply a regression model, such as linear regression, to predict the number of pickups in the next hour.


## Part 1. Local development

### Step 1. Setup the environment

In [13]:
from pyspark.sql import SparkSession

# Create or retrieve a Spark session
spark = SparkSession.builder.appName("Demand Prediction").getOrCreate()


### Step 2. Load the processed data

In [14]:
TRAIN_PROCESSED = "../../data/processed/train_processed.parquet"

df = spark.read.parquet(TRAIN_PROCESSED)

In [15]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_year: integer (nullable = true)
 |-- trip_distance_km: double (nullable = true)



### Step 2: Feature Engineering

We want to extract date and time components from the `pickup_datetime` column to use as features for your model.

In [16]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, dayofweek

# Assuming 'df' is already loaded with your data
df = df.withColumn("year", year("pickup_datetime"))
df = df.withColumn("month", month("pickup_datetime"))
df = df.withColumn("day", dayofmonth("pickup_datetime"))
df = df.withColumn("hour", hour("pickup_datetime"))
df = df.withColumn("minute", minute("pickup_datetime"))
df = df.withColumn("day_of_week", dayofweek("pickup_datetime"))

# Group by date, hour, and day of the week to count pickups
hourly_pickups = df.groupBy("year", "month", "day", "hour", "day_of_week").count().withColumnRenamed("count", "num_pickups")
hourly_pickups.show()


[Stage 1:>                                                          (0 + 8) / 8]

+----+-----+---+----+-----------+-----------+
|year|month|day|hour|day_of_week|num_pickups|
+----+-----+---+----+-----------+-----------+
|2016|    3| 18|   7|          6|        393|
|2016|    6| 30|  14|          5|        353|
|2016|    1|  1|  14|          6|        343|
|2016|    3| 23|  14|          4|        376|
|2016|    3| 19|   7|          7|        157|
|2016|    6| 29|  23|          4|        394|
|2016|    5| 28|   3|          7|        162|
|2016|    2| 27|   1|          7|        378|
|2016|    4| 14|  15|          5|        459|
|2016|    5| 10|  12|          3|        401|
|2016|    2|  8|  11|          2|        327|
|2016|    2| 29|  10|          2|        375|
|2016|    1| 10|  17|          1|        375|
|2016|    5| 13|   8|          6|        461|
|2016|    2|  6|   3|          7|        217|
|2016|    2| 25|  11|          5|        385|
|2016|    5| 11|   7|          4|        373|
|2016|    5| 14|  15|          7|        452|
|2016|    2| 10|   0|          4| 

                                                                                

### Step 3: Data Preparation for the Model

We need to split your data into training and test sets to evaluate the model properly.

In [18]:
# Split the data into training and test sets (80% training, 20% test)
train_data, test_data = hourly_pickups.randomSplit([0.8, 0.2], seed=1234)


### Step 4: Define and Train the Regression Model

Use Spark MLlib to define and train a linear regression model.

In [19]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Assemble features
assembler = VectorAssembler(inputCols=["year", "month", "day", "hour", "day_of_week"], outputCol="features")

# Initialize the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="num_pickups")

# Pipeline: Assemble vectors and then apply linear regression
pipeline = Pipeline(stages=[assembler, lr])

# Fit the model on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)
predictions.select("prediction", "num_pickups").show()


24/05/24 15:49:10 WARN Instrumentation: [50afee1d] regParam is zero, which might cause numerical instability and overfitting.
24/05/24 15:49:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/24 15:49:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/05/24 15:49:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/05/24 15:49:11 WARN Instrumentation: [50afee1d] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
[Stage 11:>                                                         (0 + 8) / 8]

+------------------+-----------+
|        prediction|num_pickups|
+------------------+-----------+
| 201.3859906427854|        566|
|246.72837122214892|        323|
|503.66852783854233|        269|
|195.22288232348671|        248|
|210.33700918327457|        199|
| 225.4511360430624|        143|
|240.56526290285024|        114|
|376.59240464094086|        361|
| 512.6195463790315|        396|
| 542.8478000986072|        417|
|  196.703042398763|        138|
|257.15954983791437|        136|
| 272.2736766977023|        213|
| 287.3878035574901|        281|
| 362.9584378564293|        348|
|408.30081843579285|        352|
|483.87145273473203|        180|
|145.19755350010078|        118|
|205.65406093925216|         50|
|326.56707581755495|        308|
+------------------+-----------+
only showing top 20 rows



                                                                                

### Step 5: Evaluate the Model

Evaluate the model's performance using suitable metrics, such as RMSE (Root Mean Squared Error).

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate model
evaluator = RegressionEvaluator(labelCol="num_pickups", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data =", rmse)


[Stage 14:>                                                         (0 + 8) / 8]

Root Mean Squared Error (RMSE) on test data = 113.66203923029369


                                                                                

### Step 6: Save and Deploy the Model (Optional)

If the model performs satisfactorily, consider saving it for later use or deploying it for real-time predictions.

In [21]:
# Save the model
LOCAL_MODEL_PATH = "../../results/models/taxi_demand_prediction_model"
GCP_MODEL_PATH = "gs://spbd-nyc-taxi-bucket/results/models/taxi_demand_prediction_model"

model.save(LOCAL_MODEL_PATH)

In [23]:
# from pyspark.ml import PipelineModel

# # Load the model
# model = PipelineModel.load(LOCAL_MODEL_PATH)


                                                                                

In [24]:
spark.stop()

## Part 2: Running on GCP

After modify and refactor the code, we gonna upload and run it on GCP

In [1]:
# Set bucket name
bucket_name = "spbd-nyc-taxi-bucket"

# Upload the Python script
!gsutil cp ./demand_prediction.py gs://{bucket_name}/scripts/

Copying file://./demand_prediction.py [Content-Type=text/x-python]...
/ [1 files][  1.7 KiB/  1.7 KiB]                                                
Operation completed over 1 objects/1.7 KiB.                                      


In [26]:
cluster_name = "spbd-nyc-taxi-cluster"
region = "europe-west9"
machine_type="n2-standard-2"

!gcloud dataproc clusters create {cluster_name} \
    --region={region} \
    --zone={region}-a \
    --master-machine-type={machine_type} \
    --worker-machine-type={machine_type} \
    --num-workers=2 \
    --image-version=2.0-debian10 \
    --scopes=default


Waiting on operation [projects/epita-spbd-nyc-da/regions/europe-west9/operations/66fa500a-0ebf-319e-8531-e7b1655803cf].
Waiting for cluster creation operation...                                      
Waiting for cluster creation operation...done.                                 
Created [https://dataproc.googleapis.com/v1/projects/epita-spbd-nyc-da/regions/europe-west9/clusters/spbd-nyc-taxi-cluster] Cluster placed in zone [europe-west9-a].


### Submit our Spark Job

In [36]:
!gcloud dataproc jobs submit pyspark \
    gs://spbd-nyc-taxi-bucket/scripts/demand_prediction.py \
    --cluster={cluster_name} \
    --region={region}

Job [53246a69e0a048bc8cd4ea7cc86c7e44] submitted.
Waiting for job output...
24/05/24 16:32:26 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/05/24 16:32:26 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/05/24 16:32:26 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/24 16:32:26 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
24/05/24 16:32:26 INFO org.sparkproject.jetty.util.log: Logging initialized @4056ms to org.sparkproject.jetty.util.log.Slf4jLog
24/05/24 16:32:26 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_412-b08
24/05/24 16:32:27 INFO org.sparkproject.jetty.server.Server: Started @4208ms
24/05/24 16:32:27 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@6dfa7a49{HTTP/1.1, (http/1.1)}{0.0.0.0:37025}
24/05/24 16:32:27 INFO org.apache.hadoop.yarn.client.RMPro

### Monitor the Job Execution

In [37]:
!gcloud dataproc jobs list --cluster=spbd-nyc-taxi-cluster --region=europe-west9

JOB_ID                            TYPE     STATUS
53246a69e0a048bc8cd4ea7cc86c7e44  pyspark  DONE
04443586510448939f1aee29dcd0f55a  pyspark  ERROR
752860f713dd42be8816269ed104e55b  pyspark  ERROR
83c7c3998cbb421e96ebe7d478203abe  pyspark  ERROR
3e1e1f351bab4a47b197d41c31e4f666  pyspark  ERROR
b43ada9382d14a98b0041ca206fb2635  pyspark  ERROR


### (Optional) Delete a cluster

In [38]:
!gcloud dataproc clusters delete spbd-nyc-taxi-cluster --region=europe-west9 --quiet

Waiting on operation [projects/epita-spbd-nyc-da/regions/europe-west9/operations/f500353b-43e2-3a3d-aad6-484ddad4771a].
Waiting for cluster deletion operation...done.                                 
Deleted [https://dataproc.googleapis.com/v1/projects/epita-spbd-nyc-da/regions/europe-west9/clusters/spbd-nyc-taxi-cluster].
