Skip to content

Commit

Permalink
Use lsst-resources for URI manipulation and add from_uri() method
Browse files Browse the repository at this point in the history
The from_uri() method is complicated by fastavro requiring that
all referenced schemas be available locally.
  • Loading branch information
timj committed Aug 30, 2023
1 parent d2b4c6b commit c0f9b6b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 17 deletions.
81 changes: 64 additions & 17 deletions python/lsst/alert/packet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
"""Routines for working with Avro schemas.
"""

from __future__ import annotations

import io
import os.path
import tempfile
from importlib import resources
from pathlib import PurePath
from lsst.resources import ResourcePath

import fastavro

Expand All @@ -42,13 +45,17 @@ def _get_ref(*args):
return resources.files("lsst.alert.packet").joinpath(*args)


def _get_uri_str(*args):
"""Return the package resource in the form of a URI.
def _get_dir_uri(*args: str) -> ResourcePath:
"""Return the package resource associated with the given directory
components as a URI.
This URI is suitable for use by `lsst.resources.ResourcePath`
and uses the ``resource`` URI scheme.
Returns
-------
uri : `lsst.resources.ResourcePath`
The URI derived from the supplied paths.
"""
return "resource://lsst.alert.packet/" + "/".join(args)
return ResourcePath("resource://lsst.alert.packet/" + "/".join(args),
forceDirectory=True)


def get_schema_root():
Expand All @@ -59,11 +66,10 @@ def get_schema_root():
return _get_ref("schema")


def get_schema_root_uri():
def get_schema_root_uri() -> ResourcePath:
"""Return the ``resource`` URI corresponding to the location where
schemas are stored."""
# Add trailing / to indicate that we know this is a directory.
return _get_uri_str("schema") + "/"
return _get_dir_uri("schema")


def get_latest_schema_version():
Expand Down Expand Up @@ -104,7 +110,7 @@ def get_schema_path(major, minor):
return _get_ref("schema", str(major), str(minor)).as_posix()


def get_schema_uri(major, minor):
def get_schema_uri(major: int, minor: int) -> ResourcePath:
"""Get the URI to a package resource directory housing alert schema
definitions.
Expand All @@ -117,12 +123,10 @@ def get_schema_uri(major, minor):
Returns
-------
uri : `str`
uri : `lsst.resources.ResourcePath`
``resource`` URI to the directory containing the schemas.
"""
# Add trailing / to indicate that we know this URI refers
# to a directory.
return _get_uri_str("schema", str(major), str(minor)) + "/"
return _get_dir_uri("schema", str(major), str(minor))


def get_path_to_latest_schema():
Expand All @@ -139,17 +143,17 @@ def get_path_to_latest_schema():
return (schema_path / f"lsst.v{major}_{minor}.alert.avsc").as_posix()


def get_uri_to_latest_schema():
def get_uri_to_latest_schema() -> ResourcePath:
"""Get the URI to to the primary file for the latest schema.
Returns
-------
uri : `str`
uri : `lsst.resources.ResourcePath`
The ``resource`` URI to the latest schema.
"""
major, minor = get_latest_schema_version()
schema_uri = get_schema_uri(major, minor)
return schema_uri + f"lsst.v{major}_{minor}.alert.avsc"
return schema_uri.join(f"lsst.v{major}_{minor}.alert.avsc")


def resolve_schema_definition(to_resolve, seen_names=None):
Expand Down Expand Up @@ -350,6 +354,49 @@ def __eq__(self, other):
"""
return self.definition == other.definition

@classmethod
def from_uri(cls, base_uri: str | ResourcePath) -> Schema:
"""Instantiate a `Schema` by reading its definition from a URI.
Parameters
----------
base_uri : `str` or `lsst.resources.ResourcePath`
URI to the base schema as either a `~lsst.resources.ResourcePath`
or a string that can be converted to one.
"""
uri = ResourcePath(base_uri)

if uri.isLocal:
return cls.from_file(uri.ospath)

# fastavro requires that the schema file is local and that all the
# referenced schema files are also local. This means that for a remote
# URI all related schema files must be downloaded. Additionally they
# must all have the original names and not temporary names.

# Special case resource URIs. If the package is installed in expanded
# form the local file will have the original name, else if the package
# is still in a wheel it will have a temporary name.
if uri.scheme == "resource":
with uri.as_local() as local_file:
if local_file.basename() == uri.basename():
# Likely already a local file.
return cls.from_file(local_file.ospath)

# This URI is a remote resource (eg S3) or a package resource in a
# wheel. Need to scan the directory and download all .avsc files.
uri_dir = uri.dirname()

with tempfile.TemporaryDirectory() as tmpdir:
tempdir_uri = ResourcePath(tmpdir, forceDirectory=True)
for file in ResourcePath.findFileResources([uri_dir],
file_filter=f"\\{uri.getExtension()}$"):
target = tempdir_uri.join(file.basename())
print(f"Transferring from {file} to {target}")
target.transfer_from(file, transfer="copy")

return cls.from_file(tempdir_uri.join(uri.basename()).ospath)

@classmethod
def from_file(cls, filename=None):
"""Instantiate a `Schema` by reading its definition from the
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ install_requires =
fastavro
numpy
requests
lsst-resources
packages =
lsst.alert.packet
lsst.alert.packet.bin
Expand Down

0 comments on commit c0f9b6b

Please sign in to comment.