# SI 330 Final Project
### Nicholas Ketchum, SI 330, Winter 2021

This project create a visualization and correlation between unemployment insurance claims recorded by the Federal Reserve and covid cases in the United States. It uses public APIs and datasets to gather information, clean it, and store in a database. It then makes calculations and plots a visualization using dataframes. The motivation is to see how reported cases and employment were affected what the correlation is between new cases and initial unemployment claims over a one-year period.

**1. Establish a database connection**

Using the sql extension and th sqlalchemy library, we will connect to the default database configuration provided by Jupyter through MichiganMads.org. This database will store data pulled from APIs and other sources for storage and queries.

In [None]:
%%capture
# Establish a database connection.
%load_ext sql
%sql postgres://jovyan:si330studentuser@localhost:5432/si330
import sqlalchemy
engine = sqlalchemy.create_engine('postgres://jovyan:si330studentuser@localhost:5432/si330')

**2. Import libraries and set data source variables**

First, we go ahead and import all the other libraries we'll need in one spot. This makes it easier to keep track of what we've imported. For some reason, pandas requires an import of register_matplotlib_converters to properly plot some of my data.

The next thing this cell does is execute register_matplotlib_converters() to load the pandas plotting extension, and then define my own specific Fed API key, for which I had to register, and the Fed API headers, which I like keeping one spot for clarity.

In [None]:
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import pandas as pd
from pandas.plotting import register_matplotlib_converters
import requests

# For some reason the "register_matplotlib_converters" class
# is required for plotting on this platform.
register_matplotlib_converters()

# Storing Fed api key and headers in one spot for more flexibility.
FED_API_KEY = "a964732354e30d669470642ff6b45f4c"

fed_api_settings = {
    'series_id': 'ICSA',
    'file_type': 'json',
    'sort_order': 'desc',
    'observation_start': "2020-01-07", # Trying to match the static end/start dates
    'observation_end': "2021-03-07",   # provided by the Covid tracker.
    'limit': '1300'
}

**3. Retrieve Fed data from a public API using JSON**

This function takes the predefined headers and API key and makes a request to gather one year's worth of data roughly aligning with the first year of the COVID-19 outbreak. It requests the JSON data, then rerads it into a dataframe, and then refactored into a new data frame that has a converted string date with an extra day added to each to align these dates with dates in the COVID-19 data set, which allows for joins.

The additional day is immaterial because these are weekly measurements. Because we're looking at overall trends in rough form, we're not concerned if data is off by one day; we're looking at chart shapes more generally.

Finally, the dataframe's index is set using the date column, and columns are renamed for clarity and convience before returning the dataframe to the original function call.

In [None]:
# Get weekly numbers from a public API.
def getFedData(settings):
    headers = "?series_id=" + settings['series_id'] + "&api_key=" + FED_API_KEY + "&file_type=" + settings['file_type'] + "&sort_order=" + settings['sort_order'] + "&observation_start=" + settings['observation_start'] + "&observation_end=" + settings['observation_end'] + "&limit=" + str(settings['limit'])
    url = "https://api.stlouisfed.org/fred/series/observations" + headers
    df = pd.read_json(url)
    ndf = pd.DataFrame()
    for group, row in df.iterrows():
        # Covid data is one day behind Fed data.
        # To make our lives easier, let's add one day to Fed data so we can match records.
        # A one-day delta is immaterial for our overall measurement and analysis.
        ndf.loc[group, 'date'] = pd.to_datetime(df.iloc[group]['observations']['date']) + pd.Timedelta(days=1)
        ndf.loc[group, 'total_new_claims'] = int(df.iloc[group]['observations']['value'])
    ndf.set_index('date', inplace=True)
    return ndf

**4. Store Fed data into the database**

This function creates a table to store Fed data. It drops any pre-existing tables, creates a new table with two columns: date (timestamp) and claims (integer). Then, the data is loaded from a dataframe by using the .to_sql() method.

In [None]:
# Store fed data in a db tabe.
def storeFedData(df, engine):
    %sql DROP TABLE IF EXISTS fed_data;
    %sql CREATE TABLE fed_data(date timestamp, claims integer, PRIMARY KEY(date))
    df.to_sql('fed_data', engine, if_exists='replace')

**5. Pull Fed data from the database**

This function simply selects all records from the fed_data table, ordered by date, and returned as a sql result set.

In [None]:
# Select fed data from db table.
def selectFedData(engine):
    results = %sql SELECT * FROM fed_data ORDER BY date ASC;
    return results

**6. Retrieve COVID-19 data from a public CSV file**

This function requests a public CSV file listed at covidtracking.com. It uses the pandas read_csv method to convert the CSV data into a dataframe. Then, a new dataframe is created which stores a reformatted date and converts it into a pandas datetime.

Nans are replaced with zeros to keep the dataframe shape the same as the Fed dataframe (keeping an identical row count), its positive_cases column datatype is converted to an integer using the apply() function to cast data as an int(), and then set as the dataframe index.

Finally, the dataframe is resampled to a weekly interval, summing up the new cases from each day that week. Then the final, cleaned dataframe is returned to the original calling function.

In [None]:
# Get cases from a public csv.
def getCovidData():
    url = 'https://api.covidtracking.com/v1/us/daily.csv'
    # Entire dataframe with everything.
    df = pd.read_csv(url)
    # Empty dataframe will store what we need.
    ndf = pd.DataFrame()
    for group, row in df.iterrows():
        # Format the data similar to Fed data.
        date = row['date']
        date = str(date)
        year = date[0:4]
        day = date[4:6]
        month = date[6:8]
        date = year + '-' + day + '-' + month
        # Make date into a datetime type.
        ndf.loc[group, 'date'] = pd.to_datetime(date)
        positive_cases = row['positiveIncrease']
        ndf.loc[group, 'positive_cases'] = positive_cases
    # Get rid of empty yrecords.
    ndf = ndf.replace(np.nan, 0)
    # Convert positive case column type into an int.
    ndf['positive_cases'] = ndf['positive_cases'].apply(int)
    # Index via datetime.
    ndf.set_index('date', inplace=True)
    # Resample as a weekly sum of new cases.
    ndf = ndf.resample('W').sum()
    return ndf

**7. Store COVID-19 data into the database**

This function creates a table to store COVID-19 data. It drops any pre-existing tables, creates a new table with two columns: date (timestamp) and positive_cases (integer). Then, the data is loaded from a dataframe by using the .to_sql() method.

In [None]:
# Store fed data in a db tabe.
def storeCovidData(df, engine):
    %sql DROP TABLE IF EXISTS covid_data;
    %sql CREATE TABLE covid_data(date timestamp, positive_cases integer, PRIMARY KEY(date))
    df.to_sql('covid_data', engine, if_exists='replace')

**8. Pull COVID-19 data from the database**

This function simply selects all records from the covid_data table, ordered by date, and returned as a sql result set.

In [None]:
# Select fed data from db table.
def selectCovidData(engine):
    results = %sql SELECT * FROM covid_data ORDER BY date ASC;
    return results

**9. Select everthing stored in the database**

This function joins the fed_data and covid_data table on the data field. This results in each record containing a date, number of new unemployment claims in a particular week, the number of new COVID-19 cases identified in that same week, and returns the data as a sql result set.

In [None]:
# Select final records.
def selectAllData(engine):
    # Covid data is daily but Fed data is weekly. The join takes care of that.
    results = %sql SELECT fed_data.date AS claim_date, fed_data.total_new_claims, covid_data.date AS covid_date, covid_data.positive_cases AS new_positive_cases FROM fed_data INNER JOIN covid_data ON fed_data.date = covid_data.date;
    return results

**10. Create a two dataframes for each datasest from the returned database queries**

This function takes the sql result set returned from the above function, converts it into a "master" dataframe, renames the columns from integer indexes to readable lables, and sets the index using the date. Then, two new dataframes are created using the master dataframe: one for unemployment (Fed) and one for COVID-19. Both dataframes are returned to the original calling function as a tuple.


Although created a "master dataframe" could have been accomplished without databases, they were included anyhow to demonstrate the concepts.

In [None]:
def getFinalDf(all_sql_results):
    # Put it all to a data frame
    all_results = pd.DataFrame(all_sql_results)
    all_results = all_results.rename(columns={0: "claim_date", 1: "total_new_claims", 2: "covid_date", 3: "new_positive_cases"})
    fedResults = all_results[['claim_date', 'total_new_claims']]
    fedResults = fedResults.set_index('claim_date')
    covidResults = all_results[['covid_date', 'new_positive_cases']]
    covidResults = covidResults.set_index('covid_date')
    return (fedResults, covidResults)

**11. Combine get/store/select data operations**

This function grabs unemployment Fed data and COVID-19 data from their sources using their dedicated functions and stores it in the databases. It then calls a function all_sql_results() which queries and joins table records from fed_data and covid_data using the selectAllData() function. Finally it returns a tuple containing fedResults and covidResults from the aforementioned database query.

In [None]:
def getStoreSelectData(engine):
    # Get Fed data, store it into a db, and select contents.
    fedData = getFedData(fed_api_settings)
    storeFedData(fedData, engine)
    
    # Get Covid data, store it into a db, and select contents.
    covidData = getCovidData()
    storeCovidData(covidData, engine)
    
    # Get clean and relevant records from db.
    all_sql_results = selectAllData(engine)

    fedResults = getFinalDf(all_sql_results)[0]
    covidResults = getFinalDf(all_sql_results)[1]
    
    return (fedResults, covidResults)

**12. Visualize the data**

This function grabs the final Fed/unemployment and COVID-19 data from functions above, and sets up a 2-figure plot, each displaying two time series. The first plot displays raw total numbers of both initial unemployment claims reported by the Federal Reserve and the total number of new COVID-19 cases for each specific week. The second chart displays the percentage change for each specific week. The period of weeks covered starts from January 2020 and goes to March 2021.

In [None]:
def visualize(engine):
    fedResults = getStoreSelectData(engine)[0]
    covidResults = getStoreSelectData(engine)[1]
        
    fig = plt.figure(figsize=(20,10))
    
    blue = mpatches.Patch(color='#5594c4', label='Initial Unemployment Claims')
    red = mpatches.Patch(color='#ff9c46', label='New cases')
    
    plt.legend(handles=[red, blue], title='Legend', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.xlabel('Date')
    plt.ylabel('Amount')
    
    ax1 = fig.add_subplot(1, 2, 1)
    ax1.plot(fedResults)
    ax1.plot(covidResults)
    
    ax2 = fig.add_subplot(1, 2, 2)
    
    fedResults['percent_change'] = fedResults['total_new_claims'].pct_change()
    fedResults = fedResults.replace([np.inf, -np.inf], np.nan)
    fedResults = fedResults.dropna()
    fedResults = fedResults[['percent_change']]
    
    covidResults['percent_change'] = covidResults['new_positive_cases'].pct_change()
    covidResults = covidResults.replace([np.inf, -np.inf], np.nan)
    covidResults = covidResults.dropna()
    covidResults = covidResults[['percent_change']]

    ax2.plot(fedResults)
    ax2.plot(covidResults)
    
    df = pd.DataFrame()
    df['fed'] = fedResults['percent_change'][:-1]
    df['covid'] = covidResults['percent_change']
    
    plt.show()

**13. Execute the visualization**

This is just a function call that runs EVERYTHING (except whatever is below the next cell), sets labels and colors and tick intervals, and finally displays the plots.

In [None]:
visualize(engine)

**14. Display correlation**

This function computes the correlation between percent changes in initial unemployment claims and new positive COVID-19 cases for each week. The the correlation is to zero, the weaker the  relationship. Positive values indicate a positive correlations where both variables tend to increase together. Negative values indicate a negative correlation where both variables tend to move opposite each other.

In [None]:
def getCorrelation():
    all_sql_results = selectAllData(engine)

    fedResults = getFinalDf(all_sql_results)[0]
    fedResults['percent_change'] = fedResults['total_new_claims'].pct_change()
    fedResults = fedResults.replace([np.inf, -np.inf], np.nan)
    fedResults = fedResults.dropna()
    fedResults = fedResults[['percent_change']]

    covidResults = getFinalDf(all_sql_results)[1]
    covidResults['percent_change'] = covidResults['new_positive_cases'].pct_change()
    covidResults = covidResults.replace([np.inf, -np.inf], np.nan)
    covidResults = covidResults.dropna()
    covidResults = covidResults[['percent_change']]

    df = pd.DataFrame()
    df['fed'] = fedResults['percent_change'][:-1]
    df['covid'] = covidResults['percent_change']

    correlation = df['fed'].corr(df['covid'])
    return correlation

**15. Functional testing**

This cell simply loads some data and then runs simple tests of all the functions, which are mostly testing for lengths of records. The database "store" functions assume tables are already populated with the expected data within the hard-coded time range.

An extra record exist until the selection phase, where a nan value is dropped in one of the records.

In [None]:
fedData = getFedData(fed_api_settings)
covidData = getCovidData()
all_sql_results = selectAllData(engine)

def test_getFedData():
    assert len(getFedData(fed_api_settings)) == 61

def test_storeFedData(fedData, engine):
    results = %sql SELECT * FROM fed_data ORDER BY date ASC
    assert len(results) == 61

def test_selectFedData(engine):
    assert len(selectFedData(engine)) == 61
    
def test_getCovidData():
    assert len(getCovidData()) == 61

def test_storeCovidData(covidData, engine):
    results = %sql SELECT * FROM covid_data ORDER BY date ASC
    assert len(results) == 60

def test_selectCovidData(engine):
    assert len(selectCovidData(engine)) == 60

def test_selectAllData(engine):
    assert len(selectAllData(engine)) == 60

def test_getFinalDf(all_sql_results):
    assert len(getFinalDf(all_sql_results)) == 2

def test_getStoreSelectData(engine):
    assert len(getStoreSelectData(engine)) == 2

def test_getCorrelation():
    assert isinstance(getCorrelation(), np.float64)

test_getFedData()
test_storeFedData(fedData, engine)
test_selectFedData(engine)
test_storeCovidData(covidData, engine)
test_selectCovidData(engine)
test_selectAllData(engine)
test_getFinalDf(all_sql_results)
test_getStoreSelectData(engine)
test_getCorrelation()