In [None]:
import sys
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.axes
import numpy as np
from pathlib import Path

# Add project root to Python path
project_root = Path().resolve().parent
sys.path.insert(0, str(project_root))

import src.config as config

In [None]:
import pandas as pd
import src.loadProcessed as loadp

counts: pd.DataFrame = loadp.load_selected_count()
locations: pd.DataFrame = loadp.load_processed_locations()

## Compute Model

In [None]:
import statsmodels.api as sm
import statsmodels.formula.api as smf
import patsy.highlevel as phl
import math

# We want to log transform since counts have a power relation
log_data = counts.copy()
log_data['daily_count'] = log_data['daily_count'].apply(np.log)
log_data['is_weekend'] = log_data['day'].isin(['Saturday', 'Sunday'])

model_3 = smf.mixedlm(
    'daily_count ~ day*year',     # Defines the response and fixed effects
    log_data,                       
    groups=log_data['sensor_id'],  # Defines how to cluster the data
    re_formula='~ is_weekend*year'            # Random effect that differs across the clusters
)
result_3 = model_3.fit(reml=True)

print(result_3.summary())

## Extract Parameters

In [None]:
# NumPy array maybe overkill but it's nice to have set functions that maintain order
days = np.array(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'])
day_levels = np.setdiff1d(days, ['Friday'], assume_unique=True)

residual_variance = result_3.scale

# --- Fixed Covariance Matrix ---
fe_label_order = (
    ['Intercept', 'year[T.2025]']
    + [f'day[T.{d}]' for d in day_levels]
    + [f'day[T.{d}]:year[T.2025]' for d in day_levels]
)
fixed_cov_matrix = result_3.cov_params().loc[fe_label_order, fe_label_order]

fe_rename_map = {'Intercept': 'intercept', 'year[T.2025]': 'year'}
fe_rename_map.update({f'day[T.{d}]': f'day_{d.lower()}' for d in day_levels})
fe_rename_map.update({f'day[T.{d}]:year[T.2025]': f'interaction_{d.lower()}' for d in day_levels})

fixed_cov_matrix = fixed_cov_matrix.rename(index=fe_rename_map, columns=fe_rename_map)

# -- Fixed Params Vector (Beta) --
fixed_params = result_3.params.loc[fe_label_order].rename(index=fe_rename_map)


# --- Random Covariance Matrix ---
random_cov_matrix = result_3.cov_re.iloc[:4, :4]

# Reorder and rename the covariance matrix
re_label_order = ['Group', 'year[T.2025]', 'is_weekend[T.True]', 'is_weekend[T.True]:year[T.2025]']
re_rename_labels = ['sensor', 'year', 'weekend', 'interaction']

re_rename_map = dict(zip(re_label_order, re_rename_labels))

random_cov_matrix = random_cov_matrix.loc[re_label_order, re_label_order]

# Rename rows and columns
random_cov_matrix = random_cov_matrix.rename(index=re_rename_map, columns=re_rename_map)

# --- Random Params Vector (b) ---
random_params = (
    pd.DataFrame.from_dict(result_3.random_effects, orient='index')
    .reindex(columns=re_label_order)
    .rename(columns=re_rename_map)
)

In [None]:
# Extract Fixed Effects
log_fixed_year = result_3.params['year[T.2025]']  # beta_1
log_fixed_intercept = result_3.params['Intercept']  # beta_0

log_fixed_day_effects = {d: result_3.params[f'day[T.{d}]'] for d in day_levels}

# Extract Random Effects
log_random_effects = pd.DataFrame.from_dict(
    result_3.random_effects,
    orient='index'
).rename(
    columns={
        'Group': 'b0_intercept',
        'year[T.2025]': 'b1_year',
        'is_weekend[T.True]': 'b2_weekend',
        'is_weekend[T.True]:year[T.2025]': 'b3_interaction'
    }
)

In [None]:
def random_design_vector(intercept: bool, year: bool, weekend: bool):
    return np.array(
        [int(intercept), int(year), int(weekend), int(year and weekend)]
    )

def fixed_design_vector(intercept: bool, year: bool, day: str):
    # Create a fixed effects design vector
    day_vector = np.array([0] * len(day_levels))

    # Leave as all zeros if day is Friday
    if day != 'Friday':
        # Find the corresponding index to the specified day
        day_vector[np.where(day_levels == day)[0][0]] = 1

    return np.concatenate(([int(intercept), int(year)], day_vector, day_vector * int(year)))

def random_effect_sensor_cov(sensor_id):
    cov_mat = result_3.random_effects_cov[sensor_id]

    cov_mat = cov_mat.loc[re_label_order, re_label_order]

    # Rename rows and columns
    cov_mat = cov_mat.rename(index=re_rename_map, columns=re_rename_map)
    return cov_mat
    

## Calculating Population Wide Statistics

Where $\boldsymbol{x}_j^T$, $\boldsymbol{z}_j^T$ are rows corresponding to the $j$ in their respective design matrices, $\boldsymbol{b}_j \sim N(\bold 0, G)$, $\varepsilon_j \sim N(\bold 0, \sigma^2)$
$$\log y_{j} = \boldsymbol{x}_j^T \boldsymbol{\beta} + \boldsymbol{z}_j^T \boldsymbol{b}_j + \varepsilon_j$$
$$\log E[y_j] = \boldsymbol{x}_j^T \boldsymbol{\beta} + \frac 1 2 (\boldsymbol{z}_j^T G \boldsymbol{z}_j + \sigma ^ 2)$$
$$\text{Var}\ \log E[y_j] \approx \text{Var}\ \boldsymbol{x}_j^T \hat{\boldsymbol{\beta}} = \boldsymbol{x}_j^T \text{Var}(\hat{\boldsymbol{\beta}}) \boldsymbol{x}_j$$

In [None]:
population_wide_stats = pd.DataFrame(
    index = ['estimate', 'lower', 'upper']
)

### Estimates of population counts for each day across years

In [None]:
for day in days:
    for year in ['2019', '2025']:
        is_weekend = day in ['Saturday', 'Sunday']
        t_t = fixed_design_vector(True, year == '2025', day)
        z_t = random_design_vector(True, year == '2025', is_weekend)

        # Calculate Estimate
        log_estimate = (
            t_t @ fixed_params + (z_t @ random_cov_matrix @ z_t.transpose() + residual_variance)/2
        )
        population_wide_stats.loc['estimate', f'{day}_expected_{year}'] = np.exp(log_estimate)
        
        # Calculate Confidence interval
        std_error = np.sqrt(t_t @ fixed_cov_matrix @ t_t.transpose())
        
        population_wide_stats.loc['lower', f'{day}_expected_{year}'] = np.exp(log_estimate - 1.96 * std_error)
        population_wide_stats.loc['upper', f'{day}_expected_{year}'] = np.exp(log_estimate + 1.96 * std_error)

In [None]:
for day in days:
    x_t25 = fixed_design_vector(True, True, day)
    x_t19 = fixed_design_vector(True, False, day)

    z_t25 = random_design_vector(True, True, day in ['Saturday', 'Sunday'])
    z_t19 = random_design_vector(True, False, day in ['Saturday', 'Sunday'])

    log_ratio_estimate = (x_t25 - x_t19) @ fixed_params + (z_t25 @ random_cov_matrix @ z_t25.transpose() - z_t19 @ random_cov_matrix @ z_t19.transpose())/2 

    std_error = np.sqrt((x_t25 - x_t19) @ fixed_cov_matrix @ (x_t25 - x_t19).transpose())

    population_wide_stats.loc['estimate', f'{day}_percentage_change'] = (np.exp(log_ratio_estimate) - 1) * 100
    population_wide_stats.loc['lower', f'{day}_percentage_change'] = (np.exp(log_ratio_estimate - 1.96 * std_error) - 1) * 100
    population_wide_stats.loc['upper', f'{day}_percentage_change'] = (np.exp(log_ratio_estimate + 1.96 * std_error) - 1) * 100




### Estimates of average population counts across years
Possible extenstion of this would be calculating the confidence intervals, however since it is a non-linear transforrmation of our estimators I would need to research methods for this. From a brief look I can see the delta method and bootstrapping may be potential avenues for improvement but for the scope of this project I will leave this as a point estimate.

In [None]:
for year in ['2019', '2025']:
    columns = [f'{day}_expected_{year}' for day in days]

    weekly_sum = population_wide_stats.loc['estimate', columns].sum() # type: ignore
    population_wide_stats.loc['estimate', f'mean_expected_{year}'] = weekly_sum / 7

In [None]:
population_wide_stats.to_parquet(config.PROCESSED_DATA_DIR / 'population_stats.parquet')
population_wide_stats.to_csv(config.PROCESSED_DATA_DIR / 'population_stats.csv')

## Location Specific Estimates

Where $\boldsymbol{x}_j^T$, $\boldsymbol{z}_j^T$ are rows corresponding to the $j$ in their respective design matrices, $\boldsymbol{b}_j \sim N(\bold 0, G)$, $\varepsilon_j \sim N(\bold 0, \sigma^2)$
$$\log y_{j} = \boldsymbol{x}_j^T \boldsymbol{\beta} + \boldsymbol{z}_j^T \boldsymbol{b}_j + \varepsilon_j$$
$$\log E[y_j | \boldsymbol{b}_j] = \boldsymbol{x}_j^T \boldsymbol{\beta} + \boldsymbol{z}_j^T \boldsymbol{b}_j + \frac {\sigma ^ 2} 2$$

In [None]:
location_specific_stats = pd.DataFrame(
    index = random_params.index
)

### Absolute Counts

In [None]:
for sensor_id, sensor_b in random_params.iterrows():
    for year in ['2019', '2025']:
        for is_weekend in [False, True]:       
            if is_weekend:     
                average_over_days = np.array(['Saturday', 'Sunday'])
            else:
                average_over_days = np.setdiff1d(days, ['Saturday', 'Sunday'])

            z_t = random_design_vector(True, year == '2025', is_weekend)

            X_t = np.array([fixed_design_vector(True, year == '2025', d) for d in average_over_days])

            count_estimate = (
                np.exp(z_t @ sensor_b + residual_variance/2) * 1/len(average_over_days) * (np.exp(X_t @ fixed_params)).sum()
            )

            location_specific_stats.loc[sensor_id, f'{'wknd' if is_weekend else 'wkdy'}_count_{year}_estimate'] = count_estimate

for year in ['2019', '2025']:
    location_specific_stats[f'count_{year}_estimate'] = 2/7 * location_specific_stats[f'wknd_count_{year}_estimate'] + 5/7 * location_specific_stats[f'wkdy_count_{year}_estimate'] 

## Fractional Change
Since the weekend random effect requires a sum over the day fixed effects, the ratio of these sums does not cancel as nicely as usual under a log transformation. For this reason we are taking the ratio of absolute counts despite the added variability this induces.

In [None]:
location_specific_stats['percent_year_change'] = (location_specific_stats['count_2025_estimate'] / location_specific_stats['count_2019_estimate'] - 1) * 100
location_specific_stats['wknd_percent_year_change'] = (location_specific_stats['wknd_count_2025_estimate'] / location_specific_stats['wknd_count_2019_estimate'] - 1) * 100
location_specific_stats['wkdy_percent_year_change'] = (location_specific_stats['wkdy_count_2025_estimate'] / location_specific_stats['wkdy_count_2019_estimate'] - 1) * 100

In [None]:
location_specific_stats.to_parquet(config.PROCESSED_DATA_DIR / 'location_specific_stats.parquet')
location_specific_stats.to_csv(config.PROCESSED_DATA_DIR / 'location_specific_stats.csv')