In [217]:
#REF: https://alpaca.markets/docs/python-sdk/market_data.html#market-data
#!pip3 install alpaca-py

In [3]:
from alpaca.data import StockHistoricalDataClient
from alpaca.data.requests import StockLatestQuoteRequest
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
from datetime import datetime
from datetime import date
from datetime import time
from datetime import timedelta
from termcolor import colored
from psycopg.rows import dict_row
from psycopg.types.json import Json
import alpaca.trading
import alpaca
import pytz
import config
import psycopg
import threading
import random

In [None]:
#Init
alpaca_key = 'PKM3CR0WHR7P6VWAKPE3'
alpaca_secret = 'AAOm7WiKTZuIUODJZh3GPX4IctCeWcQOjc4NublA'
selected_exchanges = ['AssetExchange.NASDAQ', 'AssetExchange.NYSE']
newYorkTz = pytz.timezone("America/New_York") 
#stock_client = StockHistoricalDataClient(alpaca_key, alpaca_secret)

In [4]:
def pgDictToConn(secretDict):
    pgStrs = []
    for key in secretDict:
        pgStrs.append('{}={}'.format(key, secretDict[key]))
    return ' '.join(pgStrs)
pgConnStr = pgDictToConn(config.pgSecrets)

In [19]:
def getAllActiveSymbols(selected_exchanges):
    #AssetExchange.NASDAQ and AssetExchange.NYSE
    trade_client = alpaca.trading.TradingClient(alpaca_key, alpaca_secret)
    assets = trade_client.get_all_assets()
    symbols = [asset.symbol for asset in assets 
           if str(asset.exchange) in selected_exchanges and str(asset.status) != 'AssetStatus.INACTIVE']
    return symbols

In [20]:
def getMinuteDataForStock(symbol, date_from, date_to):
    data = []
    current_date = date_from
    while current_date <= date_to:
        open_time = datetime.combine(current_date, time(hour=9, minute=30, tzinfo=newYorkTz))
        close_time = datetime.combine(current_date, time(hour=16,minute=0,tzinfo=newYorkTz))
        request_params = StockBarsRequest(symbol_or_symbols=[symbol], 
                                      timeframe=TimeFrame.Minute, 
                                      start=open_time,
                                      end=close_time)
        stock_client = StockHistoricalDataClient(alpaca_key, alpaca_secret)
        result = stock_client.get_stock_bars(request_params)
        if symbol in result.data:
            entries_list = []
            for entry in result.data[symbol]:
                entry_dict = entry.__dict__
                entry_dict['date'] = current_date
                entries_list.append(entry_dict)
            data = data + entries_list
        current_date = current_date + timedelta(1)
    return data 

In [21]:
def threadedGetMinuteDataForStock(symbol, date_from, date_to, result = [], job_id=1, thread_size=50):
    num_days =  (date_to - date_from).days + 1
    if num_days <= thread_size:
        success = False
        while not success:
            try:
                result.extend(getMinuteDataForStock(symbol, date_from, date_to))
                print(colored('Completed for job id {} for {}'.format(job_id, symbol), 'red'))
                success = True
            except:
                time.sleep(random.randint(1, 5))
            
    else:
        split_point = int(num_days/2) - 1
        date_from1 = date_from
        date_to1 = date_from1 +  timedelta(split_point)
        date_from2 = date_to1 + timedelta(1)
        date_to2 = date_to
        result1 = []
        result2 = []
        t1 = threading.Thread(target=threadedGetMinuteDataForStock, name='t{}'.format(job_id*2), 
                              args=(symbol, date_from1, date_to1, result1, job_id*2))
        t2 = threading.Thread(target=threadedGetMinuteDataForStock, name='t{}'.format(job_id*2+1), 
                              args=(symbol, date_from2, date_to2, result2, job_id*2+1))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        result.extend(result1)
        result.extend(result2)
        print(colored('Completed for job id {} for {}'.format(job_id, symbol), 'red'))

In [405]:
def updateStockEntriesToDB(entries):
    with psycopg.connect(pgConnStr) as conn:
        with conn.cursor() as cur:
            stmt = '''INSERT INTO alpaca_minute (symbol, date, datetime, 
                open, close, high, low, trade_count, vol, vwap)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT(symbol, datetime) DO NOTHING'''
            for entry in entries: 
                data = (entry['symbol'], entry['date'], entry['timestamp'], 
                        entry['open'], entry['close'], entry['high'], entry['low'],
                        entry['trade_count'], entry['volume'], entry['vwap'])
                cur.execute(stmt, data, prepare=True)
            conn.commit()

In [13]:
#https://www.psycopg.org/psycopg3/docs/basic/copy.html
#https://stackoverflow.com/questions/48019381/how-postgresql-copy-to-stdin-with-csv-do-on-conflic-do-update
def updateSingleStockEntriesToDB(symbol, entries):
    with psycopg.connect(pgConnStr) as conn:
        with conn.cursor() as cur:
            tmp_table = 'tmp_{}'.format(symbol.replace('.','_'))
            stmt = '''CREATE TEMP TABLE {} 
                (LIKE alpaca_minute INCLUDING DEFAULTS)
                ON COMMIT DROP'''.format(tmp_table)
            cur.execute(stmt)
                
            with cur.copy('''COPY {} (symbol, date, datetime, 
                open, close, high, low, trade_count, vol, vwap)
                FROM STDIN'''.format(tmp_table)) as copy:
                for entry in entries: 
                    entry_tuple = (entry['symbol'], entry['date'], entry['timestamp'], 
                            entry['open'], entry['close'], entry['high'], entry['low'],
                            entry['trade_count'], entry['volume'], entry['vwap'])
                    copy.write_row(entry_tuple)
                    
            stmt = '''INSERT INTO alpaca_minute (symbol, date, datetime, 
                open, close, high, low, trade_count, vol, vwap)
                SELECT symbol, date, datetime, 
                open, close, high, low, trade_count, vol, vwap
                FROM {}
                ON CONFLICT(symbol, datetime) DO NOTHING'''.format(tmp_table)
            cur.execute(stmt)
            conn.commit()    

In [None]:
# Test Script
# --------------
# to_date = date(year=2023, month=3, day=13)
# from_date = to_date + timedelta (-179)
# AAPL_data = []
# threadedGetMinuteDataForStock('AAPL', from_date, to_date, AAPL_data)
# print(len(AAPL_data))
# updateSingleStockEntriesToDB('AAPL', AAPL_data)

In [16]:
symbols_done = ['AAPL', 'CVV', 'DTOCU', 'GMFI', 'GMFIU', 'GMGI', 'GMRE', 'GMVD', 'GNE', 'GNE.PRA', 'GNFT', 'GNLN', 'GNL.PRA', 'GNL.PRB', 'GNPX', 'GNRC', 'GNSS', 'GNT', 'GNTA', 'GNT.PRA', 'GNTX', 'GNTY', 'GNUS', 'GNW', 'GPACU', 'GPK', 'GPMT', 'GPRO', 'GPS', 'GREE', 'GREEL', 'GRNQ', 'GRNR', 'GRNT', 'GRPH', 'GRPN', 'GRTX', 'GSBC', 'GSBD', 'GSD', 'GSDWU', 'GSHD', 'GSIT', 'GSK', 'GSM', 'GSMG', 'GS.PRC', 'GS.PRD', 'GS.PRJ', 'GS.PRK', 'GSQB', 'GSQB.U', 'GSRM', 'GSRMR', 'GSRMU', 'GSUN', 'GT', 'GTAC', 'GTACU', 'GTBP', 'GTEC', 'GTES', 'GTH', 'GTLB', 'GTLS', 'GTLS.PRB', 'GTN', 'GTN.A', 'GTR', 'GTX', 'GTXAP', 'GURE', 'GUT', 'GUT.PRC', 'GVCI', 'GVCIU', 'NGS', 'NGVC', 'NGVT', 'NH', 'NICE', 'REFR', 'REG', 'RELX', 'SF', 'SF.PRB', 'SF.PRC', 'SF.PRD', 'STKS']

In [22]:
symbols = getAllActiveSymbols(selected_exchanges)

In [None]:
to_date = date(year=2023, month=3, day=13)
from_date = to_date + timedelta (-179)
for symbol in symbols:
    if symbol not in symbols_done:
        data = []
        threadedGetMinuteDataForStock(symbol, from_date, to_date, data)
        updateSingleStockEntriesToDB(symbol, data)
        symbols_done.append(symbol)
        print('Inserted {} entries for {}'.format(len(data), symbol))
        

Completed for job id 5 for QSI
Completed for job id 7 for QSI
Completed for job id 4 for QSI
Completed for job id 2 for QSI
Completed for job id 6 for QSI
Completed for job id 3 for QSI
Completed for job id 1 for QSI
Inserted 33751 entries for QSI
Completed for job id 4 for QSR
Completed for job id 6 for QSR
Completed for job id 7 for QSR
Completed for job id 3 for QSR
Completed for job id 5 for QSR
Completed for job id 2 for QSR
Completed for job id 1 for QSR
Inserted 44581 entries for QSR
Completed for job id 6 for WMC
Completed for job id 4 for WMC
Completed for job id 7 for WMC
Completed for job id 3 for WMC
Completed for job id 5 for WMC
Completed for job id 2 for WMC
Completed for job id 1 for WMC
Inserted 5181 entries for WMC
Completed for job id 4 for WMG
Completed for job id 5 for WMG
Completed for job id 2 for WMG
Completed for job id 7 for WMG
Completed for job id 6 for WMG
Completed for job id 3 for WMG
Completed for job id 1 for WMG
Inserted 42736 entries for WMG
Completed