In [190]:
# Required packages & Install Dependencies
!pip install pyspark==3.5.0 prophet==1.1.5 pandas==2.2.2 numpy==1.25.2 scikit-learn==1.4.1.post1



In [191]:
# Import libraries and Initiate Spark Session

import os
import random
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, to_date, count, when, lit, date_trunc
)

# Spark Session Initialization
spark = SparkSession.builder \
    .appName("RobotDelivery") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()




In [192]:
# Data Ingestion
columns = [
    "ride_id", "rideable_type", "started_at", "ended_at", "start_station_name",
    "start_station_id", "end_station_name", "end_station_id",
    "start_lat", "start_lng", "end_lat", "end_lng", "member_type"
]

citibike_df = spark.read.csv("/content/drive/MyDrive/bonial/dataset/202502-citibike-tripdata_combined.csv", header=False, inferSchema=True)
citibike_df = citibike_df.toDF(*columns)

holiday_df = spark.read.csv("/content/drive/MyDrive/bonial/us_holidays_2023.csv", header=True, inferSchema=True)


In [193]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [194]:
# Data Quality Validations check and Logging.


from pyspark.sql.functions import col, to_timestamp, count, when
import pandas as pd
import os

output_data_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/validated_data"
dq_report_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/data_quality_report.csv"

os.makedirs(output_data_path, exist_ok=True)

dq_report = []

# Total record count once (expensive op)
total_count = citibike_df.count()

# Null value check
for col_name in ["started_at", "ended_at"]:
    null_count = citibike_df.filter(col(col_name).isNull()).count()
    dq_report.append({
        "Check": f"Null Check - {col_name}",
        "Total Records": total_count,
        "Failed Records": null_count,
        "Passed Records": total_count - null_count,
        "Status": "PASS" if null_count == 0 else "FAIL"
    })

# Timestamp conversion check
citibike_df = citibike_df.withColumn("starttime", to_timestamp("started_at")) \
                         .withColumn("endtime", to_timestamp("ended_at"))

invalid_starttime = citibike_df.filter(col("starttime").isNull()).count()
invalid_endtime = citibike_df.filter(col("endtime").isNull()).count()

dq_report += [
    {
        "Check": "Timestamp Conversion - starttime",
        "Total Records": total_count,
        "Failed Records": invalid_starttime,
        "Passed Records": total_count - invalid_starttime,
        "Status": "PASS" if invalid_starttime == 0 else "FAIL"
    },
    {
        "Check": "Timestamp Conversion - endtime",
        "Total Records": total_count,
        "Failed Records": invalid_endtime,
        "Passed Records": total_count - invalid_endtime,
        "Status": "PASS" if invalid_endtime == 0 else "FAIL"
    }
]

# Trip duration check (1s to 86400s)
citibike_df = citibike_df.withColumn("tripduration", col("endtime").cast("long") - col("starttime").cast("long"))
valid_duration_df = citibike_df.filter((col("tripduration") > 0) & (col("tripduration") <= 86400))
valid_duration_count = valid_duration_df.count()

dq_report.append({
    "Check": "Trip Duration Range (1s to 86400s)",
    "Total Records": total_count,
    "Failed Records": total_count - valid_duration_count,
    "Passed Records": valid_duration_count,
    "Status": "PASS" if valid_duration_count == total_count else "FAIL"
})

# Completeness check for key columns
important_columns = ["start_station_name", "end_station_name", "ride_id"]
for col_name in important_columns:
    missing_count = citibike_df.filter(col(col_name).isNull()).count()
    dq_report.append({
        "Check": f"Completeness - {col_name}",
        "Total Records": total_count,
        "Failed Records": missing_count,
        "Passed Records": total_count - missing_count,
        "Status": "PASS" if missing_count == 0 else "FAIL"
    })

# Final Valid Records After DQ
citibike_df = valid_duration_df
citibike_df.write.mode("overwrite").option("header", True).csv(output_data_path)

# Save DQ Report
pd.DataFrame(dq_report).to_csv(dq_report_path, index=False)



In [195]:
# Holiday Trips
from pyspark.sql.functions import to_date, col

holiday_df = holiday_df.withColumn("holiday_date", to_date(col("HolidayDate"), "dd-MM-yyyy"))

holiday_df.select("HolidayDate", "holiday_date").show(5)

citibike_df = citibike_df.withColumn("trip_date", to_date(col("started_at")))

joined_df = citibike_df.join(
    holiday_df,
    citibike_df.trip_date == holiday_df.holiday_date,
    "left"
).withColumn("is_holiday", col("HolidayName").isNotNull())

# Save to CSV and Parquet
output_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/holiday_trip_data"
joined_df.write.mode("overwrite").option("header", True).csv(f"{output_path}/csv")
joined_df.write.mode("overwrite").parquet(f"{output_path}/parquet")

total_trips = joined_df.count()
holiday_trips = joined_df.filter(col("is_holiday") == True).count()

print(f"Total trips: {total_trips}")
print(f"Holiday trips found: {holiday_trips}")


+-----------+------------+
|HolidayDate|holiday_date|
+-----------+------------+
| 01-01-2025|  2025-01-01|
| 16-01-2025|  2025-01-16|
| 20-02-2025|  2025-02-20|
| 29-05-2025|  2025-05-29|
| 19-06-2025|  2025-06-19|
+-----------+------------+
only showing top 5 rows

Total trips: 2030942
Holiday trips found: 58366


In [196]:
# Demand Forecasting for the upcoming trips

import os
from prophet import Prophet
from pyspark.sql.functions import date_trunc, count

# Step 1: Aggregate trips per hour
hourly_df = joined_df.groupBy(
    date_trunc("hour", "starttime").alias("ds")
).agg(count("*").alias("y"))

# Step 2: Convert to pandas
hourly_pd = hourly_df.toPandas()

# Step 3: Run Forecast only if we have data
if not hourly_pd.empty:
    model = Prophet()
    model.fit(hourly_pd)
    future = model.make_future_dataframe(periods=24, freq="h")
    forecast = model.predict(future)
    forecast_output = forecast[['ds', 'yhat']]

    # Step 4: Save
    forecast_output_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/forecast_data"
    os.makedirs(forecast_output_path, exist_ok=True)

    forecast_output.to_csv(f"{forecast_output_path}/forecast_output.csv", index=False)
    forecast_output.to_parquet(f"{forecast_output_path}/forecast_output.parquet", index=False)

    print("Forecast saved.")
else:
    print("No data available for forecasting.")





INFO:prophet:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this.
DEBUG:cmdstanpy:input tempfile: /tmp/tmpylid9w0o/161d8lg8.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpylid9w0o/cpl0ptm9.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/usr/local/lib/python3.11/dist-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=22362', 'data', 'file=/tmp/tmpylid9w0o/161d8lg8.json', 'init=/tmp/tmpylid9w0o/cpl0ptm9.json', 'output', 'file=/tmp/tmpylid9w0o/prophet_model8ce_wqnb/prophet_model-20250321151550.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
15:15:50 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
15:15:50 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing


Forecast saved.


In [197]:
# Simulate robot data

import numpy as np
import pandas as pd
import random

random.seed(42)
np.random.seed(42)

robot_pd = pd.DataFrame({
    "robot_id": range(10),
    "charge_level": np.random.randint(10, 100, size=10),
    "location": [f"area_{random.randint(1, 5)}" for _ in range(10)]
})

# Ensure at least one robot in area_1 is eligible
if "area_1" not in robot_pd["location"].values:
    random_index = random.randint(0, 9)
    robot_pd.loc[random_index, "location"] = "area_1"

area_1_indices = robot_pd.index[robot_pd["location"] == "area_1"].tolist()
if area_1_indices:
    robot_pd.loc[random.choice(area_1_indices), "charge_level"] = random.randint(51, 100)

robot_df = spark.createDataFrame(robot_pd)

# Save
output_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/robot_data"
os.makedirs(output_path, exist_ok=True)
robot_df.write.mode("overwrite").option("header", True).csv(f"{output_path}/csv")
robot_df.write.mode("overwrite").parquet(f"{output_path}/parquet")

print(f" Simulated Robot Data Saved with {robot_df.count()} records.")



 Simulated Robot Data Saved with 10 records.


In [198]:
# Receipts Generation
from pyspark.sql.functions import col
import os

receipts_df = joined_df.selectExpr(
    "ride_id as delivery_id",
    "starttime as delivery_time",
    "start_station_id as from_location",
    "end_station_id as to_location"
).withColumn("receipt", col("delivery_id"))

output_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/receipts"
os.makedirs(output_path, exist_ok=True)

receipts_df.write.mode("overwrite").option("header", True).csv(f"{output_path}/csv")
receipts_df.write.mode("overwrite").parquet(f"{output_path}/parquet")

print(f" Receipts Generated: {receipts_df.count()} records.")



 Receipts Generated: 2030942 records.


In [199]:

# Inter-City Trip logs
inter_city_df = joined_df.filter(col("start_station_id") != col("end_station_id"))

output_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/inter_city_trips"
os.makedirs(output_path, exist_ok=True)

inter_city_df.write.mode("overwrite").option("header", True).csv(f"{output_path}/csv")
inter_city_df.write.mode("overwrite").parquet(f"{output_path}/parquet")

print(f" Inter-City Trips Saved: {inter_city_df.count()} records.")



 Inter-City Trips Saved: 1998691 records.


In [200]:
# Robots availability for pickup - Already charged and available in specific area

available_df = robot_df.filter(col("charge_level") > 50)

output_path = "/content/drive/MyDrive/bonial/dataset/outputFiles/available_robots"
os.makedirs(output_path, exist_ok=True)

available_df.write.mode("overwrite").option("header", True).csv(f"{output_path}/csv")
available_df.write.mode("overwrite").parquet(f"{output_path}/parquet")

print(f" Available Robots Saved: {available_df.count()} records.")


 Available Robots Saved: 8 records.
