In [None]:
## Installation
%pip install fastapi uvicorn nest-asyncio   # API
%pip install numpy                          # Efficient data handling
%pip install pandas                         # Efficient date handling & aggregation
%pip install python-dotenv                  # .env => extracting hidden info
%pip install requests                       # Performing API calls
%pip install statsmodels                   # Optimized Parameter Generation

## Imports
from fastapi import FastAPI, Cookie, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import Annotated
import uvicorn
import nest_asyncio
import numpy as np
import pandas as pd
from dotenv import load_dotenv
import os
import requests
from enum import Enum 

In [None]:
## Configuration
load_dotenv()

## Global Variables
BACKEND_URL = os.getenv("BACKEND_URL")

In [None]:
app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173", "http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

In [None]:
# SARIMA Parameters
p = 1       # 1st Order Non-Seasonal AR Component
d = 1       # 1st Order Non-Seasonal I Component
q = 1       # 1st Order Non-Seasonal MA Component
P = 1       # 1st Order Seasonal AR Component
D = 1       # 1st Order Seasonal I Component
Q = 1       # 1st Order Seasonal MA Component
s = 52      # Weekly
 

In [None]:
## Preprocess Data
def preprocess_order_data(order_data):
    order_df = pd.DataFrame(order_data)
    order_df['order_time'] = order_df['order_time'].apply(lambda date_time : pd.to_datetime(date_time, utc=True).normalize())
    
    num_orders_df = order_df.groupby('order_time', group_keys=True)['cost'].count().reset_index().rename(columns={'cost': 'num_orders'})
    first_time_orders_df = order_df[order_df['is_first_order']]
    num_first_time_orders_df = first_time_orders_df.groupby('order_time', group_keys=True)['cost'].count().reset_index().rename(columns={'cost': 'num_first_time_orders'})
    revenue_df = order_df.groupby('order_time', group_keys=True)['cost'].sum().reset_index().rename(columns={'cost': 'revenue'})
    
    return dict(revenue_df=revenue_df, order_df=num_orders_df, first_time_consumer_df=num_first_time_orders_df)

def preprocess_visit_data(visit_data): 
    visit_df = pd.DataFrame(visit_data)
    visit_df['visit_time'] = visit_df['visit_time'].apply(lambda date_time : pd.to_datetime(date_time, utc=True).normalize())
    
    num_visits_df = visit_df.groupby('visit_time', group_keys=True)['restaurant_id'].count().reset_index().rename(columns={'restaurant_id': 'num_visits'})
    
    return num_visits_df

In [None]:
# Helper for retrieving data from DB
def get_restaurant_data_helper(path, cookies):
    response = requests.get(path, cookies=cookies)
    if not response.ok:
        raise HTTPException(status_code=424, detail="Failed to pull restaurant data from DB")
    data = response.json()
    return data

# TODO: Put data (num orders, total revenue, num first-time orders, num site visits) for a given restaurant into buckets
def get_restaurant_data(connect_sid, restaurant_id):
    cookies = {
        "connect.sid": connect_sid
    }
    
    # Get data
    order_data = get_restaurant_data_helper(f"{BACKEND_URL}/analytics/orders/{restaurant_id}", cookies)
    visit_data = get_restaurant_data_helper(f"{BACKEND_URL}/analytics/visits/{restaurant_id}", cookies)
    
    revenue_df, order_df, first_time_consumer_df = preprocess_order_data(order_data)
    visit_df = preprocess_visit_data(visit_data)
    
    return dict(revenue_df=revenue_df, order_df=order_df, visit_df=visit_df, first_time_consumer_df=first_time_consumer_df)

In [None]:
# Apply differencing
# Credit: https://otexts.com/fpp2/stationarity.html
def differencing(arr, lag = 1, repeat = 1):
    differenced_arr = np.copy(arr)
    for i in range(repeat):
        differenced_arr = differenced_arr[lag:] - differenced_arr[:lag]
    return differenced_arr

# Apply undifferencing
# Credit: https://stackoverflow.com/questions/72700812/how-to-inverse-differencing-on-future-forecasted-result
def revert_differencing(original_arr, differenced_arr, lag = 1, repeat = 1):
    inverted_diff_arr = np.copy(differenced_arr)
    for i in range(repeat):
        inverted_diff_arr = np.r_[original_arr[-lag:], differenced_arr].cumsum()
    return inverted_diff_arr[-len(differenced_arr):]

In [None]:
# Perform polynomial multiplication (essentially performing discrete convolution)
def polynomial_mul(poly_a, poly_b):
    product_len = len(poly_a) + len(poly_b) - 1
    product_poly = np.zeros(product_len, dtype=float)
    for i in range(len(poly_a)):
        for j in range(len(poly_b)):
            product_poly[i + j] += poly_a[i] * poly_b[j]     
    return product_poly

# Using seasonal and non-seasonal coefficients, generates the polynomial upon application of both seasonal/non-seasonal for one of: {MA, AR}
def generate_seasonal_n_nonseasonal_poly(seasonal_coeff, nonseasonal_coeff, is_positive):
    # TODO: Use s from above
    # Nonseasonal Polynomial w/ Lag Coefficients
    nonseasonal_poly = np.zeros(len(nonseasonal_coeff) + 1) 
    nonseasonal_poly[0] = 1
    nonseasonal_poly[1:] = np.array(nonseasonal_coeff) if is_positive else -np.array(nonseasonal_coeff)
    
    # Seasonal Polynomial w/ Lag Coefficients
    seasonal_poly = np.zeros((len(seasonal_coeff) * s) + 1)
    seasonal_poly[0] = 1
    for i in range(s):
        seasonal_poly[s*i] = seasonal_coeff[i] if is_positive else -seasonal_coeff[i]
        
    # Generate Product Polynomial
    product_poly = polynomial_mul(nonseasonal_poly, seasonal_poly)
    
    return product_poly

In [None]:
# Perform parameter generation from scratch
# NOTE: Implementing this from scratch would require using Hannan–Rissanen and Maximum Likelihood Estimation (MLE) with a Kalman filter and additional optimizers, which would
# add an extreme and highly unrealistic level of complexity (due to the very difficult math involved)
# NOTE: This ONLY uses model parameters - everything else is being done entirely from scratch

def get_sarima_params(data):
    # TODO: Test the output of res.params to be certain this will work
    from statsmodels.tsa.statespace.sarimax import SARIMAX
    model = SARIMAX(data, order=(p,d,q), seasonal_order=(P,D,Q,s))
    res = model.fit(disp=False)
    params = res.params
    resid = res.resid
    
    phi1   = params[0]
    theta1 = params[1]
    Phi1   = params[2]
    Theta1 = params[3]

    return dict(phi1=phi1, theta1=theta1, Phi1=Phi1, Theta1=Theta1, resid=resid)

In [None]:
def forecast_logic(data, num_forecast_weeks=1):
    # Retrieve SARIMA parameters
    params = get_sarima_params(data)
    phi1, Phi1, theta1, Theta1, resid = params['phi1'], params['Phi1'], params['theta1'], params['Theta1'], params['resid']
    
    # Apply trend differencing
    differenced_trend = differencing(data, lag=1, repeat=d)
    # Apply seasonal differencing
    differenced_all = differencing(differenced_trend, lag=s, repeat=D)
    
    # Generate shocks
    shocks = np.array(resid)
    
    # Generate AR & MA polynomials
    ar_polys = generate_seasonal_n_nonseasonal_poly([Phi1], [phi1], is_positive=False)
    ma_polys = generate_seasonal_n_nonseasonal_poly([Theta1], [theta1], is_positive=True)
    forecast_arr = []
    
    # Apply forecasting
    for i in range(num_forecast_weeks):
        ar = np.dot(ar_polys, differenced_all[-len(ar_polys):][::-1])
        ma = np.dot(ma_polys, shocks[-len(ma_polys):][::-1])
        
        forecast = ar - ma
        differenced_all = np.append(differenced_all, [forecast])
        shocks = np.append(shocks, [0.0]) # SARIMA takes expectation & assumes no future error
        forecast_arr.append(forecast)
    
    # Invert differencing
    forecast_data_inverted_seasonal = revert_differencing(differenced_all, forecast_arr, lag=s, repeat=D)
    forecast_data_inverted_all = revert_differencing(data, forecast_data_inverted_seasonal, lag=1, repeat=d)
    
    return forecast_data_inverted_all

In [None]:
# NOTE: Given limited data, will only be used to forecast next week's KPI figures
@app.get("/forecast/{restaurant_id}")
def forecast(restaurant_id: int, connect_sid: Annotated[str | None, Cookie(alias="connect.sid")] = None):
    # NOTE: FastAPI Cookie handling from here: https://fastapi.tiangolo.com/tutorial/cookie-params/#import-cookie
    if connect_sid is None:
        raise HTTPException(status_code=401, detail="Consumer session cookie not found")
    
    # Get data
    revenue_data, order_data, visit_data, first_time_consumer_data = get_restaurant_data(connect_sid, restaurant_id)
    return
    
    # Perform forecasting
    next_week_revenue = forecast_logic(revenue_data, num_forecast_weeks=1)
    next_week_orders = forecast_logic(order_data, num_forecast_weeks=1)
    next_week_site_visits = forecast_logic(visit_data, num_forecast_weeks=1)
    next_week_first_time_consumers = forecast_logic(first_time_consumer_data, num_forecast_weeks=1)
    
    return JSONResponse({"revenue": next_week_revenue, "orders": next_week_orders, "visits": next_week_site_visits, "first_time_consumers": next_week_first_time_consumers})

In [None]:
## Get FastAPI Server Online
if __name__ == "__main__":
    nest_asyncio.apply()
    uvicorn.run(app, host="127.0.0.1", port=8080)