<p align="center">
<img src="img/lasair.png" alt="drawing" width="50"/>
</p>
<h1 align="center">  Kafka Tutorial: </h1>
<h2 align="center">  Consuming Public Alerts </h2>


---

## What you'll learn

This notebook will walk you through making your own code to consume alerts from Lasair filters. You will learn:


* A refresher on the basics of kakfa 
* How to set up a kafka consumer with the `lasair` helper function
* What stream options are available for the Lasair filters
* How to navigate the data produced by these stream options. 


## Pre-requesites


### **Have you installed the `lasair` client ?**

You can do this through pip!

```bash
pip install lasair
```

### You also need the following python libraries:
  - `json`
  - `pandas`
  - `matplotlib`

---

<h2 align="center">  0. Understanding the Basics of Kafka </h1>

**TL;DR: Kafka allows you to stream the alert data for your chosen Lasair Filters**

The Rubin Observatory sends us (Lasair) alerts continuously through the night in realy time. 
Each alert data is then put in order into the queue of each **filter/topic** it is relevant to (e.g. only alerts cloincident with the Magellanic Clouds, or only Transients coincident with the nucleus of their hosgt galaxy).

Then as a user we can listen to these alerts without ever having to deal with the rest of the Rubin stream.

Because kafka creates a queue **you don't have to me "listening" in real time to catch the alerts**. 
If you have a piece of software or a bot, you can have them run on a cron-job once a day. 
Kafka will **remember your place in the queue** through the means of a `group_id` (see more below). 
If you change `group_id` you will start again at the top of the queue.


#### Q&A:

* _If I start at the beginnign of the queue, does that mean I have to consumer ALL the alerts Rubin has eve sent before I reach today's events?_

**No - topic queues are not infinitely big.** In Lasair they are emptied after **[TBC]** days. 

* _I am looking for alerts that are older than XX days old, what do I do?_

**If you are looking for historical activity, the live stream is probably not what you are looking for.**

Instead you can simply do a database query! (see `API_recipes.ipynb` and further [api/query documentation](https://lasair.readthedocs.io/en/main/core_functions/rest-api.html#--api-query-)).



<h2 align="center">  1. Setting-up your first consumer </h1>



In [2]:
import json
from pathlib import Path
from lasair import LasairError, lasair_consumer
import random # This only needed for the tutorial

### Config for the `lasair_consumer`

To connect to a kafka stream you need a few things:
* The socket (Host:Port) of the server (where is the server on the internet and how do I get into it?): `lasair-lsst-kafka_pub.lsst.ac.uk:9092`
* The endpoint (where are do I send my requests on the server?): `https://api.lasair.lsst.ac.uk/api`
* The topic (corresponding to your filter of choice):  `lasair_83lasair_tutorial_basic_stream`
* The `group_id`, which keeps track of which alerts you've already seen and which ones you have yet to receive. 
* How many alerts (N) are we polling for?


In [6]:
#endpoint     = "https://api.lasair.lsst.ac.uk/api"
endpoint     = "https://api.lasair.lsst-dev.lsst.ac.uk/api"
#kafka_server = "lasair-lsst-kafka_pub.lsst.ac.uk:9092"
kafka_server = "lasair-lsst-dev-kafka_pub.lsst.ac.uk:9092"
#topic        = "lasair_83lvra_feeder_full"
topic        = "lasair_83lasair_tutorial_basic_stream"
group_id     = "tutorial"+str(int(random.random()*10000000000)) # CREATING RANDOM ID
N            = 3

Now let's use the Lasair client to make our consumer

In [7]:
consumer = lasair_consumer(
    kafka_server,    
    group_id,        
    topic            
)

### Polling your first alerts

In [8]:
n = 0
while n < N: # whilst we have not read N messages
    # ask the lasair consumer to poll the NEXT MESSAGE IN THE QUEUE
    msg = consumer.poll(timeout=20)
    
    if msg is None:
        # If that message is None, we have reached the end of the queue!
        break

    if msg.error():
        # If there is an error we want to raise an exception
        raise LasairError("Error while consuming message: {}".format(msg.error()))
        break

    # If we have a message we need to read it into Json format
    jmsg = json.loads(msg.value())

    # Then we can write it out!
    print(json.dumps(jmsg, indent=2))
    n += 1
print('You have reached the end of the queue')

{
  "diaObjectId": 313958955284430880,
  "lastDiaSourceMjdTai": 61069.08298840221,
  "latestR": 0.993594,
  "nDiaSources": 19,
  "ra": 53.565957857006524,
  "decl": -26.875152515053284,
  "separationArcsec": 68.262
}
{
  "diaObjectId": 313871013127389233,
  "lastDiaSourceMjdTai": 61088.212443683144,
  "latestR": 0.924362,
  "nDiaSources": 49,
  "ra": 150.12143874401778,
  "decl": 1.1635964494757776,
  "separationArcsec": 0.744,
  "UTC": "2026-02-17 05:13:10"
}
{
  "diaObjectId": 313875416830444113,
  "lastDiaSourceMjdTai": 61088.212443683144,
  "latestR": 0.996161,
  "nDiaSources": 20,
  "ra": 149.60194865285905,
  "decl": 0.7280789506347216,
  "separationArcsec": 0.228,
  "UTC": "2026-02-17 05:13:10"
}
You have reached the end of the queue


<h2 align="center">  2. Saving the data with the correct format </h2>


In a real life setting you won't be printing large dictionaries to your notebook or terminal, you want it in a `.json` file. 

Let's select an output directory for our data.

**NOTE: I set this tutorial up to point to the /tmp directory** the data will be cleared when you restart your system. Feel free to select a different location

In [9]:
output_dir   = "/tmp/lasair_consumer_output" # this won't work on windows
OUTPUT_PATH = Path(output_dir)
OUTPUT_PATH.mkdir(exist_ok=True, parents=True) # If sub directory doesn't exist, create it. If it does exist, do nothing. If the parent directories don't exist, create them too.

Since we have already listened to our alerts we have moved in the queue! If we want the same alerts we printed above, we need a new `group_id`

In [10]:
group_id     = "tutorial"+str(int(random.random()*10000000000)) # CREATING RANDOM ID

consumer = lasair_consumer(
    kafka_server,    
    group_id,        
    topic            
)

Now we poll and we dump each message in a file which will have the structure:

```
[
    {MESSAGE_ALERT1},
    {MESSAGE_ALERT2},
    ....
    {MESSAGE_ALERTN},
]
```

Each message contains fields and sub-dictionaries.

Now our consumer **is a little more invovled** than it was above, because we need to make sure the brackets and commas are in the right place:

In [11]:
n = 0
first = True
# To ensure we don't leave out file open we work within a `with` scope
with open(OUTPUT_PATH / f"message_BASIC.tmp.json", "w", encoding="utf-8") as f:
    # first we write the opening square bracket for the json list
    f.write("[\n")
    while n < N:
        msg = consumer.poll(timeout=20)
        if msg is None:
            break
        if msg.error():
            raise LasairError("Error while consuming message: {}".format(msg.error()))
            break
        # 2. If we make it here it means we have messages. 
        raw = msg.value()
        # msg.value() may be bytes or str depending on client
        if isinstance(raw, bytes):
            raw = raw.decode("utf-8")

        # 3. Get the JSON data for our alert.
        result = json.loads(raw)
        
        # write comma before each object after the first
        if not first:
            f.write(",\n")
        first = False
        
        json.dump(result, f, indent=2, ensure_ascii=False)


        n += 1
    f.write("]\n")


Above we saved the data to a `.tmp.json` file which we will now rename. This practice is called "saving files atomically" and it's a way to not overwrite a good file with corrupted data. If the while loop above breaks halfway through we will be able to tell the good from the bad files. (For example Vim has Swap files for the same reason). 


Once we are happy everything has run properly we can replace our tmp file name with its final name.

In [12]:
# Clean up the temporary files and rename them to .json
import os


os.replace(str(OUTPUT_PATH / f"message_BASIC.tmp.json"), str(OUTPUT_PATH / f"message_BASIC.json"))

<h2 align="center"> 3. Reading a Basic Alert File </h2>

I am going to show you how to handle these with pandas since it already has excellent JSON support. 
This tutorial will not give you a "raw python" solution but you are welcome to suggest updated tutorials via a fork on these examples repository. 

In [14]:
import pandas as pd

Pandas already has a `read_json` function, which works quite well even for nested data structures (which we will need later).

In [15]:
dat = pd.read_json(OUTPUT_PATH/"message_BASIC.json") 

In [16]:
dat.head()

Unnamed: 0,diaObjectId,lastDiaSourceMjdTai,latestR,nDiaSources,ra,decl,separationArcsec,UTC
0,313958955284430880,61069.082988,0.993594,19,53.565958,-26.875153,68.262,
1,313871013127389233,61088.212444,0.924362,49,150.121439,1.163596,0.744,2026-02-17 05:13:10
2,313875416830444113,61088.212444,0.996161,20,149.601949,0.728079,0.228,2026-02-17 05:13:10


%4|1771593406.677|MAXPOLL|rdkafka#consumer-3| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 170ms (adjust max.poll.interval.ms for long-running message processing): leaving group


As you can see the columns we have here are the same listed in our SQL `SELECT` query for the [Lasair Tutorial Basic Stream Filter](https://lasair-lsst-dev.lsst.ac.uk/filters/130/).


**WARNING: UPDATE LINK**



![basicfilter](img/basic_stream_filter.png)


---

### 3 types of stream: Basic, Lite Lightcurve, Full Packet

When creating your filter you can select from a few options:
* **Kafka stream**: Just the fields you selected during Filter creation
* **Lite lightcurve**: The fields you selected at filter creation + the lightcurve history 
* **Full Alert**: The fields you selected + the full alert packet. 


In the example above we've only looked at the most basic form of output. 
Now we are going to play with the lightcurve and full alert modes.


[Docs Reference: Alert Streams](https://lasair-lsst.readthedocs.io/en/main/core_functions/alert-streams.html#alert-streams)


---

<h2 align="center"> 5. Lite Lightcurve Alerts</h2>

To get the lite Ligthcruev data we have to **Change our Topic to point to the right filter**. 

In [17]:
topic = "lasair_83lasair_tutorial_lite_lightcurve"

We also need to recreate our consumer to point to the right topic (note that changing the `group_id` here is unnecessary)

In [18]:
consumer = lasair_consumer(
    kafka_server,    
    group_id,        
    topic            
)

Poll a few alerts, save to file and get them in a data frame, just like before.

In [19]:
n = 0
first = True
# To ensure we don't leave out file open we work within a `with` scope
with open(OUTPUT_PATH / f"message_LiteLC.tmp.json", "w", encoding="utf-8") as f:
    # first we write the opening square bracket for the json list
    f.write("[\n")
    while n < N:
        msg = consumer.poll(timeout=20)
        if msg is None:
            break
        if msg.error():
            raise LasairError("Error while consuming message: {}".format(msg.error()))
            break
        # 2. If we make it here it means we have messages. 
        raw = msg.value()
        # msg.value() may be bytes or str depending on client
        if isinstance(raw, bytes):
            raw = raw.decode("utf-8")

        # 3. Get the JSON data for our alert.
        result = json.loads(raw)
        
        # write comma before each object after the first
        if not first:
            f.write(",\n")
        first = False
        
        json.dump(result, f, indent=2, ensure_ascii=False)


        n += 1
    f.write("]\n")

os.replace(str(OUTPUT_PATH / f"message_LiteLC.tmp.json"), str(OUTPUT_PATH / f"message_LiteLC.json"))

In [20]:
dat_llc = pd.read_json(OUTPUT_PATH/"message_LiteLC.json") 

In [21]:
dat_llc.head()

Unnamed: 0,diaObjectId,lastDiaSourceMjdTai,latestR,nDiaSources,ra,decl,separationArcsec,UTC,alert
0,313708285369778251,61069.071948,0.974207,7,7.968275,-45.168417,87.476,,
1,314003014503366706,61088.211979,0.957076,2,149.637643,2.846224,3.444,2026-02-17 05:13:11,{'diaObject': {'diaObjectId': 3140030145033667...
2,313862203195261039,61088.211979,0.941332,32,149.701174,2.675891,6.958,2026-02-17 05:13:11,{'diaObject': {'diaObjectId': 3138622031952610...


**[FINISH WHEN FILTER RAN WITH MORE ALERTS]**

In [24]:
dat_llc['alert'].iloc[1].keys()

dict_keys(['diaObject', 'diaSourcesList', 'diaForcedSourcesList', 'diaNondetectionLimitsList', 'ebv'])

%4|1771593966.556|MAXPOLL|rdkafka#consumer-4| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 303ms (adjust max.poll.interval.ms for long-running message processing): leaving group


<h2 align="center"> 6. Full Alert Packet Data </h2>

In [25]:
topic = "lasair_83lasair_tutorial_full_alert"
consumer = lasair_consumer(
    kafka_server,    
    group_id,        
    topic            
)
N = 3

In [27]:
n = 0
first = True
# To ensure we don't leave out file open we work within a `with` scope
with open(OUTPUT_PATH / f"message_FULL.tmp.json", "w", encoding="utf-8") as f:
    # first we write the opening square bracket for the json list
    f.write("[\n")
    while n < N:
        msg = consumer.poll(timeout=20)
        if msg is None:
            break
        if msg.error():
            raise LasairError("Error while consuming message: {}".format(msg.error()))
            break
        # 2. If we make it here it means we have messages. 
        raw = msg.value()
        # msg.value() may be bytes or str depending on client
        if isinstance(raw, bytes):
            raw = raw.decode("utf-8")

        # 3. Get the JSON data for our alert.
        result = json.loads(raw)
        
        # write comma before each object after the first
        if not first:
            f.write(",\n")
        first = False
        
        json.dump(result, f, indent=2, ensure_ascii=False)


        n += 1
    f.write("]\n")

os.replace(str(OUTPUT_PATH / f"message_FULL.tmp.json"), str(OUTPUT_PATH / f"message_FULL.json"))

In [28]:
dat_full= pd.read_json(OUTPUT_PATH/"message_FULL.json") 

In [29]:
dat_full.head()

Unnamed: 0,diaObjectId,lastDiaSourceMjdTai,latestR,nDiaSources,ra,decl,separationArcsec,UTC,alert
0,313853533151035396,61088.211979,0.932557,51,150.638398,2.578472,5.498,2026-02-17 05:13:12,{'diaObject': {'diaObjectId': 3138535331510353...
1,313941448576204866,61088.097446,0.934364,30,53.739286,-28.454923,9.64,2026-02-17 05:13:12,{'diaObject': {'diaObjectId': 3139414485762048...
2,313941448626012241,61088.097446,0.900823,27,53.550288,-26.82895,0.937,2026-02-17 05:13:12,{'diaObject': {'diaObjectId': 3139414486260122...


Here we have a funky column at the end named `alert`, which contains some heavy nested dictionaries: **THE FULL ALERT FROM RUBIN** (Except the image stamps - we will NEVER send you the stamps through the stream).

Let's see what's in there:

In [32]:
dat_full['alert'].iloc[0].keys()

dict_keys(['diaObject', 'diaSourcesList', 'diaForcedSourcesList', 'diaNondetectionLimitsList', 'ebv'])

We first have the contents of the `diaObject` table (for Schema see Rubin Alert Production [diaObject](https://sdm-schemas.lsst.io/apdb.html#DiaObject))

In [33]:
dat_full['alert'].iloc[0]['diaObject']

{'diaObjectId': 313853533151035396,
 'validityStartMjdTai': 61088.213659945,
 'ra': 150.63839800028137,
 'raErr': 2.6492096367292103e-05,
 'decErr': 2.78653697023401e-05,
 'ra_dec_Cov': -1.236832580797653e-10,
 'u_psfFluxMean': None,
 'u_psfFluxMeanErr': None,
 'u_psfFluxSigma': None,
 'u_psfFluxNdata': 0,
 'u_fpFluxMean': None,
 'u_fpFluxMeanErr': None,
 'g_psfFluxMean': None,
 'g_psfFluxMeanErr': None,
 'g_psfFluxSigma': None,
 'g_psfFluxNdata': 0,
 'g_fpFluxMean': None,
 'g_fpFluxMeanErr': None,
 'r_psfFluxMean': 1151.55517578125,
 'r_psfFluxMeanErr': 28.16971778869629,
 'r_psfFluxSigma': 132.42359924316406,
 'r_psfFluxNdata': 23,
 'r_fpFluxMean': None,
 'r_fpFluxMeanErr': None,
 'i_psfFluxMean': None,
 'i_psfFluxMeanErr': None,
 'i_psfFluxSigma': None,
 'i_psfFluxNdata': 0,
 'i_fpFluxMean': None,
 'i_fpFluxMeanErr': None,
 'z_psfFluxMean': None,
 'z_psfFluxMeanErr': None,
 'z_psfFluxSigma': None,
 'z_psfFluxNdata': 0,
 'z_fpFluxMean': None,
 'z_fpFluxMeanErr': None,
 'y_psfFluxMean

%4|1771594353.840|MAXPOLL|rdkafka#consumer-5| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 496ms (adjust max.poll.interval.ms for long-running message processing): leaving group


but a dictionary like that is not very readable - so let's put it in pandas:

In [43]:
__dat  = pd.DataFrame(dat_full['alert'].iloc[0]['diaObject'], index=[0])
__dat.head()

Unnamed: 0,diaObjectId,validityStartMjdTai,ra,raErr,decErr,ra_dec_Cov,u_psfFluxMean,u_psfFluxMeanErr,u_psfFluxSigma,u_psfFluxNdata,...,z_psfFluxMaxSlope,z_psfFluxErrMean,y_psfFluxMin,y_psfFluxMax,y_psfFluxMaxSlope,y_psfFluxErrMean,firstDiaSourceMjdTai,lastDiaSourceMjdTai,nDiaSources,decl
0,313853533151035396,61088.21366,150.638398,2.6e-05,2.8e-05,-1.236833e-10,,,,0,...,,,,,,,,,51,2.578472


Then we have the contents of the diaSource table (99 rows) for EACH `diaSourceId` in the history. (see [Rubin Schema for diaSource](https://sdm-schemas.lsst.io/apdb.html#DiaSource)) 

In [37]:
# How many diaSources do we have here?
len(dat_full['alert'].iloc[0]['diaSourcesList'])

51

In [38]:
# First one as a n example
dat_full['alert'].iloc[0]['diaSourcesList'][0]

{'diaSourceId': 313853533151035396,
 'visit': 2025121401028,
 'detector': 95,
 'diaObjectId': 313853533151035396,
 'ssObjectId': None,
 'parentDiaSourceId': 0,
 'midpointMjdTai': 61024.32082858987,
 'ra': 150.638329896706,
 'raErr': 2.6492096367292103e-05,
 'decErr': 2.78653697023401e-05,
 'ra_dec_Cov': -1.236832580797653e-10,
 'x': 2669.985595703125,
 'xErr': 0.444157928228378,
 'y': 82.73981475830078,
 'yErr': 0.530026137828826,
 'centroid_flag': False,
 'apFlux': 1492.971435546875,
 'apFluxErr': 578.626220703125,
 'apFlux_flag': False,
 'apFlux_flag_apertureTruncated': False,
 'isNegative': False,
 'snr': 5.279487609863281,
 'psfFlux': 1442.251708984375,
 'psfFluxErr': 267.03643798828125,
 'psfLnL': None,
 'psfChi2': 1639.921630859375,
 'psfNdata': 1681,
 'psfFlux_flag': False,
 'psfFlux_flag_edge': False,
 'psfFlux_flag_noGoodPixels': False,
 'trailFlux': 1324.7528076171875,
 'trailFluxErr': 41.209903717041016,
 'trailRa': 150.6383481175432,
 'trailRaErr': None,
 'trailDec': 2.5785

Let's put all our sources into one big table.


In [45]:

df_source_ls = []

# FOR EACH SOURCE
for i in range(len(dat_full['alert'].iloc[0]['diaSourcesList'])):
    # Turns the dictionary into a one row dataframe and append it to the list
    df_source_ls.append(pd.DataFrame(dat_full['alert'].iloc[0]['diaSourcesList'][i], 
                                     index=[i]) # We can also ensure the index makes sense
                                     )

# Then we concatenate our small dataframes into our final table!
# NOTE: This method of concatenation is actually more efficient than concatenanting
# as we go along because for large dataframes, the concatenation creates a copy of the large 
# dataframe each time. So every loop, the process is slowed down. 
df_sources = pd.concat(df_source_ls, ignore_index=True)
df_sources.head()

  df_sources = pd.concat(df_source_ls, ignore_index=True)


Unnamed: 0,diaSourceId,visit,detector,diaObjectId,ssObjectId,parentDiaSourceId,midpointMjdTai,ra,raErr,decErr,...,pixelFlags_suspect,pixelFlags_suspectCenter,pixelFlags_streak,pixelFlags_streakCenter,pixelFlags_injected,pixelFlags_injectedCenter,pixelFlags_injected_template,pixelFlags_injected_templateCenter,glint_trail,decl
0,313853533151035396,2025121401028,95,313853533151035396,,0,61024.320829,150.63833,2.6e-05,2.8e-05,...,False,False,False,False,False,False,False,False,False,2.578488
1,313853533826842654,2025121401033,104,313853533151035396,,0,61024.326482,150.638354,3.8e-05,3.8e-05,...,False,False,False,False,False,False,False,False,False,2.578477
2,313862203239825515,2025121600089,145,313853533151035396,,0,61026.213658,150.638375,8.2e-05,7.2e-05,...,False,False,False,False,False,False,False,False,False,2.578493
3,313871013157273643,2025121800192,145,313853533151035396,,0,61028.318308,150.638425,1.2e-05,1.6e-05,...,False,False,False,False,False,False,False,False,False,2.578468
4,313871013291491384,2025121800193,145,313853533151035396,,0,61028.318805,150.638408,1.6e-05,1.4e-05,...,False,False,False,False,False,False,False,False,False,2.578476


Then we have the **Forced Photometry** in `diaForcedSourcesList`, one table for each forced source. See [Rubin Schema for diaForcedSource](https://sdm-schemas.lsst.io/apdb.html#DiaForcedSource)

In [41]:
len(dat_full['alert'].iloc[0]['diaForcedSourcesList'])

1

In [40]:
dat_full['alert'].iloc[0]['diaForcedSourcesList'][0]

{'diaForcedSourceId': 170019716326228034,
 'diaObjectId': 313853533151035396,
 'ra': 150.6383980002814,
 'visit': 2026021600255,
 'detector': 142,
 'psfFlux': 1404.9306640625,
 'psfFluxErr': 142.97650146484375,
 'midpointMjdTai': 61088.211979366155,
 'scienceFlux': 4937.51806640625,
 'scienceFluxErr': 138.020263671875,
 'band': 'r',
 'timeProcessedMjdTai': 61088.21365986636,
 'timeWithdrawnMjdTai': None,
 'decl': 2.578472037566413}

Finally we have the extinction, E(B-V).

In [46]:
dat_full['alert'].iloc[0]['ebv']

'0.019336736'

In [49]:
# Always empty at the minute [TBC]
dat_full['alert'].iloc[0]['diaNondetectionLimitsList']

[]

--- 


<h2 align="center"> What's Next </h2>


- If you want to make a consumer script for yourself, you can start with the fully functional and commented sample script under `scripts/kafka_consumer.py`. 

- If you want to get alert data that is not in a kafka queue, **the `API_recipes.ipynb` notebook is for you!**


<h2 align="center"> How to Get Help </h2>

If you have a question or need any help, you can find past questions or as a new one on the **[Community Forum](https://community.lsst.org/c/support/support-lasair/55)**.


![communityforum](img/community_forum.png)
