# dwh load oilprice

## Requirements & configuration

In [311]:
# # Install required packages
# ! pip install yfinance --upgrade --no-cache-dir
# ! pip install psycopg2-binary
# ! pip install sqlalchemy

In [1]:
# # Import required packages
import os
import json
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from datetime import date

In [2]:
# DB configuration
db1_config = {
    'host': 'datalake.cknmu1bvrxjg.us-east-1.rds.amazonaws.com',
    'port': '5432',
    'user': 'muser',
    'password': 'datalake',
    'dbname': 'datalake',
}

db2_config = {
    'host': 'datawarehouse.cbjds2ertxwh.us-east-1.rds.amazonaws.com',
    'port': '5432',
    'user': 'dw_admin',
    'password': 'tW7OsKSHr0Wby28Ue2es',
    'dbname': 'datawarehouse',
}

# Configure cnx_string for sqlalchemy
db1 = f'postgresql://{db1_config["user"]}:{db1_config["password"]}@{db1_config["host"]}/{db1_config["dbname"]}'
db2 = f'postgresql://{db2_config["user"]}:{db2_config["password"]}@{db2_config["host"]}/{db2_config["dbname"]}'

## Setup DB Connection

In [3]:
# Establish connection to database 'lakehousedwh'
try: 
    dwh_conn = psycopg2.connect(
        dbname=db2_config['dbname'],
        user=db2_config['user'],
        host=db2_config['host'],
        password=db2_config['password'],
        port=db2_config['port']
    )

except psycopg2.Error as e: 
    print("Error: Could not make the connection to database 'datawarehouse'")
    print(e)

# Create cursor
try: 
    dwh_cursor = dwh_conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get the cursor to the database 'datawarehouse'")
    print(e)

# Set auto commit feature
dwh_conn.set_session(autocommit=True)

# Create engines
db1_engine = create_engine(db1)
db2_engine = create_engine(db2)

## Create table oilprice in DWH

In [4]:
# Create table 'oilprice_daily_stage' in DWH
sql = """
    CREATE TABLE IF NOT EXISTS oilprice_daily_stage (
        date DATE,
        close float,
        news_link text
        )
"""
dwh_cursor.execute(sql)

## Transfer price and news data from dwl to dwh

In [41]:
# Query for price data
sql1 = '''
     SELECT date, close
     FROM oilprice
     ORDER BY date ASC;
     '''

# Query for news data
sql2 = '''
    SELECT ingested_at, news
    FROM news_oil
    ORDER BY ingested_at ASC;
    '''

df_oilprice = pd.read_sql(sql1, db1)
df_news = pd.read_sql(sql2, db1)

# Transform news data (extract news link, convert date column)
df_news.loc[:, 'news_link'] = df_news.news.map(lambda x: x[0]['link'])
df_news['date'] = pd.to_datetime(df_news['ingested_at']).dt.date
df_news.drop(columns = ['ingested_at', 'news'], inplace = True)

# Merge into single dataframe
df = pd.merge(df_oilprice,df_news,on='date',how='left')

# Extend data frame with missing trading days
# Forward fill closing price and news column
date_range = pd.date_range(start=df.date.min(), end=df.date.max())
df = df.set_index('date').reindex(date_range).fillna(
    method='ffill').rename_axis('date').reset_index()

# Insert values into table 'oilprice_daily_stage' using sqlalchemy
df.to_sql('oilprice_daily_stage', db2, index=False, if_exists='replace')

## Check values

In [43]:
# Show inserted values
sql = '''
    SELECT *
    FROM oilprice_daily_stage
    ORDER BY date DESC
    LIMIT 10;
    '''
test = pd.read_sql(sql, db2)
test

Unnamed: 0,date,close,news_link
0,2022-05-06,112.389999,https://finance.yahoo.com/video/stocks-nasdaq-...
1,2022-05-05,110.900002,https://finance.yahoo.com/video/stocks-nasdaq-...
2,2022-05-04,110.139999,https://finance.yahoo.com/video/stocks-mixed-f...
3,2022-05-03,104.970001,https://finance.yahoo.com/news/oil-something-f...
4,2022-05-02,109.360001,https://finance.yahoo.com/video/oil-earnings-e...
5,2022-05-01,109.339996,https://finance.yahoo.com/video/oil-giants-che...
6,2022-04-30,109.339996,https://finance.yahoo.com/video/oil-giants-che...
7,2022-04-29,109.339996,https://finance.yahoo.com/video/oil-giants-che...
8,2022-04-28,107.589996,https://finance.yahoo.com/video/market-strateg...
9,2022-04-27,105.32,https://finance.yahoo.com/video/oil-rebounds-z...


In [44]:
# Count inserted values
sql = '''
    SELECT COUNT(*) cnt
    FROM oilprice_daily_stage;
    '''
count = pd.read_sql(sql, db2).values[0][0]
count

5395

## Close the connection

In [45]:
# Close connection 
dwh_cursor.close()
dwh_conn.close()