## **multi-thread**

"multi-thread" is a notebook for drawing in, storing, and analyzing **trading data from Alpaca** in a parralel fashion. 

The code leverages the python libraries **"websocket-client"** for streaming data needs, and **"_thread"** to run concurrent functions. 

Core Functions: data streaming, raw storage, active table, trade decisioning

Edit: trade decisioning will be focused on capturing the time latency within the application

## **key learnings**
Communication between functions, using **'_thread'**, with the use of lists is extremely limiting. But, EVERYTHING RUNS

Couple that with the need for inifinite loops, and it created situations of conflict between logical if then statements in the code. Need to explore multi-processing, the use of 'threading', and Queue. Which could all solve the issues experience.  

Queue is the best practice for communication between threads, not lists :/. The next iteration start here.


In [1]:
import websocket, json
import pandas as pd
import datetime
import os
import _thread

In [2]:
"""
helper functions
"""
def delete_file(file_name):
    #there are a bunch of these 'try's around, basically, the list system to communicate between threads is just
    #as terrible as it was made out to be :/ time to figure out queue!
    try:
        os.remove(os.getcwd()+'/data_staging/'+file_name)
        print("Deleted "+file_name)
    except: 
        FileNotFoundError
        

In [3]:
"""
Data Streaming
The piece of the notebook covers the code for streaming data from Alpaca using their API into a temporary location.

Currently, only a list of securities is being pased to the function.  But, that could expand to include different directories for the data 
stream, or different authenication details (which are currently hardocded :/)


TODO:
We'll be using lists and queue to communicate between threads.  We need a line or two here to add the new file name to a globally shared list.
"""

#securities = ["T.TSLA"]
def trade_stream(securities, global_list): #websocket alpaca trade data stream
    """
    The websocket listening is based on the tutorial found here:
    https://www.youtube.com/watch?v=fIzm57idu3Y&t=695s
    """

    #opeing function  
    def on_open(ws):
        print("opened")
        #dictionaries with "action" and "data" seem to be the format for the websocket
        #authentication data
        auth_data = {"action": "authenticate",
                     "data":{"key_id":'AKSRDKT347RTSHBQOR3E', 
                               "secret_key":'YaUElSsJlg1cJQusGO0IxxudsqsRmkZQAFJ1WkLU'}}

        #tells the websocket that were a legitimate subscriber to its data
        ws.send(json.dumps(auth_data))
        #tells the websocket what data we are looking for specifically
        listen_message = {"action":"listen", "data":{"streams":securities}} #securities used to be ["T.TSLA"]
        #actions the listening
        ws.send(json.dumps(listen_message))


    def on_message(ws, message):
        #Alert
        print("Received Message".format(datetime.datetime.now()))
        #need a timestamp for the message data
        time = str(datetime.datetime.now())
        #convert the message to dict format
        message_dict = eval(message)
        #save the file down into the staging area
        with open(os.getcwd()+'/data_staging/'+time+'.json', 'w') as json_file:
            json.dump(message_dict, json_file)
        print(message)

        global_list.append(time+'.json')

    #where should the websocket be listening
    socket = 'wss://data.alpaca.markets/stream'
    #create the websocket app
    ws = websocket.WebSocketApp(socket,
                              on_open=on_open,
                              on_message=on_message)
    #start the application
    ws.run_forever()

In [4]:
"""
Raw Storage
This piece of code listens to the shared list of incoming websocket message file names.  It then reads and writes those files into a "cold 
storage" location and appends (deduplicates and blends) the data for an analysis table.
"""
def store_data(global_list, raw_list, active_list):
  
    def read_file(file_name):
        #reading needs to be wrapped with 'try' because the while loops re-reference files global_list[0] is static
        #because its static, it can satisfy the requirements to be read while also having already been deleted
        #but, the requirement to be deleted is such that this bug does not create data loss
        try:
            #standard json read
            with open(os.getcwd()+'/data_staging/'+file_name) as f:
                data = json.load(f)
        except:
            FileNotFoundError
        return data

    def write_file(file_name, data):
        #standard json write
        with open(os.getcwd()+'/cold_storage/'+file_name, 'w') as json_file:
            json.dump(data, json_file)

    def core_table(data): 
        #check new data against existing analysis table and remove duplicates
        #read in existing data
        table = pd.read_csv(os.getcwd()+"/data.csv")
        #define the new records
        new_records = pd.DataFrame.from_dict(data['data'])
        #append the new records
        table = table.append(new_records)
        #drop the duplciate trades
        table = table.drop_duplicates(subset=['i'])
        #write the file down back into the analysis table
        table.to_csv(os.getcwd()+"/data.csv", index=False)
        print("Table Updated")
    
    #start an infinte loop to house the calling of the above functions
    while True:
        #check the global_list, it houses files in staging
        if len(global_list) > 0:
            #read in the file & write it down to storage
            data = read_file(global_list[0])
            trade_decision('Storage', data)
            write_file(global_list[0], data)
            #if the data meets requirements, below, which mean the data is a Trade
            if ((data['stream'] != "authorization") and (data['stream'] != "listening")) :
                core_table(data)
        #create a file_name variable, it would've been better higher in the code but here is fine
        file_name = global_list[0]
        #check if the file name is in a list which indicates that it has been checked by the other function
        if file_name in active_list:
            #similar to the issues with read, deletion can sometimes be called in error
            try:
                delete_file(file_name)
                trade_decision('Storage Deleted', data)
            except:
                FileNotFoundError
            #remove the file from the other lists
            active_list.remove(file_name)
            global_list.remove(file_name)
            print("Deleted file from Storage function")
        #mark that the file has been processed by this function
        raw_list.append(file_name)



In [5]:
"""
Active Table
This functions acts to check the newest arriving data against the most current closing price data by symbol
"""
def active_data(global_list, raw_list, active_list):
  
    def read_file(file_name):
        #standard json read
        with open(os.getcwd()+'/data_staging/'+file_name) as f:
            data = json.load(f)
        return data
    #this checks the data in staging against the active data table
    def cross_check(data):
        #read the active table
        active_data = pd.read_csv(os.getcwd()+'/active_data.csv')
        #make sure the data isn't one of the websocket messages that isn't a trade
        if ((data['stream'] != "authorization") and (data['stream'] != "listening")) :
            #make sure the data has a stock symbol that is within the active table
            if data['data']['T'] in active_data['T'].unique():
                
                #conditionally check that where the active table's stock symbol matches the data...
                #that the price and time need to be updated is the timestamp is later in the data than the table
                #this effectively creates a most recent price for the symbol
                if int(active_data.loc[active_data['T'] == data['data']['T']].reset_index(drop=True)['t'][0]) <  int(data['data']['t']):
                    active_data['p'].loc[active_data['T'] == data['data']['T']] = data['data']['p']
                    active_data['t'].loc[active_data['T'] == data['data']['T']] = data['data']['t']
                    print("Active Data Update")
                active_data.to_csv(os.getcwd()+"/active_data.csv", index=False)
                

    while True:
        #check the global_list, it houses files in staging
        if len(global_list) > 0:
            #reading needs to be wrapped with 'try' because the while loops re-reference files global_list[0] is static
            #because its static, it can satisfy the requirements to be read while also having already been deleted
            #but, the requirement to be deleted is such that this bug does not create data loss
            #this strategy is different than storage, where the try was in the read function
            #i think this is better, as there might be other functions that are impacted, and this would 
            #solve as a universal error handler
            try:
                #read in the file and cross_check the data against the active table
                data = read_file(global_list[0])
                cross_check(data)
                trade_decision('Active', data)
                #create a file_name variable, it would've been better higher in the code but here is fine
                file_name = global_list[0]
                if file_name in raw_list:
                    #similar to the issues with read, deletion can sometimes be called in error
                    try:
                        delete_file(file_name)
                        trade_decision('Active Deleted', data)
                    except:
                        FileNotFoundError
                    #remove the file from the other lists
                    raw_list.remove(file_name)
                    global_list.remove(file_name)
                    print("Deleted file from Active function")
                #mark that the file has been processed by this function
                active_list.append(file_name)
            except:
                FileNotFoundError


In [6]:
"""
Trade Decisioning

In the next iteration this will be explored more heavily. :)

Here, this is merely a helper function to write down useful snippets of information that are timestamped throughout
the other functions.  This will allow for diagnostics down the road without any threat to the underlying logs for 
each action/milestone within the code itself

"""
def trade_decision(function_name, data):
    if ((data['stream'] != "authorization") and (data['stream'] != "listening")) :
        #create miniature dictionary of critial data
        info_dict = {'function':function_name, #this allows for a lot of customization
                        'T':data['data']['T'], #the stock symbol
                        'i':data['data']['i'], #the trade id so we can trace a single trade through the system
                        't':data['data']['t'], # the trades timestamp, so we can compare latency from start to any point
                        'tT':str(datetime.datetime.now())} #the timestamp of the function being called

        #standard json write
        with open(os.getcwd()+'/trade_latency/'+str(data['data']['i'])+'_'+function_name+'.json', 'w') as json_file:
            json.dump(info_dict, json_file)

In [7]:
#create two empty lists that communicate across the two data blending functions... active data and storage
raw_list = []
active_list = []
#load the global list with any existing files currently in staging
global_list = os.listdir(os.getcwd()+'/data_staging/')
#grabbed the top thirty securities from: https://www.tradingview.com/markets/stocks-usa/market-movers-active/
#this list tells the websocket which securities' trades to listen for
securities = ['T.SNDL','T.NAKD','T.PLTR','T.NIO','T.FCEL',\
              'T.WORX','T.AAL','T.NAK','T.GE','T.ACB',\
              'T.IDEX','T.WORK','T.CCL','T.AAPL','T.SRNE',\
              'T.NNDM','T.MRNA','T.TSLA','T.APXT','T.ITUB',\
              'T.TTNP','T.RIG','T.RAIL','T.VALE','T.GNUS',\
              'T.TLRY','T.ZOM','T.SPY','T.FSR','T.PFE']



In [None]:
try:
    _thread.start_new_thread(trade_stream, (securities, global_list,))
    _thread.start_new_thread(active_data, (global_list, raw_list, active_list, ))
    _thread.start_new_thread(store_data,  (global_list, raw_list, active_list, ))
except:
    print('Unable to start threads...')

while True:
    pass
