In [71]:
!ls

ETL.ipynb           Untitled.ipynb      create_tables.ipynb


In [63]:
import configparser
import psycopg2

In [70]:
!pwd 

/Users/margaret/OneDrive/Documents/projects/option_price_collection/jupyter_notebook_test


In [64]:
# read config file
config = configparser.ConfigParser()
config.read('../config.cfg')

['../config.cfg']

In [65]:
# STAGING TABLES
staging_options_copy = ("""
COPY staging_option FROM '{}'
CREDENTIALS 'aws_iam_role={}'
delimiter ',' gzip
ignoreheader 1
region 'us-east-2'
""").format(config.get('S3', 'OPTION_DATA'), config.get('IAM_ROLE', 'ARN'))

copy_table_queries = [staging_options_copy]

In [66]:
tickers_table_insert = ("""INSERT INTO dim_tickers 
                                (ticker, company, exchange_nm)
                            SELECT distinct Ticker as ticker, 
                                   Company as company,
                                   Exchange as exchange_nm
                            FROM staging_option
""")

contracts_table_insert = ("""INSERT INTO dim_contracts 
                                (contract_symbol, maturity_date, strike, option_type, ticker_key, 
                                    currency, contract_size)
                             SELECT contractSymbol, 
                                    TO_DATE(contractExpiryDate,'YYYYMMDD'),
                                    strike,
                                    OptionType,
                                    ticker_key,
                                    currency,
                                    contractSize
                             FROM staging_option, dim_tickers
                             WHERE staging_option.ticker = dim_tickers.ticker
""")


options_transation_insert = ("""INSERT INTO fact_options_transation
                                    (contract_key, ticker_key, time, price, bid, ask, change, percent_change,
                                     volume, open_interest, implied_vol, in_the_money)
                                SELECT  contract_key,
                                        dim_tickers.ticker_key, 
                                        lastTradeDate,
                                        lastPrice,
                                        bid,
                                        ask,
                                        change,
                                        percentChange,
                                        volume::INTEGER, 
                                        openInterest::INTEGER,
                                        impliedVolatility,
                                        inTheMoney
                                    FROM staging_option, dim_contracts, dim_tickers
                                    WHERE staging_option.Ticker = dim_tickers.Ticker
                                        AND staging_option.contractSymbol = dim_contracts.contract_symbol
""")
insert_table_queries = [tickers_table_insert, contracts_table_insert, options_transation_insert]

In [67]:
def load_staging_tables(cur, conn):
    """
    This function copy data from S3 to staing tables.
    
    input:
    cur - database cursor variable
    conn - database connection object
    
    return - None
    """
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()
        print("Staging table loading completed {}.".format(query))


def insert_tables(cur, conn):
    """
    This function insert data into tables from staging tables.
    
    input:
    cur - database cursor variable
    conn - database connection object
    
    return - None
    """
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()

In [68]:
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [27]:
DWH_DB_USER = config.get('CLUSTER', 'DB_USER')
DWH_DB_PASSWORD = config.get('CLUSTER', 'DB_PASSWORD')
DWH_ENDPOINT = config.get('CLUSTER', 'HOST')
DWH_PORT = config.get('CLUSTER', 'DB_PORT')
DWH_DB = config.get('CLUSTER', 'DB_NAME')

conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://awsuser:wydfin-hydxEg-0fyczy@redshift-cluster-1.cq00m0qwikth.us-east-2.redshift.amazonaws.com:5439/dev


In [69]:
# connect to AWS redshift cluster
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

# copy data from S3 to staging tables in redshift cluster
load_staging_tables(cur, conn)
# transform data from staging tables into fact & dimension tables
insert_tables(cur, conn)

conn.close()

Staging table loading completed 
COPY staging_option FROM 's3://option-price-collection37'
CREDENTIALS 'aws_iam_role=arn:aws:iam::736387989270:role/myRedshiftRole'
delimiter ',' gzip
ignoreheader 1
region 'us-east-2'
.
