In [0]:
%python
%pip install openpyxl

In [0]:
# PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, when, datediff, date_add, lit, row_number, abs as abs_,
)
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import DenseVector, SparseVector

# Data Handling & ML (Python)
import pandas as pd
import numpy as np
from sklearn.neighbors import NearestNeighbors

# StatsModels
import statsmodels.formula.api as smf
from statsmodels.stats.outliers_influence import variance_inflation_factor, OLSInfluence
from statsmodels.tools.tools import add_constant
from statsmodels.graphics.gofplots import qqplot
from statsmodels.stats.diagnostic import het_breuschpagan

# Plotting
import matplotlib.pyplot as plt
import seaborn as sns


In [0]:
spark = SparkSession.builder.appName("PropensityScoreMatching").getOrCreate()

In [0]:
def filter_data_function_M(df, intervention_group, control_group, recovery):
    # Convert max follow-up weeks to days
    
    # Filter and add time periods based on individual waiting time
    df_filtered = (
        df.filter((col("group") == intervention_group) | (col("group") == control_group))
        .withColumn("start_of_reference_period", when(col("epp_rtt_end_date") < date_add(col("epp_rtt_start_date"),85), col("epp_rtt_end_date")).otherwise(date_add(col("epp_rtt_start_date"), 85)))
        .withColumn("end_of_reference_period", date_add(col("start_of_reference_period"), 600))  # Define recovery period if needed
        .filter((col("days") >= col("epp_rtt_start_date"))  & (col("days")<=col("end_of_reference_period")))
        .withColumn(
            "time_period",
            when(col("days") <= col("start_of_reference_period"), 0)  # Waiting period
            .otherwise(1)  # Post-recovery period
        )
    )

    return df_filtered

In [0]:
def group_data_function(df_filtered, intervention_group, control_group, colum_hc, col_list):
    # Filter out those who do not have full 1 year follow-up
    df_filtered = df_filtered.filter(datediff(col("end_of_reference_period"), col("start_of_reference_period")) >= 600)
    
    # Group and aggregate
    df_grouped_by_time_period = (
        df_filtered.groupBy("epp_pid", "epp_rtt_start_date", "epp_tfc", "time_period", "group", *col_list)
        .agg(F.sum(colum_hc).alias('total_hc_use'))
        .withColumn(
            "treated",
            when(col("group") == control_group, 0).when(col("group") == intervention_group, 1)
        )
    )

    df_grouped = df_grouped_by_time_period.withColumn(
        'total_hc_use', 
        when(col('total_hc_use').isNull(), 0).otherwise(col('total_hc_use'))
    ).withColumn(
        'avg_weekly_use', 
        when(col("time_period") == 1, (F.col('total_hc_use') / 600 * 7).cast('double'))
        .otherwise((F.col('total_hc_use') / 85 *7).cast('double'))
    )

    return df_grouped

In [0]:
gb_data = simulate_waiting_list_data()
gb_data = gb_data.withColumn("wt", datediff(col("epp_rtt_end_date"), col("epp_rtt_start_date")))

waiting_list_df = gb_data.withColumn(
    "group",
    when((col("wt") <= 84), "<= 12 weeks")
    .when((col("wt") > 84) & (col("wt") <= 126), "<= 18 weeks")
    .otherwise("> 18 weeks")
)
waiting_list_df = waiting_list_df.filter((col("wt") > 84) )
#display(waiting_list_df.groupby("group").count())

waiting_list_df = waiting_list_df.filter(col("epp_referral_priority")!='cancer' )




In [0]:
inter_group="> 18 weeks"
df_30_weeks_filtered = filter_data_function_M(
    df=waiting_list_df, 
    intervention_group=inter_group, 
    control_group="<= 18 weeks",
    recovery=28  # Default recovery period
)

In [0]:
display(df_30_weeks_filtered)

In [0]:

columns = ["ndl_age_band", "ndl_imd_quantile", "ndl_ethnicity", "ndl_ltc", "Sex", "Frailty_level"]

# Replace null values with "unknown" in the specified columns
for column in columns:
    df_30_weeks_filtered = df_30_weeks_filtered.withColumn(column, when(col(column).isNull(), "unknown").otherwise(col(column)))

df_weeks_grouped = group_data_function(
    df_filtered=df_30_weeks_filtered,
    intervention_group=inter_group,
    control_group="<= 18 weeks", 
    colum_hc="gp_healthcare_use",
    col_list=columns
)
display(df_weeks_grouped)

In [0]:

categorical_cols = ["ndl_age_band", "ndl_imd_quantile", "ndl_ethnicity", "ndl_ltc", "Sex", "Frailty_level"]

# Index and encode categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index").fit(df_weeks_grouped) for col in categorical_cols]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df_weeks_grouped).transform(df_weeks_grouped)

# Assemble features for logistic regression
feature_cols = [col + "_index" for col in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_assembled = assembler.transform(df_indexed).select("features", "treated", "epp_pid", "epp_rtt_start_date", "epp_tfc" ,"group")

In [0]:
# Fit logistic regression to predict treatment probability
lr = LogisticRegression(featuresCol="features", labelCol="treated", probabilityCol="propensity_score")
lr_model = lr.fit(df_assembled)

# Get propensity scores
df_matched = lr_model.transform(df_assembled).select("epp_pid", "epp_rtt_start_date", "epp_tfc" ,"group", "propensity_score")

In [0]:
# Split into treatment and control groups
treated_df = df_matched.filter(col("group") == "> 18 weeks")
control_df = df_matched.filter(col("group") == "<= 18 weeks")

# UDF to extract the first value from a vector
def extract_first_element(vector):
    if isinstance(vector, DenseVector) or isinstance(vector, SparseVector):
        return float(vector[0])  # Get the first element
    return None

# Register UDF to convert vector to double
extract_first_element_udf = udf(extract_first_element, DoubleType())

# Apply UDF to extract numeric scores
treated_df = treated_df.withColumn("propensity_score_num", extract_first_element_udf(col("propensity_score")))
control_df = control_df.withColumn("control_score_num", extract_first_element_udf(col("propensity_score")))

# Rename control columns to avoid conflicts in crossJoin
control_df = control_df.withColumnRenamed("epp_pid", "control_pid") \
    .withColumnRenamed("control_score_num", "control_score_num")

# Cross join and calculate absolute score difference
#matched_df = treated_df.crossJoin(control_df) \
#    .withColumn("score_diff", abs(col("propensity_score_num") - col("control_score_num")))

matched_df = control_df.crossJoin(treated_df) \
   .withColumn("score_diff", abs(col("control_score_num") -col("propensity_score_num") ))

# Rank by the smallest score difference and keep the best match
window_spec = Window.partitionBy("control_pid").orderBy("score_diff")
matched_cohort = matched_df.withColumn("rank", row_number().over(window_spec)) \
                           .filter(col("rank") == 1) \
                           .select("epp_pid", "control_pid", "score_diff")


In [0]:
display(matched_cohort)

In [0]:
matched_epp_ids = matched_cohort.select("epp_pid").rdd.flatMap(lambda x: x).collect()
matched_control_ids = matched_cohort.select("control_pid").rdd.flatMap(lambda x: x).collect()

waiting_list_df_matched = waiting_list_df.filter(df_weeks_grouped.epp_pid.isin(matched_epp_ids + matched_control_ids))
display(waiting_list_df_matched)

In [0]:
display(
    waiting_list_df_matched.groupBy(
        "epp_pid", 
        "epp_tfc", 
        "epp_rtt_start_date", 
        "group"
    ).count().groupBy("group").count()
)


display(
    waiting_list_df.groupBy(
        "epp_pid", 
        "epp_tfc", 
        "epp_rtt_start_date", 
        "group"
    ).count().groupBy("group").count()
)

In [0]:
columns2 = ["ndl_ethnicity", "ndl_ltc", "Sex", "Frailty_level", "ndl_imd_quantile"] 
#columns2 = ["ndl_ethnicity",  "Sex",  "ndl_imd_quantile"] 

activity_variable = [
    "gp_healthcare_use",  
    "ae_healthcare_use", "nel_healthcare_use", #"el_healthcare_use", "op_healthcare_use", 
    "antib_pres_count", "antipres_pres_count", "pain_pres_count", 
]

all_estimates = []

for activity in activity_variable:

    df_30_weeks_filtered = filter_data_function_M(df=waiting_list_df, intervention_group='> 18 weeks', control_group='<= 18 weeks', recovery=28)

    df_grouped = group_data_function(
        df_filtered=df_30_weeks_filtered,
        intervention_group="> 18 weeks",
        control_group="<= 18 weeks", 
        colum_hc=activity,  
        col_list=columns2
    )
    display(df_grouped.groupBy("group").count())

    base_formula = f"avg_weekly_use ~ C(group) * C(time_period)"

# Add interaction terms between waiting time (group) and frailty/LTC
    full_formula = base_formula + " + C(group)*C(Frailty_level) +  C(group)*C(ndl_ltc)+ C(group)*C(ndl_ethnicity)"

# If you have additional variables in `columns2` that you want to add as controls:
    #full_formula = full_formula + " + " + " + ".join(columns2)

    did_model = smf.ols(formula=full_formula, data=df_grouped.toPandas()).fit()
    print(did_model.summary())

    estimates = did_model.params
    se = did_model.bse

    for term in estimates.index:
        all_estimates.append({
            'Activity': activity,
            'Term': term,
            'Estimate': estimates[term],
            'Standard Error': se[term],
            'p-value': did_model.pvalues[term]     
         })

In [0]:
results_df = pd.DataFrame(all_estimates)
results_df.to_excel('../Files/OLS_hyster_all.xlsx', index=False)

In [0]:
# VIF Calculation Function
def calculate_vif(df):
    X = add_constant(df)
    vif_data = pd.DataFrame()
    vif_data['Variable'] = X.columns
    vif_data['VIF'] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
    return vif_data

# Function to perform model diagnostics
def perform_diagnostics(model, df):
    # Residuals
    residuals = model.resid
    
    # Residual plot
    plt.figure(figsize=(8, 6))
    sns.residplot(x=model.fittedvalues, y=residuals, lowess=True, line_kws={'color': 'red'})
    plt.title('Residuals vs Fitted')
    plt.xlabel('Fitted Values')
    plt.ylabel('Residuals')
    plt.show()

    # Q-Q plot for Normality of Residuals
    qqplot(residuals, line='45')
    plt.title('Q-Q Plot')
    plt.show()

    # Breusch-Pagan Test for Heteroscedasticity
    _, pval, _, _ = het_breuschpagan(residuals, model.model.exog)
    print(f'Breusch-Pagan test p-value: {pval}')
    if pval < 0.05:
        print("Heteroscedasticity detected (p-value < 0.05)")
    else:
        print("No heteroscedasticity detected")

    # Cook's Distance for influence/outliers
    influence = OLSInfluence(model)
    cooks_d = influence.cooks_distance[0]
    
    # Plot Cook's Distance
    plt.figure(figsize=(8, 6))
    plt.stem(np.arange(len(cooks_d)), cooks_d, markerfmt=",")
    plt.title('Cook\'s Distance')
    plt.xlabel('Observation Index')
    plt.ylabel('Cook\'s Distance')
    plt.show()

    # Check for any high influence points (typically, Cook’s distance > 4/n)
    influence_points = np.where(cooks_d > 4 / len(df))[0]
    if len(influence_points) > 0:
        print(f"Influential points: {influence_points}")
    else:
        print("No influential points detected")

# Main loop for each activity
all_estimates = []
columns2 = ["ndl_ethnicity", "ndl_ltc", "Sex", "Frailty_level", "ndl_imd_quantile"]

activity_variable = [
    "gp_healthcare_use",  
    "ae_healthcare_use", "nel_healthcare_use", 
    "antib_pres_count", "antipres_pres_count", "pain_pres_count"
]

for activity in activity_variable:
    df_30_weeks_filtered = filter_data_function_M(df=waiting_list_df_matched, intervention_group='> 18 weeks', control_group='<= 18 weeks', recovery=28)

    df_grouped = group_data_function(
        df_filtered=df_30_weeks_filtered,
        intervention_group="> 18 weeks",
        control_group="<= 18 weeks", 
        colum_hc=activity,  
        col_list=columns2
    )

    
    df_grouped_pd = df_grouped.toPandas()

  # Build the formula without highly collinear variables
    base_formula = f"avg_weekly_use ~ C(group) * C(time_period)"
    full_formula = base_formula + " + C(group)*C(Frailty_level) + C(group)*C(ndl_ltc)+ C(group)*C(ndl_ethnicity)"

    did_model = smf.ols(formula=full_formula, data=df_grouped_pd).fit(cov_type='HC3')
    
    # Model Diagnostics
    perform_diagnostics(did_model, df_grouped_pd)

    # Filter out influential points based on Cook's distance
    influence = OLSInfluence(did_model)
    cooks_d = influence.cooks_distance[0]
    high_influence = np.where(cooks_d > 4 / len(df_grouped_pd))[0]
    df_grouped_pd_filtered = df_grouped_pd.drop(index=high_influence)

    did_model_filtered = smf.ols(formula=full_formula, data=df_grouped_pd_filtered).fit(cov_type='HC3')
    print(did_model_filtered.summary())

    perform_diagnostics(did_model_filtered, df_grouped_pd_filtered)
    display(df_grouped_pd_filtered.groupby("group"))



    # Store the results
    estimates = did_model_filtered.params
    se = did_model.bse
    for term in estimates.index:
        all_estimates.append({
            'Activity': activity,
            'Term': term,
            'Estimate': estimates[term],
            'Standard Error': se[term],
            'p-value': did_model.pvalues[term]     
        })

# Convert all estimates into a DataFrame for easy analysis
all_estimates_df = pd.DataFrame(all_estimates)

# Display the results
print(all_estimates_df)

In [0]:

results_df = pd.DataFrame(all_estimates)

# Plot the estimates with error bars
fig, axes = plt.subplots(len(activity_variable), 1, figsize=(10, 5 * len(activity_variable)), sharex=True)

for i, activity in enumerate(activity_variable):
    activity_df = results_df[results_df['Activity'] == activity]
    axes[i].errorbar(activity_df['Estimate'], activity_df['Term'], xerr=activity_df['Standard Error'], fmt='o', markersize=4)
    axes[i].axvline(0, color='black', linestyle='--', linewidth=0.8)  # Add vertical line at x=0
    axes[i].set_title(f'Estimates for {activity}', fontsize=14)
    axes[i].set_xlabel('Estimate', fontsize=14)
    axes[i].tick_params(axis='y', labelsize=8)

plt.suptitle('Estimates of Healthcare Use with Standard Errors for Different Activities', fontsize=16)
plt.subplots_adjust(bottom=0.05, left=0.1, right=0.9, top=0.95, hspace=0.4)
plt.show()

In [0]:
filtered_results_df = results_df[results_df['Activity'].str.contains('gp')]
display(filtered_results_df.to_string())

In [0]:
results_df.to_excel('../Files/OLS_hyster_matched.xlsx', index=False)

In [0]:
display(filtered_results_df)