<img src="figures/dirac.png" width="700" align="left">

In [2]:
%load_ext autoreload
%autoreload 1
%aimport genesis.streaming

# Streaming Alerts (with Genesis)

(notebook by [@mjuric](http://github.com/mjuric))

## What is Alert Streaming and When to Use It



Classis survey science is not performed in real time: the analysis usually usually lags the data collection and processing. Typically, a researcher waits for (days to weeks) for a reasonably sized batch of new data to be accumulated and then performs the analysis (and frequently manually).

This does not work well for use cases where the object of interest may change on short timescales and needs to be followed up rapidly. An example may be a short-timescale transient, or a potentially hazardous asteroid undergoing in an Earth flyby. For this use case, the researcher would prefer to be analyzing the data as they come in, with minimal latency between data collection and the understanding of whether a particular datum is interesting.

This is solved by alert streaming: near real-time transmission of alerts to (and measurements of) objects whose properties have changed. The key differences between the "classic" and streaming-driven approach:
1. Response time on order of seconds
1. Fully automated, machine-driven, analysis

## Streaming Challenges

* Minimal latency
* Robustness to transmission issues
* Ease of use

## Demo setup: (Kafka Cluster + JupyterHub on Kubernetes) on Digital Ocean

The demo runs on:
* Kafka broker cluster: 8 x 6-core machine w. 16GB of RAM ("standard" Droplet type)
* JupyterHub: A 4-core machine w. 8GB of RAM per user ("standard" Droplet type)

Within the cluster, we've set up:
1. A small static topic with only 100 alerts ("small")
2. A medium-sized topic with ~10,000 alerts ("medium")
3. A topic with continuously injected alerts at LSST scale ("lsst")

## Genesis Broker Access Library

Kafka comes with performant Python libraries roughly following the API and structure of their native (Java) client libraries.

Included with this demo is an early version of `genesis.streaming`, a client library for robust and scalable access to alert streams. Genesis largely abstracts away both the underlying transport protocol and alert serialization: to the user, alerts are simple Python `dict`s, delivered through familiar `generator`s.

In [3]:
import genesis.streaming as gs

## Simple Streaming

If you execute the cell below, it will hang forever... (click the stop ◾️button in your Jupyter to interrupt it).

In [4]:
with gs.open("kafka://broker0.do.alerts.wtf/small") as stream:
    for idx, alert in stream:
        print("Candidate ID:", alert['candid'])

Aborted (CTRL-C).


What happened: Genesis (actually, Kafka) remembers what was the last alert you received from any topic (the ***offset*** of the last received alert), and will only send you new alerts. This is desired behavior -- if you weren't connected immediately when the night started (or got temporarily disconnected), you may want to catch up.

But what if this is your first time you connected to the stream? If so, Genesis will default to waiting for new packets, and not sending you anything it may already have. This is a safe default: e.g., if the first time you connect to the LSST stream is one year into operations, you don't want to be sent a year's worth of alerts!

In our case, since I'm not injecting any new alerts, it will wait indefinitely...

Let's change this default, and have it send us everything it has. We'll also turn on a nice progress bar, and we will tell it to stop if it doesn't receive an alert in a 10 second interval.

In [4]:
with gs.open("kafka://broker0.do.alerts.wtf/small", start_at="earliest") as stream:
    for idx, alert in stream(timeout=10, progress=True):
        print("[%d] Candidate ID: %s" % (idx, alert['candid']))

HBox(children=(IntProgress(value=1, bar_style='info', description='Alerts processed', max=1, style=ProgressSty…

[0] Candidate ID: 619124525415015014
[1] Candidate ID: 619125121015010000
[2] Candidate ID: 619125121815010002
[3] Candidate ID: 619124521415015004
[4] Candidate ID: 619124522715010001
[5] Candidate ID: 619125122915015004
[6] Candidate ID: 619124526115015000
[7] Candidate ID: 619124523115015003
[8] Candidate ID: 619124523515015008
[9] Candidate ID: 619124525315010008
[10] Candidate ID: 619125121115015004
[11] Candidate ID: 619124522015015001
[12] Candidate ID: 619125123115010002
[13] Candidate ID: 619124523515010007
[14] Candidate ID: 619124523915010008
[15] Candidate ID: 619124524415015004
[16] Candidate ID: 619124524815010002
[17] Candidate ID: 619124525515010000
[18] Candidate ID: 619124525915010002
[19] Candidate ID: 619125122115010004
[20] Candidate ID: 619124524215010008
[21] Candidate ID: 619124525615010012
[22] Candidate ID: 619124523615010000
[23] Candidate ID: 619125124115010002
[24] Candidate ID: 619125124915010008
[25] Candidate ID: 619125121115015003
[26] Candidate ID: 619

You may notice a slight pause before the streaming starts: this is Kafka establishing connections to the broker cluster. Once the connections are established, the alerts start streaming quickly.

Rather than printing the alerts, let's just store their IDs.

In [5]:
with gs.open("kafka://broker0.do.alerts.wtf/small", start_at="earliest") as stream:
    alerts = [ alert['candid'] for _, alert in stream(timeout=10, progress=True) ]

alerts.sort()

print(f"Read {len(alerts)} alerts.")
print("First few candidate IDs:", alerts[:3])

HBox(children=(IntProgress(value=1, bar_style='info', description='Alerts processed', max=1, style=ProgressSty…


Read 100 alerts.
First few candidate IDs: [619124520615010001, 619124520615015004, 619124520915010000]


### Remembering "offsets"

Kafka can remember the `offset` of the last transmitted alert; next time you connect, it will start streaming from that offset.

For this to work, we need to connect to a stream with a "consumer ID" -- a name uniquely identifying you. Kafka will associate the offset to the consumer ID; the next time the same consumer ID connects, it will continue streaming the offset associated to it.

Let's generate a random consumer ID for your demo session, and add it to the stream URL:

In [6]:
import random, string

id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
broker_url = "kafka://{}@broker0.do.alerts.wtf/small".format(id)

print("Consumer ID:", id)
print("Broker URL: ", broker_url)

Consumer ID: R1HSBZU4
Broker URL:  kafka://R1HSBZU4@broker0.do.alerts.wtf/small


Let's read in a few alerts:

In [7]:
with gs.open(broker_url, start_at="earliest") as stream:
    alerts1 = [ alert['candid'] for _, alert in stream(limit=25, progress=True) ]
    
    # ... now do some work with the alert ...
    
    # stream.commit()

alerts1.sort()

print(f"Read {len(alerts1)} alerts.")
print("First few candidate IDs:", alerts1[:3])

HBox(children=(IntProgress(value=0, description='Alerts processed', max=25, style=ProgressStyle(description_wi…


Read 25 alerts.
First few candidate IDs: [619124520915010002, 619124521415015004, 619124521915015005]


Let's read in the rest:

In [8]:
with gs.open(broker_url, start_at="earliest") as stream:
    alerts2 = [ alert['candid'] for _, alert in stream(timeout=10, progress=True) ]

    # ... now do some work with the alert ...
    
    # stream.commit()

alerts2.sort()

print(f"Read {len(alerts2)} alerts.")
print("First few candidate IDs:", alerts2[:3])

HBox(children=(IntProgress(value=1, bar_style='info', description='Alerts processed', max=1, style=ProgressSty…


Read 100 alerts.
First few candidate IDs: [619124520615010001, 619124520615015004, 619124520915010000]


Why didn't it continue from wher we left off? Because the broker needs to be _explicitly_ told to commit the offset. It doesn't do so automatically to prevent data loss. 

To illustrate: if we committed the offset as soon as the alert is returned to you, and the code in _"... now do some work with the alert ..."_ section above crashes before acting on the alerts, the next time you connect to the broker these alerts would be skipped.

This is why you must explicitly call `stream.commit()` once you're certain the received alerts were successfully processed.

Now go back up, uncommend the `stream.commit()` lines, and re-execute these two cells.

Finally, let's verify nothing was lost:

In [9]:
set(alerts) == set(alerts1 + alerts2)

True

### About Kafka delivery guarantees

A couple of warnings about Kafka's delivery guarantees:
* **Kafka does not guarantee the order in which you'll receive the alerts**: it guarantees delivery, but some may be out of order.
* **Kafka (typically) guarantees "at least once" delivery**: that is, you may receive some alerts more than once (if there's a crash, a network interruption, or any similar exceptional situation). Your code should guard agaist this. Exactly-once semantics difficult, but possible (and coming in the future).

![Distributed computing problems](https://cdn.confluent.io/wp-content/uploads/image2.png)

-- [Mathias Verraes](https://www.linkedin.com/in/mathiasverraes/)

## Filtering the stream

Now let's filter the stream for objects of interest. Say we're only interested in asteroids, and wish to ignore the rest.

We'll write a filter function which checks whether the alert candidate has the "Nearby Solar System object Name" field set to something other than "null":

In [10]:
def filter_asteroids(alert):
    if alert['candidate']['ssnamenr'] != 'null':
        return alert
    else:
        return None

In [11]:
with gs.open("kafka://broker0.do.alerts.wtf/medium", start_at="earliest") as stream:
    for idx, alert in stream(limit=10, timeout=10, progress=True, filter=filter_asteroids):
        print(f"[{idx}] Candidate ID: {alert['candid']} {alert['candidate']['ssnamenr']}")

HBox(children=(IntProgress(value=0, description='Alerts processed', max=10, style=ProgressStyle(description_wi…

[39] Candidate ID: 619144901515015003 82382
[212] Candidate ID: 619144900715015062 7022
[284] Candidate ID: 619145351615015000 11395
[978] Candidate ID: 619144903315015016 34486
[1034] Candidate ID: 619144901215015053 1868
[1172] Candidate ID: 619145351915015006 8168
[1290] Candidate ID: 619145355515015022 152974
[1562] Candidate ID: 619145354515015000 117572
[1676] Candidate ID: 619144902415015006 95923
[1882] Candidate ID: 619144902915015022 169988



By looking at the [fields available in the alert packet](https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html), you can construct arbitrarily complex filters.

Here's one that checks whether an object may be a transient:

In [12]:
import pandas as pd
import numpy as np
import astropy.units as u

def is_transient(alert):
    # Filter by E. C. Bellm (@ebellm on GitHub)

    # if only a single discovery, bail out -- we wait for at least two
    # before triggering
    if alert['prv_candidates'] is None:
        return
    
    dflc = pd.DataFrame( [ alert['candidate'] ] + alert['prv_candidates'])
    candidate = dflc.loc[0]

    # positive subtraction?
    is_positive_sub = candidate['isdiffpos'] == 't'
    
    # no nearby source
    if (candidate['distpsnr1'] is None) or (candidate['distpsnr1'] > 1.5):
        no_pointsource_counterpart = True
    else:
        # nearby source, but it's a galaxy?
        if candidate['sgscore1'] < 0.5:
            no_pointsource_counterpart = True
        else:
            no_pointsource_counterpart = False
            
    where_detected = (dflc['isdiffpos'] == 't') # nondetections will be None
    if np.sum(where_detected) >= 2:
        detection_times = dflc.loc[where_detected,'jd'].values
        dt = np.diff(detection_times)
        not_moving = np.max(dt) >= (30*u.minute).to(u.day).value
    else:
        not_moving = False
    
    no_ssobject = (candidate['ssdistnr'] is None) or (candidate['ssdistnr'] < 0) or (candidate['ssdistnr'] > 5)
    
    if is_positive_sub and no_pointsource_counterpart and not_moving and no_ssobject:
        return alert

    return None

In [13]:
with gs.open("kafka://broker0.do.alerts.wtf/medium", start_at="earliest") as stream:
    for idx, alert in stream(limit=10, timeout=10, progress=True, filter=is_transient):
        print(f"[{idx}] Candidate ID: {alert['candid']}")

HBox(children=(IntProgress(value=0, description='Alerts processed', max=10, style=ProgressStyle(description_wi…

[161] Candidate ID: 619125121815015000
[218] Candidate ID: 619126252815015003
[224] Candidate ID: 619125796115015003
[251] Candidate ID: 619144900715015065
[274] Candidate ID: 619126713315015008
[284] Candidate ID: 619126712515015002
[487] Candidate ID: 619144900515015043
[634] Candidate ID: 619133751515015001
[635] Candidate ID: 619133751515015044
[688] Candidate ID: 619144901815015015



The filter in the above example was (intentionally) written to be slow. At this processing rate, it may not be able to keep up with the full LSST alert stream.

Fortunately, Genesis knows how to parallelize execution over multiple cores, using Python's `multiprocessing.Pool`:

In [14]:
from multiprocessing import Pool

with Pool(4) as workers:
    with gs.open("kafka://broker0.do.alerts.wtf/medium", start_at="earliest") as stream:
        for idx, alert in stream(pool=workers, limit=10, timeout=10, progress=True, filter=is_transient):
            print(f"[{idx}] Candidate ID: {alert['candid']}")

HBox(children=(IntProgress(value=0, description='Alerts processed', max=10, style=ProgressStyle(description_wi…

[253] Candidate ID: 619125121815015000
[383] Candidate ID: 619144900715015065
[463] Candidate ID: 619144900515015043
[498] Candidate ID: 619126252815015003
[504] Candidate ID: 619125796115015003
[681] Candidate ID: 619126713315015008
[691] Candidate ID: 619126712515015002
[782] Candidate ID: 619144901815015015
[1024] Candidate ID: 619133751515015001
[1025] Candidate ID: 619133751515015044



## Robustness

Kafka is a distributed system robust to component failures.

The cluster of kafka brokers in our demo setup has a "replication factor" of 2 -- that is, each alert is mirrored on at least two brokers. Therefore, one broker may fail without a data loss; the clients will transparently switch to receive data from the other replica. Let's demonstrate this!

When I give a signal, please start the cell below. It will start downloading alerts from a topic with 10,000 alerts. As it's running, I will shut down one of the brokers in the cluster; your client should still download the full 10,000 alerts.

In [20]:
with gs.open("kafka://broker0.do.alerts.wtf/medium", start_at="earliest") as stream:
    for idx, alert in stream(timeout=10, progress=True):
        # we'll do nothing -- just show the progress bar.
        pass;

HBox(children=(IntProgress(value=1, bar_style='info', description='Alerts processed', max=1, style=ProgressSty…




## Scalability

And now for the main event: let's see if we can stream and filter alerts at the full LSST rate!

I have set up a script that injects 10,000 LSST-sized ZTF alerts every 40 seconds (average LSST rate).

When I give the signal, please execute the cell below to start consuming from this stream. We will observe how many simultaneous users we can have before the system fails to keep up!

In [4]:
with gs.Pool(4) as workers:
    with gs.open("kafka://broker0.do.alerts.wtf/lsst") as stream:
        for idx, alert in stream(pool=workers, progress=True):
            # we'll do nothing -- just show the progress bar.
            pass;

HBox(children=(IntProgress(value=1, bar_style='info', description='Alerts processed', max=1, style=ProgressSty…

Aborted (CTRL-C).


## Acknowledgements

This work has been supported by:

![Supported By](figures/foundation-logos.png)