In [1]:
from pyspark.sql import SparkSession

# This is the fix!
# "local[*]" tells Spark to use all available CPU cores in this one container
# as its "cluster".
spark = SparkSession.builder \
    .appName("BDA_Final_Project") \
    .master("local[*]") \
    .getOrCreate()

# If this works, it will print your new Spark session object
print("Spark Session created successfully in local mode!")
print(spark)

Spark Session created successfully in local mode!
<pyspark.sql.session.SparkSession object at 0x736b45747ad0>


In [2]:
# Download the January 2024 yellow taxi data file
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet

# Check if the file is there. You should see it in the output.
!ls -lh

--2025-10-29 19:33:54--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.164.82.112, 3.164.82.160, 3.164.82.40, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.164.82.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49961641 (48M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-01.parquet.1’


2025-10-29 19:34:12 (2.73 MB/s) - ‘yellow_tripdata_2024-01.parquet.1’ saved [49961641/49961641]

total 96M
-rw-r--r-- 1 jovyan users  12K Oct 29 19:33 Untitled.ipynb
drwsrwsr-x 2 jovyan users 4.0K Oct 20  2023 work
-rw-r--r-- 1 jovyan users  48M Mar 21  2024 yellow_tripdata_2024-01.parquet
-rw-r--r-- 1 jovyan users  48M Mar 21  2024 yellow_tripdata_2024-01.parquet.1


In [3]:
# --- Import all libraries (with new functions) ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, col, dayofweek, month, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# --- 1. Start Spark Session ---
# Use .getOrCreate() to get the existing session or make a new one
spark = SparkSession.builder \
    .appName("BDA_Final_Project_V2") \
    .master("local[*]") \
    .getOrCreate()

print("--- 1. Spark Session V2 Ready ---")
print(spark)

# --- 2. Load Data ---
# (Assumes 'yellow_tripdata_2024-01.parquet' is in your folder)
filepath = "yellow_tripdata_2024-01.parquet"
df = spark.read.parquet(filepath)
print("--- 2. Data Loaded Successfully ---")

# --- 3. UPGRADED Feature Engineering ---
print("--- 3. Upgraded Feature Engineering... ---")
# Clean the data
df = df.dropna(subset=['tpep_pickup_datetime', 'PULocationID'])

# Original feature
df = df.withColumn("hour", hour(col("tpep_pickup_datetime")))

# --- NEW FEATURES ---
# Get the month (1-12)
df = df.withColumn("month", month(col("tpep_pickup_datetime")))

# Get the day of the week (1=Sunday, 2=Monday, ..., 7=Saturday)
df = df.withColumn("day_of_week", dayofweek(col("tpep_pickup_datetime")))

# Create an 'is_weekend' feature (1 for weekend, 0 for weekday)
df = df.withColumn("is_weekend", when((col("day_of_week") == 1) | (col("day_of_week") == 7), 1).otherwise(0))

print("New features added:")
df.select("tpep_pickup_datetime", "hour", "month", "day_of_week", "is_weekend").show(5)

# --- 4. UPGRADED Aggregation ---
print("--- 4. Upgraded Aggregation... ---")
# We now group by ALL our features
demand_df = df.groupBy("PULocationID", "month", "day_of_week", "hour", "is_weekend").count()
demand_df = demand_df.withColumnRenamed("count", "trip_count")
print("New aggregated data:")
demand_df.show(5)

# --- 5. UPGRADED ML Preparation ---
print("--- 5. Upgraded ML Prep... ---")
# This is our new, smarter feature list
feature_cols = ["PULocationID", "month", "day_of_week", "hour", "is_weekend"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
features_df = assembler.transform(demand_df)

(training_data, test_data) = features_df.randomSplit([0.8, 0.2], seed=42)
print("--- 6. Data Split for Training/Testing ---")

# --- 7. Train the V2 Model ---
print("--- 7. Starting Model Training... (This will take a few minutes) ---")
rf = RandomForestRegressor(labelCol="trip_count", featuresCol="features")
model_v2 = rf.fit(training_data)
print("--- Model V2 Trained Successfully! ---")

# --- 8. Make & Evaluate V2 Predictions ---
print("--- 8. Making V2 Predictions... ---")
predictions = model_v2.transform(test_data)
predictions.select("features", "trip_count", "prediction").show(10)

evaluator = RegressionEvaluator(labelCol="trip_count", predictionCol="prediction", metricName="rmse")
rmse_v2 = evaluator.evaluate(predictions)

print("---  FINAL PROJECT V2 COMPLETE  ---")
print(f"OLD RMSE (V1): 1393.87")
print(f"NEW RMSE (V2): {rmse_v2}")
print(f"This means, on average, our NEW model's prediction is off by about {int(rmse_v2)} trips.")

--- 1. Spark Session V2 Ready ---
<pyspark.sql.session.SparkSession object at 0x736b45747ad0>
--- 2. Data Loaded Successfully ---
--- 3. Upgraded Feature Engineering... ---
New features added:
+--------------------+----+-----+-----------+----------+
|tpep_pickup_datetime|hour|month|day_of_week|is_weekend|
+--------------------+----+-----+-----------+----------+
| 2024-01-01 00:57:55|   0|    1|          2|         0|
| 2024-01-01 00:03:00|   0|    1|          2|         0|
| 2024-01-01 00:17:06|   0|    1|          2|         0|
| 2024-01-01 00:36:38|   0|    1|          2|         0|
| 2024-01-01 00:46:51|   0|    1|          2|         0|
+--------------------+----+-----+-----------+----------+
only showing top 5 rows

--- 4. Upgraded Aggregation... ---
New aggregated data:
+------------+-----+-----------+----+----------+----------+
|PULocationID|month|day_of_week|hour|is_weekend|trip_count|
+------------+-----+-----------+----+----------+----------+
|          75|    1|          2| 

In [4]:
# --- 9. Save Final Results for Visualization ---
print("--- 9. Saving final predictions to a CSV file... ---")

# Let's see all our columns
predictions.printSchema()

# We need the location, hour, day, and the prediction.
# Let's save the predictions for a specific day to make it easy to visualize.
# (e.g., Tuesday, January 16th, 2024 = day_of_week 3)
heatmap_df = predictions.filter( (col("month") == 1) & (col("day_of_week") == 3) ) \
                        .select("PULocationID", "hour", "prediction")

# The 'predictions' DataFrame is big. Let's make it a small, single CSV.
# .coalesce(1) forces Spark to save it as one file.
# .repartition(1) is another way to do this.
heatmap_df.repartition(1).write.format("csv") \
          .option("header", "true") \
          .mode("overwrite") \
          .save("heatmap_predictions")

print("---  File Saved!  ---")
print("A new folder named 'heatmap_predictions' has been created in your Jupyter 'work' directory.")
print("Inside, you will find a .csv file you can download and open in Tableau or Power BI.")

--- 9. Saving final predictions to a CSV file... ---
root
 |-- PULocationID: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- is_weekend: integer (nullable = false)
 |-- trip_count: long (nullable = false)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)

---  File Saved!  ---
A new folder named 'heatmap_predictions' has been created in your Jupyter 'work' directory.
Inside, you will find a .csv file you can download and open in Tableau or Power BI.


In [5]:
# --- STEP 1: Import the 'datetime' library ---
# This lets us get the *current* time
from datetime import datetime
from pyspark.sql.functions import lit

# --- STEP 2: Define your inputs ---
# --- (You or your professor can change these!) ---
test_location = 142  # (e.g., 142 = Grand Central)
test_hour = 17       # (e.g., 5 PM)
test_month = 1       # (January)
test_day_of_week = 3 # (1=Sun, 2=Mon, 3=Tues)
test_is_weekend = 0  # (0=No)

# --- You can also use the CURRENT time! ---
# now = datetime.now()
# test_location = 142
# test_hour = now.hour
# test_month = now.month
# test_day_of_week = (now.weekday() + 2) % 7 + 1 # (Converts Mon=0 to Sun=1)
# test_is_weekend = 1 if test_day_of_week in (1, 7) else 0


# --- STEP 3: Create a 1-row DataFrame with these inputs ---
# We make a list of one "row"
my_data = [(test_location, test_month, test_day_of_week, test_hour, test_is_weekend)]

# We define the column names to match our model
my_schema = ["PULocationID", "month", "day_of_week", "hour", "is_weekend"]

# Create the 1-row Spark DataFrame
test_df = spark.createDataFrame(my_data, schema=my_schema)

print("--- 1. Making a prediction for this single input: ---")
test_df.show()

# --- STEP 4: Use the V2 Assembler and Model ---
# (This assumes your V2 assembler is named 'assembler')
final_test_df = assembler.transform(test_df)
prediction = model_v2.transform(final_test_df)

print("--- 2. Prediction Generated: ---")
prediction.select("features", "prediction").show()

# --- STEP 5: Get the final number and print it! ---
final_number = prediction.collect()[0]["prediction"]

print("---  FINAL PREDICTION  ---")
print(f"For Location ID {test_location} at hour {test_hour},")
print(f"the model predicts a demand of: {int(final_number)} trips.")

--- 1. Making a prediction for this single input: ---
+------------+-----+-----------+----+----------+
|PULocationID|month|day_of_week|hour|is_weekend|
+------------+-----+-----------+----+----------+
|         142|    1|          3|  17|         0|
+------------+-----+-----------+----+----------+

--- 2. Prediction Generated: ---
+--------------------+-----------------+
|            features|       prediction|
+--------------------+-----------------+
|[142.0,1.0,3.0,17...|388.2318295717943|
+--------------------+-----------------+

---  FINAL PREDICTION  ---
For Location ID 142 at hour 17,
the model predicts a demand of: 388 trips.


In [None]:
print("---  FINAL PREDICTION  ---")
print(f"For Location ID {test_location} at hour {test_hour},")
print(f"the model predicts a demand of: {int(final_number)} trips.")