# Data Ingestion (Main)
Uses YFinance library. If YFinance is deprecated, can use alternative TIINGO approach.

### Imports

In [103]:
import requests
import os
from os.path import join, dirname
from dotenv import load_dotenv, find_dotenv
import pandas as pd
import datetime
import json
import psycopg2
import yfinance as yf

### Retrieve env variables

In [None]:
load_dotenv(find_dotenv())

TIINGO_API_KEY = os.environ.get("TIINGO_API_KEY")
PG_HOST = os.environ.get("PG_HOST")
PG_PORT = os.environ.get("PG_PORT")
PG_DATABASE = os.environ.get("PG_DATABASE")
PG_USER = os.environ.get("PG_USER")
PG_PASSWORD = os.environ.get("PG_PASSWORD")

# YFINANCE
Find up-to-date ASX tickers through following link.
https://www.marketindex.com.au/data-downloads

Replace the file '30-june-2023.xlsx' in this directory.

### Test library endpoints

In [91]:
msft = yf.Ticker("MSFT")

# get all stock info
print(msft.info)

# get historical market data
hist = msft.history(period="1mo")
print(hist)

# show meta information about the history (requires history() to be called first)
print(msft.history_metadata)


# show actions (dividends, splits, capital gains)
msft.actions
msft.dividends
msft.splits
msft.capital_gains  # only for mutual funds & etfs

# show share count
msft.get_shares_full(start="2022-01-01", end=None)

# show financials:
msft.calendar
msft.sec_filings
# - income statement
msft.income_stmt
msft.quarterly_income_stmt
# - balance sheet
msft.balance_sheet
msft.quarterly_balance_sheet
# - cash flow statement
msft.cashflow
msft.quarterly_cashflow
# see `Ticker.get_income_stmt()` for more options

# show holders
msft.major_holders
msft.institutional_holders
msft.mutualfund_holders
msft.insider_transactions
msft.insider_purchases
msft.insider_roster_holders

msft.sustainability

# show recommendations
msft.recommendations
msft.recommendations_summary
msft.upgrades_downgrades

# show analysts data
msft.analyst_price_targets
msft.earnings_estimate
msft.revenue_estimate
msft.earnings_history
msft.eps_trend
msft.eps_revisions
msft.growth_estimates

# Show future and historic earnings dates, returns at most next 4 quarters and last 8 quarters by default.
# Note: If more are needed use msft.get_earnings_dates(limit=XX) with increased limit argument.
msft.earnings_dates

# show ISIN code - *experimental*
# ISIN = International Securities Identification Number
msft.isin

# show options expirations
msft.options

# show news
msft.news

# get option chain for specific expiration
# opt = msft.option_chain('YYYY-MM-DD')
# data available via: opt.calls, opt.puts

### Connect Database
Using psycopg2 library to access postgresql ([psycopg2 Documentation](https://www.psycopg.org/docs/))

In [133]:
connection = psycopg2.connect(
    host=PG_HOST, port=PG_PORT, database=PG_DATABASE,
    user=PG_USER, password=PG_PASSWORD
)
connection.autocommit = True
cur = connection.cursor()

### [ Optional ] Drop Tables

Only execute the code below if major error occurs during ingestion. Will remove tables and all of their contents

In [None]:
connection.rollback()
cur.close()
cur = connection.cursor()
# cur.execute("""DROP TABLE exchange CASCADE;""")
# cur.execute("""DROP TABLE type CASCADE;""")
# cur.execute("""DROP TABLE security CASCADE;""")
# cur.execute("""DROP TABLE daily_price CASCADE;""")
# cur.execute("""DROP TABLE portfolio_security CASCADE;""")
# cur.execute("""DROP TABLE portfolio CASCADE;""")
# cur.execute("""DROP TABLE users CASCADE;""")

connection.commit()
cur.close()

### Create Database Tables

In [143]:
connection.rollback()
cur.close()
cur = connection.cursor()

cur.execute("""
CREATE TABLE exchange 
(
    exchange_id SERIAL PRIMARY KEY,
    exchange_name VARCHAR(50) NOT NULL,
    exchange_full_name VARCHAR(50),
    exchange_currency CHAR(3),
    exchange_created_date DATE DEFAULT CURRENT_DATE,
    exchange_last_updated DATE DEFAULT CURRENT_DATE
);
""")

cur.execute("""
CREATE TABLE type
(
    type_id SERIAL PRIMARY KEY,
    type_name VARCHAR(25) NOT NULL
)    
""")

cur.execute("""
CREATE TABLE security 
(
    security_id SERIAL PRIMARY KEY,
    exchange_id INT NOT NULL,
    security_ticker VARCHAR(10) NOT NULL,
    type_id INT DEFAULT 1,
    security_name VARCHAR(50) NOT NULL,
    security_sector VARCHAR(50),
    security_industry VARCHAR(50),
    security_created_date DATE DEFAULT CURRENT_DATE,
    security_last_updated DATE DEFAULT CURRENT_DATE,
    CONSTRAINT exchange_security_fk
      FOREIGN KEY(exchange_id) 
      REFERENCES exchange(exchange_id),
    CONSTRAINT type_security_fk
      FOREIGN KEY(type_id) 
      REFERENCES type(type_id)
);
""")

cur.execute("""
CREATE TABLE daily_price 
(
    dp_id BIGSERIAL PRIMARY KEY,
    security_id INT NOT NULL,
    dp_date DATE NOT NULL,
    dp_created_date DATE DEFAULT CURRENT_DATE,
    dp_last_updated DATE DEFAULT CURRENT_DATE,
    dp_open NUMERIC(6,2),
    dp_high NUMERIC(6,2),
    dp_low NUMERIC(6,2),
    dp_close NUMERIC(6,2) NOT NULL,
    dp_adj_close NUMERIC(6,2),
    dp_volume NUMERIC(20),
    CONSTRAINT security_dp_fk
      FOREIGN KEY(security_id) 
      REFERENCES security(security_id),
    UNIQUE(dp_date, security_id)
)
""")

cur.execute("""
CREATE TABLE users 
(
    user_id SERIAL PRIMARY KEY,
    user_username VARCHAR(50),
    user_email VARCHAR(50)
)
""")

cur.execute("""
CREATE TABLE portfolio 
(
    portfolio_id SERIAL PRIMARY KEY,
    user_id INT NOT NULL,
    portfolio_name VARCHAR(50) NOT NULL,
    CONSTRAINT user_portfolio_fk
      FOREIGN KEY(user_id) 
      REFERENCES users(user_id)
)
""")

cur.execute("""
CREATE TABLE portfolio_security 
(
    portfolio_id INT NOT NULL,
    security_id INT NOT NULL,
    portfolio_security_volume INT NOT NULL,
    PRIMARY KEY (portfolio_id, security_id),
    CONSTRAINT port_portsec_fk
      FOREIGN KEY(portfolio_id) 
      REFERENCES portfolio(portfolio_id),
    CONSTRAINT sec_portsec_fk
      FOREIGN KEY(security_id) 
      REFERENCES security(security_id)
);
""")

connection.commit()
cur.close()

### Insert preliminary data
All types, and all exchanges need to be hard-code inputted

In [144]:
connection.rollback()
cur.close()
cur = connection.cursor()

cur.execute("""
INSERT INTO type
        (
            type_id,
            type_name
        )
    VALUES
        (
            1,
            'EQUITY'
        )
""")
cur.execute("""
INSERT INTO type
        (
            type_id,
            type_name
        )
    VALUES
        (
            2,
            'ETF'
        )
""")
cur.execute("""
INSERT INTO type
        (
            type_id,
            type_name
        )
    VALUES
        (
            3,
            'MUTUAL FUND'
        )
""")
cur.execute("""
INSERT INTO exchange
        (
            exchange_id,
            exchange_name,
            exchange_full_name,
            exchange_currency
        )
    VALUES
        (
            1,
            'ASX',
            'Australian Stock Exchange',
            'AUD'
        )
""")

connection.commit()
cur.close()


### SQL statements for processing web-scraped data
These act as templates so that insertion of data can be scaled

In [145]:
insert_security_sql = """
INSERT INTO security
            (
                exchange_id,
                security_ticker,
                type_id,
                security_name,
                security_sector,
                security_industry
            )
        VALUES
            (
                (
                    SELECT exchange_id FROM exchange e WHERE UPPER(e.exchange_name) = UPPER(%s)
                ),
                %s,
                (
                    SELECT type_id FROM type t WHERE UPPER(t.type_name) = UPPER(%s)
                ),
                %s,
                %s,
                %s
            )
        RETURNING security_id

"""

insert_daily_price_sql = """
INSERT INTO daily_price
        (
            security_id,
            dp_date,
            dp_open,
            dp_high,
            dp_low,
            dp_close,
            dp_volume
        )
    VALUES
        (
            %s,
            %s,
            %s,
            %s,
            %s,
            %s,
            %s

        )
"""

### Process ASX data
Based on web-scraped data retrieved from yfinance python module.
List of relevant tickers are retrieved from https://www.marketindex.com.au/data-downloads

A similar process will need to be achieved for any other desired exchanges

In [None]:

connection.rollback()
cur.close()
cur = connection.cursor()

# All active ASX tickers
tick_df = pd.read_excel("30-june-2023.xlsx")

for index, row in tick_df.iterrows():

    # Retrieve the information and daily price history for ticker
    try:
        security = yf.Ticker(f"{row['ASX Code'].strip()}.AX")
        hist = security.history(period='max',interval='1d')
    except Exception as e:
        ## Uncomment if want to see the errors relating to the retrieval of the security data
        # print(e)

        # If a listed ticker cannot be retrieved from yfinance, simply skip to the next one
        continue

    # Insert security and retrieve newly inserted ID
    try:
        security_data = (security.info.get('exchange','ASX'), security.info.get('symbol','N/A'), security.info.get('quoteType','Equity'), security.info.get('longName',security.info.get('shortName','N/A')), security.info.get('sector', 'N/A'), security.info.get('industry', 'N/A'))
        cur.execute(insert_security_sql, security_data)
        security_id = cur.fetchone()[0]

        ## Uncomment the following to see each time a security is added to the database as it runs
        # print(f"ID: {security_id}")
    except Exception as e:
        ## Uncomment if want to see errors relating to processing of the security
        # print(e)

        # If a security cannot be inputted into the database correctly, it is skipped over
        continue

    # Insert each daily price from history dataframe.
    hist = hist.convert_dtypes()
    for index2, row2 in hist.iterrows():
        try:
            dp_data = (security_id, index2, row2['Open'], row2['High'], row2['Low'], row2['Close'], row2['Volume'])
            cur.execute(insert_daily_price_sql, dp_data)
        except Exception as e:
            ## Uncomment if want to see errors relating to the daily_price ingestion
            # print(e)

            # If a daily price record spins an error in processing, then move onto the next day's price
            continue
    
    # db transaction boundary for each security
    connection.commit()


### Close DB connection

In [132]:
cur.close()
connection.close()