# IMPORT AND SET UP

In [1]:
#Utility
import urllib
import os
import datetime
import logging
from io import BytesIO
import time
import pytz

#Data Science
import pandas as pd
import numpy as np

#Google API
from google.cloud import bigquery
from google.cloud import storage # Imports the Google Cloud storage library

In [2]:
deployment = 'local' #local or cloud

In [3]:
if deployment == 'cloud':
    from pyspark.sql import SparkSession #ONlY FOR CLOUD DEPLOYMENT
    #Start spark session
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.17.0")\
        .master('yarn') \
        .appName('spark-bigquery-ryder') \
        .getOrCreate()
    
    #Instantiate BigQuery client
    bigquery_client = bigquery.Client() # Instantiates a client
    #Instantiate Storage client
    storage_client = storage.Client() # Instantiates a client
    
else:
    #Set credentials for bigquery !FOR LOCAL ONLY, DON'T COPY TO PYSPARK
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="covid-jul25-**************.json"
    bigquery_client = bigquery.Client() # Instantiates a client

    #Set credentials for cloud storage
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="covid-jul25-**************.json"
    storage_client = storage.Client() # Instantiates a client

In [4]:
#Set working environment
PROJECT_ID='covid-jul25'
REGION='us-west3'
ZONE='us-west3-a'
BUCKET_LINK='gs://us-west3-{BUCKET_NAME}'
BUCKET='us-west3-{BUCKET_NAME}'

# UDF

In [5]:
def arima_forecast(case_type,horizon,confidence_level,sorted_statedict):
    '''Create sql script for BigQuery to run ML_FORECAST on multiple models and concatenate the results
    Args:
    case_type(STRING): 'confirmed' or 'deaths'
    horizon(INT): length of forecast with automatic frequency provided by ARIMA model
    confidence_level(FLOAT): confidence level of forecast
    sorted_statedict(List of tuples): item is [(state abbr, state)]
    
    Return:
    sql(STRING): sql script to be run in BigQuery
    '''
    sql = ''
    for state in enumerate(sorted_statedict):
        if state[0] == 0:
            sql += f'''DELETE FROM `covid-jul25.arimamodels.{case_type}_US_forecast` WHERE True; INSERT INTO `covid-jul25.arimamodels.{case_type}_US_forecast` SELECT * FROM ML.FORECAST(MODEL `covid-jul25.arimamodels.{case_type}_{state[1][0]}`, STRUCT({horizon} AS horizon, {confidence_level} AS confidence_level))'''
        else:
            sql += f''' UNION ALL SELECT * FROM ML.FORECAST(MODEL `covid-jul25.arimamodels.{case_type}_{state[1][0]}`, STRUCT({horizon} AS horizon, {confidence_level} AS confidence_level))'''    
    return(sql)

# IMPORT STATE DATA FROM BIGQUERY

In [6]:
#Exclude certain states
excludestate = ['American Samoa','United States Virgin Islands','Commonwealth of the Northern Mariana Islands','Guam','Puerto Rico']
excludestateabbr = ['AS','VI','MP','GU','PR']

In [7]:
sql = """
    SELECT *
    FROM [covid-jul25.usprojections.countyarea]
"""
countyarea = pd.read_gbq(sql, dialect='legacy')

Downloading: 100%|█████████████████████████████████████████████████████████████| 3142/3142 [00:00<00:00, 4389.40rows/s]


In [8]:
#State abbrev list
stateabbrlist = countyarea.loc[:,['state_name','state_abbreviation']].drop_duplicates()
statelist = stateabbrlist[~stateabbrlist['state_name'].isin(excludestate)]['state_name'].to_list()
stateabb = stateabbrlist[~stateabbrlist['state_abbreviation'].isin(excludestateabbr)]['state_abbreviation'].to_list()

statedict = dict(zip(stateabb, statelist))
rstatedict = dict(zip(statelist,stateabb))

In [9]:
#Loop through states to create models for confirmed ONLY
dictionary_items = statedict.items()
sorted_statedict = sorted(dictionary_items)

# ARIMA

## Put cases data in format for ARIMA

In [10]:
#Reformat the cases data
query_job = bigquery_client.query(
    """
    DROP TABLE IF EXISTS `covid-jul25.usprojections.arimaformat`;
    CREATE TABLE `covid-jul25.usprojections.arimaformat` AS
    (SELECT *, deaths as value, 'deaths' as case_type 
    FROM `covid-jul25.usprojections.temp_cases`
    UNION ALL
    SELECT *, confirmed as value, 'confirmed' as case_type 
    FROM `covid-jul25.usprojections.temp_cases`)
    """)
results = query_job.result()  # Waits for job to complete.

## ARIMA for CONFIRMED Cases

In [11]:
case_type = 'confirmed'
for state in sorted_statedict:
    query_job = bigquery_client.query(
    f"""
    CREATE OR REPLACE MODEL `covid-jul25.arimamodels.{case_type}_{state[0]}`
    OPTIONS
    (model_type = 'ARIMA',
    time_series_timestamp_col = 'date1',
    time_series_data_col = 'value',
    time_series_id_col = 'statecounty'
    ) AS
    SELECT date1, value, statecounty
    FROM
    `covid-jul25.usprojections.arimaformat`
    WHERE state = '{state[1]}'
    AND case_type = '{case_type}'
    """)
    results = query_job.result()  # Waits for job to complete.
    print(f"""Complete ARIMA model for {case_type} projections for {state[1]}""")

Complete ARIMA model for confirmed projections for Alaska
Complete ARIMA model for confirmed projections for Alabama
Complete ARIMA model for confirmed projections for Arkansas
Complete ARIMA model for confirmed projections for Arizona
Complete ARIMA model for confirmed projections for California
Complete ARIMA model for confirmed projections for Colorado
Complete ARIMA model for confirmed projections for Connecticut
Complete ARIMA model for confirmed projections for District of Columbia
Complete ARIMA model for confirmed projections for Delaware
Complete ARIMA model for confirmed projections for Florida
Complete ARIMA model for confirmed projections for Georgia
Complete ARIMA model for confirmed projections for Hawaii
Complete ARIMA model for confirmed projections for Iowa
Complete ARIMA model for confirmed projections for Idaho
Complete ARIMA model for confirmed projections for Illinois
Complete ARIMA model for confirmed projections for Indiana
Complete ARIMA model for confirmed proj

In [12]:
#CREATE RESULTS TABLE FOR JOINING all the result tables
case_type = 'confirmed'
for state in sorted_statedict:
    query_job = bigquery_client.query(
    f"""
    DROP TABLE IF EXISTS `covid-jul25.arimamodels.{case_type}_{state[0]}_tbl`;
    CREATE TABLE `covid-jul25.arimamodels.{case_type}_{state[0]}_tbl` AS
    SELECT * FROM ML.EVALUATE(MODEL `covid-jul25.arimamodels.{case_type}_{state[0]}`);
    """)
    results = query_job.result()  # Waits for job to complete.
print(f"""Complete ARIMA parameters table for {case_type} projections for all states.""")

Complete ARIMA parameters table for confirmed projections for all states.


### ARIMA PARAMETERS for county level for CONFIRMED

In [13]:
#Combine all statecounty table
case_type = 'confirmed'
query_job = bigquery_client.query(
    f"""
    -- DROP TABLE `covid-jul25.arimamodels.{case_type}_US`;
    -- CREATE TABLE `covid-jul25.arimamodels.{case_type}_US` AS
    DELETE FROM `covid-jul25.arimamodels.{case_type}_US` WHERE True;
    INSERT INTO `covid-jul25.arimamodels.{case_type}_US`
    SELECT * FROM
    `covid-jul25.arimamodels.{case_type}_*`
    WHERE statecounty is not null;
    """)
results = query_job.result()  # Waits for job to complete.

### ARIMA FORECASTS for county level for CONFIRMED 

In [14]:
query_job = bigquery_client.query(arima_forecast('confirmed',30,0.9,sorted_statedict))
results = query_job.result()  # Waits for job to complete.

## ARIMA for Deaths Cases

In [15]:
case_type = 'deaths'
for state in sorted_statedict:
    query_job = bigquery_client.query(
    f"""
    CREATE OR REPLACE MODEL `covid-jul25.arimamodels.{case_type}_{state[0]}`
    OPTIONS
    (model_type = 'ARIMA',
    time_series_timestamp_col = 'date1',
    time_series_data_col = 'value',
    time_series_id_col = 'statecounty'
    ) AS
    SELECT date1, value, statecounty
    FROM
    `covid-jul25.usprojections.arimaformat`
    WHERE state = '{state[1]}'
    AND case_type = '{case_type}'
    """)
    results = query_job.result()  # Waits for job to complete.
    print(f"""Complete ARIMA model for {case_type} projections for {state[1]}""")

Complete ARIMA model for deaths projections for Alaska
Complete ARIMA model for deaths projections for Alabama
Complete ARIMA model for deaths projections for Arkansas
Complete ARIMA model for deaths projections for Arizona
Complete ARIMA model for deaths projections for California
Complete ARIMA model for deaths projections for Colorado
Complete ARIMA model for deaths projections for Connecticut
Complete ARIMA model for deaths projections for District of Columbia
Complete ARIMA model for deaths projections for Delaware
Complete ARIMA model for deaths projections for Florida
Complete ARIMA model for deaths projections for Georgia
Complete ARIMA model for deaths projections for Hawaii
Complete ARIMA model for deaths projections for Iowa
Complete ARIMA model for deaths projections for Idaho
Complete ARIMA model for deaths projections for Illinois
Complete ARIMA model for deaths projections for Indiana
Complete ARIMA model for deaths projections for Kansas
Complete ARIMA model for deaths 

In [17]:
#CREATE RESULTS TABLE FOR JOINING all the result tables
case_type = 'deaths'
for state in sorted_statedict:
    query_job = bigquery_client.query(
    f"""
    DROP TABLE IF EXISTS `covid-jul25.arimamodels.{case_type}_{state[0]}_tbl`;
    CREATE TABLE `covid-jul25.arimamodels.{case_type}_{state[0]}_tbl` AS
    SELECT * FROM ML.EVALUATE(MODEL `covid-jul25.arimamodels.{case_type}_{state[0]}`);
    """)
    results = query_job.result()  # Waits for job to complete.
print(f"""Complete ARIMA parameters table for {case_type} projections for all states.""")

Complete ARIMA parameters table for deaths projections for all states.


### ARIMA PARAMETERS for county level for DEATHS

In [18]:
#Combine all statecounty table
case_type = 'deaths'
query_job = bigquery_client.query(
    f"""
    -- DROP TABLE `covid-jul25.arimamodels.{case_type}_US`;
    -- CREATE TABLE `covid-jul25.arimamodels.{case_type}_US` AS
    DELETE FROM `covid-jul25.arimamodels.{case_type}_US` WHERE True;
    INSERT INTO `covid-jul25.arimamodels.{case_type}_US`
    SELECT * FROM
    `covid-jul25.arimamodels.{case_type}_*`
    WHERE statecounty is not null;
    """)
results = query_job.result()  # Waits for job to complete.

### ARIMA FORECASTS for county level for DEATHS 

In [19]:
query_job = bigquery_client.query(arima_forecast('deaths',30,0.9,sorted_statedict))
results = query_job.result()  # Waits for job to complete.

# UPDATE STATIC STATS FOR ARIMA NEXT 7 DAYS

In [20]:
sql = """SELECT * FROM [covid-jul25.arimamodels.confirmed_US_forecast]"""
rawconfirmeddf = pd.read_gbq(sql, dialect='legacy')

sql = """SELECT * FROM [covid-jul25.arimamodels.deaths_US_forecast]"""
rawdeathsdf = pd.read_gbq(sql, dialect='legacy')

Downloading: 100%|███████████████████████████████████████████████████████████| 97620/97620 [00:11<00:00, 8474.72rows/s]
Downloading: 100%|██████████████████████████████████████████████████████████| 97620/97620 [00:09<00:00, 10345.50rows/s]


In [21]:
#Set today
today = datetime.date.today()
sd = today + datetime.timedelta(days=7) #7-day forecast
#Set timezone to UTC
sd = datetime.datetime(sd.year,sd.month,sd.day,tzinfo=datetime.timezone.utc)

## CONFIRMED cases next 7 days

In [22]:
#Get only counties in the countyarea list (EXCLUDE certain counties and territories)
confirmeddf = rawconfirmeddf.copy()
confirmeddf = confirmeddf[confirmeddf['statecounty'].isin(list(countyarea['statecounty']))]
#Get data for 7 day from now and forecast_value
confirmeddf = confirmeddf[confirmeddf['forecast_timestamp']==sd][['statecounty','forecast_value']]

In [23]:
#Get state names using split
state = list(map(lambda x:x.split('-',1),list(confirmeddf['statecounty'])))
confirmeddf['state']=list(pd.DataFrame(state)[0])

In [24]:
#Get state abbr
stateabbr = list(map(lambda x:rstatedict[x],list(confirmeddf['state'])))
statefc_confirmed = confirmeddf.copy()
statefc_confirmed['region'] = list(pd.DataFrame(stateabbr)[0])
statefc_confirmed['region'] = 'US-'+ statefc_confirmed['region']
statefc_confirmed = statefc_confirmed.drop(columns=['state'])
statefc_confirmed.columns = ['statecounty','confirmed_forecast','region']

## DEATHS cases next 7 days

In [25]:
#Get only counties in the countyarea list (EXCLUDE certain counties and territories)
deathsdf = rawdeathsdf.copy()
deathsdf = deathsdf[deathsdf['statecounty'].isin(list(countyarea['statecounty']))]
#Get data for 7 day from now and forecast_value
deathsdf = deathsdf[deathsdf['forecast_timestamp']==sd][['statecounty','forecast_value']]

In [26]:
#Get state names using split
state = list(map(lambda x:x.split('-',1),list(deathsdf['statecounty'])))
deathsdf['state']=list(pd.DataFrame(state)[0])

In [27]:
#Get state abbr
stateabbr = list(map(lambda x:rstatedict[x],list(deathsdf['state'])))
statefc_deaths = deathsdf.copy()
statefc_deaths['region'] = list(pd.DataFrame(stateabbr)[0])
statefc_deaths['region'] = 'US-'+ statefc_deaths['region']
statefc_deaths = statefc_deaths.drop(columns=['state'])
statefc_deaths.columns = ['statecounty','deaths_forecast','region']

## Write to BigQuery and Update

In [28]:
statefc_confirmed.to_gbq('usprojections.arima_confirmed_statecounty',if_exists='replace')
statefc_deaths.to_gbq('usprojections.arima_deaths_statecounty',if_exists='replace')

1it [00:03,  3.38s/it]
1it [00:06,  6.98s/it]


In [29]:
query_job = bigquery_client.query(
    """
    UPDATE `covid-jul25.usprojections.latest_cases` as M
    SET M.arima_confirmed_forecast = S.confirmed_forecast
    FROM `covid-jul25.usprojections.arima_confirmed_statecounty` as S
    WHERE M.statecounty = S.statecounty;
    
    UPDATE `covid-jul25.usprojections.latest_cases` as M
    SET M.arima_deaths_forecast = S.deaths_forecast
    FROM `covid-jul25.usprojections.arima_deaths_statecounty` as S
    WHERE M.statecounty = S.statecounty;
    """)
results = query_job.result()  # Waits for job to complete.

# APPEND PARAMETERS

In [30]:
query_job = bigquery_client.query(
    """
    DELETE FROM `covid-jul25.usprojections.arima_all_parameter` WHERE True;
    INSERT INTO `covid-jul25.usprojections.arima_all_parameter` 
    SELECT M.*, S.state_name as State, true as type FROM `covid-jul25.arimamodels.confirmed_US` as M
    LEFT JOIN `covid-jul25.usprojections.countyarea` as S
    ON M.statecounty = S.statecounty
    WHERE non_seasonal_p is not null

    UNION ALL

    SELECT M.*, S.state_name as State, false as type FROM `covid-jul25.arimamodels.deaths_US` as M
    LEFT JOIN `covid-jul25.usprojections.countyarea` as S
    ON M.statecounty = S.statecounty
    WHERE non_seasonal_p is not null
    """)
results = query_job.result()  # Waits for job to complete.

# APPEND FORECAST TO END OF RESULTS

## ALL cases (forecast next 30 days)

In [31]:
query_job = bigquery_client.query(
    """
    -- DROP TABLE IF EXISTS `covid-jul25.usprojections.arima_all_forecast`;
    -- CREATE TABLE `covid-jul25.usprojections.arima_all_forecast` AS
    DELETE FROM `covid-jul25.usprojections.arima_all_forecast` WHERE True;
    INSERT INTO `covid-jul25.usprojections.arima_all_forecast` 
    SELECT A.*, B.county_name FROM
    (SELECT M.*, S.state as State, S.lat_long, S.confirmed as actual_latest, S.region, true as type FROM `covid-jul25.arimamodels.confirmed_US_forecast` as M
    LEFT JOIN `covid-jul25.usprojections.latest_cases` as S
    ON M.statecounty = S.statecounty

    UNION ALL

    SELECT M.*, S.state as State, S.lat_long, S.deaths as actual_latest, S.region, false as type FROM `covid-jul25.arimamodels.deaths_US_forecast` as M
    LEFT JOIN `covid-jul25.usprojections.latest_cases` as S
    ON M.statecounty = S.statecounty) as A

    RIGHT JOIN `covid-jul25.usprojections.countyarea` as B
    ON A.statecounty = B.statecounty
    """)
results = query_job.result()

## Store the results for next 30 in a temp table

In [52]:
query_job = bigquery_client.query(
    """
    DROP TABLE IF EXISTS `covid-jul25.usprojections.arima_all_result_temp`;
    CREATE TABLE `covid-jul25.usprojections.arima_all_result_temp` AS
    SELECT M.*, S.deaths FROM
    (SELECT State as state, county_name as county, statecounty, lat_long, actual_latest as confirmed, region, forecast_timestamp as date
    FROM `covid-jul25.usprojections.arima_all_forecast`
    WHERE type = True) as M
    LEFT JOIN
    (SELECT State as state, county_name as county, statecounty, lat_long, actual_latest as deaths, region, forecast_timestamp as date
    FROM `covid-jul25.usprojections.arima_all_forecast`
    WHERE type = False) as S
    ON M.statecounty = S.statecounty
    AND M.date = S.date
    """)
results = query_job.result()

In [53]:
# sql = """SELECT * FROM [covid-jul25.usprojections.arima_all_forecast]"""
sql = """SELECT * FROM [covid-jul25.usprojections.arima_all_result_temp]"""
allforecast = pd.read_gbq(sql, dialect='legacy')

Downloading: 100%|██████████████████████████████████████████████████████████| 94590/94590 [00:08<00:00, 10679.91rows/s]


In [54]:
allforecast = allforecast[allforecast['statecounty'].isin(list(countyarea['statecounty']))]
# allforecast.columns = ['statecounty','date','confirmed','deaths']

# ['statecounty',
#  'forecast_timestamp',
#  'forecast_value',
#  'standard_error',
#  'confidence_level',
#  'prediction_interval_lower_bound',
#  'prediction_interval_upper_bound',
#  'confidence_interval_lower_bound',
#  'confidence_interval_upper_bound',
#  'State',
#  'lat_long',
#  'actual_latest',
#  'region',
#  'type',
#  'county_name']

In [None]:
# #Get state names using split
# state = list(map(lambda x:x.split('-',1),list(allforecast['statecounty'])))
# allforecast['state']=list(pd.DataFrame(state)[0])
# allforecast['county']=list(pd.DataFrame(state)[1])

# #Get state abbr
# stateabbr = list(map(lambda x:rstatedict[x],list(allforecast['state'])))
# allforecast['region'] = list(pd.DataFrame(stateabbr)[0])
# allforecast['region'] = 'US-'+allforecast['region']

## Appending to latest results

In [38]:
sql = """SELECT * FROM [covid-jul25.usprojections.final_cases]"""
finalcasesdf = pd.read_gbq(sql, dialect='legacy')

Downloading: 100%|█████████████████████████████████████████████████████████| 716148/716148 [01:13<00:00, 9786.64rows/s]


In [57]:
alldf = pd.concat([finalcasesdf,allforecast],sort=False)
#Write to temp table
alldf.to_gbq('usprojections.arima_all_result_temp',if_exists='replace')

1it [04:31, 271.27s/it]


In [None]:
query_job = bigquery_client.query(
    """
    -- DROP TABLE IF EXISTS `covid-jul25.usprojections.arima_all_result`;
    -- CREATE TABLE `covid-jul25.usprojections.arima_all_result` AS
    DELETE FROM `covid-jul25.usprojections.arima_all_result` WHERE True;
    INSERT INTO `covid-jul25.usprojections.arima_all_result`
    SELECT *,confirmed/duration as confirmed_velocity, deaths/duration as deaths_velocity FROM
    (SELECT *, row_number() OVER(PARTITION BY statecounty ORDER BY date ASC) as duration
    FROM `covid-jul25.usprojections.arima_all_result_temp`)
    """)
results = query_job.result()