In [4]:
import datetime
import base64
import os
import io
from typing import Tuple
import fastavro
from flask import Flask, request
from google.cloud import logging
from broker_utils import data_utils, gcp_utils, schema_maps
import yaml


# replace this with a path to a single, unzipped alert (file extension should be .avro)
f = "/Users/hndz/Downloads/LSST_ALERTS_BATCH/ALERTS/NITE59581/alert_mjd59582.0513_obj1130_src2260000.avro"
fin = "/Users/hndz/Pitt-Google-Broker/broker/broker_utils/schema_maps/elasticc.yaml"
with open(fin, "rt") as f_schema:
    schema_map = yaml.safe_load(f_schema)
    
    
# load the alert
elasticc_alert = data_utils.decode_alert(f)

# for the SuperNNova results, you can use this dummy data:
supernnova_results = {
    "diaObjectId": 1130,
    "diaSourceId": 2260000,
    "prob_class0": 0.201,
    "prob_class1": 0.799,
    "predicted_class": 1,
}

alert_dict = {"alert": elasticc_alert, "SuperNNova": supernnova_results}
now = datetime.datetime.now(datetime.timezone.utc)
attrs = {"kafka.timestamp": now, "ingest_time": now}
schema_out = fastavro.schema.load_schema("elasticc.v0_9.brokerClassfication.avsc")

In [5]:
def _create_elasticc_msg(alert_dict, attrs):
    """Create a message according to the ELAsTiCC broker classifications schema.

    https://github.com/LSSTDESC/plasticc_alerts/blob/main/Examples/plasticc_schema
    """
    # original elasticc alert as a dict
    elasticc_alert = alert_dict["alert"]

    # dict with the following keys:
    #   schema_map["objectId"]
    #   schema_map["sourceId"]
    #   "prob_class0"
    #   "prob_class1"
    #   "predicted_class"
    supernnova_results = alert_dict["SuperNNova"]

    # here are a few things you'll need
    elasticcPublishTimestamp = attrs["kafka.timestamp"]
    brokerIngestTimestamp = attrs["ingest_time"]  # Troy: attach this in first module
    brokerVersion = "v0.6"

    classifications = [
        {
            "classifierName": "SuperNNova_v1.3",  # Troy: pin version in classify_snn
            # Chris: fill these two in. classIds are listed here:
            #        https://docs.google.com/presentation/d/1FwOdELG-XgdNtySeIjF62bDRVU5EsCToi2Svo_kXA50/edit#slide=id.ge52201f94a_0_12
            "classifierParams": "",  # leave this blank for now
            "classId": 111120,
            "probability": supernnova_results["prob_class0"], 
        },
    ]

    # Chris: fill this in with the correct key/value pairs.
    #        it is the dictionary that will be sent to elasticc, so it needs these:
    #        https://github.com/LSSTDESC/plasticc_alerts/blob/main/Examples/plasticc_schema/elasticc.v0_9.brokerClassification.avsc
    msg = {
        "alertId": elasticc_alert["alertId"],
        "diaSourceId": elasticc_alert[schema_map["source"]][schema_map["sourceId"]],
        "elasticcPublishTimestamp": elasticcPublishTimestamp, 
        "brokerIngestTimestamp": brokerIngestTimestamp,
        "brokerName": "Pitt-Google Broker",
        "brokerVersion": brokerVersion,
        "classifications": classifications
    }

    # avro serialize the dictionary
    avro = _dict_to_avro(msg, schema_out)

    return avro

def _dict_to_avro(msg: dict, schema: dict):
    """Avro serialize a dictionary."""
    fout = io.BytesIO()
    fastavro.writer(fout, schema, [msg])
    fout.seek(0)
    avro = fout.getvalue()
    return avro

In [6]:
_create_elasticc_msg(alert_dict, attrs)

b'Obj\x01\x04\x14avro.codec\x08null\x16avro.schema\xaa\x14{"type": "record", "doc": "sample avro alert schema v4.1", "name": "elasticc.v0_9.brokerClassfication", "fields": [{"doc": "unique alert identifer", "name": "alertId", "type": "long"}, {"doc": "id of source that triggered this classification", "name": "diaSourceId", "type": "long"}, {"doc": "timestamp from originating ELAsTiCC alert", "name": "elasticcPublishTimestamp", "type": {"logicalType": "timestamp-micros", "type": "long"}}, {"doc": "timestamp of broker ingestion of ELAsTiCC alert", "name": "brokerIngestTimestamp", "type": ["null", {"logicalType": "timestamp-micros", "type": "long"}]}, {"doc": "Name of broker (never changes)", "name": "brokerName", "type": "string"}, {"doc": "Version/Release of broker\'s software", "name": "brokerVersion", "type": "string"}, {"name": "classifications", "type": {"type": "array", "items": {"type": "record", "name": "elasticc.v0_9.classificationDict", "fields": [{"doc": "Name of classifier br