In [1]:
import blpapi
from blpapi import AuthOptions, TlsOptions
from pymongo import MongoClient
from datetime import datetime, timedelta
import traceback


class BPIPE:
    def __init__(self, tickers: list[str], mongo_connection_string: str = "mongodb://192.168.10.153:27017/"):
        # Setup MongoDB connection
        client = MongoClient(mongo_connection_string)
        self.bpipe_db = client["BPIPE"][datetime.now().strftime("%Y%m%d")]
        self.qr_db = client["QR"][datetime.now().strftime("%Y%m%d")]

        # Define tickers
        self.tickers = tickers

        # Initialize session options
        self.options = blpapi.SessionOptions()
        self.options.setServerAddress("cloudpoint1.bloomberg.com", 8194, 0)
        self.options.setServerAddress("cloudpoint2.bloomberg.com", 8194, 1)
        self.options.setSessionIdentityOptions(AuthOptions.createWithApp("CHINASILVER:Trading"))

        # TLS key files for connecting to ZFP endpoint
        self.options.setTlsOptions(TlsOptions.createFromFiles(r"T:\Intern Folder\External Research\2023 Interns\BPIPE\certificates\65C7B614CC7BF90D81CDE1363A617D85.pk12", "123456", r"T:\Intern Folder\External Research\2023 Interns\BPIPE\certificates\rootCertificate.pk7"))

        self.options.setAutoRestartOnDisconnection(True)
        print(self.options)

        self.session = blpapi.Session(self.options, self.onEvent)
        self.session.start()
        self.session.openService("//blp/mktdepthdata")

    def subscribe(self):

        subList = blpapi.SubscriptionList()
        for ticker in self.tickers:
            print(f"Subscribing to {ticker}...")
            #subList.add("//blp/mktdepthdata/ticker/" + str(ticker), options="type=TOP", correlationId=blpapi.CorrelationId(ticker + ".type=TOP"))
            #subList.add("//blp/mktdepthdata/ticker/" + str(ticker), options="type=MBO", correlationId=blpapi.CorrelationId(ticker + ".type=MBO"))  # Need full book
            #subList.add("//blp/mktdepthdata/ticker/" + str(ticker), options="type=MBL", correlationId=blpapi.CorrelationId(ticker + ".type=MBL"))
            subList.add(ticker, fields = ["EVT_TRADE_TIME_RT", "EVT_TRADE_PRICE_RT", "EVT_TRADE_SIZE_RT", "EVT_TRADE_CONDITION_CODE_RT"], correlationId=blpapi.CorrelationId(ticker+'.type=QR'))
        self.session.subscribe(subList)

    def onEvent(self, event, session):

        for msg in event:
            if msg.messageType() == "MarketDepthUpdates":
                cid = msg.correlationId().value()
                msg = msg.toPy()
                msg["cid"] = cid
                self.bpipe_db.insert_one(msg)

            elif msg.messageType() == "MarketDataEvents":
                try:
                    cid = msg.correlationId().value()
                    msg = msg.toPy()
                    if "EVT_TRADE_TIME_RT" in msg:
                        msg["cid"] = cid
                        msg["EVT_TRADE_TIME_RT"] = datetime.combine(datetime.today(),msg["EVT_TRADE_TIME_RT"]) + timedelta(hours=8) # Convert datetime.time to datetime.datetime
                        self.qr_db.insert_one(msg)
                except Exception as e:
                    print(e)
                    print(msg)
                
            else:
                print(msg)
                


In [2]:
x = BPIPE(['9988 HK Equity'], "mongodb://192.168.10.153:27017/")
x.subscribe()

[
    defaultServices = "//blp/mktdata;//blp/refdata"
    defaultSubscriptionService = "//blp/mktdata"
    defaultTopicPrefix = "/ticker/"
    allowMultiCorrelatorsPerMsg = false
    connectTimeout = 5000
    clientMode = 0
    maxPendingRequests = 1024
    autoRestartOnDisconnection = true
    authenticationOptions = ""
    numStartAttempts = 3
    reconnectionInterval = 3000
    defaultKeepAliveInactivityTime = 20000
    defaultKeepAliveResponseTimeout = 5000
    serviceCheckTimeout = 120000
    serviceDownloadTimeout = 120000
    maxPendingEvents = 10000
    compat33x = false
    keepAliveEnabled = true
    serverAddress = [
        [
            host = "cloudpoint1.bloomberg.com"
            port = 8194
        ]
        [
            host = "cloudpoint2.bloomberg.com"
            port = 8194
        ]
    ]
    recordSubscriptionDataReceiveTimes = false
    flushPublishedEventsTimeout = 2000
    bandwidthSaveModeDisabled = false
    tlsOptionsImpl = [
        tlsHandshakeTimeoutMs

CID: {[ valueType=AUTOGEN classId=0 value=10 ]}
RequestId: cefc985d-99a4-0ab4-0000-b3ffa8c00500
ServiceOpened = {
    serviceName = "//blp/mktdata"
}

CID: {[ valueType=POINTER classId=0 value=00000209ADC9A970 ]}
RequestId: abfc255d-99ae-0ab4-0000-b3ffa8c00500
SubscriptionStarted = {
    exceptions[] = {
    }
    streamIds[] = {
        "1"
    }
    receivedFrom = {
        address = "cloudpoint1.bloomberg.com:8194"
    }
    reason = "Subscriber made a subscription"
}

CID: {[ valueType=POINTER classId=0 value=00000209ADC9A970 ]}
RequestId: 43fca55d-99b3-0ab4-0000-b3ffa8c00500
SubscriptionStreamsActivated = {
    streams[] = {
        streams = {
            id = "1"
            endpoint = {
                address = "cloudpoint1.bloomberg.com:8194"
            }
        }
    }
    reason = "Subscriber made a subscription"
}



In [5]:
import pandas as pd
cursor = x.qr_db.find({"cid": {"$regex": '9988 HK Equity'}})
pd.DataFrame(list(cursor)).drop(["_id"], axis=1)#.dropna(subset=["MKTDEPTH_EVENT_TYPE"])

Unnamed: 0,EVT_TRADE_PRICE_RT,EVT_TRADE_SIZE_RT,EVT_TRADE_CONDITION_CODE_RT,EVT_TRADE_TIME_RT,cid
0,83.40,500,,2023-07-05 09:56:19.449,9988 HK Equity.type=QR
1,83.40,300,,2023-07-05 09:56:20.532,9988 HK Equity.type=QR
2,83.40,100,,2023-07-05 09:56:20.532,9988 HK Equity.type=QR
3,83.40,200,,2023-07-05 09:56:20.532,9988 HK Equity.type=QR
4,83.45,100,,2023-07-05 09:56:20.626,9988 HK Equity.type=QR
...,...,...,...,...,...
161,83.35,400,,2023-07-05 09:58:02.520,9988 HK Equity.type=QR
162,83.40,800,X,2023-07-05 09:58:03.095,9988 HK Equity.type=QR
163,83.35,700,,2023-07-05 09:58:09.909,9988 HK Equity.type=QR
164,83.40,700,,2023-07-05 09:58:12.056,9988 HK Equity.type=QR
