## **multi-thread V2**

"multi-thread v2" is a notebook for drawing in and storing **trading data from Alpaca**

It differs from V1 in that the functionailty is more narrowly defined to aid in performance.

The code leverages the python libraries **"websocket-client"** for streaming data needs, and **"_thread"** to run concurrent functions. 

Core Functions: data_streaming(), data_storage()

In [1]:
import websocket, json
import pandas as pd
import datetime
import time
import os
import _thread

In [None]:
"""
Data Streaming
The piece of the notebook covers the code for streaming data from Alpaca using their API into a shared global data dictionary
"""

#securities = ["T.TSLA"]
def data_stream(securities, global_dict): #websocket alpaca trade data stream
    """
    The websocket listening is based on the tutorial found here:
    https://www.youtube.com/watch?v=fIzm57idu3Y&t=695s
    """

    #opeing function  
    def on_open(ws):
        print("opened")
        #dictionaries with "action" and "data" seem to be the format for the websocket
        #authentication data
        auth_data = {"action": "authenticate",
                     "data":{"key_id":###, 
                               "secret_key":###}}

        #tells the websocket that were a legitimate subscriber to its data
        ws.send(json.dumps(auth_data))
        #tells the websocket what data we are looking for specifically
        listen_message = {"action":"listen", "data":{"streams":securities}} #securities used to be ["T.TSLA"]
        #actions the listening
        ws.send(json.dumps(listen_message))


    def on_message(ws, message):
        #Alert
        print("Received Message".format(datetime.datetime.now()))
        print(message)
        if message["stream"][0] == "T":
            global_dict[message["data"]["i"]]= message["data"] 

    #where should the websocket be listening
    socket = 'wss://data.alpaca.markets/stream'
    #create the websocket app
    ws = websocket.WebSocketApp(socket,
                              on_open=on_open,
                              on_message=on_message)
    #start the application
    ws.run_forever()

In [None]:
#the more i think about it, the more this is just a helper function to exist incase there is a data breakdown
def data_stage(global_dict, directory):
    while True:
        #data comes in faster than a second at a time, so this should be interesting in terms of how many records
        #get collected for each csv
        time.sleep(1)
        #files are named with a timestamp
        timestamp = str(datetime.datetime.now())
        table = pd.DataFrame.from_dict(global_dict, orient="index")
        keys = global_dict.keys() #grabbing a list of the active keys so we can reove them from the dictionary
        #actually remove the listed keys from the dict
        list(map(global_dict.pop, keys))
        #write everything down
        table.to_csv(directory+'/data_staging/data_table_'+timestamp+'.csv', index=False)
        print("Data Staged"+' '+timestamp)

In [None]:
global_dict = {}
directory = directory = os.getcwd()
#grabbed the top thirty securities from: https://www.tradingview.com/markets/stocks-usa/market-movers-active/
#this list tells the websocket which securities' trades to listen for
securities = ['T.SNDL','T.NAKD','T.PLTR','T.NIO','T.FCEL',\
              'T.WORX','T.AAL','T.NAK','T.GE','T.ACB',\
              'T.IDEX','T.WORK','T.CCL','T.AAPL','T.SRNE',\
              'T.NNDM','T.MRNA','T.TSLA','T.APXT','T.ITUB',\
              'T.TTNP','T.RIG','T.RAIL','T.VALE','T.GNUS',\
              'T.TLRY','T.ZOM','T.SPY','T.FSR','T.PFE']

In [None]:
try:
    _thread.start_new_thread(data_stream, (securities, global_dict,))
    _thread.start_new_thread(data_stage, (global_dict, directory, ))
except:
    print('Unable to start threads...')

while True:x
    pass


In [None]:
"""

{'stream': 'authorization',
    'data': {'action': 'authenticate', 'status': 'authorized'}}
 
 
{"stream": "T.ACB",
"data": {"ev": "T",
         "T": "ACB",
         "i": "62879147248441",
         "x": 17,
         "p": 11.16,
         "s": 30,
         "t": 1606758407220000000,
         "c": [37],
         "z": 1}}
"""