In [465]:
import duckdb
import pandas as pd
from fredapi import Fred
import certifi
import os
from datetime import datetime, timedelta, date
from config import FRED_API_KEY
os.environ['SSL_CERT_FILE'] = certifi.where() #API Key security requirements
fred = Fred(api_key=FRED_API_KEY)
today_date = date.today().strftime('%Y-%m-%d') #Fetch Date

In [466]:
# Master List of FRED Indexes:
series_ids = [
    "DCOILWTICO", "GASREGW", "DPROPANEMBTX", "CPIAUCSL", "PPIACO", "PCE", "CPILFESL", "UMCSENT", "PCEPILFE", "MICH",
    "PAYEMS", "UNRATE", "CES0500000003", "CIVPART", "JTSJOL", "ICSA", "ADPWNUSNERSA", "U6RATE", "AWHNONAG", "FEDFUNDS", 
    "DFEDTAR", "IOER", "DISCOUNT", "DGS10", "SOFR", "PRIME", "TB3MS", "CPFF", "CSUSHPINSA", "HSN1F", "PERMIT", "HOUST", 
    "USSTHPI", "EXHOSLUSM495S", "MORTGAGE30US", "RRVRUSQ156N", "RHORUSQ156N", "HOSINVUSM495N"
]

# Energy Indexes:
energy_indexes =  ['DCOILWTICO', 'GASREGW', 'DPROPANEMBTX']
# CPI/PPI/Consumer Sentiment Indexes:
cpi_ppi_consumer_sentiment_indexes =  ['CPIAUCSL', 'PPIACO', 'PCE', 'CPILFESL', 'UMCSENT', 'PCEPILFE', 'MICH']
# Employment Indexes:
employment_indexes =  ['PAYEMS', 'UNRATE', 'CES0500000003', 'CIVPART', 'JTSJOL', 'ICSA', 'ADPWNUSNERSA', 'U6RATE', 'AWHNONAG']
# Federal Funds Indexes:
federal_funds_indexes =  ['FEDFUNDS', 'DFEDTAR', 'IOER', 'DISCOUNT', 'DGS10', 'SOFR', 'PRIME', 'TB3MS', 'CPFF']
# Housing Indexes:
housing_indexes =  ['CSUSHPINSA', 'HSN1F', 'PERMIT', 'HOUST', 'USSTHPI', 'EXHOSLUSM495S', 'MORTGAGE30US', 'RRVRUSQ156N', 'RHORUSQ156N', 'HOSINVUSM495N']

index_categories = {
    'energy': energy_indexes,
    'cpi_ppi_sentiment': cpi_ppi_consumer_sentiment_indexes,
    'employment': employment_indexes,
    'federal_funds': federal_funds_indexes,
    'housing': housing_indexes
}

### Fetch Data from the Economic Research Federal Reserve Bank of St. Louis

In [467]:
def fetch_series_data(series_id):
    """
    Fetch series data for a given series ID between start_date and end_date.
    
    Parameters:
    - series_id (str): The FRED series ID.
    
    Returns:
    - pd.Series: The time series data.
    """
    try:
        series_data = fred.get_series(series_id)
        return series_data
    except Exception as e:
        print(f"Error fetching series data for {series_id}: {e}")
        return pd.Series([])

def fetch_metadata(series_id):
    """
    Fetch metadata for a given series ID.
    
    Parameters:
    - series_id (str): The FRED series ID.
    
    Returns:
    - dict: The metadata for the series.
    """
    try:
        metadata = fred.get_series_info(series_id)
        return metadata
    except Exception as e:
        print(f"Error fetching metadata for {series_id}: {e}")
        return {}

def construct_dataframe(series_data, metadata, series_id, fetch_date):
    """
    Construct a DataFrame from series data and metadata.
    
    Parameters:
    - series_data (pd.Series): The time series data.
    - metadata (dict): The metadata for the series.
    - series_id (str): The FRED series ID.
    - fetch_date (str): The date when the data was fetched.
    
    Returns:
    - pd.DataFrame: The constructed DataFrame with data and metadata.
    """
    try:
        df = pd.DataFrame(series_data, columns=['series_index']).reset_index().rename(columns={'index': 'date'})
        df['name'] = metadata.get('title', '')
        df['seriesid'] = series_id
        df['source'] = f'https://fred.stlouisfed.org/series/{series_id}'
        df['units'] = metadata.get('units', '')
        df['seasonal_adjustment'] = metadata.get('seasonal_adjustment', '')
        df['frequency'] = metadata.get('frequency', '')
        df['fred_last_updated_date'] = metadata.get('last_updated', '')
        df['fetch_date'] = fetch_date
        return df
    except Exception as e:
        print(f"Error constructing dataframe for {series_id}: {e}")
        return pd.DataFrame()

def fetch_fred_data(series_ids):
    """
    Fetch data and metadata for a list of FRED series IDs.
    
    Parameters:
    - series_ids (list of str): The FRED series IDs.
    
    Returns:
    - list of pd.DataFrame: A list of DataFrames with data and metadata for each series.
    """
    today_date = date.today().strftime('%Y-%m-%d')
    dataframes = []
    
    for series_id in series_ids:
        series_data = fetch_series_data(series_id)
        if series_data.empty:
            continue
        metadata = fetch_metadata(series_id)
        df = construct_dataframe(series_data, metadata, series_id, today_date)
        dataframes.append(df)
    
    return dataframes


In [468]:
# Fetch data
dataframes = fetch_fred_data(series_ids)

### Connect to DuckDB and Iterate and Place Tables in the DuckDB database

In [469]:
# Creates empty database and shows existing tables
sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name


In [470]:
conn = duckdb.connect('data/fred.db')
for series_id, dataframe in zip(series_ids, dataframes):
    # Convert series ID to lowercase for the table name
    table_name = series_id.lower()
    
    # Register the DataFrame as a virtual table
    conn.register(table_name + "_temp", dataframe)
    
    # Create or replace the physical table from the virtual table
    conn.execute(f'''
    CREATE OR REPLACE TABLE {table_name} AS 
    SELECT * FROM {table_name}_temp
    ''')

    # Optionally, unregister the virtual table to clean up
    conn.unregister(table_name + "_temp")
conn.close()

sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name
0,adpwnusnersa
1,awhnonag
2,ces0500000003
3,civpart
4,cpff
5,cpiaucsl
6,cpilfesl
7,csushpinsa
8,dcoilwtico
9,dfedtar


### Create Bronze Layer of Medallion Architecture

In [471]:
def create_bronze_sql_files(series_ids, target_directory):
    """
    Create .sql files in the target directory for a list of series IDs.
    
    Parameters:
    - series_ids (list of str): The FRED series IDs.
    - target_directory (str): The target directory to store .sql files.
    """
    # Ensure the target directory exists, create if not
    bronze_directory = os.path.join(target_directory, "bronze")
    os.makedirs(bronze_directory, exist_ok=True)
    
    # SQL template
    sql_template = """SELECT 
date AS Date,
series_index AS Series,
name AS Name,
units AS Units,
seasonal_adjustment AS AdjSeas,
frequency AS Freq,
fred_last_updated_date AS LastUpdatedDate,
fetch_date AS FetchDate
FROM {}
"""
    
    # Iterate over series IDs and create SQL files
    for series_id in series_ids:
        file_name = f"bronze_{series_id.lower()}.sql"
        file_path = os.path.join(bronze_directory, file_name)
        
        # Format the SQL query for the current series ID
        sql_query = sql_template.format(series_id.lower())
        
        # Write the SQL query to a file
        with open(file_path, 'w') as sql_file:
            sql_file.write(sql_query)
            
        print(f"SQL file created: {file_path}")

In [472]:
target_directory = "/Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models"
create_bronze_sql_files(series_ids, target_directory)

SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_dcoilwtico.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_gasregw.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_dpropanembtx.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_cpiaucsl.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_ppiaco.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_pce.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_cpilfesl.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/bronze/bronze_umcsent.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economi

In [473]:
sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name
0,adpwnusnersa
1,awhnonag
2,bronze_adpwnusnersa
3,bronze_awhnonag
4,bronze_ces0500000003
...,...
71,tb3ms
72,u6rate
73,umcsent
74,unrate


### Create Silver Layer of Medallion Architecture (Union All)

In [474]:
def create_silver_sql_union_all_files(index_categories, target_directory):
    """
    Create .sql files in the 'silver' subdirectory for unioning tables related to each category.

    Parameters:
    - index_categories (dict of list): A dictionary where each key is a category name and each value is a list of series IDs.
    - target_directory (str): The target directory to store .sql files.
    """
    # Ensure the target directory exists, create if not
    silver_directory = os.path.join(target_directory, "silver", "unionall")
    os.makedirs(silver_directory, exist_ok=True)

    # Iterate over each category and create SQL files
    for category_name, series_ids in index_categories.items():
        file_name = f"silver_{category_name}_unionall.sql"
        file_path = os.path.join(silver_directory, file_name)
        
        # Start the SQL query for UNION ALL
        sql_queries = []
        for series_id in series_ids:
            table_name = series_id.lower()
            sql_query = f"SELECT Date, \
            \nSeries, \
            \nName, \
            \nUnits, \
            \nAdjSeas, \
            \nFreq, \
            \nLastUpdatedDate, \
            \nFetchDate \
            \nFROM {{{{ref('bronze_%s')}}}}" % table_name
            sql_queries.append(sql_query)
        union_sql = " \nUNION ALL \n".join(sql_queries)
        
        # Write the SQL query to a file
        with open(file_path, 'w') as sql_file:
            sql_file.write(union_sql)
            
        print(f"SQL file created: {file_path}")

In [475]:
target_directory = "/Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models"
create_silver_sql_union_all_files(index_categories, target_directory)

SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/unionall/silver_energy_unionall.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/unionall/silver_cpi_ppi_sentiment_unionall.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/unionall/silver_employment_unionall.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/unionall/silver_federal_funds_unionall.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/unionall/silver_housing_unionall.sql


In [476]:
sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name
0,adpwnusnersa
1,awhnonag
2,bronze_adpwnusnersa
3,bronze_awhnonag
4,bronze_ces0500000003
...,...
76,tb3ms
77,u6rate
78,umcsent
79,unrate


### Create Silver Layer of Medallion Architecture (AdjSeas - Boolean)

In [477]:
def create_silver_sql_binaryconversion_files(index_categories, target_directory):
    """
    Create .sql files for transforming the AdjSeas column in the 'silver' subdirectory.

    Parameters:
    - index_categories (dict of list): A dictionary where each key is a category name.
    - target_directory (str): The target directory to store .sql files.
    """
    # Define the target subdirectory for these transformation SQL files
    silver_directory = os.path.join(target_directory, "silver", "binaryconversion")
    os.makedirs(silver_directory, exist_ok=True)

    # SQL template for transforming AdjSeas
    sql_template = """SELECT 
Date,
Series,
Name,
Units,
CASE 
    WHEN AdjSeas = 'Not Seasonally Adjusted' THEN 0 
    WHEN AdjSeas = 'Seasonally Adjusted' THEN 1 
    ELSE NULL 
END AS AdjSeas,
Freq,
LastUpdatedDate,
FetchDate
FROM {{{{ref('silver_{category_name}_unionall')}}}}"""

    # Iterate over category names to create transformation SQL files
    for category_name in index_categories.keys():
        # Format file name and path
        file_name = f"silver_{category_name}_binaryconversion.sql"
        file_path = os.path.join(silver_directory, file_name)
        
        # Format the SQL query for the current category
        formatted_sql = sql_template.format(category_name=category_name.lower())
        
        # Write the formatted SQL query to a file
        with open(file_path, 'w') as sql_file:
            sql_file.write(formatted_sql)
            
        print(f"SQL file created: {file_path}")

In [478]:
create_silver_sql_binaryconversion_files(index_categories, target_directory)

SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/binaryconversion/silver_energy_binaryconversion.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/binaryconversion/silver_cpi_ppi_sentiment_binaryconversion.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/binaryconversion/silver_employment_binaryconversion.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/binaryconversion/silver_federal_funds_binaryconversion.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/binaryconversion/silver_housing_binaryconversion.sql


In [479]:
sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name
0,adpwnusnersa
1,awhnonag
2,bronze_adpwnusnersa
3,bronze_awhnonag
4,bronze_ces0500000003
...,...
81,tb3ms
82,u6rate
83,umcsent
84,unrate


### Create Silver Layer of Medallion Architecture (Category Field Added)

In [480]:
def create_silver_sql_add_category_files(index_categories, target_directory):
    """
    Create .sql files for adding a Category column, referencing the binaryconversion files.

    Parameters:
    - index_categories (dict of list): A dictionary where each key is a category name.
    - target_directory (str): The target directory to store .sql files.
    """
    # Define the target subdirectory for these SQL files
    silver_directory = os.path.join(target_directory, "silver", "addcategory")
    os.makedirs(silver_directory, exist_ok=True)

    # SQL template for adding Category column
    sql_template = """SELECT 
Date,
Series,
Name,
Units,
AdjSeas,
Freq,
LastUpdatedDate,
'{}' AS Category,
FetchDate
FROM {{{{ref('silver_{}_binaryconversion')}}}}"""

    # Iterate over category names to create SQL files
    for category_name in index_categories.keys():
        # Format file name and path
        file_name = f"silver_{category_name}_addcategory.sql"
        file_path = os.path.join(silver_directory, file_name)
        
        # Format the SQL query for the current category
        formatted_sql = sql_template.format(category_name, category_name.lower())
        
        # Write the formatted SQL query to a file
        with open(file_path, 'w') as sql_file:
            sql_file.write(formatted_sql)
            
        print(f"SQL file created: {file_path}")

In [481]:
create_silver_sql_add_category_files(index_categories, target_directory)

SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/addcategory/silver_energy_addcategory.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/addcategory/silver_cpi_ppi_sentiment_addcategory.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/addcategory/silver_employment_addcategory.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/addcategory/silver_federal_funds_addcategory.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/addcategory/silver_housing_addcategory.sql


In [482]:
sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name
0,adpwnusnersa
1,awhnonag
2,bronze_adpwnusnersa
3,bronze_awhnonag
4,bronze_ces0500000003
...,...
86,tb3ms
87,u6rate
88,umcsent
89,unrate


### Create Silver Layer of Medallion Architecture (Explicit Data Types Assigned)

In [483]:
def create_silver_sql_set_datatypes_files(index_categories, target_directory):
    """
    Create .sql files for setting data types, referencing the addcategory files.

    Parameters:
    - index_categories (dict of list): A dictionary where each key is a category name.
    - target_directory (str): The target directory to store .sql files.
    """
    # Define the target subdirectory for these SQL files
    silver_directory = os.path.join(target_directory, "silver", "setdatatypes")
    os.makedirs(silver_directory, exist_ok=True)

    # SQL template for casting data types
    sql_template = """SELECT 
CAST(Date AS DATETIME(7)) AS Date,
CAST(Series AS FLOAT) AS Series,
CAST(Name AS NVARCHAR(60)) AS Name,
CAST(Units AS NVARCHAR(60)) AS Units,
CAST(AdjSeas AS BIT) AS AdjSeas,
CAST(Freq AS NVARCHAR(60)) AS Freq,
CAST(LastUpdatedDate AS DATETIME(7)) AS LastUpdatedDate,
CAST(FetchDate AS DATETIME(7)) AS FetchDate
FROM {{{{ref('silver_{category_name}_addcategory')}}}}"""

    # Iterate over category names to create SQL files
    for category_name in index_categories.keys():
        # Format file name and path
        file_name = f"silver_{category_name}_setdatatypes.sql"
        file_path = os.path.join(silver_directory, file_name)
        
        # Format the SQL query for the current category
        formatted_sql = sql_template.format(category_name=category_name.lower())
        
        # Write the formatted SQL query to a file
        with open(file_path, 'w') as sql_file:
            sql_file.write(formatted_sql)
            
        print(f"SQL file created: {file_path}")

In [484]:
create_silver_sql_set_datatypes_files(index_categories, target_directory)

SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/setdatatypes/silver_energy_setdatatypes.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/setdatatypes/silver_cpi_ppi_sentiment_setdatatypes.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/setdatatypes/silver_employment_setdatatypes.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/setdatatypes/silver_federal_funds_setdatatypes.sql
SQL file created: /Users/jeffreyblack/Projects/FREDDataProject/fred_economic_data/models/silver/setdatatypes/silver_housing_setdatatypes.sql


In [485]:
sql_query = ''' 
show tables
''' 

with duckdb.connect('data/fred.db') as con: #using with to automatically close the database
    display(con.sql(sql_query).df())

Unnamed: 0,name
0,adpwnusnersa
1,awhnonag
2,bronze_adpwnusnersa
3,bronze_awhnonag
4,bronze_ces0500000003
...,...
91,tb3ms
92,u6rate
93,umcsent
94,unrate


### Create Gold Layer of Medallion Architecture