In [54]:
import pandas as pd
import numpy as np
import pandas_market_calendars as mcal

# CONNECT to PostgreSQL

In [3]:
import dotenv
import os
import psycopg2

from os.path import expanduser
home = expanduser("~")

# dotenv.load_dotenv(fr"{home}\creds\local_postgres.txt")
# schema = 'deliverable_2'
dotenv.load_dotenv(fr"{home}\creds\pwd_redshift.txt")
schema = 'juanmlacasa_coderhouse'

connection_params = dict(
        host=os.getenv('host')
        , dbname=os.getenv('dbname')
        , user=os.getenv('user')
        , password=os.getenv('password')
        , port=os.getenv('port')
)


try:
    conn = psycopg2.connect(
        **connection_params
    )
    print("Connected to Redshift successfully!")
    
except Exception as e:
    print("Unable to connect to Redshift.")
    print(e)

Connected to Redshift successfully!


# EXTRACT

In [4]:
# Setup the parameters for the data to be fetched
ticker_info = dict(ticker_symbol = 'AAPL'
                   , data_source='Yahoo Finance'
                   , source_type = 'Platform'
                   , interval='1d'
                   , start_date = '2023-01-15'
                   , end_date = '2023-02-15')

In [5]:
import yfinance as yf

# Fetch data for Apple Inc. with a daily interval
data = yf.download(ticker_info['ticker_symbol']
                   , interval=ticker_info['interval']
                   , start=ticker_info['start_date']
                   , end=ticker_info['end_date'])

# Display the data
data.head()

[*********************100%%**********************]  1 of 1 completed


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2023-01-17,134.830002,137.289993,134.130005,135.940002,135.362488,63646600
2023-01-18,136.820007,138.610001,135.029999,135.210007,134.63559,69672800
2023-01-19,134.080002,136.25,133.770004,135.270004,134.695343,58280400
2023-01-20,135.279999,138.020004,134.220001,137.869995,137.284286,80223600
2023-01-23,138.119995,143.320007,137.899994,141.110001,140.510529,81760300


# TRANSFORM

In [6]:
def get_or_create_record(conn, search_value, table_name, entity):
    ''''
    This function ensures that a record with a specific search value exists in the database.
    If the record already exists, it returns its ID.
    If the record does not exist, it creates one and then returns the new ID.
    '''

    # --- Search for Existing Record ---
    with conn.cursor() as cur:
        # Construct and execute a SELECT query to search for an existing record
        cur.execute(f"SELECT {entity}_id FROM {schema}.{table_name} WHERE {entity}_name = '{search_value}';")
        try:
            # If a record is found, retrieve its ID
            record_id = cur.fetchone()[0]
        except TypeError:
            # If no record is found, set record_id to None
            record_id = None

    # --- Insert Record if Not Found ---
    if not record_id:
        with conn.cursor() as cur:
            # Construct and execute an INSERT query to create a new record
            cur.execute(f"INSERT INTO {schema}.{table_name} ({entity}_name) VALUES ('{search_value}');")
            # Retrieve the ID of the newly created record
            cur.execute(f"SELECT {entity}_id FROM {schema}.{table_name} WHERE {entity}_name = '{search_value}';")
            record_id = cur.fetchone()[0]
            # Commit the transaction to save the new record in the database
            conn.commit()

    # --- Return the Record ID ---
    return record_id


In [7]:
asset_id = get_or_create_record(conn, ticker_info['ticker_symbol'], "assets", "asset")
data_source_id = get_or_create_record(conn, ticker_info['data_source'], "data_sources", "source")
source_type_id = get_or_create_record(conn, ticker_info['source_type'], "source_types", "type")
print(asset_id, source_type_id, data_source_id)

4 2 3


In [8]:
data.reset_index(inplace=True)
data.columns = [c.lower().replace(' ', '_') for c in data.columns]
data.rename(columns={'date':'ts'}, inplace=True)
data['asset_id'] = asset_id
data['source_id'] = data_source_id

In [20]:
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay

# Define custom business days excluding US federal holidays
us_bd = CustomBusinessDay(calendar=USFederalHolidayCalendar())


In [53]:
# import pandas_market_calendars as mcal

# Check for missing dates (excluding weekends and holidays)
all_days = mcal.date_range(mcal.get_calendar('NYSE').schedule(start_date=ticker_info['start_date'], end_date=ticker_info['end_date'])
                , frequency='1D')[:-1]
# pd.date_range(start=ticker_info['start_date'], end=ticker_info['end_date'], freq=us_bd)  # B denotes business days
missing_days = np.setdiff1d(all_days.date, data['ts'].dt.date)

if len(missing_days) > 0:
    print("Missing dates:", missing_days)
else:
    print("No missing dates!")

No missing dates!


In [14]:
# check for and drop duplicates
data.drop_duplicates('ts', inplace=True)
data.duplicated('ts').sum()

0

# LOAD

In [58]:
from psycopg2.extras import execute_values

def load_staging_postgresql(conn, table_name, dataframe):
    '''
    This function loads data from a Pandas DataFrame into a specified table in a PostgreSQL database.
    If records with the same primary key exist, the function updates them with the new values.
    '''
    clean_staging = f'''
    DELETE FROM {schema}.staging_{table_name}
    '''
    # --- Convert DataFrame to List of Tuples ---
    # Convert each row of the DataFrame into a tuple and create a list of these tuples
    values = [tuple(x) for x in dataframe.to_numpy()]

    # --- Format Column Names ---
    # Construct a string of column names separated by commas
    cols = '"'+'''", "'''.join(dataframe.columns)+'"'

    # --- Prepare SQL Queries ---
    # Construct the base INSERT INTO query and append the ON CONFLICT clause
    insert_sql = f"INSERT INTO {schema}.staging_{table_name} ({cols}) VALUES %s"

    # --- Execute Transaction ---
    # Execute the query using execute_values for batch insertion
    with conn.cursor() as curs:
        # Construct and execute an INSERT query to create a new record
        curs.execute(clean_staging)
        execute_values(curs, insert_sql, values)
        conn.commit()

    print('Finished loading data into staging PostgreSQL.')


In [59]:
def merge_redshift(conn, table_name):

    # --- Prepare SQL Queries ---
    # Construct DELETE FROM query to delete duplicate records from target table
    # Create an INSERT INTO query to insert records from the staging table into the target table


    delete_sql = f'''
    DELETE FROM {schema}.{table_name}
    USING {schema}.staging_{table_name} AS staging
    WHERE {schema}.{table_name}.asset_id = staging.asset_id AND
        {schema}.{table_name}.source_id = staging.source_id AND
        {schema}.{table_name}.ts = staging.ts;
    '''
    
    insert_sql = f'''
    INSERT INTO {schema}.{table_name}
    SELECT * FROM {schema}.staging_{table_name};
    '''

    # --- Execute Transaction ---
    # Execute the query
    with conn.cursor() as cur:
        cur.execute(delete_sql)
        cur.execute(insert_sql)
        # Commit the transaction to save the new record in the database
        conn.commit()

    print('Finished loading data into PostgreSQL.')

In [60]:
load_staging_postgresql(conn, 'ohlcv_data', data)

Finished loading data into PostgreSQL.


In [61]:
merge_redshift(conn, 'ohlcv_data')

Finished loading data into PostgreSQL.


In [31]:
# close redshift connection
conn.close()

In [30]:
# with conn.cursor() as curs:
#     curs.execute("ROLLBACK")
#     conn.commit()