In [2]:
import os
import time
import math
from datetime import datetime
from typing import Dict, Optional
import pickle
import blpapi
from copy import deepcopy
import flatbuffers
from BlpConn.Message import HeadlineCalendarEvent
from BlpConn.Message import HeadlineEconomicEvent
from BlpConn.Message.EventType import EventType
from defs import *
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
service = "//blp/economic-data"

In [4]:
client_cert_path = os.environ["CLIENT_CERTIFICATE"]
client_cert_password = os.environ["PASSWORD"]
root_cert_path = os.environ["ROOT_CERTIFICATE"]

In [5]:
tls_options = blpapi.TlsOptions.createFromFiles(
    client_cert_path,
    client_cert_password,
    root_cert_path
)

In [6]:
host_primary = os.environ["PRIMARY_HOST"]
host_secondary = os.environ["SECONDARY_HOST"]
port = int(os.environ["PORT"])

In [7]:
session_options = blpapi.SessionOptions()
session_options.setServerAddress(host_primary, port, 0)
session_options.setServerAddress(host_secondary, port, 0)
session_options.setTlsOptions(tls_options)
session_options.setAutoRestartOnDisconnection(True)
session_options.setNumStartAttempts(3)
session_options.setDefaultSubscriptionService(service)

In [8]:
session = blpapi.Session(session_options)
session.start()

True

In [10]:
session.openService(service)

True

In [11]:
sub = blpapi.SubscriptionList()
sub.add("//blp/economic-data/headline-actuals/ticker/CATBTOTB Index")
# sub.add("//blp/economic-data/headline-surveys/ticker/CATBTOTB Index")
sub.add("//blp/economic-data/release-calendar/ticker/INJCJC Index")
corr_id = session.subscribe(sub)

In [20]:
event = session.nextEvent()
messages = [msg for msg in event]
print(messages[0])

CID: {[ valueType=AUTOGEN classId=0 value=3 ]}
Heartbeat = {
}



In [21]:
print(messages[0])

CID: {[ valueType=AUTOGEN classId=0 value=3 ]}
Heartbeat = {
}



In [None]:
def getStringValue(elem: blpapi.Element, name: str) -> str:
    if not elem.hasElement(name):
        return ""
    this = elem.getElement(name)
    return this.getValueAsString()

In [24]:
def getReleaseDT(elem: blpapi.Element, name: str):
    if not elem.hasElement(name):
        return None, None
    sub_elem = elem.getElement(name)
    choice = sub_elem.getChoice()
    if choice.name() == "DATETIME":
        start = choice.getValueAsDatetime()
        return start, start
    elif choice.name() == "DATE":
        start = choice.getValueAsDatetime()
        return start, start
    elif choice.name() == "DATARANGE":
        start = choice.getElement("START").getValueAsDatetime()
        end = choice.getElement("END").getValueAsDatetime()
        return start, end
    return None, None
    


In [25]:
def getFloatFromString(elem: blpapi.Element, name: str) -> float:
    s = getStringValue(elem, name)
    try:
        return float(s)
    except ValueError:
        return math.nan

def getIntFromString(elem: blpapi.Element, name: str) -> int:
    s = getStringValue(elem, name)
    try:
        return int(s)
    except ValueError:
        return 0

def getValue(elem: blpapi.Element, name: str) -> Value:
    if not elem.hasElement(name):
        return None
    sub_elem = elem.getElement(name)
    choice = sub_elem.getChoice()
    value = Value()
    if choice.name() == "SINGLE":
        value.number = 1
        s = choice.getValueAsString()
        try:
            value.value = float(s)
        except ValueError:
            value.value = math.nan
        return value
    elif choice.name() == "DISTRIBUTION":
        value.number = getIntFromString(choice, "NUMBER")
        value.average = getFloatFromString(choice, "AVERAGE")
        value.low = getFloatFromString(choice, "LOW")
        value.high = getFloatFromString(choice, "HIGH")
        value.median = getFloatFromString(choice, "MEDIAN")
        value.standard_deviation = getFloatFromString(choice, "STANDARD_DEVIATION")
        return value
    return None



In [26]:
elem = messages[0].asElement()
print(elem)

EconomicEvent = {
    HeadlineCalendarEvent = {
        ID_BB_GLOBAL = "BBG002SBQKM3"
        PARSEKYABLE_DES = "INJCJC Index"
        DESCRIPTION = "US Initial Jobless Claims SA"
        EVENT_TYPE = CALENDAR
        EVENT_SUBTYPE = INITPAINT
        EVENT_ID = 2151207
        OBSERVATION_PERIOD = "May 3"
        ECO_RELEASE_DT = {
            DATETIME = 2025-05-08T12:30:00.000+00:00
        }
        RELEASE_STATUS = SCHEDULED
    }
}



In [27]:
def parseHeadlineEconomicEvent(elem: blpapi.Element) -> HeadlineEconomicEvent:
    message = HeadlineEconomicEvent(
        id_bb_global=getStringValue(elem, "ID_BB_GLOBAL"),
        parsekyable_des=getStringValue(elem, "PARSEKYABLE_DES")
    )
    message.description = getStringValue(elem, "DESCRIPTION")
    message.event_type = getStringValue(elem, "EVENT_TYPE")
    message.event_subtype = getStringValue(elem, "EVENT_SUBTYPE")
    message.event_id = getStringValue(elem, "EVENT_ID")
    message.observation_period = getStringValue(elem, "OBSERVATION_PERIOD")
    message.release_start_dt, message.release_end_dt = getReleaseDT(elem, "ECO_RELEASE_DT")
    message.value = getValue(elem, "VALUE")
    message.prior_value = getValue(elem, "PRIOR_VALUE")
    message.prior_economic_release_start_dt, message.prior_economic_release_end_dt = getReleaseDT(elem, "PRIOR_ECONOMIC_RELEASE_DT")
    message.prior_observation_period = getStringValue(elem, "PRIOR_OBSERVATION_PERIOD")
    message.prior_event_id = getStringValue(elem, "PRIOR_EVENT_ID")
    return message

In [28]:
message = parseHeadlineEconomicEvent(elem.getElement("HeadlineEconomicEvent"))
print(message)

InvalidArgumentException: Choice sub-element not found for name 'HeadlineEconomicEvent'. (0x00020002)

In [34]:
def parseHeadlineCalendarEvent(elem: blpapi.Element) -> HeadlineCalendarEvent:
    message = HeadlineCalendarEvent(
        id_bb_global=getStringValue(elem, "ID_BB_GLOBAL"),
        parsekyable_des=getStringValue(elem, "PARSEKYABLE_DES")
    )
    message.description = getStringValue(elem, "DESCRIPTION")
    message.event_type = getStringValue(elem, "EVENT_TYPE")
    message.event_subtype = getStringValue(elem, "EVENT_SUBTYPE")
    message.event_id = getStringValue(elem, "EVENT_ID")
    message.observation_period = getStringValue(elem, "OBSERVATION_PERIOD")
    message.release_start_dt, message.release_end_dt = getReleaseDT(elem, "ECO_RELEASE_DT")
    message.release_status = getStringValue(elem, "RELEASE_STATUS")
    return message

In [35]:
message = parseHeadlineCalendarEvent(elem.getElement("HeadlineCalendarEvent"))
print(message)

HeadlineCalendarEvent(id_bb_global='BBG002SBQKM3', parsekyable_des='INJCJC Index', description='US Initial Jobless Claims SA', event_type='CALENDAR', event_subtype='INITPAINT', event_id='2151207', observation_period='May 3', release_start_dt=datetime.datetime(2025, 5, 8, 12, 30, tzinfo=<blpapi.datetime.FixedOffset object at 0x7f4f12cd8390>), release_end_dt=datetime.datetime(2025, 5, 8, 12, 30, tzinfo=<blpapi.datetime.FixedOffset object at 0x7f4f12cd8390>))


In [None]:
def headline_calendar_event_to_flatbuffers(builder, event: HeadlineCalendarEvent):
    id_bb_global = builder.CreateString(event.id_bb_global or "")
    parsekyable_des = builder.CreateString(event.parsekyable_des or "")
    description = builder.CreateString(event.description or "")
    event_type = builder.CreateString(event.event_type or "")
    event_subtype = builder.CreateString(event.event_subtype or "")
    event_id = builder.CreateString(event.event_id or "")
    observation_period = builder.CreateString(event.observation_period or "")

    # Convert datetime to timestamp if available
    release_start_dt = int(event.release_start_dt.timestamp()) if event.release_start_dt else 0
    release_end_dt = int(event.release_end_dt.timestamp()) if event.release_end_dt else 0

    # Start building the FlatBuffers object
    HeadlineCalendarEvent.Start(builder)
    HeadlineCalendarEvent.AddIdBbGlobal(builder, id_bb_global)
    HeadlineCalendarEvent.AddParsekyableDes(builder, parsekyable_des)
    HeadlineCalendarEvent.AddDescription(builder, description)
    HeadlineCalendarEvent.AddEventType(builder, event_type)
    HeadlineCalendarEvent.AddEventSubtype(builder, event_subtype)
    HeadlineCalendarEvent.AddEventId(builder, event_id)
    HeadlineCalendarEvent.AddObservationPeriod(builder, observation_period)
    HeadlineCalendarEvent.AddReleaseStartDt(builder, release_start_dt)
    HeadlineCalendarEvent.AddReleaseEndDt(builder, release_end_dt)

    # Finish the FlatBuffers object
    return HeadlineCalendarEvent.End(builder)

In [None]:
builder = flatbuffers.Builder(0)
event = HeadlineCalendarEvent(
    id_bb_global="12345",
    parsekyable_des="Test Description",
    description="Test Event",
    event_type="Test Type",
    event_subtype="Test Subtype",
    event_id="Test ID",
    observation_period="Test Period",
    release_start_dt=datetime.now(),
    release_end_dt=datetime.now()
)
flatbuffers_event = headline_calendar_event_to_flatbuffers(builder, event)
builder.Finish(flatbuffers_event)