In [1]:
import numpy as np 
import pandas as pd
import json 
from typing import List
from datetime import datetime


import snowflake
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import sproc, col, lit, count

In [2]:
#Build session object: 
accountname = 'xe85544.east-us-2.azure'
username = 'kx'
password = 'Snowflake1'

connection_parameters = {
    "account": accountname,
    "user": username,
    "password": password,
    "role": "ACCOUNTADMIN",
    "database": "KX",
    "schema": "BRUNO",
    "warehouse": "KX"
}

session = Session.builder.configs(connection_parameters).create()

In [3]:
trades_df = session.table('trades')
quotes_df = session.table('quotes')

trades_agg = trades_df.select(col('"Symbol"')).group_by('"Symbol"')\
            .agg(count('"Symbol"').alias('Record_Count')).to_pandas()

quotes_agg = quotes_df.select(col('"Symbol"')).group_by('"Symbol"')\
            .agg(count('"Symbol"').alias('Record_Count')).to_pandas()

records_df = trades_agg.merge(quotes_agg, how = 'inner', left_on = 'Symbol', right_on = 'Symbol')

records_df['SUM'] = records_df['RECORD_COUNT_x'] + records_df['RECORD_COUNT_y']
records_df.sort_values(by='SUM', inplace=True)
records_df.head(5)

Unnamed: 0,Symbol,RECORD_COUNT_x,RECORD_COUNT_y,SUM
1448,CHNG,1,2,3
7221,MTEST,2,6,8
1506,RSXJ,7,6,13
860,RSX,8,6,14
7600,PTEST,7,10,17


### Creating the Ticker List: 

In this section, we will output a list of lists that contain tickers that will be fed into a Sproc. 

The user input will be the number of tickers that they want in a bucket - if n = 1, only one ticker will be used, n = len(df), will be equivalent to the current solution that is IO bound. 

In [4]:
def group_stocks(df, ROW_MAX):
    groups = []
    group_total = 0
    group_symbols = set()
    for i, row in df.iterrows():
        symbol, value = row['Symbol'], row['SUM']
        if group_total + value <= ROW_MAX:
            group_total += value
            group_symbols.add(symbol)
        else:
            groups.append(list(group_symbols))
            group_total = value
            group_symbols = {symbol}
    groups.append(list(group_symbols))
    return groups

In [5]:
ticker_buckets = group_stocks(records_df, 10_000_000)

In [6]:
len(ticker_buckets)

173

### Create and run Sproc using multi-cluster warehouses: 

Steps: 
1. create a list of list of tickers to pass into a sproc
2. create an SPROC that has an input list as an argument, that contains the respective tickers to filter by, and calls the main logic

In [7]:
license = 'Y8NTo4WkoyVFpNkz0oikQnrkIJbISSogiCwXxiUY/idrTFbSW9NHlidzlg1GDZYsoyeWTCzSvfkOliez/uOGvRem6fmbR9JkDEaTVpZjvVs7NltmlqSmGFvtNg2WNsuINhgs2UcXR/5Gh5vLLKMH/Nnx/DOIMyayRmMGJvEGU4i5hrJG'

In [8]:
def business_logic(t, q):
    from datetime import timedelta

    q['mid'] = (q['ask_price'] + q['bid_price']) / 2
    qq = q[['symbol', 'time', 'mid']]

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(seconds=1)
    t['tp1s'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(seconds=5)
    t['tp5s'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(seconds=10)
    t['tp10s'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(minutes=1)
    t['tp1m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(minutes=5)
    t['tp5m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(minutes=10)
    t['tp10m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] += timedelta(minutes=30)
    t['tp30m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']


    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(seconds=1)
    t['tm1s'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(seconds=5)
    t['tm5s'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(seconds=10)
    t['tm10s'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(minutes=1)
    t['tm1m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(minutes=5)
    t['tm5m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(minutes=10)
    t['tm10m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    tt = t[['symbol', 'time']].copy()
    tt['time'] -= timedelta(minutes=30)
    t['tm30m'] = pd.merge_asof(tt, qq, on='time', by='symbol')['mid']

    t['time'] = t['time'].astype('uint64')

    return t

In [9]:
%%time

@sproc(packages=['snowflake-snowpark-python', 'numpy', 'pandas', 'pyarrow'],
       name='harsh_test_sproc2', 
       is_permanent = True, 
       stage_location= '@KX', 
       replace = True)
def harsh_test_sproc2(session: Session, tickers: List[str]) -> None:
    """Takes in a list of tickers that we will be filtering on"""
 
    select_columns = ['"TTime"', '"Symbol"', '"Trade Volume"', '"Trade Price"']
    trades = session.table('trades').select(select_columns)\
                    .filter(col('"Symbol"').isin(tickers))\
                    .order_by('"TTime"').to_pandas()                 # 1
    trades.columns = ['time', 'symbol', 'volume', 'price']

    select_columns = ['"TTime"', '"Symbol"', '"Bid_Size"', '"Bid_Price"', '"Offer_Size"', '"Offer_Price"']
    quotes = session.table('quotes').select(select_columns)\
                    .filter(col('"Symbol"').isin(tickers))\
                    .order_by('"TTime"').to_pandas()                 # 2
    quotes.columns = ['time', 'symbol', 'ask_volume', 'ask_price', 'bid_volume', 'bid_price']
    
    output = business_logic(trades, quotes)

    session.write_pandas(output, 'HARSH_OUTPUT', overwrite=False, auto_create_table=True, chunk_size=1000000, compression='snappy')        # 4
    
    return None

CPU times: user 63.8 ms, sys: 14.6 ms, total: 78.3 ms
Wall time: 37.3 s


In [10]:
import time
from joblib import Parallel, delayed
start = datetime.now()

#instantiate queries
query_ids = []

def execute_sproc(job):
    
    conn = snowflake.connector.connect(
        account="xe85544.east-us-2.azure",
        user="kx",
        password = 'Snowflake1',
        role="ACCOUNTADMIN",  # optional
        warehouse="HARSH_TEST2",  # medium snowpark-optimized
        database="KX",
        schema="BRUNO",
    )
    cur = conn.cursor()
    cur.execute_async(f'CALL HARSH_TEST_SPROC2({job})')
    return cur.sfqid

results = Parallel(n_jobs=-1)(delayed(execute_sproc)(job) for job in ticker_buckets)

query_ids.extend(results)

end = datetime.now()
print(end-start)

0:00:11.900718


In [11]:
conn = snowflake.connector.connect(
    account="xe85544.east-us-2.azure",
    user="kx",
    password = 'Snowflake1',
    role="ACCOUNTADMIN",  # optional
    warehouse="HARSH_TEST2",  # medium snowpark-optimized
    database="KX",
    schema="BRUNO",
)
cur = conn.cursor()


t = cur.execute(f'''
SELECT
    MAX(END_TIME) AS end_time
FROM
    SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE QUERY_ID in {tuple(query_ids)}
''').fetch_pandas_all()

In [12]:
t['END_TIME'].values[0] - pd.Timestamp(start) - pd.Timedelta(hours=5) #timezone adjust

NaT

In [13]:
query_ids

['01ac1011-0a04-7c85-0003-0de2003a8c96',
 '01ac1011-0a04-7cd3-0003-0de2003aa0ba',
 '01ac1011-0a04-7c85-0003-0de2003a8c9a',
 '01ac1011-0a04-7cd3-0003-0de2003aa0ae',
 '01ac1011-0a04-7c85-0003-0de2003a8ca2',
 '01ac1011-0a04-7c85-0003-0de2003a8c9e',
 '01ac1011-0a04-7cd3-0003-0de2003aa0b6',
 '01ac1011-0a04-7cd3-0003-0de2003aa0b2',
 '01ac1011-0a04-7cd3-0003-0de2003aa0be',
 '01ac1011-0a04-7cd3-0003-0de2003aa0c2',
 '01ac1011-0a04-7cd3-0003-0de2003aa0c6',
 '01ac1011-0a04-7cd3-0003-0de2003aa0ce',
 '01ac1011-0a04-7cd3-0003-0de2003aa0ca',
 '01ac1011-0a04-7c85-0003-0de2003a8ca6',
 '01ac1011-0a04-7cd3-0003-0de2003aa0d2',
 '01ac1011-0a04-7cd3-0003-0de2003aa0d6',
 '01ac1011-0a04-7c85-0003-0de2003a8caa',
 '01ac1011-0a04-7cd3-0003-0de2003aa0da',
 '01ac1011-0a04-7c85-0003-0de2003a8cae',
 '01ac1011-0a04-7cd3-0003-0de2003aa0de',
 '01ac1011-0a04-7cd3-0003-0de2003aa0e2',
 '01ac1011-0a04-7cd3-0003-0de2003aa0e6',
 '01ac1011-0a04-7c85-0003-0de2003a8cb6',
 '01ac1011-0a04-7cd3-0003-0de2003aa0ea',
 '01ac1011-0a04-