In [34]:
import numpy as np
from astropy.time import Time
import random
from lsst.summit.utils.efdUtils import makeEfdClient, getEfdData
from astropy.time import Time
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
from alertParserUtilities_two import parseAlertContents
import asyncio
import urllib
import json
import lsst.alert.packet as packet
from datetime import datetime
import pandas as pd

In [35]:
async def getDataFrame(client, starts, ends, topic, verbose=True, fields=None):

    all_data = pd.DataFrame()
    for start, end in zip(starts, ends):
        if verbose:
            print(
                r"Starting query for time range {} - {}".format(start, end),
                end=" . . . ",
            )
        if fields != None:
            df_bump = await client.select_time_series(
                topic, fields, Time(start), Time(end)
            )
        else:
            df_bump = await client.select_time_series(
                topic, "*", Time(start), Time(end)
            )

        all_data = pd.concat([all_data, df_bump], ignore_index=False)

        del df_bump

        if verbose:
            print("Finished")

    return all_data

async def query_bump_logs_in_chunks(
    start_date,
    end_date,
    client_name="",
    chunk_size_days=3,
    topic_name="lsst.sal.MTM1M3.logevent_logMessage",
    fields=["message"],
):
    """
    Queries the log messages related to bump tests from the EFD in chunks.

    Args:
        start_date (str): Start date of the query in ISO format (YYYY-MM-DD).

        end_date (str): End date of the query in ISO format (YYYY-MM-DD).

        client_name (str, optional): Name of the EFD client. Defaults to "".

        chunk_size_days (int, optional): Number of days per chunk. Defaults to 3.

        topic_name (str, optional): SAL topic name to be queried by the client. Defaults to lsst.sal.MTM1M3.logevent_logMessage.

        fields (list[str], optional): Fields to be queried by the client. Defaults to ["message"].

    Returns:
        pandas.DataFrame: Concatenated DataFrame containing the queried log messages.
    """

    client = makeClient(client_name)

    # Convert start and end dates to datetime objects
    start = datetime.fromisoformat(start_date)
    end = datetime.fromisoformat(end_date)

    # Initialize an empty DataFrame to store concatenated results
    all_data = pd.DataFrame()

    current_start = start
    while current_start < end:
        current_end = min(current_start + timedelta(days=chunk_size_days), end)
        try:
            # Query the data for the current chunk
            chunk_data = await client.select_time_series(
                topic_name=topic_name,
                fields=fields,
                start=Time(current_start.isoformat(), format="isot", scale="utc"),
                end=Time(current_end.isoformat(), format="isot", scale="utc"),
            )
            # Concatenate the chunk data to the main DataFrame
            all_data = pd.concat([all_data, chunk_data], ignore_index=False)
        except Exception as e:
            print(
                f"Error querying data from {current_start.isoformat()} to {current_end.isoformat()}: {e}"
            )
            continue  # Optionally, continue to the next chunk

        # Move to the next chunk
        current_start = current_end

    return all_data

def makeClient(client_name):
    # Create the client based on client_name
    if client_name == "summit_efd":
        return makeEfdClient("summit_efd")
    elif client_name == "usdf_efd":
        return makeEfdClient("usdf_efd")
    elif client_name == "idf_efd":
        return makeEfdClient("idf_efd")
    else:
        return makeEfdClient()  # Default client

In [36]:
def parseAlertContents(dictionary, ra, dec, time, obs_reason, target_name):
    """
    A function to resolve alert contents from the header file of an acquired image
    """
    t = Time.now()
    t_tai = t.tai
    t_mjdtai = t_tai.mjd
    dictionary["diaSource"]["ra"] = ra
    dictionary["diaSource"]["dec"] = dec
    dictionary["observation_reason"] = obs_reason
    dictionary["target_name"] = target_name
    dictionary["diaSource"]["midpointMJDTai"] =  time
    dictionary["diaSource"]["timeProcessedMjdTai"] = time
    
    sourceID = generateSourceID()
    dictionary["diaSourceId"] = sourceID
    dictionary["diaSource"]["diaSourceId"] = sourceID

    return dictionary

def generateSourceID():
    return np.random.randint(9223372036854775808,dtype=np.int64)

def getRA(inputList,num=1000):

    raStart = float(getCoordFromKeyword("RASTART",inputList))
    raEnd = float(getCoordFromKeyword("RAEND",inputList))

    avgCenterRA = np.mean([raStart,raEnd]) # Take the average of the two for now
    floats = np.linspace(0, 1, num=num)
    ras = np.array(random.choices(floats, k=num))*1.75 + avgCenterRA

    return ras
    

def getDec(inputList,num=1000):

    decStart = float(getCoordFromKeyword("DECSTART",inputList))
    decEnd = float(getCoordFromKeyword("DECEND",inputList))

    avgCenterDec = np.mean([decStart,decEnd]) # Take the average of the two for now
    floats = np.linspace(0, 1, num=num)
    decs = np.array(random.choices(floats, k=num))*1.75 + avgCenterDec

    return decs

def getTime(inputList):
    midpoint_mjdtai = float(getCoordFromKeyword("MJD-OBS",inputList))
    return midpoint_mjdtai

def getObsReason(inputList):
    obsReasons = getCoordFromKeyword("REASON",inputList,returnAll=True)

    returnString = ""
    for k in obsReasons[3:]:
        returnString+=k
        if k!=obsReasons[-1]:
            returnString+=", "

    return returnString

def getTargetName(inputList):
    
    obsReasons = getCoordFromKeyword("COMMENT",inputList,returnAll=True)

    returnString = ""
    for k in obsReasons[3:]:
        returnString+=k
        if k!=obsReasons[-1]:
            returnString+=", "

    return returnString

def getCoordFromKeyword(keyword,inputList,index=-1,returnAll=False):
    msk = ["keyword: {}".format(keyword) in s for s in inputList]
    firstIndex = np.argwhere(inputList==inputList[msk][0])[0][0]
    if returnAll:
        return inputList[firstIndex+1].split(" ")
    else:
        return inputList[firstIndex+1].split(" ")[index]

In [50]:
client = makeClient("base_efd",)
start = Time("2025-08-22T20:30:16.20")
end = Time("2025-10-29T17:20")

topic = "lsst.sal.MTHeaderService.logevent_largeFileObjectAvailable"
result = await getDataFrame(client,[np.datetime64(start.to_datetime())],[np.datetime64(end.to_datetime())],topic)
result = result.sort_index()

# yml = urllib.request.urlopen(result.iloc[-1]['url']).read().decode()
# inputList = np.array(yml.split("\n"))

Starting query for time range 2025-08-22T20:30:16.200000 - 2025-10-29T17:20:00.000000 . . . Finished


In [51]:
result.iloc[0],result.iloc[-1]

(byteSize                                                         339376
 checkSum                               dc6f9a72088626336078104a45367224
 generator                                               MTHeaderService
 id                                                 MC_O_20251021_000001
 mimeType                                                           YAML
 private_efdStamp                                      1761056853.962062
 private_identity                                        MTHeaderService
 private_kafkaStamp                                    1761056890.962062
 private_origin                                                       12
 private_rcvStamp                                                      0
 private_revCode                                                3d4c0d90
 private_seqNum                                                       47
 private_sndStamp                                      1761056890.962062
 url                   https://s3.ls.lsst.org/rubin

In [38]:
# number=7500+np.random.random()*5000 # This will simulated 10k +/- 2.5k alerts
number=10000 # This is a uniform 10k

In [52]:
schema = packet.SchemaRegistry.from_filesystem(schema_root="lsst.v9_0.alert").get_by_version("9.0")

In [54]:
schema.definition

{'type': 'record',
 'doc': 'Rubin Avro alert schema v9.0',
 'name': 'lsst.v9_0.alert',
 'fields': [{'doc': 'Identifier of the triggering DiaSource',
   'name': 'diaSourceId',
   'type': 'long'},
  {'doc': 'Scheduler reason for the image containing this diaSource (RTN-097).',
   'default': None,
   'name': 'observation_reason',
   'type': ['null', 'string']},
  {'doc': 'Scheduler target for the image containing this diaSource (RTN-097).',
   'default': None,
   'name': 'target_name',
   'type': ['null', 'string']},
  {'name': 'diaSource',
   'type': {'type': 'record',
    'name': 'lsst.v9_0.diaSource',
    'fields': [{'doc': 'Unique identifier of this DiaSource.',
      'name': 'diaSourceId',
      'type': 'long'},
     {'doc': 'Id of the visit where this diaSource was measured.',
      'name': 'visit',
      'type': 'long'},
     {'doc': 'Id of the detector where this diaSource was measured. Datatype short instead of byte because of DB concerns about unsigned bytes.',
      'name': 'de

In [41]:
%%time 
schema = packet.SchemaRegistry.from_filesystem(schema_root="lsst.v9_0.alert").get_by_version("9.0")

myDictionary = {"diaSource":{},"diaObject":{}}
ras = getRA(inputList,num=number)
decs = getDec(inputList,num=number)
midpoint_mjdtai = getTime(inputList)
obs_reason = getObsReason(inputList)
target_name = getTargetName(inputList)
bigResult = []
for k in np.arange(number):
    resultDictionary = parseAlertContents(myDictionary, ras[k], decs[k], midpoint_mjdtai, obs_reason, target_name)

    resultDictionary['diaSource']['time_processed'] = float(Time(resultDictionary["diaSource"]["midpointMJDTai"],format='mjd').value+0.003)
    resultDictionary['diaObject']['validityStart'] = float(Time(resultDictionary["diaSource"]["midpointMJDTai"],format='mjd').value)
    resultDictionary['diaSource']['midpointMjdTai'] = float(Time(resultDictionary["diaSource"]["midpointMJDTai"],format='mjd').value)

    result = packet.simulate.simulate_alert(schema.definition)

    for k in resultDictionary.keys():
        if type(result[k])==dict:
            if result[k]==None:
                result[k]={}
            for k1 in resultDictionary[k].keys():
                result[k][k1] = resultDictionary[k][k1]
        else:
            result[k] = resultDictionary[k]

    bigResult.append(result)
    
with open("./fakeAlert.avro", "wb") as f:
    schema.store_alerts(f, bigResult)

CPU times: user 5.6 s, sys: 0 ns, total: 5.6 s
Wall time: 5.7 s


In [42]:
result

{'diaSourceId': np.int64(1642671371977578505),
 'observation_reason': 'null',
 'target_name': "''",
 'diaSource': {'diaSourceId': np.int64(1642671371977578505),
  'visit': 8934913513110285479,
  'detector': -1247575987,
  'diaObjectId': None,
  'ssObjectId': None,
  'parentDiaSourceId': None,
  'midpointMjdTai': 60961.981981116114,
  'ra': np.float64(306.22622070625454),
  'raErr': None,
  'dec': np.float64(-20.01870172081949),
  'decErr': None,
  'ra_dec_Cov': None,
  'x': 0.12992256879806519,
  'xErr': None,
  'y': 0.4828704297542572,
  'yErr': None,
  'centroid_flag': None,
  'apFlux': None,
  'apFluxErr': None,
  'apFlux_flag': None,
  'apFlux_flag_apertureTruncated': None,
  'isNegative': None,
  'snr': None,
  'psfFlux': None,
  'psfFluxErr': None,
  'psfLnL': None,
  'psfChi2': None,
  'psfNdata': None,
  'psfFlux_flag': None,
  'psfFlux_flag_edge': None,
  'psfFlux_flag_noGoodPixels': None,
  'trailFlux': None,
  'trailFluxErr': None,
  'trailRa': None,
  'trailRaErr': None,
  