Skip to content

Commit

Permalink
Make some fixes for mypy
Browse files Browse the repository at this point in the history
The fundamental problem is that the new row for the
composite formatter has no file associated with it.
This means that the Location is None so we have to
do lots of checking for mypy for that condition.
It also means that we can not construct a Formatter
(since it requires a Location).

This leads to recording the formatter class as well
as the formatter in the dataclass we use for collecting
all the relevant information. The formatter can now be
optional.

This leads to one of the Formatter methods becoming
a class method.
  • Loading branch information
timj committed Jun 25, 2020
1 parent 7b51d02 commit 605ff20
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 29 deletions.
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/core/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ def segregateParameters(self, parameters: Optional[Dict[str, Any]] = None) -> Tu

return supported, unsupported

def selectResponsibleComponent(self, readComponent: str, fromComponents: Set[Optional[str]]) -> str:
@classmethod
def selectResponsibleComponent(cls, readComponent: str, fromComponents: Set[Optional[str]]) -> str:
"""Given a possible set of components to choose from, return the
component that should be used to calculate the requested read
component.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/storedFileInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, formatter: FormatterParameter,
"""Fully-qualified name of Formatter. If a Formatter class or instance
is given the name will be extracted."""

path: str
path: Optional[str]
"""Path to dataset within Datastore."""

storageClass: StorageClass
Expand Down
86 changes: 60 additions & 26 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ class DatastoreFileGetInformation:
a Datastore.
"""

location: Location
location: Optional[Location]
"""The location from which to read the dataset."""

formatter: Formatter
formatter: Optional[Formatter]
"""The `Formatter` to use to deserialize the dataset."""

info: StoredFileInfo
Expand All @@ -124,6 +124,9 @@ class DatastoreFileGetInformation:
readStorageClass: StorageClass
"""The `StorageClass` of the dataset being read."""

formatterClass: Type[Formatter]
"""The formatter class to be used."""


class FileLikeDatastore(GenericBaseDatastore):
"""Generic Datastore for file-based implementations.
Expand Down Expand Up @@ -310,7 +313,11 @@ def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[StoredFi
# Docstring inherited from GenericBaseDatastore
records = []
foundNull = 0
nRefs = 0
for ref, info in zip(refs, infos):
# Count how many refs are in this iterable as we go
nRefs += 1

# Component should come from ref and fall back on info
component = ref.datasetType.component()
if component is None and info.component is not None:
Expand Down Expand Up @@ -338,10 +345,10 @@ def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[StoredFi
checksum=info.checksum, file_size=info.file_size)
)

if foundNull > 1 or (len(refs) == 1 and foundNull > 0):
if foundNull > 1 or (nRefs == 1 and foundNull > 0):
raise RuntimeError(f"Internal error in datastore {self.name}"
" Component and path are None in too many records "
f"(nulls: {foundNull} / records: {len(refs)}).")
f"(nulls: {foundNull} / records: {nRefs}).")

self._table.insert(*records)

Expand Down Expand Up @@ -395,7 +402,8 @@ def removeStoredItemInfo(self, ref: DatasetIdRef) -> None:
# Docstring inherited from GenericBaseDatastore
self._table.delete(dataset_id=ref.id)

def _get_dataset_locations_info(self, ref: DatasetIdRef) -> List[Tuple[Location, StoredFileInfo]]:
def _get_dataset_locations_info(self, ref: DatasetIdRef) -> List[Tuple[Optional[Location],
StoredFileInfo]]:
r"""Find all the `Location`\ s of the requested dataset in the
`Datastore` and the associated stored file information.
Expand Down Expand Up @@ -494,16 +502,26 @@ def _prepare_for_get(self, ref: DatasetRef,
else:
readStorageClass = refStorageClass

formatter = getInstanceOf(storedFileInfo.formatter,
FileDescriptor(location, readStorageClass=readStorageClass,
storageClass=writeStorageClass, parameters=parameters),
ref.dataId)
if location is not None:
formatter = getInstanceOf(storedFileInfo.formatter,
FileDescriptor(location, readStorageClass=readStorageClass,
storageClass=writeStorageClass,
parameters=parameters),
ref.dataId)
formatterClass = type(formatter)

_, notFormatterParams = formatter.segregateParameters()

_, notFormatterParams = formatter.segregateParameters()
# Of the remaining parameters, extract the ones supported by
# this StorageClass (for components not all will be handled)
assemblerParams = readStorageClass.filterParameters(notFormatterParams)

else:
formatter = None
formatterClass = getClassOf(storedFileInfo.formatter)

# Of the remaining parameters, extract the ones supported by
# this StorageClass (for components not all will be handled)
assemblerParams = readStorageClass.filterParameters(notFormatterParams)
# No parameters for assembler with the composite formatter
assemblerParams = {}

# The ref itself could be a component if the dataset was
# disassembled by butler, or we disassembled in datastore and
Expand All @@ -516,7 +534,8 @@ def _prepare_for_get(self, ref: DatasetRef,
component = storedFileInfo.component if storedFileInfo.component else refComponent

fileGetInfo.append(DatastoreFileGetInformation(location, formatter, storedFileInfo,
assemblerParams, component, readStorageClass))
assemblerParams, component, readStorageClass,
formatterClass))

return fileGetInfo

Expand Down Expand Up @@ -765,7 +784,7 @@ def exists(self, ref: DatasetRef) -> bool:
if not fileLocations:
return False
for location, storedFileInfo in fileLocations:
if storedFileInfo.path is None:
if storedFileInfo.path is None or location is None:
# Composite formatter
continue
if not self._artifact_exists(location):
Expand Down Expand Up @@ -849,16 +868,21 @@ def predictLocation(thisRef: DatasetRef) -> Location:

if len(fileLocations) == 1:
# No disassembly so this is the primary URI
primary = ButlerURI(fileLocations[0][0].uri)
primaryLocation, _ = fileLocations[0]
if primaryLocation is None:
# mypy appeasement
raise RuntimeError(f"Unexpectedly encountered no location for dataset {ref}")
primary = ButlerURI(primaryLocation.uri)

else:
for location, storedFileInfo in fileLocations:
if storedFileInfo.path is None:
for disCompLocation, storedFileInfo in fileLocations:
if storedFileInfo.path is None or disCompLocation is None:
# composite formatter
continue
if storedFileInfo.component is None:
raise RuntimeError(f"Unexpectedly got no component name for a component at {location}")
components[storedFileInfo.component] = ButlerURI(location.uri)
raise RuntimeError("Unexpectedly got no component name for a component"
f" at {disCompLocation}")
components[storedFileInfo.component] = ButlerURI(disCompLocation.uri)

return primary, components

Expand Down Expand Up @@ -1004,7 +1028,7 @@ def get(self, ref: DatasetRef, parameters: Optional[Mapping[str, Any]] = None) -
if compositeInfo is None:
raise RuntimeError(f"Unable to retrieve read-only component '{refComponent}' since"
"no formatter was stored with the composite during disassembly.")
compositeFormatter = compositeInfo.formatter
compositeFormatter = compositeInfo.formatterClass

# Assume that every read-only component can be calculated by
# forwarding the request to a single read/write component.
Expand All @@ -1020,6 +1044,11 @@ def get(self, ref: DatasetRef, parameters: Optional[Mapping[str, Any]] = None) -

# Select the relevant component
rwInfo = allComponents[forwardedComponent]
if rwInfo.formatter is None:
# for mypy
raise RuntimeError(f"Unexpectedly got null formatter for component {forwardedComponent}")
if rwInfo.location is None:
raise RuntimeError(f"Unexpectedly got null file location for components {forwardedComponent}")

# For now assume that read parameters are validated against
# the real component and not the requested component
Expand All @@ -1044,13 +1073,14 @@ def get(self, ref: DatasetRef, parameters: Optional[Mapping[str, Any]] = None) -
# see the storage class of the read-only component and those
# parameters will have to be handled by the formatter on the
# forwarded storage class.
assemblerParams = {}
assemblerParams: Dict[str, Any] = {}

# Need to created a new info that specifies the read-only
# component and associated storage class
readInfo = DatastoreFileGetInformation(rwInfo.location, readFormatter,
rwInfo.info, assemblerParams,
refComponent, refStorageClass)
refComponent, refStorageClass,
type(readFormatter))

return self._read_artifact_into_memory(readInfo, ref, isComponent=True)

Expand Down Expand Up @@ -1085,6 +1115,9 @@ def get(self, ref: DatasetRef, parameters: Optional[Mapping[str, Any]] = None) -
if isDisassembled:
refStorageClass.validateParameters(parameters)
else:
if getInfo.formatter is None:
# for mypy
raise RuntimeError(f"Unexpected null formatter for ref {ref}")
# For an assembled composite this could be a read-only
# component derived from a real component. The validity
# of the parameters is not clear. For now validate against
Expand Down Expand Up @@ -1190,7 +1223,7 @@ def trash(self, ref: DatasetRef, ignore_errors: bool = True) -> None:
raise FileNotFoundError(err_msg)

for location, storedFileInfo in fileLocations:
if storedFileInfo.path is None:
if storedFileInfo.path is None or location is None:
# Composite formatter
continue
if not self._artifact_exists(location):
Expand Down Expand Up @@ -1239,7 +1272,7 @@ def emptyTrash(self, ignore_errors: bool = True) -> None:
raise FileNotFoundError(err_msg)

for location, storedFileInfo in fileLocations:
if storedFileInfo.path is None:
if storedFileInfo.path is None or location is None:
# Composite formatter
continue

Expand Down Expand Up @@ -1274,8 +1307,9 @@ def emptyTrash(self, ignore_errors: bool = True) -> None:
self.removeStoredItemInfo(ref)
except Exception as e:
if ignore_errors:
uris = [location.uri for location, _ in fileLocations if location is not None]
log.warning("Error removing dataset %s (%s) from internal registry of %s: %s",
ref.id, location.uri, self.name, e)
ref.id, ",".join(uris), self.name, e)
continue
else:
raise FileNotFoundError(err_msg)
Expand Down
12 changes: 12 additions & 0 deletions python/lsst/daf/butler/datastores/posixDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
ref: DatasetRef, isComponent: bool = False) -> Any:
location = getInfo.location

if location is None:
raise RuntimeError(f"Unexpectedly got null dataset location for ref {ref}")

# Too expensive to recalculate the checksum on fetch
# but we can check size and existence
if not os.path.exists(location.path):
Expand All @@ -140,6 +143,12 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
storedFileInfo.file_size))

formatter = getInfo.formatter

# Consistency check for mypy
if formatter is None:
raise RuntimeError(f"Internal error in datastore {self.name}: Null formatter encountered"
f" for {ref}")

try:
result = formatter.read(component=getInfo.component if isComponent else None)
except Exception as e:
Expand Down Expand Up @@ -361,6 +370,9 @@ def export(self, refs: Iterable[DatasetRef], *,
if len(fileLocations) > 1:
raise NotImplementedError(f"Can not export disassembled datasets such as {ref}")
location, storedFileInfo = fileLocations[0]
if location is None:
# for mypy
raise RuntimeError(f"Unexpectedly got null location for {ref}")
if transfer is None:
# TODO: do we also need to return the readStorageClass somehow?
yield FileDataset(refs=[ref], path=location.pathInStore, formatter=storedFileInfo.formatter)
Expand Down
9 changes: 9 additions & 0 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
ref: DatasetRef, isComponent: bool = False) -> Any:
location = getInfo.location

if location is None:
raise RuntimeError(f"Unexpectedly got null dataset location for ref {ref}")

# since we have to make a GET request to S3 anyhow (for download) we
# might as well use the HEADER metadata for size comparison instead.
# s3CheckFileExists would just duplicate GET/LIST charges in this case.
Expand Down Expand Up @@ -171,6 +174,12 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
# tempfile (when formatter does not support to/from/Bytes). This is S3
# equivalent of PosixDatastore formatter.read try-except block.
formatter = getInfo.formatter

# Consistency check for mypy
if formatter is None:
raise RuntimeError(f"Internal error in datastore {self.name}: Null formatter encountered"
f" for {ref}")

try:
result = formatter.fromBytes(serializedDataset,
component=getInfo.component if isComponent else None)
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/tests/testFormatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def write(self, inMemoryDataset: Any) -> str:
yaml.dump(inMemoryDataset._asdict(), fd)
return fileDescriptor.location.pathInStore

def selectResponsibleComponent(self, readComponent: str, fromComponents: Set[Optional[str]]) -> str:
@classmethod
def selectResponsibleComponent(cls, readComponent: str, fromComponents: Set[Optional[str]]) -> str:
forwarderMap = {
"counter": "data",
}
Expand Down

0 comments on commit 605ff20

Please sign in to comment.