diff --git a/caffe2/core/blob_serialization.cc b/caffe2/core/blob_serialization.cc index ccfcdea18cb4..fafd8a11ff3e 100644 --- a/caffe2/core/blob_serialization.cc +++ b/caffe2/core/blob_serialization.cc @@ -10,6 +10,9 @@ #include "caffe2/core/blob.h" #include "caffe2/core/common.h" #include "caffe2/utils/proto_utils.h" +#ifdef USE_FBGEMM +#include "fbgemm/FbgemmConvert.h" +#endif C10_DEFINE_int( caffe2_tensor_chunk_size, @@ -388,7 +391,75 @@ void SerializeTensorData(const SerializeParams& params) { params.tensor_proto); } +#ifdef USE_FBGEMM +namespace { +// Unfortunately we can't include folly/lang/Bits.h here, +// so provide our own byte-swapping code. +fbgemm::bfloat16 ByteSwap(fbgemm::bfloat16 n) { +#ifdef _MSC_VER + return _byteswap_ushort(n); +#else + return __builtin_bswap16(n); +#endif +} + +void ByteSwapArray( + const fbgemm::bfloat16* src, + fbgemm::bfloat16* dest, + size_t num_elements) { + // Note that we support src and dest pointing to the same location. + // We currently only use this function on big-endian machines, so it isn't + // worth trying to build a fancier SIMD version. + for (size_t n = 0; n < num_elements; ++n) { + dest[n] = ByteSwap(src[n]); + } +} +} // namespace +#endif // USE_FBGEMM + void SerializeTensorData(const SerializeParams& params) { + // The FLOAT_BFLOAT16 option requests doing a conversion to bfloat16. This + // reduces the serialized data size at the cost of some lost precision. + // We currently only support doing this when compiled with fbgemm. +#ifdef USE_FBGEMM + if (params.options.float_format() == + BlobSerializationOptions_FloatFormat_FLOAT_BFLOAT16) { + std::unique_ptr tmp_buffer; + const float* src; + if (params.context.device() == CPU) { + src = params.input.data(); + } else { + tmp_buffer.reset(new float[params.input.size()]); + params.context.CopyToCPU( + params.input.size(), params.input.data(), tmp_buffer.get()); + } + + params.SetDataFormat(TensorProto_SerializationFormat_FMT_BFLOAT16); + // TODO: it would be nice if we could use + // folly::resizeWithoutInitialization() here + params.tensor_proto.mutable_raw_data()->resize( + params.input.size() * sizeof(fbgemm::bfloat16)); + + Range dest( + reinterpret_cast( + &(*params.tensor_proto.mutable_raw_data())[0]), + params.input.size()); + + fbgemm::FloatToBfloat16_simd(src, dest.data(), params.input.size()); + + // Note: technically a platform can have different integer from floating + // point endianness, and we ideally should check floating point endianness + // here. However, the fbgemm code doesn't appear to make this distinction, + // and at least in the Bfloat16ToFloat_ref() code it appears to assume that + // floating point and integer endianness are the same. + if (!kIsLittleEndian) { + ByteSwapArray(dest.data(), dest.data(), dest.size()); + } + return; + } +#endif + + params.SetDataFormat(TensorProto_SerializationFormat_FMT_PROTOBUF); params.CopyToRepeatedField(params.tensor_proto.mutable_float_data()); } @@ -792,6 +863,48 @@ DESERIALIZE_IMPL(float, FMT_PROTOBUF) { params.CopyFromRepeatedField(params.tensor_proto.float_data()); } +DESERIALIZE_IMPL(float, FMT_BFLOAT16) { +#ifdef USE_FBGEMM + CAFFE_ENFORCE_EQ( + params.dest.size() * sizeof(fbgemm::bfloat16), + params.tensor_proto.raw_data().size(), + "incorrect data size in serialized bfloat16 data"); + auto raw_src = reinterpret_cast( + params.tensor_proto.raw_data().data()); + + // If we are on a big-endian machine, byte-swap the serialized data. + const fbgemm::bfloat16* src; + std::unique_ptr bswap_buffer; + if (kIsLittleEndian) { + src = raw_src; + } else { + bswap_buffer.reset(new fbgemm::bfloat16[params.dest.size()]); + ByteSwapArray(raw_src, bswap_buffer.get(), params.dest.size()); + src = bswap_buffer.get(); + } + + // If we are on a non-CPU device, we need an intermediate CPU buffer for the + // bfloat16 to float conversion. + std::unique_ptr tmp_buffer; + float* dest; + if (params.context.device() == CPU) { + dest = params.dest.data(); + } else { + tmp_buffer.reset(new float[params.dest.size()]); + dest = tmp_buffer.get(); + } + + fbgemm::Bfloat16ToFloat_simd(src, dest, params.dest.size()); + if (params.context.device() != CPU) { + params.context.CopyFromCPU(params.dest.size(), dest, params.dest.data()); + } +#else + // We cannot load serialized bfloat16 data without fbgemm. + CAFFE_ENFORCE( + false, "cannot perform bfloat16 to float conversion without fbgemm"); +#endif +} + DESERIALIZE_IMPL(double, FMT_PROTOBUF) { params.CopyFromRepeatedField(params.tensor_proto.double_data()); } @@ -825,6 +938,7 @@ void DeserializeTensorBody( DeserializeParams params(dest, tensor_proto, context); switch (format) { DESERIALIZE_FORMAT_CASE(FMT_PROTOBUF); + DESERIALIZE_FORMAT_CASE(FMT_BFLOAT16); } // This can happen if the blob was serialized by a newer version of the code diff --git a/caffe2/proto/caffe2.proto b/caffe2/proto/caffe2.proto index 26f85e4d4ea4..6e055778578a 100644 --- a/caffe2/proto/caffe2.proto +++ b/caffe2/proto/caffe2.proto @@ -49,6 +49,8 @@ message TensorProto { // the protobuf typed fields, although in some cases raw little endian data // is stored in the byte_data field instead. FMT_PROTOBUF = 0; + // bfloat16 data stored in the raw_data field. + FMT_BFLOAT16 = 1; } // data_format is a SerializationFormat enum value. // However, we intentionally store it as an integer value so we can @@ -504,6 +506,19 @@ message BlobSerializationOptions { // - a chunk size of -1 means to disable chunking, and serialize the blob in // a single chunk. optional int64 chunk_size = 2; + + enum FloatFormat { + // Use the current default serialization format, as chosen by the + // current version of the code. (At the time of writing this is PROTOBUF) + FLOAT_DEFAULT = 0; + // Store the data in the TensorProto's float_data field + FLOAT_PROTOBUF = 1; + // Serialize float values as bfloat16. Note that this conversion is lossy. + FLOAT_BFLOAT16 = 2; + } + + // Settings for how to serialize tensors containing float values + optional FloatFormat float_format = 3; } message SerializationOptions { diff --git a/caffe2/proto/caffe2_pb2.pyi b/caffe2/proto/caffe2_pb2.pyi index b16ad15278dd..98e9ae713465 100644 --- a/caffe2/proto/caffe2_pb2.pyi +++ b/caffe2/proto/caffe2_pb2.pyi @@ -80,9 +80,11 @@ class TensorProto(google.protobuf.message.Message): class _SerializationFormat(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[SerializationFormat.V], builtins.type): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor = ... FMT_PROTOBUF = TensorProto.SerializationFormat.V(0) + FMT_BFLOAT16 = TensorProto.SerializationFormat.V(1) class SerializationFormat(metaclass=_SerializationFormat): V = typing.NewType('V', builtins.int) FMT_PROTOBUF = TensorProto.SerializationFormat.V(0) + FMT_BFLOAT16 = TensorProto.SerializationFormat.V(1) class Segment(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor = ... @@ -708,18 +710,32 @@ global___DBReaderProto = DBReaderProto class BlobSerializationOptions(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor = ... + class _FloatFormat(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[FloatFormat.V], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor = ... + FLOAT_DEFAULT = BlobSerializationOptions.FloatFormat.V(0) + FLOAT_PROTOBUF = BlobSerializationOptions.FloatFormat.V(1) + FLOAT_BFLOAT16 = BlobSerializationOptions.FloatFormat.V(2) + class FloatFormat(metaclass=_FloatFormat): + V = typing.NewType('V', builtins.int) + FLOAT_DEFAULT = BlobSerializationOptions.FloatFormat.V(0) + FLOAT_PROTOBUF = BlobSerializationOptions.FloatFormat.V(1) + FLOAT_BFLOAT16 = BlobSerializationOptions.FloatFormat.V(2) + BLOB_NAME_REGEX_FIELD_NUMBER: builtins.int CHUNK_SIZE_FIELD_NUMBER: builtins.int + FLOAT_FORMAT_FIELD_NUMBER: builtins.int blob_name_regex: typing.Text = ... chunk_size: builtins.int = ... + float_format: global___BlobSerializationOptions.FloatFormat.V = ... def __init__(self, *, blob_name_regex : typing.Optional[typing.Text] = ..., chunk_size : typing.Optional[builtins.int] = ..., + float_format : typing.Optional[global___BlobSerializationOptions.FloatFormat.V] = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal[u"blob_name_regex",b"blob_name_regex",u"chunk_size",b"chunk_size"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal[u"blob_name_regex",b"blob_name_regex",u"chunk_size",b"chunk_size"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal[u"blob_name_regex",b"blob_name_regex",u"chunk_size",b"chunk_size",u"float_format",b"float_format"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal[u"blob_name_regex",b"blob_name_regex",u"chunk_size",b"chunk_size",u"float_format",b"float_format"]) -> None: ... global___BlobSerializationOptions = BlobSerializationOptions class SerializationOptions(google.protobuf.message.Message): diff --git a/caffe2/python/operator_test/load_save_test.py b/caffe2/python/operator_test/load_save_test.py index 0c575acdc38f..6ce883003b58 100644 --- a/caffe2/python/operator_test/load_save_test.py +++ b/caffe2/python/operator_test/load_save_test.py @@ -461,9 +461,9 @@ def float_array(dtype: Type[np.floating], size: int) -> np.ndarray: return blobs - def load_and_check_blobs( + def load_blobs( self, - blobs: List[Tuple[str, np.ndarray]], + blob_names: List[str], dbs: List[str], db_type: Optional[str] = None ) -> None: @@ -472,13 +472,21 @@ def load_and_check_blobs( load_op = core.CreateOperator( "Load", [], - [name for name, data in blobs], + blob_names, absolute_path=1, dbs=dbs, db_type=db_type or self._db_type, ) self.assertTrue(workspace.RunOperatorOnce(load_op)) - self.assertEqual(len(workspace.Blobs()), len(blobs)) + self.assertEqual(len(workspace.Blobs()), len(blob_names)) + + def load_and_check_blobs( + self, + blobs: List[Tuple[str, np.ndarray]], + dbs: List[str], + db_type: Optional[str] = None + ) -> None: + self.load_blobs([name for name, data in blobs], dbs, db_type) for name, data in blobs: np.testing.assert_array_equal(workspace.FetchBlob(name), data) @@ -636,5 +644,55 @@ def testSaveWithOptions(self) -> None: ) + def testSaveFloatToBfloat16(self) -> None: + tmp_folder = self.make_tempdir() + tmp_file = str(tmp_folder / "save.output") + + # Create 2 blobs with the same float data + float_data = np.random.random_sample(4000).astype(np.float32) + workspace.FeedBlob("float1", float_data) + workspace.FeedBlob("float2", float_data) + blob_names = ["float1", "float2"] + + # Serialize the data, using bfloat16 serialization for one of the blobs + save_op = core.CreateOperator( + "Save", + blob_names, + [], + absolute_path=1, + db=tmp_file, + db_type=self._db_type, + options=caffe2_pb2.SerializationOptions( + options=[ + BlobSerializationOptions( + blob_name_regex="float1", + float_format=BlobSerializationOptions.FLOAT_BFLOAT16, + ), + ], + ), + ) + self.assertTrue(workspace.RunOperatorOnce(save_op)) + + # As long as fbgemm was available for us to perform bfloat16 conversion, + # the serialized data for float1 should be almost half the size of float2 + if workspace.has_fbgemm: + blob_chunks = self._read_chunk_info(Path(tmp_file)) + self.assertEqual(len(blob_chunks["float1"]), 1, blob_chunks["float1"]) + self.assertEqual(len(blob_chunks["float2"]), 1, blob_chunks["float2"]) + self.assertLess( + blob_chunks["float1"][0].value_size, + 0.6 * blob_chunks["float2"][0].value_size + ) + + self.load_blobs(blob_names, [tmp_file]) + + # float2 should be exactly the same as the input data + np.testing.assert_array_equal(workspace.FetchBlob("float2"), float_data) + # float2 should be close-ish to the input data + np.testing.assert_array_almost_equal( + workspace.FetchBlob("float1"), float_data, decimal=2 + ) + + if __name__ == '__main__': unittest.main()