Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-25826: lsst.alert.packet reader should iterate over alerts #21

Merged
merged 16 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions python/lsst/alert/packet/bin/validateAvroRoundTrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@

import lsst.alert.packet

# The default filename of the root of the alert schema.
SCHEMA_FILENAME = "lsst.alert.avsc"

# The default filename of per-schema sample alert data.
SAMPLE_FILENAME = "alert.json"


def schema_filename(major_version, minor_version):
return f"lsst.v{major_version}_{minor_version}.alert.avsc"


def check_file_round_trip(baseline, received_data):
"""Assert that the contents of baseline is equal to received_data.

Expand Down Expand Up @@ -81,8 +83,10 @@ def main():
schema_major, schema_minor = args.schema_version.split(".")
schema_root = lsst.alert.packet.get_schema_path(schema_major, schema_minor)

alert_schema = lsst.alert.packet.Schema.from_file(os.path.join(schema_root,
SCHEMA_FILENAME))
alert_schema = lsst.alert.packet.Schema.from_file(
os.path.join(schema_root,
schema_filename(schema_major, schema_minor)),
)
if args.input_data:
input_data = args.input_data
else:
Expand Down
22 changes: 19 additions & 3 deletions python/lsst/alert/packet/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Routines for loading data from files.
"""

import itertools
import os.path

import fastavro
Expand Down Expand Up @@ -68,5 +68,21 @@ def retrieve_alerts(fp, reader_schema=None):
except Exception as e:
raise RuntimeError(f"failed to find alert data in "
f"{fp.name if hasattr(fp, 'name') else 'stream'}") from e
records = [rec for rec in reader]
return Schema(reader.writer_schema), records

# Peek at one record so that reader.writer_schema is populated, since it
# gets loaded lazily. If you don't do this, then reader.writer_schema would
# be None, which means Schema(reader.writer_schema) would get an empty
# value.
#
# It would be simpler to do something like 'records = list(reader)', but
# that would require loading all the records into memory in one gulp. Since
# alert files can be huge, even terabytes, the extra complexity here is
# worth it.
try:
first_record = next(reader)
records = itertools.chain([first_record], reader)
except StopIteration:
# The file has zero records in it. It might still have a schema, though.
records = []
writer_schema = Schema(reader.writer_schema)
return writer_schema, records
18 changes: 4 additions & 14 deletions python/lsst/alert/packet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ class Schema(object):
"""
def __init__(self, schema_definition):
self.definition = resolve_schema_definition(schema_definition)
fastavro.schema._schema.SCHEMA_DEFS.clear()

def serialize(self, record):
"""Create an Avro representation of data following this schema.
Expand Down Expand Up @@ -220,19 +219,9 @@ def validate(self, record):
-------
valid : `bool`
Whether or not the data complies with the schema.

Notes
-----
Validating the against the schema requires that the fastavro cache
(``SCHEMA_DEFS``) be populated, but that can only be the case for one
version of the schema at once. Hence we populate, check for validity,
and flush the cache.
"""
fastavro.parse_schema(self.definition)
try:
return fastavro.validate(record, self.definition)
finally:
fastavro.schema._schema.SCHEMA_DEFS.clear()
return fastavro.validate(record, self.definition)

def store_alerts(self, fp, records):
"""Store alert packets to the given I/O stream.
Expand Down Expand Up @@ -267,7 +256,8 @@ def retrieve_alerts(self, fp):
Alert records.
"""
from .io import retrieve_alerts
return retrieve_alerts(fp, reader_schema=self)
schema, records = retrieve_alerts(fp, reader_schema=self)
return schema, records

def __eq__(self, other):
"""Compare schemas for equality.
Expand All @@ -278,7 +268,7 @@ def __eq__(self, other):
return self.definition == other.definition

@classmethod
def from_file(cls, filename=None, root_name="lsst.alert"):
def from_file(cls, filename=None, root_name="lsst.v3_0.alert"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little evil, and hardcodes a v3.0 expectation. That's okay while there's only one version available, though, I think.

"""Instantiate a `Schema` by reading its definition from the filesystem.

Parameters
Expand Down
22 changes: 0 additions & 22 deletions python/lsst/alert/packet/schema/1/0/lsst.alert.avsc

This file was deleted.

9 changes: 0 additions & 9 deletions python/lsst/alert/packet/schema/1/0/lsst.alert.cutout.avsc

This file was deleted.

159 changes: 0 additions & 159 deletions python/lsst/alert/packet/schema/1/0/lsst.alert.diaSource.avsc

This file was deleted.