In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=50a8439e19a568603099d7c44e5e542d3f42ecf73852cb8b8a5da8978a0b938d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [13]:
import json
import numpy as np
import pickle
import pandas as pd
from datetime import datetime
import pytz

In [20]:
model_file_path = '/gdrive/My Drive/trip_model.pkl'
scaler_file_path = '/gdrive/My Drive/scaler_model.pkl'
data_file_path = '/gdrive/My Drive/station_data.json'
output_file_path_pred = '/gdrive/My Drive/station_data.json'
output_file_path_spark = "/gdrive/My Drive/output_priority.json"

In [15]:
def get_current_datetime():
    eastern = pytz.timezone('US/Eastern')
    return datetime.now(eastern).strftime("%Y-%m-%d %H:%M:%S")
eastern = pytz.timezone('US/Eastern')

In [8]:
def load_model(file_path):
    with open(file_path, 'rb') as model_file:
        return pickle.load(model_file)

def load_scaler(file_path):
    with open(file_path, 'rb') as scaler_file:
        return pickle.load(scaler_file)

def load_station_data(file_path):
    with open(file_path, 'r') as data_file:
        return json.load(data_file)

def scale_features(data_point, scaler):
    return scaler.transform([data_point])[0]

def predict_traffic(model, scaled_data_point):
    return model.predict([scaled_data_point])[0]


def add_predictions_to_data(station_data, model, scaler):
    # Convert the station_data list of dictionaries to a pandas DataFrame
    df = pd.DataFrame(station_data)

    # Extract features into a separate DataFrame and rename columns
    features = df[['day_of_week', 'hour', 'local_id']]
    features = features.rename(columns={'day_of_week': 'day', 'hour': 'hour', 'local_id': 'station_id'})

    # Scale the features using the loaded scaler
    scaled_features = scaler.transform(features)

    # Use the trained model to predict the target variable
    df['predicted_traffic'] = model.predict(scaled_features)

    # Convert the DataFrame back to a list of dictionaries
    updated_station_data = df.to_dict('records')

    return updated_station_data


def save_data_with_predictions(station_data, output_file_path):
    with open(output_file_path, 'w') as output_file:
        json.dump(station_data, output_file)

In [9]:
def runPrediction():
    rf_model = load_model(model_file_path)
    scaler = load_scaler(scaler_file_path)
    station_data = load_station_data(data_file_path)

    station_data_with_predictions = add_predictions_to_data(station_data, rf_model, scaler)

    save_data_with_predictions(station_data_with_predictions, output_file_path_pred)

In [10]:
runPrediction()

In [11]:
from pyspark.sql import SparkSession
import time

In [12]:
def process_batch():
    # Read the JSON file into a DataFrame
    df = spark.read.option("multiline", "true").json(json_file_path)

    # Create a new column maintenance_priority
    df = df.withColumn(
        "maintenance_priority",
        (df["num_docks_disabled"] + df["num_vehicles_disabled"]) /
        (df["num_docks_available"] + df["num_vehicles_available"])
    )

    # Order the DataFrame in descending order of maintenance_priority
    df = df.orderBy("maintenance_priority", ascending=False)

    df.createOrReplaceTempView("station")

    try:

        # Save the DataFrame as a JSON file
        df.write.json(output_file_path_spark,mode="overwrite")

    finally:
        # Unpersist the DataFrame to release resources
        df.unpersist()

In [18]:
current_hour_global = datetime.now(eastern).hour
print(current_hour_global)

0


In [21]:
# Create a Spark session
#spark = SparkSession.builder.appName("BatchApp").getOrCreate()

# Specify the path to the JSON file
json_file_path = output_file_path_pred

# Set the interval in seconds
interval_seconds = 15

try:
    # Run indefinitely
    while True:
        # Record the start time for each iteration
        iteration_start_time = time.time()

        current_datetime = datetime.now(eastern)
        if current_datetime.hour != current_hour_global:
          runPrediction()
          current_hour_global = current_datetime.hour

        # Process the batch
        process_batch()

        # Calculate and print the time taken for the iteration
        iteration_end_time = time.time()
        iteration_elapsed_time = iteration_end_time - iteration_start_time
        print(f"Time taken for iteration: {iteration_elapsed_time:.2f} seconds")

        # Wait for the specified interval
        time.sleep(interval_seconds)

except KeyboardInterrupt:
    # Handle keyboard interrupt (e.g., press Ctrl+C to stop the loop)
    print("Stopping the application")

finally:
    # Stop the Spark session
    spark.stop()


Time taken for iteration: 12.18 seconds
Time taken for iteration: 3.10 seconds
Time taken for iteration: 2.35 seconds
Time taken for iteration: 1.50 seconds
Time taken for iteration: 1.29 seconds
Time taken for iteration: 1.37 seconds
Time taken for iteration: 1.17 seconds
Time taken for iteration: 1.92 seconds
Time taken for iteration: 2.32 seconds
Time taken for iteration: 1.93 seconds
Stopping the application
