# Name: Robert
# Student Number: K23050945

## Comparing SQL and NoSQL For HFT Data
SQL and NoSQL are two different methods of implementing databases that are used to store datasets based on the structure of such data. SQL (such as MySQL) is a relational database that assures Atomicity, Consistency, Isolation, and Durability (ACID) principles, which are principles that enforce the reliability of data. A great use case of this would be on banking transactions, where strictness and protocol are important in ensuring transaction conditions are met and compliant. 

On the other hand, NoSQL (such as MongoDB, which is a document-type NoSQL) is a database that relaxes the ACID principles in order to optimise for big data (semi-structured and unstructured data arriving in high volume and velocity), using flexible schemas that offer some structure to these types of data. Also, NoSQL conforms to the Basically Available, Soft state, Eventually consistent (BASE) principles to ensure that data is eventually reliable as it prioritises scalability and availability. 


Therefore, great use cases of NoSQL would fall under real-time analytics of financial feed or market data, as they arrive too quickly and in such large volumes for a relational database that wants to ensure the atomicity (during a transaction, an update might fail in one side, whilst the other updates might be successful, causing a “partial transaction”), consistency (as when checking for the condition of a certain data, it might evaluate to true even if a very quick data evaluates it to false and it wasnt detected on time), and isolation (two transactions purchasing one item could occur at the same time, due to data inconsistencies) principles.


As High-Frequency Trading (HFT) relies on real-time analysis and fast order executions, speed and flexibility would be the most important criteria to focus on. Therefore, a system that can handle a lot of tick market data at fast rates, making NoSQL databases (like MongoDB) the best candidate, as a relational database would not be able to scale as efficiently.

## Initial Data Analysis
This is just to play around with the data and understand it more

In [31]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db


order_detail_names = [
    'OrderCode',
    'MarketSegmentCode',
    'MarketSectorCode',
    'TICode',
    'CountryOfRegister',
    'CurrencyCode',
    'ParticipantCode',
    'BuySelllnd',
    'MarketMechanismGroup',
    'MarketMechanismType',
    'Price',
    'AggregateSize',
    'SingleFilllnd',
    'BroadcastUpdateAction',
    'Date',
    'Time',
    'MessageSequenceNumber'
]

order_detail_schema = {
    'OrderCode':"string[pyarrow]",
    'MarketSegmentCode':"string[pyarrow]",
    'MarketSectorCode':"string[pyarrow]",
    'TICode':"string[pyarrow]",
    'CountryOfRegister':"string[pyarrow]",
    'CurrencyCode':"string[pyarrow]",
    'ParticipantCode':"string[pyarrow]",
    'BuySelllnd':"string[pyarrow]",
    'MarketMechanismGroup':"string[pyarrow]",
    'MarketMechanismType':"string[pyarrow]",
    'Price': "float64",
    'AggregateSize': "int64",
    'SingleFilllnd':"string[pyarrow]",
    'BroadcastUpdateAction':"string[pyarrow]",
    'Date':"string[pyarrow]",
    'Time':"string[pyarrow]",
    'MessageSequenceNumber': "int64"
}

order_history_names = [
    'OrderCode',
    'OrderActionType',
    'MatchingOrderCode',
    'TradeSize',
    'TradeCode',
    'TICode',
    'CurrencyCode',
    'CountryOfRegister',
    'MarketSegmentCode',
    'AggregateSize',
    'BuySellInd',
    'MarketMechanismType',
    'MessageSequenceNumber',
    'Date',
    'Time'
]

order_history_schema = {
    'OrderCode':"string[pyarrow]",
    'OrderActionType':"string[pyarrow]",
    'MatchingOrderCode':"string[pyarrow]",
    'TradeSize': "int64",
    'TradeCode':"string[pyarrow]",
    'TICode':"string[pyarrow]",
    'CurrencyCode':"string[pyarrow]",
    'CountryOfRegister':"string[pyarrow]",
    'MarketSegmentCode':"string[pyarrow]",
    'AggregateSize': "int64",
    'BuySellInd':"string[pyarrow]",
    'MarketMechanismType':"string[pyarrow]",
    'MessageSequenceNumber': "int64",
    'Date':"string[pyarrow]",
    'Time':"string[pyarrow]"
}

trade_reports_names = [
    'MessageSequenceNumber',
    'TICode',
    'MarketSegmentCode',
    'CountryOfRegister',
    'CurrencyCode',
    'TradeCode',
    'TradePrice',
    'TradeSize',
    'TradeDate',
    'TradeTime',
    'BroadcastUpdateAction',
    'TradeTypelnd',
    'TradeTimelnd',
    'BargainConditions',
    'ConvertedPricelnd',
    'PublicationDate',
    'PublicationTime'
]

trade_reports_schema = {
    'MessageSequenceNumber': "int64",
    'TICode':"string[pyarrow]",
    'MarketSegmentCode':"string[pyarrow]",
    'CountryOfRegister':"string[pyarrow]",
    'CurrencyCode':"string[pyarrow]",
    'TradeCode':"string[pyarrow]",
    'TradePrice': "float64",
    'TradeSize': "int64",
    'TradeDate':"string[pyarrow]",
    'TradeTime':"string[pyarrow]",
    'BroadcastUpdateAction':"string[pyarrow]",
    'TradeTypelnd':"string[pyarrow]",
    'TradeTimelnd':"string[pyarrow]",
    'BargainConditions':"string[pyarrow]",
    'ConvertedPricelnd':"string[pyarrow]",
    'PublicationDate':"string[pyarrow]",
    'PublicationTime':"string[pyarrow]"
}



order_detail_df = dd.read_csv('/Users/robertwillfindyou/Downloads/allGlaxoOrderDetail.CSV.gz', header=None, blocksize=None)
order_history_df = dd.read_csv('/Users/robertwillfindyou/Downloads/allGlaxoOrderHistory.CSV.gz', header=None, blocksize=None)
trade_reports_df = dd.read_csv('/Users/robertwillfindyou/Downloads/allGlaxoTradeReport.CSV.gz', header=None, blocksize=None)


order_detail_df.columns = order_detail_names
order_history_df.columns = order_history_names
trade_reports_df.columns = trade_reports_names

order_detail_df.head()



Unnamed: 0,OrderCode,MarketSegmentCode,MarketSectorCode,TICode,CountryOfRegister,CurrencyCode,ParticipantCode,BuySelllnd,MarketMechanismGroup,MarketMechanismType,Price,AggregateSize,SingleFilllnd,BroadcastUpdateAction,Date,Time,MessageSequenceNumber
0,709ENVUN07,SET1,FE10,GB0009252882,GB,GBX,,S,O,LO,1510.0,173,N,F,28022007,07:51:15,3717
1,208ATNHG07,SET1,FE10,GB0009252882,GB,GBX,,S,O,LO,1550.0,1800,N,F,18012007,15:48:21,654681
2,006D95WX07,SET1,FE10,GB0009252882,GB,GBX,,S,O,LO,1485.0,700,N,F,28022007,07:52:53,4338
3,006D94UH07,SET1,FE10,GB0009252882,GB,GBX,,B,O,LO,1300.0,230,N,F,28022007,07:51:31,3849
4,709FJNIR07,SET1,FE10,GB0009252882,GB,GBX,,S,O,LO,1449.0,1100,N,F,28022007,16:30:54,1309558


In [32]:
# Checking for Null values. We see that the Participant code mostly contains null rows
print(order_detail_df.isnull().sum().compute())
print()
print(order_detail_df.index.size.compute())
print()

# Aggregating categorical attributes/columns to check the data for an outlier value on the order detail dataframe
for col in order_detail_df.columns:
    if col=="OrderCode" or col=="Price" or col=="AggregateSize" or col=="Date" or col=="Time" or col=="MessageSequenceNumber":
        continue
    print(order_detail_df.groupby(col).AggregateSize.sum().compute())
    print()

# Checking quantitative attributes/columns to check for outliers on the order detail dataframe
print(order_detail_df.Price.describe().compute())
print()
print(order_detail_df.AggregateSize.describe().compute())

OrderCode                     0
MarketSegmentCode             0
MarketSectorCode              0
TICode                        0
CountryOfRegister             0
CurrencyCode                  0
ParticipantCode          274322
BuySelllnd                    0
MarketMechanismGroup          0
MarketMechanismType           0
Price                         0
AggregateSize                 0
SingleFilllnd                 0
BroadcastUpdateAction         0
Date                          0
Time                          0
MessageSequenceNumber         0
dtype: int64

274322

MarketSegmentCode
SET1    1716837258
Name: AggregateSize, dtype: int64

MarketSectorCode
FE10    1716837258
Name: AggregateSize, dtype: int64

TICode
GB0009252882    1716837258
Name: AggregateSize, dtype: int64

CountryOfRegister
GB    1716837258
Name: AggregateSize, dtype: int64

CurrencyCode
GBX    1716837258
Name: AggregateSize, dtype: int64

Series([], Name: AggregateSize, dtype: int64)

BuySelllnd
B    849475247
S    86736201

### Order History

In [33]:
# Checking for Null values
print(order_history_df.isnull().sum().compute())
print()
print(order_history_df.index.size.compute())
print()

# Aggregating categorical attributes/columns to check the data for an outlier value on the order order history dataframe
for col in order_history_df.columns:
    if col=="OrderCode" or col=="TradeSize" or col=="AggregateSize" or col=="Date" or col=="Time" or col=="MessageSequenceNumber" or col=="MatchingOrderCode" or col=="TradeCode":
        continue
    print(order_history_df.groupby(col).AggregateSize.sum().compute())
    print()

print(order_history_df.TradeSize.describe().compute())
print()
print(order_history_df.AggregateSize.describe().compute())

OrderCode                     0
OrderActionType               0
MatchingOrderCode        205856
TradeSize                     0
TradeCode                205856
TICode                        0
CurrencyCode                  0
CountryOfRegister             0
MarketSegmentCode             0
AggregateSize                 0
BuySellInd                    0
MarketMechanismType           0
MessageSequenceNumber         0
Date                          0
Time                          0
dtype: int64

322208

OrderActionType
D    1276226904
E      50794072
M             0
P     717723102
Name: AggregateSize, dtype: int64

TICode
GB0009252882    2044744078
Name: AggregateSize, dtype: int64

CurrencyCode
GB    2044744078
Name: AggregateSize, dtype: int64

CountryOfRegister
GBX    2044744078
Name: AggregateSize, dtype: int64

MarketSegmentCode
SET1    2044744078
Name: AggregateSize, dtype: int64

BuySellInd
B    1015664768
S    1029079310
Name: AggregateSize, dtype: int64

MarketMechanismType
LO    19

### Trade Reports

In [34]:
# Checking for Null values
print(trade_reports_df.isnull().sum().compute())
print()
print(trade_reports_df.index.size.compute())
print()

# Aggregating categorical attributes/columns to check the data for an outlier value on the trade reports dataframe
for col in trade_reports_df.columns:
    if col=="MessageSequenceNumber" or col=="TradeCode" or col=="TradePrice" or col=="TradeSize" or col=="TradeDate" or col=="TradeTime" or col=="PublicationDate" or col=="PublicationTime":
        continue
    print(trade_reports_df.groupby(col).TradeSize.sum().compute())
    print()

print(trade_reports_df.TradePrice.describe().compute())
print()
print(trade_reports_df.TradeSize.describe().compute())

MessageSequenceNumber    0
TICode                   0
MarketSegmentCode        0
CountryOfRegister        0
CurrencyCode             0
TradeCode                0
TradePrice               0
TradeSize                0
TradeDate                0
TradeTime                0
BroadcastUpdateAction    0
TradeTypelnd             0
TradeTimelnd             0
BargainConditions        0
ConvertedPricelnd        0
PublicationDate          0
PublicationTime          0
dtype: int64

130136

TICode
GB0009252882    723151317
Name: TradeSize, dtype: int64

MarketSegmentCode
SET1    723151317
Name: TradeSize, dtype: int64

CountryOfRegister
GB    723151317
Name: TradeSize, dtype: int64

CurrencyCode
GBX    723151317
Name: TradeSize, dtype: int64

BroadcastUpdateAction
A    334732020
D     43163425
E    345255872
Name: TradeSize, dtype: int64

TradeTypelnd
AT    300189396
N      20452523
O     300804911
P       7643597
SW      8111400
UT     45066476
VW     27937128
WT      9215528
X       3730358
Name: T

## Data Preprocessing
This is the actual preprocessing process that would clean the data quickly and efficiently using Dask's scalar methods and lazy computation.
Each csv is first imported and conformed to a schema containg name columns and their types. Then, the Date and Time columns are merged into one where the Date column is then transformed into a datetime type column. Finally each column is sorted to their respective message sequencing as stated in the lse description file. The Order Detail preprocessing has an extra step where the ordercode drops duplicates if they appear as it is only meant to have unique values.

In [None]:
order_detail_df = dd.read_csv('/Users/robertwillfindyou/Downloads/allGlaxoOrderDetail.CSV.gz', names=order_detail_names, dtype=order_detail_schema,header=None, blocksize=None)
order_detail_df.Date = order_detail_df.Date + order_detail_df.Time
order_detail_df.Date = order_detail_df.Date.map_partitions(dd.to_datetime, format='%d%m%Y%X', meta=('Date', 'datetime64[ns]'))
order_detail_df=order_detail_df.drop(columns='Time')
order_detail_df = order_detail_df.fillna(value={'OrderCode': "",
    'MarketSegmentCode': "",
    'MarketSectorCode': "",
    'TICode': "",
    'CountryOfRegister': "",
    'CurrencyCode': "",
    'ParticipantCode': "",
    'BuySelllnd': "",
    'MarketMechanismGroup': "",
    'MarketMechanismType': "",
    'Price': -1,
    'AggregateSize': -1,
    'SingleFilllnd': "",
    'BroadcastUpdateAction': "",
    'Date': "",
    'Time': "",
    'MessageSequenceNumber': ""})
order_detail_df = order_detail_df.drop_duplicates(subset=['OrderCode'])
order_detail_df = order_detail_df.sort_values(
    by=["Date", "TICode", "MessageSequenceNumber", "OrderCode"],
    ascending=[True, True, True, True]
)



order_history_df = dd.read_csv('/Users/robertwillfindyou/Downloads/allGlaxoOrderHistory.CSV.gz', names=order_history_names, dtype=order_history_schema,header=None, blocksize=None)
order_history_df.Date = order_history_df.Date + order_history_df.Time
order_history_df.Date = order_history_df.Date.map_partitions(dd.to_datetime, format='%d%m%Y%X', meta=('Date', 'datetime64[ns]'))
order_history_df=order_history_df.drop(columns='Time')
order_history_df = order_history_df.fillna(value={
    'OrderCode':"",
    'OrderActionType':"",
    'MatchingOrderCode':"",
    'TradeSize': -1,
    'TradeCode':"",
    'TICode':"",
    'CurrencyCode':"",
    'CountryOfRegister':"",
    'MarketSegmentCode':"",
    'AggregateSize': -1,
    'BuySellInd':"",
    'MarketMechanismType':"",
    'MessageSequenceNumber': -1,
    'Date':"",
    "Time":""})
order_history_df = order_history_df.sort_values(
    by=["Date", "TICode", "MessageSequenceNumber", "OrderCode", "AggregateSize"],
    ascending=[True, True, True, True, False]
)


trade_reports_df = dd.read_csv('/Users/robertwillfindyou/Downloads/allGlaxoTradeReport.CSV.gz', names=trade_reports_names, dtype=trade_reports_schema, header=None, blocksize=None)
trade_reports_df.TradeDate = trade_reports_df.TradeDate + trade_reports_df.TradeTime
trade_reports_df.PublicationDate = trade_reports_df.PublicationDate + trade_reports_df.PublicationTime
trade_reports_df.TradeDate = trade_reports_df.TradeDate.map_partitions(dd.to_datetime, format='%d%m%Y%X', meta=('TradeDate', 'datetime64[ns]'))
trade_reports_df.PublicationDate = trade_reports_df.PublicationDate.map_partitions(dd.to_datetime, format='%d%m%Y%X', meta=('PublicationDate', 'datetime64[ns]'))
trade_reports_df = trade_reports_df.drop(columns=['TradeTime','PublicationTime'])
trade_reports_df = trade_reports_df.fillna(value={
    'MessageSequenceNumber': "",
    'TICode':"",
    'MarketSegmentCode':"",
    'CountryOfRegister':"",
    'CurrencyCode':"",
    'TradeCode':"",
    'TradePrice': -1,
    'TradeSize': -1,
    'TradeDate': "",
    'TradeTime': "",
    'BroadcastUpdateAction':"",
    'TradeTypelnd':"",
    'TradeTimelnd':"",
    'BargainConditions':"",
    'ConvertedPricelnd':"",
    "PublicationDate":"",
    "PublicationTime":""})
trade_reports_df = trade_reports_df.sort_values(
    by=["TradeDate", "PublicationDate", "TICode", "MessageSequenceNumber", "TradeCode"],
    ascending=[True, True, True, True, True]
)


## Connecting To MongoDB And Uploading The LSE Data

In [None]:

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import certifi
from dask_mongo import to_mongo

uri = "mongodb+srv://ronuma1234:wo9V6CO0cFEpHv3x@cluster0.ty23e.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'), tlsCAFile=certifi.where())

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)


mongo_db = client["lse_data_database"]

# The lines of code below uploads each dataframe to their respective collections

collection = mongo_db['order_detail_df']
partitions = order_detail_df.map_partitions(lambda df: df.to_dict(orient='records')).compute()

records = [record for partition in partitions for record in partition]
if records:  
    collection.insert_many(records)

collection = mongo_db['order_history_df']
partitions = order_history_df.map_partitions(lambda df: df.to_dict(orient='records')).compute()

records = [record for partition in partitions for record in partition]
if records: 
    collection.insert_many(records)


collection = mongo_db['trade_reports_df']
partitions = trade_reports_df.map_partitions(lambda df: df.to_dict(orient='records')).compute()

records = [record for partition in partitions for record in partition]
if records:  # Ensure there is data to insert
    collection.insert_many(records)

Pinged your deployment. You successfully connected to MongoDB!


## Connecting to MongoDB And Querying The Data

In [None]:

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import certifi
from dask_mongo import to_mongo

uri = "mongodb+srv://ronuma1234:wo9V6CO0cFEpHv3x@cluster0.ty23e.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'), tlsCAFile=certifi.where())

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
    print()
except Exception as e:
    print(e)


# Gets the LSE database
mongo_db = client["lse_data_database"]

# Gets the order details collection
collection = mongo_db['order_details_df']

# Calculates the cancelled orders by counting records/documents with the "D" tagged on the BroadcastUpdateAction field of the record
print(f"Cancelled orders: {collection.count_documents({"BroadcastUpdateAction": "D"})}")

# Gets the order details collection
collection = mongo_db['order_history_df']

# Calculates the deleted orders by running an aggregate function that first sorts by date, then groups the ordercode on based on the first (latest)
#orderactiontype then filters them based on the once that have "D" and counts them
result = list(collection.aggregate([
    {"$sort": {"Date": -1}},  # Sort by latest Date & Time
    {"$group": {"_id": "$OrderCode", "last_action": {"$first": "$OrderActionType"}}},  # Get latest action per OrderCode
    {"$match": {"last_action": "D"}},  # Keep only those where the last action was "D"
    {"$count": "deleted_orders"}  # Count the number of such orders
]))

if result:
    print(f"Deleted Orders: {result[0]['deleted_orders']}")

Pinged your deployment. You successfully connected to MongoDB!

Cancelled orders: 0
Deleted Orders: 201374
