In [1]:
import pandas as pd
import requests
import time

from etl_resources import sqlite_connection, get_api_key

In [2]:
def endpoints():
    
    api_key = get_api_key()
    
    _endpoints = {
    'cpi': f"https://www.alphavantage.co/query?function=CPI&interval=monthly&apikey={api_key}",
    'federal_funds': f"https://www.alphavantage.co/query?function=FEDERAL_FUNDS_RATE&interval=monthly&apikey={api_key}",
    'gdp': f"https://www.alphavantage.co/query?function=REAL_GDP&interval=quarterly&apikey={api_key}",
    'retail_sales': f"https://www.alphavantage.co/query?function=RETAIL_SALES&apikey={api_key}",
    'unemployment': f"https://www.alphavantage.co/query?function=UNEMPLOYMENT&apikey={api_key}"    
    }
    
    return _endpoints

In [3]:
def macro_indicators():
    
    '''
    This function uses the alphavantage api to pull macro indicators
    '''

    _endpoints = endpoints()
    
    for table, endpoint in _endpoints.items():
        try:
            time.sleep(20)
            print(f"Parsing {table} data")
            response = requests.get(f"{endpoint}")
            json = response.json()["data"]
            df = pd.DataFrame.from_dict(json)

            df.to_sql(name=f"{table}", if_exists='replace', index=False, con=sqlite_connection())

        except:
            # TODO: Better failure logging
            print(f"Failed: {table}")
 


In [4]:
def process_macro_datatypes():
    
    '''
    This function cleans up the datatypes for the 2 fields in these macro indicator tables.
    '''
    
    _endpoints = endpoints()
    
    for table in _endpoints:
        
        print(f"Processing {table}")
        df = pd.read_sql(f"select * from {table}",con=sqlite_connection())
        df['date'] = pd.to_datetime(df['date'])
        df['value'] = pd.to_numeric(df['value'])
        
        df.to_sql(f"{table}_clean",if_exists='replace',con=sqlite_connection())
        

In [5]:
def process_timeseries_differences():
    
    _endpoints = endpoints()
    
    for table in _endpoints:
        print(f"Calculating quarterly differences for {table}")
        qry = f'''
SELECT
   cc.*,
   x.quarter,
   x.year 
FROM
   {table}_clean cc 
   INNER JOIN
      (
         SELECT
            MAX(c.DATE) AS last_date,
            cr.quarter,
            cr.year 
         FROM
            {table}_clean c 
            LEFT JOIN
               calendar cr 
               ON c.DATE = cr.DATE 
         WHERE
            cr.quarter IS NOT NULL 
         GROUP BY
            cr.quarter,
            cr.YEAR
      )
      x 
      ON x.last_date = cc.DATE 
ORDER BY
   x.YEAR,
   x.quarter
'''
        # TODO: refactor this... 
        base_df = pd.read_sql(qry, con=sqlite_connection())
        base_df = base_df.set_index('date')
        perc_df = base_df.pct_change()
        diff_df = base_df.diff()
        int_df = pd.merge(perc_df,diff_df,on='date')
        final_df = pd.merge(int_df, base_df, on='date')
        final_df=final_df.rename(columns = {'value_x':'percentchange','value_y':'valuechange'})
        final_df=final_df.drop(columns=['quarter_x','year_x','quarter_y','year_y','index','index_x','index_y'])

        final_df = final_df.reset_index()
        final_df['date'] = pd.to_datetime(final_df['date'])
        
        nums = [col for col in final_df.columns if col !='date']
        for col in nums:
            final_df[col] = pd.to_numeric(final_df[col])
        
        final_df.to_sql(f"{table}_qtr",if_exists='replace',con=sqlite_connection())
        

In [6]:
def main():
    macro_indicators()
    process_macro_datatypes()
    process_timeseries_differences()

In [7]:
main()

Parsing cpi data
Parsing federal_funds data
Parsing gdp data
Parsing retail_sales data
Parsing unemployment data
Processing cpi
Processing federal_funds
Processing gdp
Processing retail_sales
Processing unemployment
Calculating quarterly differences for cpi
Calculating quarterly differences for federal_funds
Calculating quarterly differences for gdp
Calculating quarterly differences for retail_sales
Calculating quarterly differences for unemployment
