In [1]:
# Fetch stock data for current trading day (i.e. April 1st, 2022 for simulation purposes)

import mysql.connector

stocks = [
            "HEROMOTOCO",
            "TECHM",
            "POWERGRID",
            "ITC",
            "ONGC",
            "SBIN",
            "BRITANNIA",
            "COALINDIA",
            "NTPC",
            "SUNPHARMA",
            "TATASTEEL",
            "ADANIPORTS",
            "M&M",
            "HDFCLIFE",
            "RELIANCE",
            "BAJAJ-AUTO",
            "HINDUNILVR",
            "BPCL",
            "LT",
            "DRREDDY",
            "ICICIBANK",
            "KOTAKBANK",
            "BHARTIARTL",
            "CIPLA",
            "SBILIFE",
            "ASIANPAINT",
            "TATACONSUM",
            "MARUTI",
            "HCLTECH",
            "INDUSINDBK",
            "TCS",
            "TITAN",
            "JSWSTEEL",
            "HDFCBANK",
            "GRASIM",
            "INFY",
            "ULTRACEMCO",
            "HDFC",
            "EICHERMOT",
            "APOLLOHOSP",
            "WIPRO",
            "BAJAJFINSV",
            "NESTLEIND",
            "HINDALCO",
            "TATAMOTORS",
            "AXISBANK",
            "UPL",
            "SHREECEM",
            "DIVISLAB",
            "BAJFINANCE"
         ]

result = []
mydb = mysql.connector.connect(host="www.septoid.com", user="ee451", password="test", database="market_data", charset="utf8")
mycursor = mydb.cursor(dictionary=True)
for stock in stocks:
    mycursor.execute("SELECT timestamp, ltp, CAST(volume as CHAR) as volume, CAST(bidqty as CHAR) as bidqty, bid, ask, CAST(askqty as CHAR) as askqty FROM `" + stock + "_tick` WHERE timestamp BETWEEN '2022-04-01' AND '2022-04-02' ORDER BY timestamp ASC")
    result.append(mycursor.fetchall())
mydb.close()

In [2]:
# Organize data for each stock for each 14 and 30 second window. There are exactly 22,500 windows between 9:15am and 3:30pm (i.e. 375 minutes x 60 seconds/minute).

import datetime

sma_14 = []
sma_30 = []
for stock in result:
    array1 = []
    array2 = []
    time = datetime.datetime(2022, 4, 1, 9, 15, 0)
    start_index = 0
    while (time < datetime.datetime(2022, 4, 1, 15, 30, 0)):
        temp1 = []
        temp2 = []
        flag = 0
        for i in range(start_index, len(stock)):
            if (datetime.timedelta(seconds=0) <= (time - stock[i]["timestamp"]) < datetime.timedelta(seconds=30)):
                if ((time - stock[i]["timestamp"]) < datetime.timedelta(seconds=14)):
                    temp1.append(stock[i])
                temp2.append(stock[i])
                if (flag == 0):
                    start_index = i
                    flag = 1
            elif (stock[i]["timestamp"] > time):
                break
        array1.append(temp1)
        array2.append(temp2)
        time = time + datetime.timedelta(seconds=1)
    sma_14.append(array1)
    sma_30.append(array2)

In [3]:
# Function to calculate sma for 14 or 30 second window (depending on input)

def sma(data):
    if (len(data) > 0):
        sum = 0
        for i in data:
            sum = sum + i["ltp"]
        return sum / len(data)
    else:
        return None

In [27]:
# Serial + Serial

start_time = datetime.datetime.now()
time = datetime.datetime(2022, 4, 1, 9, 15, 0)
index = 0
while (time < datetime.datetime(2022, 4, 1, 15, 30, 0)):
    for i in range(0, len(sma_14)):
        sma(sma_14[i][index])
        sma(sma_30[i][index])
    index = index + 1
    time = time + datetime.timedelta(seconds=1)
end_time = datetime.datetime.now()
print("Average Serial + Serial Time: ", (end_time - start_time) / 22500)

Average Serial + Serial Time:  0:00:00.000580


In [6]:
# Serial + Parallel (2 threads)

import threading

start_time = datetime.datetime.now()
time = datetime.datetime(2022, 4, 1, 9, 15, 0)
index = 0
while (time < datetime.datetime(2022, 4, 1, 15, 30, 0)):
    for i in range(0, len(sma_14)):
        thread1 = threading.Thread(target=sma, args=(sma_14[i][index], ))
        thread2 = threading.Thread(target=sma, args=(sma_30[i][index], ))
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
    index = index + 1
    time = time + datetime.timedelta(seconds=1)
end_time = datetime.datetime.now()
print("Average Serial + Parallel (2 threads) Time: ", (end_time - start_time) / 22500)

Average Serial + Parallel (2 threads) Time:  0:00:00.027385


In [5]:
# Serial + Parallel (100 threads)

start_time = datetime.datetime.now()
time = datetime.datetime(2022, 4, 1, 9, 15, 0)
index = 0
while (time < datetime.datetime(2022, 4, 1, 15, 30, 0)):
    threads = []
    for i in range(0, len(sma_14)):
        threads.append(threading.Thread(target=sma, args=(sma_14[i][index], )))
        threads.append(threading.Thread(target=sma, args=(sma_30[i][index], )))
    for i in threads:
        i.start()
    for i in threads:
        i.join()
    index = index + 1
    time = time + datetime.timedelta(seconds=1)
end_time = datetime.datetime.now()
print("Average Serial + Parallel (100 threads) Time: ", (end_time - start_time) / 22500)

Average Serial + Parallel (100 threads) Time:  0:00:00.025730


In [4]:
# Function to call per process

def process(sma_14, sma_30):
    time = datetime.datetime(2022, 4, 1, 9, 15, 0)
    index = 0
    while (time < datetime.datetime(2022, 4, 1, 15, 30, 0)):
        for i in range(0, len(sma_14)):
            sma(sma_14[i][index])
            sma(sma_30[i][index])
        index = index + 1
        time = time + datetime.timedelta(seconds=1)

In [6]:
# Parallel (4 processes) + Serial [Round-robin Approach]

import multiprocessing

array1 = []
array2 = []
array3 = []
array4 = []
array5 = []
array6 = []
array7 = []
array8 = []
# Use cyclic-distribution to schedule tasks arbitrarily
for i in range(0, 12):
    array1.append(sma_14[i * 4])
    array2.append(sma_30[i * 4])
    array3.append(sma_14[(i * 4) + 1])
    array4.append(sma_30[(i * 4) + 1])
    array5.append(sma_14[(i * 4) + 2])
    array6.append(sma_30[(i * 4) + 2])
    array7.append(sma_14[(i * 4) + 3])
    array8.append(sma_30[(i * 4) + 3])
array1.append(sma_14[48])
array2.append(sma_30[48])
array3.append(sma_14[49])
array4.append(sma_30[49])

start_time = datetime.datetime.now()
p1 = multiprocessing.Process(target=process, args=(array1, array2, ))
p2 = multiprocessing.Process(target=process, args=(array3, array4, ))
p3 = multiprocessing.Process(target=process, args=(array5, array6, ))
p4 = multiprocessing.Process(target=process, args=(array7, array8, ))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
end_time = datetime.datetime.now()
print("Average Parallel + Serial [Round-robin Approach] Time: ", (end_time - start_time) / 22500)

Average Parallel + Serial [Round-robin Approach] Time:  0:00:00.000145


In [8]:
# Fetch stock data for previous trading day (i.e. March 31st, 2022 for simulation purposes)

result2 = []
mydb = mysql.connector.connect(host="www.septoid.com", user="ee451", password="test", database="market_data", charset="utf8")
mycursor = mydb.cursor(dictionary=True)
for stock in stocks:
    mycursor.execute("SELECT timestamp, ltp, CAST(volume as CHAR) as volume, CAST(bidqty as CHAR) as bidqty, bid, ask, CAST(askqty as CHAR) as askqty FROM `" + stock + "_tick` WHERE timestamp BETWEEN '2022-03-31' AND '2022-04-01' ORDER BY timestamp ASC")
    result2.append(mycursor.fetchall())
mydb.close()

In [16]:
# Parallel (4 processes) + Serial [Combinatorial Approach]

# Get list of stocks sorted based on liquidity from previous trading day
order = []
for i in range(0, len(result2)):
    order.append(tuple((len(result2[i]), i)))
order = sorted(order)

array1 = []
array2 = []
array3 = []
array4 = []
array5 = []
array6 = []
array7 = []
array8 = []
# Use multiway number partitioning to schedule tasks based on historical trends
for i in range(0, 6):
    array1.append(sma_14[order[i * 4][1]])
    array2.append(sma_30[order[i * 4][1]])
    array1.append(sma_14[order[49 - (i * 4)][1]])
    array2.append(sma_30[order[49 - (i * 4)][1]])
    array3.append(sma_14[order[(i * 4) + 1][1]])
    array4.append(sma_30[order[(i * 4) + 1][1]])
    array3.append(sma_14[order[49 - ((i * 4) + 1)][1]])
    array4.append(sma_30[order[49 - ((i * 4) + 1)][1]])
    array5.append(sma_14[order[(i * 4) + 2][1]])
    array6.append(sma_30[order[(i * 4) + 2][1]])
    array5.append(sma_14[order[49 - ((i * 4) + 2)][1]])
    array6.append(sma_30[order[49 - ((i * 4) + 2)][1]])
    array7.append(sma_14[order[(i * 4) + 3][1]])
    array8.append(sma_30[order[(i * 4) + 3][1]])
    array7.append(sma_14[order[49 - ((i * 4) + 3)][1]])
    array8.append(sma_30[order[49 - ((i * 4) + 3)][1]])
array1.append(sma_14[order[24][1]])
array2.append(sma_30[order[24][1]])
array3.append(sma_14[order[25][1]])
array4.append(sma_30[order[25][1]])

start_time = datetime.datetime.now()
p1 = multiprocessing.Process(target=process, args=(array1, array2,))
p2 = multiprocessing.Process(target=process, args=(array3, array4,))
p3 = multiprocessing.Process(target=process, args=(array5, array6,))
p4 = multiprocessing.Process(target=process, args=(array7, array8,))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
end_time = datetime.datetime.now()
print("Average Parallel + Serial [Combinatorial Approach] Time: ", (end_time - start_time) / 22500)

Average Parallel + Serial [Combinatorial Approach] Time:  0:00:00.000124
