Skip to content

Commit

Permalink
Merge branch 'tickets/DM-30702'
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed Jun 16, 2021
2 parents 4896892 + 9f5fba9 commit 171cb58
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 53 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-30702.other.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added metadata to QuantumGraphs. This changed the on disk save format, but is backwards compatible with graphs saved with previous versions of the QuantumGraph code.
93 changes: 70 additions & 23 deletions python/lsst/pipe/base/graph/_loadHelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ("LoadHelper")
__all__ = ("LoadHelper", )

from lsst.daf.butler import ButlerURI, Quantum
from lsst.daf.butler.core._butlerUri.s3 import ButlerS3URI
Expand All @@ -32,12 +32,13 @@
from dataclasses import dataclass
import functools
import io
import json
import lzma
import pickle
import struct

from collections import defaultdict, UserDict
from typing import (Optional, Iterable, DefaultDict, Set, Dict, TYPE_CHECKING, Type, Union)
from typing import (Optional, Iterable, DefaultDict, Set, Dict, TYPE_CHECKING, Tuple, Type, Union)

if TYPE_CHECKING:
from . import QuantumGraph
Expand Down Expand Up @@ -107,58 +108,97 @@ class DefaultLoadHelper:
def __init__(self, uriObject: Union[ButlerURI, io.IO[bytes]]):
self.uriObject = uriObject

preambleSize, taskDefSize, nodeSize = self._readSizes()
# The length of infoSize will either be a tuple with length 2,
# (version 1) which contains the lengths of 2 independent pickles,
# or a tuple of length 1 which contains the total length of the entire
# header information (minus the magic bytes and version bytes)
preambleSize, infoSize = self._readSizes()

# Recode the total header size
self.headerSize = preambleSize + taskDefSize + nodeSize
if self.save_version == 1:
self.headerSize = preambleSize + infoSize[0] + infoSize[1]
elif self.save_version == 2:
self.headerSize = preambleSize + infoSize[0]
else:
raise ValueError(f"Unable to load QuantumGraph with version {self.save_version}, "
"please try a newer version of the code.")

self._readByteMappings(preambleSize, self.headerSize, taskDefSize)
self._readByteMappings(preambleSize, self.headerSize, infoSize)

def _readSizes(self):
def _readSizes(self) -> Tuple[int, Tuple[int, ...]]:
# need to import here to avoid cyclic imports
from .graph import STRUCT_FMT_STRING, MAGIC_BYTES
from .graph import STRUCT_FMT_BASE, MAGIC_BYTES, STRUCT_FMT_STRING, SAVE_VERSION
# Read the first few bytes which correspond to the lengths of the
# magic identifier bytes, 2 byte version
# number and the two 8 bytes numbers that are the sizes of the byte
# maps
magicSize = len(MAGIC_BYTES)
fmt = STRUCT_FMT_STRING
fmtSize = struct.calcsize(fmt)

# read in just the fmt base to determine the save version
fmtSize = struct.calcsize(STRUCT_FMT_BASE)
preambleSize = magicSize + fmtSize

headerBytes = self._readBytes(0, preambleSize)
magic = headerBytes[:magicSize]
sizeBytes = headerBytes[magicSize:]
versionBytes = headerBytes[magicSize:]

if magic != MAGIC_BYTES:
raise ValueError("This file does not appear to be a quantum graph save got magic bytes "
f"{magic}, expected {MAGIC_BYTES}")

# Turn they encode bytes back into a python int object
save_version, taskDefSize, nodeSize = struct.unpack('>HQQ', sizeBytes)
save_version, = struct.unpack(STRUCT_FMT_BASE, versionBytes)

if save_version > SAVE_VERSION:
raise RuntimeError(f"The version of this save file is {save_version}, but this version of"
f"Quantum Graph software only knows how to read up to version {SAVE_VERSION}")

# read in the next bits
fmtString = STRUCT_FMT_STRING[save_version]
infoSize = struct.calcsize(fmtString)
infoBytes = self._readBytes(preambleSize, preambleSize+infoSize)
infoUnpack = struct.unpack(fmtString, infoBytes)

preambleSize += infoSize

# Store the save version, so future read codes can make use of any
# format changes to the save protocol
self.save_version = save_version

return preambleSize, taskDefSize, nodeSize
return preambleSize, infoUnpack

def _readByteMappings(self, preambleSize, headerSize, taskDefSize):
def _readByteMappings(self, preambleSize: int, headerSize: int, infoSize: Tuple[int, ...]) -> None:
# Take the header size explicitly so subclasses can modify before
# This task is called

# read the bytes of taskDef bytes and nodes skipping the size bytes
headerMaps = self._readBytes(preambleSize, headerSize)

# read the map of taskDef bytes back in skipping the size bytes
self.taskDefMap = pickle.loads(headerMaps[:taskDefSize])
if self.save_version == 1:
taskDefSize, _ = infoSize

# read the map of taskDef bytes back in skipping the size bytes
self.taskDefMap = pickle.loads(headerMaps[:taskDefSize])

# read back in the graph id
self._buildId = self.taskDefMap['__GraphBuildID']
# read back in the graph id
self._buildId = self.taskDefMap['__GraphBuildID']

# read the map of the node objects back in skipping bytes
# corresponding to the taskDef byte map
self.map = pickle.loads(headerMaps[taskDefSize:])
# read the map of the node objects back in skipping bytes
# corresponding to the taskDef byte map
self.map = pickle.loads(headerMaps[taskDefSize:])

# There is no metadata for old versions
self.metadata = None
elif self.save_version == 2:
uncompressedHeaderMap = lzma.decompress(headerMaps)
header = json.loads(uncompressedHeaderMap)
self.taskDefMap = header['TaskDefs']
self._buildId = header['GraphBuildID']
self.map = dict(header['Nodes'])
self.metadata = header['Metadata']
else:
raise ValueError(f"Unable to load QuantumGraph with version {self.save_version}, "
"please try a newer version of the code.")

def load(self, nodes: Optional[Iterable[int]] = None, graphID: Optional[str] = None) -> QuantumGraph:
"""Loads in the specified nodes from the graph
Expand Down Expand Up @@ -217,7 +257,10 @@ def load(self, nodes: Optional[Iterable[int]] = None, graphID: Optional[str] = N
# loop over the nodes specified above
for node in nodes:
# Get the bytes to read from the map
start, stop = self.map[node]
if self.save_version == 1:
start, stop = self.map[node]
else:
start, stop = self.map[node]['bytes']
start += self.headerSize
stop += self.headerSize

Expand All @@ -233,7 +276,10 @@ def load(self, nodes: Optional[Iterable[int]] = None, graphID: Optional[str] = N
nodeTask = qNode.taskDef
if nodeTask not in loadedTaskDef:
# Get the byte ranges corresponding to this taskDef
start, stop = self.taskDefMap[nodeTask]
if self.save_version == 1:
start, stop = self.taskDefMap[nodeTask]
else:
start, stop = self.taskDefMap[nodeTask]['bytes']
start += self.headerSize
stop += self.headerSize

Expand All @@ -253,7 +299,8 @@ def load(self, nodes: Optional[Iterable[int]] = None, graphID: Optional[str] = N
# construct an empty new QuantumGraph object, and run the associated
# creation method with the un-persisted data
qGraph = object.__new__(QuantumGraph)
qGraph._buildGraphs(quanta, _quantumToNodeId=quantumToNodeId, _buildId=self._buildId)
qGraph._buildGraphs(quanta, _quantumToNodeId=quantumToNodeId, _buildId=self._buildId,
metadata=self.metadata)
return qGraph

def _readBytes(self, start: int, stop: int) -> bytes:
Expand Down
86 changes: 62 additions & 24 deletions python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from itertools import chain, count
import io
import json
import networkx as nx
from networkx.drawing.nx_agraph import write_dot
import os
Expand All @@ -35,8 +36,9 @@
import copy
import struct
import time
from typing import (DefaultDict, Dict, FrozenSet, Iterable, List, Mapping, Set, Generator, Optional, Tuple,
Union, TypeVar)
from types import MappingProxyType
from typing import (Any, DefaultDict, Dict, FrozenSet, Iterable, List, Mapping, Set, Generator, Optional,
Tuple, Union, TypeVar)

from ..connections import iterConnections
from ..pipeline import TaskDef
Expand All @@ -51,12 +53,24 @@

# modify this constant any time the on disk representation of the save file
# changes, and update the load helpers to behave properly for each version.
SAVE_VERSION = 1
SAVE_VERSION = 2

# String used to describe the format for the preamble bytes in a file save
# This marks a Big endian encoded format with an unsigned short, an unsigned
# Strings used to describe the format for the preamble bytes in a file save
# The base is a big endian encoded unsigned short that is used to hold the
# file format version. This allows reading version bytes and determine which
# loading code should be used for the rest of the file
STRUCT_FMT_BASE = '>H'
#
# Version 1
# This marks a big endian encoded format with an unsigned short, an unsigned
# long long, and an unsigned long long in the byte stream
STRUCT_FMT_STRING = '>HQQ'
# Version 2
# A big endian encoded format with an unsigned long long byte stream used to
# indicate the total length of the entire header
STRUCT_FMT_STRING = {
1: '>QQ',
2: '>Q'
}


# magic bytes that help determine this is a graph save
Expand All @@ -81,18 +95,24 @@ class QuantumGraph:
quanta : Mapping of `TaskDef` to sets of `Quantum`
This maps tasks (and their configs) to the sets of data they are to
process.
metadata : Optional Mapping of `str` to primitives
This is an optional parameter of extra data to carry with the graph.
Entries in this mapping should be able to be serialized in JSON.
"""
def __init__(self, quanta: Mapping[TaskDef, Set[Quantum]]):
self._buildGraphs(quanta)
def __init__(self, quanta: Mapping[TaskDef, Set[Quantum]],
metadata: Optional[Mapping[str, Any]] = None):
self._buildGraphs(quanta, metadata=metadata)

def _buildGraphs(self,
quanta: Mapping[TaskDef, Set[Quantum]],
*,
_quantumToNodeId: Optional[Mapping[Quantum, NodeId]] = None,
_buildId: Optional[BuildId] = None):
_buildId: Optional[BuildId] = None,
metadata: Optional[Mapping[str, Any]] = None):
"""Builds the graph that is used to store the relation between tasks,
and the graph that holds the relations between quanta
"""
self._metadata = metadata
self._quanta = quanta
self._buildId = _buildId if _buildId is not None else BuildId(f"{time.time()}-{os.getpid()}")
# Data structures used to identify relations between components;
Expand Down Expand Up @@ -600,6 +620,14 @@ def saveUri(self, uri):
raise TypeError(f"Can currently only save a graph in qgraph format not {uri}")
butlerUri.write(buffer) # type: ignore # Ignore because bytearray is safe to use in place of bytes

@property
def metadata(self) -> Optional[MappingProxyType[str, Any]]:
"""
"""
if self._metadata is None:
return None
return MappingProxyType(self._metadata)

@classmethod
def loadUri(cls, uri: Union[ButlerURI, str], universe: DimensionUniverse,
nodes: Optional[Iterable[int]] = None,
Expand Down Expand Up @@ -684,10 +712,20 @@ def save(self, file: io.IO[bytes]):
def _buildSaveObject(self) -> bytearray:
# make some containers
pickleData = deque()
nodeMap = {}
# node map is a list because json does not accept mapping keys that
# are not strings, so we store a list of key, value pairs that will
# be converted to a mapping on load
nodeMap = []
taskDefMap = {}
headerData = {}
protocol = 3

# Store the QauntumGraph BuildId, this will allow validating BuildIds
# at load time, prior to loading any QuantumNodes. Name chosen for
# unlikely conflicts.
headerData['GraphBuildID'] = self.graphID
headerData['Metadata'] = self._metadata

# counter for the number of bytes processed thus far
count = 0
# serialize out the task Defs recording the start and end bytes of each
Expand All @@ -696,15 +734,11 @@ def _buildSaveObject(self) -> bytearray:
# compressing has very little impact on saving or load time, but
# a large impact on on disk size, so it is worth doing
dump = lzma.compress(pickle.dumps(taskDef, protocol=protocol))
taskDefMap[taskDef.label] = (count, count+len(dump))
taskDefMap[taskDef.label] = {"bytes": (count, count+len(dump))}
count += len(dump)
pickleData.append(dump)

# Store the QauntumGraph BuildId along side the TaskDefs for
# convenance. This will allow validating BuildIds at load time, prior
# to loading any QuantumNodes. Name chosen for unlikely conflicts with
# labels as this is python standard for private.
taskDefMap['__GraphBuildID'] = self.graphID
headerData['TaskDefs'] = taskDefMap

# serialize the nodes, recording the start and end bytes of each node
for node in self:
Expand All @@ -725,18 +759,22 @@ def _buildSaveObject(self) -> bytearray:
# a large impact on on disk size, so it is worth doing
dump = lzma.compress(pickle.dumps(node, protocol=protocol))
pickleData.append(dump)
nodeMap[node.nodeId.number] = (count, count+len(dump))
nodeMap.append((int(node.nodeId.number), {"bytes": (count, count+len(dump))}))
count += len(dump)

# pickle the taskDef byte map
taskDef_pickle = pickle.dumps(taskDefMap, protocol=protocol)
# need to serialize this as a series of key,value tuples because of
# a limitation on how json cant do anyting but strings as keys
headerData['Nodes'] = nodeMap

# pickle the node byte map
map_pickle = pickle.dumps(nodeMap, protocol=protocol)
# dump the headerData to json
header_encode = lzma.compress(json.dumps(headerData).encode())

# record the sizes as 2 unsigned long long numbers for a total of 16
# bytes
map_lengths = struct.pack(STRUCT_FMT_STRING, SAVE_VERSION, len(taskDef_pickle), len(map_pickle))
save_bytes = struct.pack(STRUCT_FMT_BASE, SAVE_VERSION)

fmt_string = STRUCT_FMT_STRING[SAVE_VERSION]
map_lengths = struct.pack(fmt_string, len(header_encode))

# write each component of the save out in a deterministic order
# buffer = io.BytesIO()
Expand All @@ -745,9 +783,9 @@ def _buildSaveObject(self) -> bytearray:
# buffer.write(map_pickle)
buffer = bytearray()
buffer.extend(MAGIC_BYTES)
buffer.extend(save_bytes)
buffer.extend(map_lengths)
buffer.extend(taskDef_pickle)
buffer.extend(map_pickle)
buffer.extend(header_encode)
# Iterate over the length of pickleData, and for each element pop the
# leftmost element off the deque and write it out. This is to save
# memory, as the memory is added to the buffer object, it is removed
Expand Down

0 comments on commit 171cb58

Please sign in to comment.