Skip to content

Commit

Permalink
Merge branch 'tickets/DM-25826'
Browse files Browse the repository at this point in the history
  • Loading branch information
ebellm committed Jul 10, 2020
2 parents 602a49a + 7bb6f1b commit e2bc8cb
Show file tree
Hide file tree
Showing 38 changed files with 176 additions and 2,534 deletions.
14 changes: 9 additions & 5 deletions python/lsst/alert/packet/bin/validateAvroRoundTrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@

import lsst.alert.packet

# The default filename of the root of the alert schema.
SCHEMA_FILENAME = "lsst.alert.avsc"

# The default filename of per-schema sample alert data.
SAMPLE_FILENAME = "alert.json"


def schema_filename(major_version, minor_version):
return f"lsst.v{major_version}_{minor_version}.alert.avsc"


def check_file_round_trip(baseline, received_data):
"""Assert that the contents of baseline is equal to received_data.
Expand Down Expand Up @@ -81,8 +83,10 @@ def main():
schema_major, schema_minor = args.schema_version.split(".")
schema_root = lsst.alert.packet.get_schema_path(schema_major, schema_minor)

alert_schema = lsst.alert.packet.Schema.from_file(os.path.join(schema_root,
SCHEMA_FILENAME))
alert_schema = lsst.alert.packet.Schema.from_file(
os.path.join(schema_root,
schema_filename(schema_major, schema_minor)),
)
if args.input_data:
input_data = args.input_data
else:
Expand Down
22 changes: 19 additions & 3 deletions python/lsst/alert/packet/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Routines for loading data from files.
"""

import itertools
import os.path

import fastavro
Expand Down Expand Up @@ -68,5 +68,21 @@ def retrieve_alerts(fp, reader_schema=None):
except Exception as e:
raise RuntimeError(f"failed to find alert data in "
f"{fp.name if hasattr(fp, 'name') else 'stream'}") from e
records = [rec for rec in reader]
return Schema(reader.writer_schema), records

# Peek at one record so that reader.writer_schema is populated, since it
# gets loaded lazily. If you don't do this, then reader.writer_schema would
# be None, which means Schema(reader.writer_schema) would get an empty
# value.
#
# It would be simpler to do something like 'records = list(reader)', but
# that would require loading all the records into memory in one gulp. Since
# alert files can be huge, even terabytes, the extra complexity here is
# worth it.
try:
first_record = next(reader)
records = itertools.chain([first_record], reader)
except StopIteration:
# The file has zero records in it. It might still have a schema, though.
records = []
writer_schema = Schema(reader.writer_schema)
return writer_schema, records
43 changes: 26 additions & 17 deletions python/lsst/alert/packet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@

import fastavro

__all__ = ["get_schema_root", "get_latest_schema_version", "get_schema_path", "Schema"]
__all__ = ["get_schema_root", "get_latest_schema_version", "get_schema_path",
"Schema", "get_path_to_latest_schema"]


def get_schema_root():
"""Return the root of the directory within which schemas are stored.
"""
return pkg_resources.resource_filename(__name__, "schema")


def get_latest_schema_version():
"""Get the latest schema version.
Expand All @@ -52,6 +55,7 @@ def get_latest_schema_version():
major, minor = clean.split(b".", 1)
return int(major), int(minor)


def get_schema_path(major, minor):
"""Get the path to a package resource directory housing alert schema
definitions.
Expand All @@ -70,12 +74,26 @@ def get_schema_path(major, minor):
"""

# Note that posixpath is right here, not os.path, since pkg_resources always
# uses slash-delimited paths, even on Windows.
# Note that posixpath is right here, not os.path, since pkg_resources
# always uses slash-delimited paths, even on Windows.
path = posixpath.join("schema", str(major), str(minor))
return pkg_resources.resource_filename(__name__, path)


def get_path_to_latest_schema():
"""Get the path to the primary schema file for the latest schema.
Returns
-------
path : `str`
Path to the latest primary schema file.
"""

major, minor = get_latest_schema_version()
schema_path = get_schema_path(major, minor)
return posixpath.join(schema_path, f"lsst.v{major}_{minor}.alert.avsc")


def resolve_schema_definition(to_resolve, seen_names=None):
"""Fully resolve complex types within a schema definition.
Expand Down Expand Up @@ -143,6 +161,7 @@ def resolve_schema_definition(to_resolve, seen_names=None):

return output


class Schema(object):
"""An Avro schema.
Expand Down Expand Up @@ -172,7 +191,6 @@ class Schema(object):
"""
def __init__(self, schema_definition):
self.definition = resolve_schema_definition(schema_definition)
fastavro.schema._schema.SCHEMA_DEFS.clear()

def serialize(self, record):
"""Create an Avro representation of data following this schema.
Expand Down Expand Up @@ -220,19 +238,9 @@ def validate(self, record):
-------
valid : `bool`
Whether or not the data complies with the schema.
Notes
-----
Validating the against the schema requires that the fastavro cache
(``SCHEMA_DEFS``) be populated, but that can only be the case for one
version of the schema at once. Hence we populate, check for validity,
and flush the cache.
"""
fastavro.parse_schema(self.definition)
try:
return fastavro.validate(record, self.definition)
finally:
fastavro.schema._schema.SCHEMA_DEFS.clear()
return fastavro.validate(record, self.definition)

def store_alerts(self, fp, records):
"""Store alert packets to the given I/O stream.
Expand Down Expand Up @@ -267,7 +275,8 @@ def retrieve_alerts(self, fp):
Alert records.
"""
from .io import retrieve_alerts
return retrieve_alerts(fp, reader_schema=self)
schema, records = retrieve_alerts(fp, reader_schema=self)
return schema, records

def __eq__(self, other):
"""Compare schemas for equality.
Expand All @@ -278,7 +287,7 @@ def __eq__(self, other):
return self.definition == other.definition

@classmethod
def from_file(cls, filename=None, root_name="lsst.alert"):
def from_file(cls, filename=None, root_name="lsst.v3_0.alert"):
"""Instantiate a `Schema` by reading its definition from the filesystem.
Parameters
Expand Down
22 changes: 0 additions & 22 deletions python/lsst/alert/packet/schema/1/0/lsst.alert.avsc

This file was deleted.

9 changes: 0 additions & 9 deletions python/lsst/alert/packet/schema/1/0/lsst.alert.cutout.avsc

This file was deleted.

159 changes: 0 additions & 159 deletions python/lsst/alert/packet/schema/1/0/lsst.alert.diaSource.avsc

This file was deleted.

0 comments on commit e2bc8cb

Please sign in to comment.