From d4e2f6cd910d411c6b157445cb11f252d220460a Mon Sep 17 00:00:00 2001 From: lucylq Date: Mon, 10 Mar 2025 21:49:21 -0700 Subject: [PATCH] Serialize NamedDataStoreOutput into PTD. Update PTD serialization to account for blobs from the NamedDataStoreOutput. Something we can do in the future is to consolidate tensors (that go through the emitter) and blobs (that come from the NamedDataStore). Differential Revision: [D70939807](https://our.internmc.facebook.com/intern/diff/D70939807/) [ghstack-poisoned] --- exir/_serialize/_serialize.py | 70 +++++++++++------- exir/_serialize/data_serializer.py | 17 +++++ extension/flat_tensor/serialize/serialize.py | 74 +++++++++++++++++--- extension/flat_tensor/test/test_serialize.py | 67 +++++++++++++----- 4 files changed, 174 insertions(+), 54 deletions(-) diff --git a/exir/_serialize/_serialize.py b/exir/_serialize/_serialize.py index 6351875e113..c4e1579cece 100644 --- a/exir/_serialize/_serialize.py +++ b/exir/_serialize/_serialize.py @@ -6,13 +6,14 @@ # pyre-strict -from typing import Dict, Optional, Tuple +from typing import Dict, Optional, Set, Tuple from executorch.exir._serialize import _serialize_pte_binary from executorch.exir._serialize._cord import Cord from executorch.exir._serialize._named_data_store import NamedDataStoreOutput from executorch.exir._serialize.data_serializer import ( + DataEntry, DataPayload, DataSerializer, TensorEntry, @@ -74,39 +75,54 @@ def serialize_for_executorch( tensor.extra_tensor_info.fully_qualified_name ] = TensorLayout(tensor.scalar_type, tensor.sizes, tensor.dim_order) + if len(fqn_to_tensor_layout) == 0 and ( + named_data is None or len(named_data.external_data) == 0 + ): + return pte, ptd_files + + all_external_files: Set[str] = set() + if named_data is not None and len(named_data.external_data) > 0: + assert ( + len(named_data.buffers) > 0 + ), "External data exists, but there are no buffers provided." + all_external_files = set(named_data.external_data.keys()) + if len(fqn_to_tensor_layout) > 0: # emitter_output.external_constant_map contains the mapping from # {file: {fqn: index into external_constant_buffer}} # Contains the locations of the tensor buffers, and must be non-empty # if there are external tensors to serialize. - assert emitter_output.external_constant_map is not None - for ( - filename, - fqn_to_index, - ) in ( - # pyre-ignore Undefined attribute [16]: Optional type has no attribute `items`. - emitter_output.external_constant_map.items() - ): - # Create a TensorEntry for each external tensor. - fqn_to_tensor_entry: Dict[str, TensorEntry] = {} - for fqn, index in fqn_to_index.items(): - assert fqn in fqn_to_tensor_layout - fqn_to_tensor_entry[fqn] = TensorEntry( - buffer_index=index, - layout=fqn_to_tensor_layout[fqn], - ) - - ptd_files[filename] = data_serializer.serialize( - DataPayload( - buffers=emitter_output.external_constant_buffer, - fqn_to_tensor=fqn_to_tensor_entry, - ) + assert ( + emitter_output.external_constant_map is not None + ), "External exists, but there are no buffers provided." + all_external_files = all_external_files | set( + emitter_output.external_constant_map.keys() + ) + + for filename in all_external_files: + fqn_to_tensor_entry: Dict[str, TensorEntry] = {} + fqn_to_index = emitter_output.external_constant_map.get(filename, {}) + # Create a TensorEntry for each external tensor. + for fqn, index in fqn_to_index.items(): + assert fqn in fqn_to_tensor_layout + fqn_to_tensor_entry[fqn] = TensorEntry( + buffer_index=index, + layout=fqn_to_tensor_layout[fqn], ) - if named_data is None or len(named_data.external_data) == 0: - return pte, ptd_files + # Extract external data. + key_to_data: Dict[str, DataEntry] = {} + key_to_buffer_index = named_data.external_data.get(filename, {}) + for key, index in key_to_buffer_index.items(): + key_to_data[key] = DataEntry(index, named_data.buffers[index].alignment) - if len(named_data.buffers) == 0: - raise RuntimeError("External data exists, but there are no buffers provided.") + # Serialize into PTD file. + ptd_files[filename] = data_serializer.serialize( + DataPayload( + buffers=emitter_output.external_constant_buffer, + fqn_to_tensor=fqn_to_tensor_entry, + key_to_data=key_to_data, + ) + ) return pte, ptd_files diff --git a/exir/_serialize/data_serializer.py b/exir/_serialize/data_serializer.py index 815038de748..2cc36d26592 100644 --- a/exir/_serialize/data_serializer.py +++ b/exir/_serialize/data_serializer.py @@ -38,6 +38,21 @@ class TensorEntry: layout: TensorLayout +@dataclass +class DataEntry: + """Represents a single blob in `DataPayload`, specifying its location + and metadata. + + Attributes: + buffer_index: The index inside `DataPayload.buffers` that this + DataEntryEntry refers to. + alignment: The alignment of the data. + """ + + buffer_index: int + alignment: int + + @dataclass class DataPayload: """Contains the data and metadata required for serialization. @@ -49,10 +64,12 @@ class DataPayload: Attributes: buffers: a sequence of tensor buffers. fqn_to_tensor: a map from fully qualified names to serializable tensors. + key_to_data: a map from unique keys to serializable opaque data. """ buffers: Sequence[bytes] fqn_to_tensor: Dict[str, TensorEntry] + key_to_data: Dict[str, DataEntry] class DataSerializer(ABC): diff --git a/extension/flat_tensor/serialize/serialize.py b/extension/flat_tensor/serialize/serialize.py index 3428fe49117..a5fec22cf3e 100644 --- a/extension/flat_tensor/serialize/serialize.py +++ b/extension/flat_tensor/serialize/serialize.py @@ -7,6 +7,7 @@ # pyre-strict import json +import math import os import tempfile from dataclasses import dataclass @@ -19,6 +20,7 @@ from executorch.exir._serialize._flatbuffer import _flatc_compile, _flatc_decompile from executorch.exir._serialize._program import _insert_flatbuffer_header from executorch.exir._serialize.data_serializer import ( + DataEntry, DataPayload, DataSerializer, TensorEntry, @@ -29,6 +31,7 @@ from executorch.extension.flat_tensor.serialize.flat_tensor_schema import ( DataSegment, FlatTensor, + NamedData, TensorMetadata, ) @@ -202,6 +205,24 @@ def to_bytes(self) -> bytes: return data +@dataclass +class AlignedData: + """ + Holds data that should be aligned, for serialization. + + Attributes: + data: The data to serialize, as a cord. + alignment: The alignment required for the data. + """ + + data: Cord + alignment: int + + def __init__(self, data: Cord, alignment: Optional[int] = None) -> None: + self.data = data + self.alignment = alignment or 1 + + def _get_extended_header(flat_tensor_data: bytes) -> Optional[FlatTensorHeader]: """Returns the extended header of the flat_tensor data, if present and valid.""" try: @@ -216,7 +237,7 @@ def _get_extended_header(flat_tensor_data: bytes) -> Optional[FlatTensorHeader]: def _extract_tensors( fqn_to_tensor: Dict[str, TensorEntry], buffers: Sequence[bytes], - segments: List[Cord], + segments: List[AlignedData], tensor_alignment: int, ) -> List[TensorMetadata]: """Places tensors into a single segment, aligned to tensor_alignment within @@ -265,10 +286,43 @@ def _extract_tensors( offset=offset, ) ) - segments.append(tensor_data) + segments.append(AlignedData(tensor_data)) return tensors +def _extract_named_data( + key_to_data: Dict[str, DataEntry], + buffers: Sequence[bytes], + segments: List[AlignedData], +) -> List[NamedData]: + """Places named data into segments and record the alignment for each. + + Args: + key_to_data: A map from keys to opaque data entries. + buffers: A sequence of buffers holding opaque blob data. + segments: A list of segments to append data to. Modified in-place. + + Returns: + A list of NamedData describing the offsets to the opaque blob data. + """ + + # Map from buffer_idx to segment_idx. + segment_index_map: Dict[int, int] = {} + + named_data: List[NamedData] = [] + for key, data_entry in key_to_data.items(): + buffer_idx = data_entry.buffer_index + segment_index = segment_index_map.get(buffer_idx, None) + if segment_index is None: + segment_index = len(segments) + segment_index_map[buffer_idx] = segment_index + segments.append( + AlignedData(Cord(buffers[buffer_idx]), data_entry.alignment) + ) + named_data.append(NamedData(key=key, segment_index=segment_index)) + return named_data + + class FlatTensorSerializer(DataSerializer): """A concrete implementation of the DataSerializer interface that serializes and deserializes data to/from the FlatTensor format. @@ -289,13 +343,14 @@ def serialize( ) -> Cord: """Serializes a list of tensors and named data into a blob.""" - segments: List[Cord] = [] + segments: List[AlignedData] = [] tensors = _extract_tensors( data.fqn_to_tensor, data.buffers, segments, self.config.tensor_alignment, ) + named_data = _extract_named_data(data.key_to_data, data.buffers, segments) data_segments: List[DataSegment] = [] segment_data = Cord() @@ -305,19 +360,18 @@ def serialize( if data_segments else 0 ) + alignment = math.lcm(self.config.segment_alignment, segment.alignment) data_segments.append( DataSegment( - offset=aligned_size(prev_end, self.config.segment_alignment), - size=len(segment), + offset=aligned_size(prev_end, alignment), + size=len(segment.data), ) ) # Pad segment_data to segment alignment. - segment_pad_length = padding_required( - len(segment_data), self.config.segment_alignment - ) + segment_pad_length = padding_required(len(segment_data), alignment) if segment_pad_length > 0: segment_data.append(b"\x00" * segment_pad_length) - segment_data.append(segment) + segment_data.append(segment.data) # Create FlatTensor, which describes of the contents of the file and # points to all the data segments. It will be serialized to flatbuffer. @@ -326,7 +380,7 @@ def serialize( tensor_alignment=self.config.tensor_alignment, tensors=tensors, segments=data_segments, - named_data=[], + named_data=named_data, ) flatbuffer_payload = _serialize_to_flatbuffer(flat_tensor) diff --git a/extension/flat_tensor/test/test_serialize.py b/extension/flat_tensor/test/test_serialize.py index d32eac1a72c..570fe9ae97f 100644 --- a/extension/flat_tensor/test/test_serialize.py +++ b/extension/flat_tensor/test/test_serialize.py @@ -6,11 +6,13 @@ # pyre-strict +import math import unittest from typing import List from executorch.exir._serialize.data_serializer import ( + DataEntry, DataPayload, DataSerializer, TensorEntry, @@ -30,7 +32,7 @@ ) # Test artifacts. -TEST_TENSOR_BUFFER: List[bytes] = [b"\x11" * 4, b"\x22" * 32] +TEST_BUFFER: List[bytes] = [b"\x11" * 4, b"\x22" * 32, b"\x33" * 17] TEST_TENSOR_MAP = { "fqn1": TensorEntry( buffer_index=0, @@ -57,9 +59,18 @@ ), ), } + +TEST_DATA_ENTRY = { + "key0": DataEntry( + buffer_index=2, + alignment=64, + ) +} + TEST_DATA_PAYLOAD = DataPayload( - buffers=TEST_TENSOR_BUFFER, + buffers=TEST_BUFFER, fqn_to_tensor=TEST_TENSOR_MAP, + key_to_data=TEST_DATA_ENTRY, ) @@ -99,10 +110,15 @@ def test_serialize(self) -> None: ) self.assertTrue(header.segment_base_offset, expected_segment_base_offset) - # TEST_TENSOR_BUFFER is aligned to config.segment_alignment. - expected_segment_data_size = aligned_size( - sum(len(buffer) for buffer in TEST_TENSOR_BUFFER), config.segment_alignment + # TEST_BUFFER is aligned to config.segment_alignment. + tensor1_size = aligned_size(len(TEST_BUFFER[0]), config.tensor_alignment) + tensor2_size = aligned_size(len(TEST_BUFFER[1]), config.tensor_alignment) + tensor_segment_size = aligned_size( + tensor1_size + tensor2_size, + math.lcm(config.segment_alignment, TEST_DATA_ENTRY["key0"].alignment), ) + data_segment_size = len(TEST_BUFFER[2]) + expected_segment_data_size = tensor_segment_size + data_segment_size self.assertEqual(header.segment_data_size, expected_segment_data_size) # Confirm the flatbuffer magic is present. @@ -138,10 +154,23 @@ def test_serialize(self) -> None: self.assertEqual(tensors[2].segment_index, 0) self.assertEqual(tensors[2].offset, config.tensor_alignment) + named_data = flat_tensor.named_data + self.assertEqual(len(named_data), 1) + self.assertEqual(named_data[0].key, "key0") + self.assertEqual(named_data[0].segment_index, 1) + segments = flat_tensor.segments - self.assertEqual(len(segments), 1) + self.assertEqual(len(segments), 2) self.assertEqual(segments[0].offset, 0) self.assertEqual(segments[0].size, config.tensor_alignment * 3) + self.assertEqual( + segments[1].offset, + aligned_size( + config.tensor_alignment * 3, + math.lcm(config.segment_alignment, TEST_DATA_ENTRY["key0"].alignment), + ), + ) + self.assertEqual(segments[1].size, len(TEST_BUFFER[2])) # Length of serialized_data matches segment_base_offset + segment_data_size. self.assertEqual( @@ -149,35 +178,39 @@ def test_serialize(self) -> None: ) self.assertTrue(segments[0].size <= header.segment_data_size) - # Check the contents of the segment. Expecting two tensors from - # TEST_TENSOR_BUFFER = [b"\x11" * 4, b"\x22" * 32] + # Check the contents of the segment. Expecting two tensors and one blob + # from TEST_BUFFER = [b"\x11" * 4, b"\x22" * 32, b"\x33" * 17] segment_data = serialized_data[ - header.segment_base_offset : header.segment_base_offset + segments[0].size + header.segment_base_offset : header.segment_base_offset + + header.segment_data_size ] # Tensor: b"\x11" * 4 t0_start = 0 - t0_len = len(TEST_TENSOR_BUFFER[0]) + t0_len = len(TEST_BUFFER[0]) t0_end = t0_start + aligned_size(t0_len, config.tensor_alignment) - self.assertEqual( - segment_data[t0_start : t0_start + t0_len], TEST_TENSOR_BUFFER[0] - ) + self.assertEqual(segment_data[t0_start : t0_start + t0_len], TEST_BUFFER[0]) padding = b"\x00" * (t0_end - t0_len) self.assertEqual(segment_data[t0_start + t0_len : t0_end], padding) # Tensor: b"\x22" * 32 t1_start = t0_end - t1_len = len(TEST_TENSOR_BUFFER[1]) + t1_len = len(TEST_BUFFER[1]) t1_end = t1_start + aligned_size(t1_len, config.tensor_alignment) self.assertEqual( segment_data[t1_start : t1_start + t1_len], - TEST_TENSOR_BUFFER[1], + TEST_BUFFER[1], ) padding = b"\x00" * (t1_end - (t1_len + t1_start)) - self.assertEqual(segment_data[t1_start + t1_len : t1_start + t1_end], padding) + self.assertEqual(segment_data[t1_start + t1_len : t1_end], padding) # Check length of the segment is expected. self.assertEqual( segments[0].size, aligned_size(t1_end, config.segment_alignment) ) - self.assertEqual(segments[0].size, header.segment_data_size) + + # Named data: b"\x33" * 17 + self.assertEqual( + segment_data[segments[1].offset : segments[1].offset + len(TEST_BUFFER[2])], + TEST_BUFFER[2], + )