# Covid Forecasting - Data Collection and Cleaning

## Import Libraries and Initialize Constants

In [None]:
from dotenv import load_dotenv
import os
import pandas as pd
import requests
import time

load_dotenv()

BASE_URL_COVIDCAST = "https://api.covidcast.cmu.edu/epidata/api.php" # CovidCast endpoint
BASE_URL_HOSPITALIZATION = "https://api.delphi.cmu.edu/epidata/covid_hosp_state_timeseries/" # Covid Hospitalization endpoint

# 50 states + DC (careful of usage limits)
# STATES_COVIDCAST = [
#     "al","ak","az","ar","ca","co","ct","de","fl","ga",
#     "hi","id","il","in","ia","ks","ky","la","me","md",
#     "ma","mi","mn","ms","mo","mt","ne","nv","nh","nj",
#     "nm","ny","nc","nd","oh","ok","or","pa","ri","sc",
#     "sd","tn","tx","ut","vt","va","wa","wv","wi","wy","dc"
# ]

# 3 states + DC (I-95 Corridor)
STATES_COVIDCAST = ["pa", "md", "va", "dc"]

HEADER = {"user-agent": os.getenv("USER_AGENT", "CovidCasesExplorer/1.0")}

STATES_HOSPITALIZATION = [s.upper() for s in STATES_COVIDCAST] # needs to be upper case for the hospitalization API

START_DATE = "20200101"
END_DATE = "20251231"

NA_THRESHOLD = 0.9 # if a feature has too many NA's then it will not be useful

## Create Data Dictionary

In [None]:
# This dictionary is a reference for what each feature in the CovidCast data means.
covid_cast_data_dictionary = {
    "fb-survey_smoothed_wwearing_mask_7d": """Estimated percentage of people who wore a mask for most or all of the time while in public in the past 7 days; those not in public in the past 7 days are not counted.""",
    "fb-survey_smoothed_wcovid_vaccinated_appointment_or_accept": """Estimated percentage of respondents who either have already received a COVID vaccine or have an appointment to get a COVID vaccine or would definitely or probably choose to get vaccinated, if a vaccine were offered to them today.""",
    "google-symptoms_sum_anosmia_ageusia_smoothed_search": """The sum of Google search volume for anosmia and ageusia related searches, in an arbitrary units that are normalized for overall search users, smoothed by 7-day average""",
    "fb-survey_smoothed_wcli": """Estimated percentage of people with COVID-like illness""",
    "fb-survey_smoothed_whh_cmnty_cli": "Estimated percentage of people reporting illness in their local community, as described below, including their household",
    "doctor-visits_smoothed_adj_cli": """Estimated percentage of outpatient doctor visits primarily about COVID-related symptoms, based on data from health system partners, smoothed in time using a Gaussian linear smoother""",
    "jhu-csse_confirmed_incidence_num": """Number of new confirmed COVID-19 cases, daily""",
    "hhs_confirmed_admissions_covid_1d_prop_7dav": """Sum of Adult + Pediatric. Confirmed admissions only.""",
    "jhu-csse_deaths_7dav_incidence_prop": """Number of new confirmed deaths due to COVID-19 per 100,000 population, daily""",
    "chng_smoothed_adj_outpatient_covid": """Estimated percentage of outpatient doctor visits with confirmed COVID-19, based on Change Healthcare claims data that has been de-identified in accordance with HIPAA privacy regulations, smoothed in time using a Gaussian linear smoother""",
    "chng_7dav_inpatient_covid": """Ratio of inpatient hospitalizations associated with COVID-19, based on Change Healthcare claims data that has been de-identified in accordance with HIPAA privacy regulations, smoothed in time with a trailing 7 day average. This historical signal is no longer updated."""
}

In [None]:
pd.set_option('display.max_colwidth', None) # helps for displaying long text

covid_cast_definitions_df = pd.DataFrame(data = {
    "Metric": covid_cast_data_dictionary.keys(),
    "Definition": covid_cast_data_dictionary.values()
})

covid_cast_definitions_df

## Define Functions for API Calls

In [None]:

# Functions for CovidCast Endpoint

def fetch_signal_state_covidcast(data_source, signal, state):
    params = {
        "source": "covidcast",
        "data_source": data_source,
        "signal": signal,
        "time_type": "day",
        "geo_type": "state",
        "geo_value": state,
        "time_values": f"{START_DATE}-{END_DATE}"
    }

    r = requests.get(BASE_URL_COVIDCAST, params=params, headers=HEADER, timeout=30)

    if r.status_code == 200:
        data = r.json()
        if data.get("result") != 1:
            print(f"API error {state}:", data.get("message"))
            return None

        df = pd.DataFrame(data["epidata"])
        if df.empty:
            return None

        df = df[["time_value", "geo_value", "value"]]

        df = df.rename(columns={
            "geo_value": "geo",
            "value": f"{data_source}_{signal}"
        })

        # rename to match Hospitalization data
        df = df.rename(columns={
            "geo": "state", 
            "time_value": "date"
        })

        df["state"] = df["state"].str.upper() # upper case to match Hospitalization data

        return df

    else:
        print(f"HTTP {r.status_code} for {state}")
        return None


def fetch_all_states_covidcast(source, signal):
    """
    Fetch data for all states from a given source and signal.
    """
    dfs = []

    print(f"Fetching '{signal}' from '{source}' for {len(STATES_COVIDCAST)} states...")

    for state in STATES_COVIDCAST:  # lowercase for API
        try:
            df = fetch_signal_state_covidcast(source, signal, state)
            if df is not None and not df.empty:
                dfs.append(df)
            else:
                print(f"  ⚠ No data for state: {state}")
        except Exception as e:
            print(f"  ❌ Error fetching state {state}: {e}")
        
        time.sleep(0.5)  # avoid 429 rate-limit errors

    if dfs:
        result = pd.concat(dfs, ignore_index=True)
        print(f"✅ Successfully fetched data for {len(dfs)} states.")
        return result
    else:
        print("⚠ No data fetched from any state. Returning empty DataFrame.")
        return pd.DataFrame()


In [None]:

# Functions for Hospitalization Endpoint

def fetch_signal_state_hospitalization(state):

    params = {
        "states": state,
        "dates": f"{START_DATE}-{END_DATE}"
    }

    r = requests.get(BASE_URL_HOSPITALIZATION, params=params, headers=HEADER, timeout=30)

    if r.status_code == 200:
        data = r.json()
        if data.get("result") != 1:
            print(f"API error {state}:", data.get("message"))
            return None

        df = pd.DataFrame(data["epidata"])
        if df.empty:
            return None

        return df

    else:
        print(f"HTTP {r.status_code} for {state}")
        return None


def fetch_all_states_covidcast_hospitalization():
    """
    Fetch data for all states from a given source and signal.
    """
    dfs = []

    print(f"Fetching Hospitalization data from covid_hosp_state_timeseries for {len(STATES_HOSPITALIZATION)} states...")

    for state in STATES_HOSPITALIZATION:  # lowercase for API
        try:
            df = fetch_signal_state_hospitalization(state)
            if df is not None and not df.empty:
                dfs.append(df)
            else:
                print(f"  ⚠ No data for state: {state}")
        except Exception as e:
            print(f"  ❌ Error fetching state {state}: {e}")

        time.sleep(0.5)  # avoid 429 rate-limit errors

    if dfs:
        result = pd.concat(dfs, ignore_index=True)
        print(f"✅ Successfully fetched data for {len(dfs)} states.")
        return result
    else:
        print("⚠ No data fetched from any state. Returning empty DataFrame.")
        return pd.DataFrame()


## Data Collection

In [None]:
# CovidCast
masks = fetch_all_states_covidcast("fb-survey", "smoothed_wwearing_mask_7d")
vaccine_acceptance = fetch_all_states_covidcast("fb-survey", "smoothed_wcovid_vaccinated_appointment_or_accept")
searches = fetch_all_states_covidcast("google-symptoms", "sum_anosmia_ageusia_smoothed_search")
symptoms = fetch_all_states_covidcast("fb-survey", "smoothed_wcli")
community_symptoms = fetch_all_states_covidcast("fb-survey", "smoothed_whh_cmnty_cli")
doctor_visits = fetch_all_states_covidcast("doctor-visits", "smoothed_adj_cli")
cases = fetch_all_states_covidcast("jhu-csse", "confirmed_incidence_num")
admissions = fetch_all_states_covidcast("hhs", "confirmed_admissions_covid_1d_prop_7dav")
deaths = fetch_all_states_covidcast("jhu-csse", "deaths_7dav_incidence_prop")
outpatient = fetch_all_states_covidcast("chng", "smoothed_adj_outpatient_covid")
inpatient = fetch_all_states_covidcast("chng", "7dav_inpatient_covid")

In [None]:
# Hospitalization
hospitalization = fetch_all_states_covidcast_hospitalization()

## Data Cleaning

In [None]:
# ----------------------------
# MERGE
# ----------------------------

features = [
    masks,
    vaccine_acceptance,
    searches,
    symptoms,
    community_symptoms,
    doctor_visits,
    cases,
    admissions,
    deaths,
    outpatient,
    inpatient,
    hospitalization
]

df = features[0]

for f in (features[1:]):
    df = df.merge(f, on=["date","state"], how="outer")

df["date"] = pd.to_datetime(df["date"], format="%Y%m%d")

# Drop columns with too many NA's
df = df.drop(columns = ["geocoded_state", "total_patients_hospitalized_confirmed_influenza_covid", "total_patients_hospitalized_confirmed_influenza_covid_coverage"]) 

df = df.sort_values(["state","date"])

## Feature Engineering

In [None]:
# ----------------------------
# FEATURE ENGINEERING
# ----------------------------

TARGET = "jhu-csse_confirmed_incidence_num"

# Rolling mean
df["cases_roll_7"] = (
    df.groupby("state")[TARGET]
      .transform(lambda x: x.rolling(7).mean())
)

# Seasonality
df["day_of_year"] = df["date"].dt.dayofyear
df["day_of_week"] = df["date"].dt.dayofweek
df["month_of_year"] = df["date"].dt.month
df["quarter_of_year"] = df["date"].dt.quarter

print("Final shape:", df.shape)

## Quick EDA (Majority Will Be Done in Tableau)

In [None]:
print("Number of Features:", len(df.columns))

In [None]:
df.columns.sort_values().values

In [None]:
summary = df.describe()['date'].apply(lambda x: x.date() if isinstance(x, pd.Timestamp) else x)

summary[['min','max']]

In [None]:
df.head(10)

In [None]:
nas = df.isna().sum()
too_many_nas = nas[nas >= NA_THRESHOLD*len(df)]

assert len(too_many_nas) == 0, "Too many NA's for this feature to be useful."

## Download Data to Local

In [None]:
states_as_text = min(STATES_COVIDCAST) + "_" + max(STATES_COVIDCAST)
df.to_csv(f"data/covid_features_{states_as_text}_{START_DATE}_{END_DATE}.csv", index=False)