In [1]:
import pandas as pd
import numpy as np
import requests as req
from sqlalchemy import create_engine
from sqlalchemy.sql import text

In [2]:
%run ./configuration.ipynb

In [3]:
engine = create_engine(CONF.DB_CONNECTION_STR)

In [4]:
# def get_latest_symbol():
#     try:
#         db_con = engine.connect()
#         chkpnt_data = pd.read_sql(f"SELECT symbol, update_at FROM etl_latest_delisted_symbol", con=db_con)
#         return chkpnt_data
#     except Exception as e:
#         print(e)
#         return None
#     finally:
#         try:
#             db_con.close()
#         except Exception as e:
#             print(e)
            

In [5]:
def get_company_delisted():
    page = 0
    item = []
    
    # latest_symbol_df = get_latest_symbol()
    # latest_symbol = None
    
    # if (latest_symbol_df is not None) or (len(latest_symbol_df) != 0):
    #     latest_symbol = latest_symbol_df["symbol"][0]
    #     print('Latest symbol in system : ', latest_symbol)
    
    while(True):
        _data = req.get(f"https://financialmodelingprep.com/api/v3/delisted-companies?page={page}&apikey={CONF.API_KEY}")
        
        if _data.json()!=[]:
            pd_data = pd.DataFrame(_data.json())
            page = page + 1
            item.append(pd_data)
            # if latest_symbol and len(pd_data.loc[pd_data["symbol"] == latest_symbol]) != 0:
            #     break
        else:
            break
    
    return_df = pd.concat(item, ignore_index=True)
    
    return_df['symbol'] = return_df['symbol'].str.split(" ").str[0]
    return_df["ipoDate"] = pd.to_datetime(return_df["ipoDate"])
    return_df["delistedDate"] = pd.to_datetime(return_df["delistedDate"])
    
    # if latest_symbol:
    #     return_df = return_df.loc[return_df.index < return_df[return_df["symbol"]==latest_symbol].index[0]]
        
    return_df.rename(columns=table_mapping["stg.tmp_dim_delisted_companies"], inplace=True)
    
    return return_df
    

In [6]:
def fetch_divident_data(symbol, company_id):
    _data = None
    pd_data = None
    
    try:
        _data = req.get(f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_dividend/{symbol}?apikey={CONF.API_KEY}")
        
        if bool(_data.json()):
            try:
                pd_data = pd.DataFrame(_data.json()["historical"])
            except:
                raise Exception(_data.json()["Error Message"])
            pd_data["company_id"] = company_id
            pd_data.replace(r'^\s*$', np.nan, regex=True, inplace=True)
        else:
            pd_data = []
            
    except Exception as e:
        print(e)
    
    return pd_data

In [7]:
def get_divident_data(company_listed):
    history_divident_data = []
    
    for idx, row in company_listed.iterrows():
        co_divident = fetch_divident_data(row['symbol'], row['id'])

        if co_divident is not None:
            if len(co_divident) > 0:
                history_divident_data.append(co_divident)
        else:
            print("Unable to connect to API or Request reached limit.")
            break;
        
    return_df = pd.concat(history_divident_data, ignore_index=True)
    
    return_df["date"] = pd.to_datetime(return_df["date"])
    return_df["label"] = pd.to_datetime(return_df["label"])
    return_df["recordDate"] = pd.to_datetime(return_df["recordDate"])
    return_df["paymentDate"] = pd.to_datetime(return_df["paymentDate"])
    return_df["declarationDate"] = pd.to_datetime(return_df["label"])
    
    return_df.rename(columns=table_mapping["stg.tmp_fct_stock_dividend"], inplace=True)
    
    return return_df

In [8]:
def run_delisted_comp_integration():
    print("Running - Feed data from API")
    delisted_comp = get_company_delisted()
    
    with engine.connect() as connection:
        with connection.begin():
            print("Running - Overwrite dataframe into database temp table.")
            result_count = delisted_comp.to_sql("tmp_dim_delisted_companies", con=connection, schema="stg", if_exists="replace", index=False)
            
            if result_count>0:
                sqls = """update public.dim_delisted_companies 
                              set relisted_date = current_date 
                              where symbol not in (
                                  select symbol from stg.tmp_dim_delisted_companies
                              )"""
                print("Running - ", sqls)
                
                result_update_relisted = connection.execute(text(sqls))
                
                sqls = """update public.dim_delisted_companies
                              set company_name=tmp.company_name,
                                  exchange=tmp.exchange,
                                  ipodate=tmp.ipodate,
                                  delisted_date=tmp.delisted_date,
                                  updated_ts=current_timestamp
                              from stg.tmp_dim_delisted_companies tmp
                              where dim_delisted_companies.symbol=tmp.symbol and
                                  dim_delisted_companies.relisted_date is not null and
                                 (dim_delisted_companies.company_name!=tmp.company_name or
                                  dim_delisted_companies.exchange!=tmp.exchange or
                                  dim_delisted_companies.ipodate!=tmp.ipodate or
                                  dim_delisted_companies.delisted_date!=tmp.delisted_date)"""
                print("Running - ", sqls)
                result_update_delisted = connection.execute(text(sqls))
                
                sqls = """insert into public.dim_delisted_companies (symbol, company_name, exchange, ipodate, delisted_date, created_ts)
                          select tmp.symbol, tmp.company_name, tmp.exchange, tmp.ipodate, tmp.delisted_date, current_timestamp
                              from stg.tmp_dim_delisted_companies tmp
                              where tmp.symbol not in (
 	                              select symbol 
 	                              from public.dim_delisted_companies 
 	                              where relisted_date is not null
                              )"""
                print("Running - ", sqls)
                result_insert_delisted = connection.execute(text(sqls))
                
    print("Integrated dim_delisted_companies completed.")
                

In [9]:
def run_history_dividend_integration():
    with engine.connect() as connection:
        
        sqls = """select id, symbol from dim_delisted_companies 
                   where history_div_fetched_date is null 
                      or history_div_fetched_date < (current_date - interval '30 day')::date"""
        
        print("Running - Fetch delisted company list from database with -", sqls)
        selected_delisted_company = pd.read_sql(sqls, con=connection)
        
        print("Running - Get historical dividend data")
        div_df = get_divident_data(selected_delisted_company)
        
        with connection.begin():
            print("Running - Overwrite dataframe into database temp table")
            result_count = div_df.to_sql("tmp_fct_stock_dividend", con=connection, if_exists="replace", index=False)
            
            sqls = """update public.fct_stock_dividend
                         set label_date=tmp.label_date,
                             adj_dividend=tmp.adj_dividend,
                             dividend=tmp.dividend,
                             record_date=tmp.record_date,
                             payment_date=tmp.payment_date,
                             declaration_date=tmp.declaration_date,
                             update_ts=current_timestamp
                        from stg.tmp_fct_stock_dividend tmp
                        where fct_stock_dividend.company_id=tmp.company_id and
                        	  fct_stock_dividend.create_date=tmp.create_date and
                        	  (
                        	  	fct_stock_dividend.label_date!=tmp.label_date or
                        	  	fct_stock_dividend.adj_dividend!=tmp.adj_dividend or
                        	  	fct_stock_dividend.dividend!=tmp.dividend or
                        	  	fct_stock_dividend.record_date!=tmp.record_date or
                        	  	fct_stock_dividend.payment_date!=tmp.payment_date or
                        	  	fct_stock_dividend.declaration_date!=tmp.declaration_date
                        	  )"""
            print("Running -", sqls)
            result_update_old = connection.execute(text(sqls))
            
            sqls = """INSERT INTO public.fct_stock_dividend(company_id, create_date, label_date, adj_dividend, dividend, record_date, payment_date, declaration_date, create_ts)
                         select tmp.company_id, tmp.create_date, tmp.label_date, tmp.adj_dividend, tmp.dividend, tmp.record_date, tmp.payment_date, tmp.declaration_date, current_timestamp
                           from stg.tmp_fct_stock_dividend tmp
                      left join public.fct_stock_dividend fsd
                             on (tmp.company_id=fsd.company_id and tmp.create_date=fsd.create_date)
                          where fsd.id is null"""
            print("Running -", sqls)
            result_insert_new = connection.execute(text(sqls))
            
            sqls = """update public.dim_delisted_companies
                         set history_div_fetched_date = current_date
                       where id in (select distinct company_id from stg.tmp_fct_stock_dividend)"""
            print("Running -", sqls)
            result_update_dim = connection.execute(text(sqls))
        
    print("Integrated fct_stock_dividend completed.")
        

In [10]:
# run_delisted_comp_integration()

In [None]:
run_history_dividend_integration()

Running - Fetch delisted company list from database with - select id, symbol from dim_delisted_companies 
                   where history_div_fetched_date is null 
                      or history_div_fetched_date < (current_date - interval '30 day')::date
Running - Get historical dividend data


In [None]:
engine.dispose()