In [3]:
!pip install confluent_kafka fastavro



In [4]:
import datetime
import json
import io
import random
import settings_alerce
from urllib.request import urlopen

import fastavro
from confluent_kafka import Consumer, KafkaError

In [5]:
def load_json_from_web(url):
  response = urlopen(url)
  data_json = json.loads(response.read())
  return data_json

# light curve classifier is schemaless so need to fetch schema first
schema_url = "https://raw.githubusercontent.com/alercebroker/pipeline/main/schemas/lc_classification_step/output_ztf.avsc"
schema = load_json_from_web(schema_url)

In [6]:
def handle(record):
    print(json.dumps(record, indent=2))

In [7]:
def handle(type, objectId, probabilities):
    r = {}
    classdict = {}
    maxprob = 0
    for k,v in probabilities.items():
        classdict[k] = float('%.3f'%v)
        if v > maxprob:
            classification = k
            maxprob = v
    r['objectId']       = objectId
    r['classdict']      = classdict
    r['classification'] = classification
    print(f'{objectId} is a {classification}')

In [8]:
def handle_deserealized_record(raw_message, topic):
  bytes_io = io.BytesIO(raw_message.value())

  # Stamp classifier is a normal avro. Read as usual.
  # Reader returns an iterator with one message
  if "stamp_classifier" in topic:
    reader = fastavro.reader(bytes_io)
    record = next(reader)
    handle('stamp', record['objectId'], record['probabilities'])
    return 1

  # LC Classifier is a schemaless abro. Give schema to read.
  # Reader returns a dict.
  elif "lc_classifier_ztf" in topic:
    reader = fastavro.schemaless_reader(bytes_io, schema)
    record = reader
    handle('lc', record["oid"], record['lc_classification']['probabilities'])
    return 1

  else:
    raise Exception(f"No schema loaded for topic {topic}")
    return 0

In [9]:
def connect():
    # create a streamReader
    conf = {
        'bootstrap.servers': settings_alerce.ALERCE_KAFKA,
        'group.id'         : settings_alerce.ALERCE_GROUP_ID,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism'   : 'SCRAM-SHA-512',
        'sasl.username'    : 'lasair',
        'sasl.password'    : settings_alerce.ALERCE_PASSWORD,
        'auto.offset.reset': 'earliest',
    }
    streamReader = Consumer(conf)
    return streamReader

In [10]:
def print_topics(streamReader):
    # print all the topics this streamReader has
    t = list(streamReader.list_topics().topics.keys())
    t = sorted(t)
    print('Topics are ', t)

In [11]:
def consume(streamReader, topic, handle):
    # consume a topic from a streamReader and call handle
    streamReader.subscribe([topic])
    nalert = 0
    while nalert < 5:
        msg = streamReader.poll(timeout=20)
        if msg == None:
            break
        nalert += handle_deserealized_record(msg, topic)
    print(f'Handled {nalert} alerts')

Set up topics for LC and stamp classifiers for today

In [12]:
topics = []
# default is today
date = datetime.datetime.now().strftime('%Y%m%d')
topic = 'lc_classifier_ztf_' + date
topics.append(topic)
topic = 'stamp_classifier_' + date
topics.append(topic)
print(topics)

['lc_classifier_ztf_20240918', 'stamp_classifier_20240918']


## Main program

In [13]:
for topic in topics:
    streamReader = connect()
    print('Consuming topic: ' + topic)
    consume(streamReader, topic, handle)
    streamReader.close()

Consuming topic: lc_classifier_ztf_20240918
ZTF18adqxgcz is a CEP
ZTF18abmdqqt is a RRL
ZTF18aakgqto is a RRL
ZTF18adqwunq is a RRL
ZTF18aatlscv is a Periodic-Other
Handled 5 alerts
Consuming topic: stamp_classifier_20240918
ZTF19acfvenh is a VS
ZTF18abrwpob is a VS
ZTF19abijmtz is a VS
ZTF18abefyzn is a VS
ZTF18abbwrfd is a VS
Handled 5 alerts
