In [73]:
import os
import os.path
import shutil
from datetime import datetime, timedelta, time
from os import path, listdir
from os.path import isfile, join

import finnhub
import pandas as pd
from sqlalchemy import MetaData, create_engine, Table, text
from sqlalchemy.exc import IntegrityError, SQLAlchemyError

In [74]:
BASE_DIR = '../../../../workspace/HelloPython/HistoricalMarketData/ByDateRange'

In [75]:
# print(os.environ.get('SHELL')) # Check shell to set the right profile for keys
api_key = os.environ.get('FINNHUB_API_KEY')
if api_key is None:
    raise ValueError("Finn Hub API key not found. Please set the FINNHUB_API_KEY environment variable.")
finnhub_client = finnhub.Client(api_key=api_key)

In [76]:
try:
    DB = os.environ["DB"]
    DB_USER = os.environ["DB_USER"]
    DB_PWD = os.environ["DB_PWD"]
except KeyError:
    raise Exception("Required environment variables DB_USER and DB_PWD not set")
DB_URL = 'mysql+mysqlconnector://' + DB_USER + ':' + DB_PWD + '@localhost/'+DB
ENGINE = create_engine(DB_URL)

In [77]:
def generate_file_path(symbol, start, end):
    start_date_str = datetime.strftime(start, '%Y%m%d')
    end_date_str = datetime.strftime(end, '%Y%m%d')
    file_name = symbol.lower() + '_' + start_date_str + '_' + end_date_str + '.csv'
    file_path = os.path.join(BASE_DIR, file_name)
    if file_path is None:
        print('Could not find file for symbol:{}'.format(symbol))
    # print(file_path)
    return file_path

In [78]:
def generate_map_for_insert(dfrm, symbol):
    data = list()
    dfrm.sort_index(ascending=True)
    dates = dfrm.index
    clm_names = dfrm.columns

    for date in dates:
        row = dict()  # Dict for row values
        for column in clm_names:
            """
            During testing found that sometimes there can be two lines for the same
            date. Will get TypeError for those days because series can't converted into
            floats
            """
            # print(column)
            # print(date)
            # print(float(dfrm.loc[date, column]))

            row[column] = float(dfrm.loc[date, column])
        row['symbol'] = symbol.upper()  # Needs symbol as pk
        row['date'] = date
        #row['date'] = datetime.strftime(datetime.strptime(date, '%m/%d/%y'), '%Y-%m-%d')  # Needs date as pk
        data.append(row)  # Append row to list
    return data

In [79]:
def insert_symbol_data(symbol, file_path):
    data_inserted = False
    metadata = MetaData()
    # Reflect the table
    table_historic_data = Table("equities_historic_data", metadata, autoload_with=ENGINE)

    try:
        print(f'Retrieving historic data from file path: {file_path}')
        dfrm_new_read_from_csv = pd.read_csv(file_path)
        dfrm_new_read_from_csv['date'] = pd.to_datetime(dfrm_new_read_from_csv['date'])
        dfrm_new_read_from_csv.set_index('date', inplace=True)
        # date_indices_formatted = [datetime.strftime(datetime.strptime(index, '%m/%d/%Y'), '%Y-%m-%d')
        #                           if validate_date_format(str(index)) else index for index in dfrm_new_read_from_csv.index]
        # dfrm_new_read_from_csv.index = date_indices_formatted
        dfrm_new_read_from_csv.sort_index(ascending=True)

        data = generate_map_for_insert(dfrm_new_read_from_csv, symbol)
        if (data is not None) and (len(data) > 0):
            try:
                with ENGINE.connect() as conn:
                    conn.execute(table_historic_data.insert(), data)
                    conn.commit()
                    print('Inserted data for symbol: {}'.format(symbol.upper()))
                    data_inserted = True
            except SQLAlchemyError as e:
                print(f"Error inserting values: {e}")
        else:
            print('No new datasets for symbol: {}'.format(symbol))

    except pd.errors.EmptyDataError as err:
        print(f'Empty records for symbol: {symbol.upper()}. Skipping insert')
    except FileNotFoundError as err:
        print('FileNotFoundError:{}'.format(err))
    except IntegrityError as err:
        print('Caught IntegrityError. A row with primary key already exists for symbol: {}'.format(symbol))
        print('{}'.format(err))
    except IndexError as err:
        print('Caught IndexError (likely no records for insert!) while processing symbol: {}'.format(symbol))
        print('{}'.format(err))
    except UnicodeDecodeError as err:
        print('Caught UnicodeDecodeError while processing file: {}'.format(file_path))
        print('{}'.format(err))

    return data_inserted

In [80]:
def insert_daily_mkt_data():
    file_paths = [file for file in listdir(BASE_DIR) if isfile(join(BASE_DIR, file))]
    for file_path in file_paths:
        symbol = file_path.split('_')[0]
        if symbol is not None and symbol != '':
            data_inserted = insert_symbol_data(symbol, os.path.join(BASE_DIR, file_path))
            if data_inserted:  # Move files to archive folder if DB insert succeeded
                shutil.move(str(os.path.join(BASE_DIR, file_path)), os.path.join(BASE_DIR, 'Archive'))
                data_inserted = False

In [81]:
def retrieve_symbol_data_finnhub(symbol, dt_start, dt_end):
    file_path = generate_file_path(symbol, dt_start, dt_end)
    if (file_path is not None) and (not path.exists(file_path)):
        try:
            print(
                f'Retrieving data for symbol: {symbol} for period {datetime.strftime(dt_start, "%Y-%m-%d")} through {datetime.strftime(dt_end, "%Y-%m-%d")}')
            res = finnhub_client.stock_candles(symbol, 'D', int(dt_start.timestamp()), int(dt_end.timestamp()))
            dfrm_candles = pd.DataFrame.from_dict(res)
            if dfrm_candles is not None:
                dfrm_candles['date'] = [(datetime.strftime(datetime.fromtimestamp(ts) + timedelta(days=1), '%m/%d/%Y'))
                                        for ts in dfrm_candles['t']]
                dfrm_candles.set_index('t', inplace=True, drop=True)
                dfrm_candles.sort_index(inplace=True, ascending=True)
                dfrm_candles.set_index('date', inplace=True)
                dfrm_candles.drop('s', inplace=True, axis=1)
                dfrm_candles.rename(columns={'o': 'open',
                                             'h': 'high',
                                             'l': 'low',
                                             'c': 'close',
                                             'v': 'volume'},
                                    inplace=True
                                    )
                dfrm_candles.to_csv(file_path, sep=',')
                print('Generated records for symbol {} in file: {}'.format(symbol, file_path))
            else:
                print('No new records received for symbol:{}'.format(symbol))
        except Exception as e:
            print(e)
            print('No records received for symbol:{}'.format(symbol))
    else:
        print('Records already retrieved for symbol:{} under path:{}. Skipping'.format(symbol, file_path))
    return file_path

In [82]:
def retrieve_sql_date_for_last_entry(symbol):
    dt_last_entry = None
    with ENGINE.connect() as conn:
        res = conn.execute(text('select * from equities_historic_data where \
                    symbol like \'' + symbol + '\' order by date desc limit 0,1'))
    dfrm_existing = pd.DataFrame(res.mappings().all())
    if dfrm_existing is not None and len(dfrm_existing) > 0:  # Don't do anything if nothing exists for symbol
        dfrm_existing.set_index('date', inplace=True)
        dt_last_entry = dfrm_existing.index[0]
        #strDateLatest = datetime.strftime(dateLastEntry, '%Y-%m-%d') # Increase date by 1 day
    else:
        print(f"No records in DB for '{symbol}'. Downloading data starting from {dt_last_entry}.")
    return dt_last_entry

In [83]:
def retrieve_daily_mkt_data(symbol, dt_start=None, dt_end=None):
    file_paths = list()
    yesterday = (datetime.today() - timedelta(days=1)).date()
    baseline_date = datetime(2020, 1, 1).date()  # For new symbol, use some baseline date beyond which to get all data

    dt_last_catalog_entry = retrieve_sql_date_for_last_entry(symbol)
    print('{}: Date for last cataloged entry - {}'.format(symbol.upper(), dt_last_catalog_entry))
    if dt_last_catalog_entry is None:
        print(f"Using baseline date of {baseline_date} to retrieve data for {symbol.upper()}")
        dt_last_catalog_entry = baseline_date
    elif dt_last_catalog_entry >= yesterday:
        print(f"Data already in catalog through {yesterday} for {symbol.upper()}. Skipping")
        return None

    # dt_last_catalog_entry = datetime.combine(dt_last_catalog_entry, datetime.min.time())
    # dt_start = dt_last_catalog_entry + timedelta(days =1) # We already have data for 'dt_last_catalog_entry'

    # Set start date to last cataloged entry (fallback to baseline). This ensures no gaps in data
    dt_start = dt_last_catalog_entry
    print(f'Start date for retrieval set to: {dt_start}')

    if dt_end is None:
        dt_end = yesterday

    if dt_end > dt_start:  # Last date must be later than start date
        file_path = retrieve_symbol_data_finnhub(symbol, datetime.combine(dt_start, time.min), datetime.combine(dt_end, time.min))
        if file_path is not None:
            file_paths.append(file_path)
    else:
        print('End Date {} same or earlier than Start Date {} for symbol: {}. '
              'Skipping retrieval'.format(dt_end, dt_start, symbol))

    return file_paths  # Preserves option to use list of paths for DB insert

In [84]:
if __name__ == "__main__":
    RETRIEVAL = True  # Controls retrieval from IEX
    INSERTION = False  # Controls insert into local DB
    QUERY = True  # Controls whether data for entire SnP500 or a selected subset needs to be retrieved
    date_start = None
    date_end = None

    if QUERY:
        query = 'select distinct symbol from industrybackground where SnP500 like 1'
        # query = 'select distinct symbol from industrybackground where SnP500 like 1 and symbol not in (select distinct symbol from equities_historic_data where date like "2019-12-20")'

        with ENGINE.connect() as connection:
            result = connection.execute(text(query))
        dfrm_lst_symbols = pd.DataFrame(result.mappings().all())
        tickers = dfrm_lst_symbols['symbol'].tolist()
        # Add some other symbols that I usually track
        additional_symbols = ['TWTR', 'TSLA', 'BIDU', 'BABA', 'ROKU', 'NKLA', 'AMTD', 'TFC', 'BX', 'KKR', 'APO', 'ARES',
                              'CG']
        tickers.extend(additional_symbols)
    else:
        # tickers = ['BAC', 'JPM', 'C', 'MS', 'GS', 'WFC', 'FB', 'MSFT', 'GOOGL', 'NFLX', 'AAPL', 'AMZN', 'TSLA', 'MRK', 'PFE', 'NKE', 'INTC', 'NVDA', 'ADM', 'TSM', 'MU', 'QCOM']
        # tickers.append(['RE', 'ACGL', 'AXS', 'CB', 'THG', 'PGR', 'RNR', 'SIGI', 'TRV', 'WRB'])
        # tickers = ['BAC', 'JPM', 'C', 'MS', 'GS', 'WFC', 'GOOGL', 'SMCI', 'NVDA']
        tickers = ["FSLR", "VRT", "COIN", "MRVL", "CRWD", "AVGO", "DDOG", "SMCI", "GOOGL", "AMZN", "SHAK", "APO", "DJT",
                   "FCX", "LLY", "META", "AVGO"]

        # tickers = ["PFE", "MRK", "JNJ", "REGN", "NVO", "AAPL", "LLY", "AVGO", "FCX", "SBUX", "FSLR", "NKE"]
        tickers = ["INTC"]

    if RETRIEVAL:
        # # To be used if manual retrieval for a particular date range is required for certain symbol(s)
        # date_start = datetime(2000, 8, 1) # Starting date
        # date_start = datetime.combine(date_start, datetime.min.time())
        # date_end = datetime(2025, 12, 31) # End date
        # date_end = datetime.combine(date_end, datetime.min.time())
        # ticker = 'GOOG'
        # tickers = [ticker]
        date_start = datetime(2000, 8, 1)  # Starting date
        for ticker in set(tickers):
            retrieve_daily_mkt_data(ticker, date_start, date_end)
    if INSERTION:
        insert_daily_mkt_data()



UNP: Date for last cataloged entry - 2020-08-13
Start date for retrieval set to: 2020-08-13
Records already retrieved for symbol:UNP under path:../../../../workspace/HelloPython/HistoricalMarketData/ByDateRange/unp_20200813_20250430.csv. Skipping
KSS: Date for last cataloged entry - 2020-08-13
Start date for retrieval set to: 2020-08-13
Records already retrieved for symbol:KSS under path:../../../../workspace/HelloPython/HistoricalMarketData/ByDateRange/kss_20200813_20250430.csv. Skipping
CTSH: Date for last cataloged entry - 2020-08-13
Start date for retrieval set to: 2020-08-13
Records already retrieved for symbol:CTSH under path:../../../../workspace/HelloPython/HistoricalMarketData/ByDateRange/ctsh_20200813_20250430.csv. Skipping
M: Date for last cataloged entry - 2020-08-13
Start date for retrieval set to: 2020-08-13
Records already retrieved for symbol:M under path:../../../../workspace/HelloPython/HistoricalMarketData/ByDateRange/m_20200813_20250430.csv. Skipping
LYV: Date for l