From db5ae2e526da10b3b8f0c97feee1c64f2e0e2808 Mon Sep 17 00:00:00 2001 From: Wilsin Gosti Date: Thu, 6 Jun 2024 18:15:49 -0700 Subject: [PATCH] #tf-data Remove buffer size override for different file systems. This introduced an override that users have no way of avoiding. Users who would like to have different buffer sizes should simply specify them directly with the transform. PiperOrigin-RevId: 641083936 --- tensorflow/core/kernels/data/BUILD | 1 + .../core/kernels/data/tf_record_dataset_op.cc | 49 +++++++++++++------ tensorflow/python/data/ops/readers.py | 5 +- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/tensorflow/core/kernels/data/BUILD b/tensorflow/core/kernels/data/BUILD index 1d5071da2c0d84..c02984a37abdbf 100644 --- a/tensorflow/core/kernels/data/BUILD +++ b/tensorflow/core/kernels/data/BUILD @@ -1455,6 +1455,7 @@ tf_kernel_library( "//tensorflow/core:lib_internal", "//tensorflow/core/data:name_utils", "//tensorflow/core/data:utils", + "@local_tsl//tsl/platform:logging", ], ) diff --git a/tensorflow/core/kernels/data/tf_record_dataset_op.cc b/tensorflow/core/kernels/data/tf_record_dataset_op.cc index 65d844c1290d12..33a3ce97d91e9d 100644 --- a/tensorflow/core/kernels/data/tf_record_dataset_op.cc +++ b/tensorflow/core/kernels/data/tf_record_dataset_op.cc @@ -14,6 +14,8 @@ limitations under the License. ==============================================================================*/ #include "tensorflow/core/kernels/data/tf_record_dataset_op.h" +#include + #include "tensorflow/core/data/name_utils.h" #include "tensorflow/core/data/utils.h" #include "tensorflow/core/framework/metrics.h" @@ -25,6 +27,7 @@ limitations under the License. #include "tensorflow/core/lib/io/record_reader.h" #include "tensorflow/core/lib/io/zlib_compression_options.h" #include "tensorflow/core/lib/io/zlib_inputstream.h" +#include "tensorflow/core/platform/logging.h" namespace tensorflow { namespace data { @@ -43,6 +46,8 @@ constexpr char kCurrentFileIndex[] = "current_file_index"; constexpr char kOffset[] = "offset"; constexpr char kGcsFsPrefix[] = "gs://"; constexpr char kS3FsPrefix[] = "s3://"; +constexpr int64_t kUnspecifiedBufferSize = -1; +constexpr int64_t kDefaultBufferSize = 256LL << 10; // 256KB constexpr int64_t kCloudTpuBlockSize = 127LL << 20; // 127MB. constexpr int64_t kS3BlockSize = kCloudTpuBlockSize; @@ -323,10 +328,11 @@ void TFRecordDatasetOp::MakeDataset(OpKernelContext* ctx, OP_REQUIRES_OK(ctx, ParseScalarArgument(ctx, kCompressionType, &compression_type)); - int64_t buffer_size = -1; + int64_t buffer_size = kUnspecifiedBufferSize; OP_REQUIRES_OK(ctx, ParseScalarArgument(ctx, kBufferSize, &buffer_size)); - OP_REQUIRES(ctx, buffer_size >= 0, + OP_REQUIRES(ctx, + (buffer_size == kUnspecifiedBufferSize) || (buffer_size >= 0), errors::InvalidArgument( "`buffer_size` must be >= 0 (0 == no buffering)")); @@ -346,20 +352,31 @@ void TFRecordDatasetOp::MakeDataset(OpKernelContext* ctx, } } - if (is_gcs_fs && is_cloud_tpu_gcs_fs() && buffer_size < kCloudTpuBlockSize) { - LOG(WARNING) << "User buffer size is too small for reading Cloud TPU " - << "TFRecords stored in GCS. Overriding " << buffer_size - << " to the minimum recommended buffer_size = " - << kCloudTpuBlockSize; - buffer_size = kCloudTpuBlockSize; - } - - if (is_s3_fs && buffer_size < kS3BlockSize) { - LOG(WARNING) << "User buffer size is too small for reading " - << "TFRecords stored in S3. Overriding " << buffer_size - << " to the minimum recommended buffer_size = " - << kS3BlockSize; - buffer_size = kS3BlockSize; + if (buffer_size == kUnspecifiedBufferSize) { + if (is_gcs_fs && is_cloud_tpu_gcs_fs() && + buffer_size < kCloudTpuBlockSize) { + LOG_FIRST_N(WARNING, 1) + << "User buffer size is too small for reading Cloud TPU " + << "TFRecords stored in GCS. Overriding " << buffer_size + << " to the minimum recommended buffer_size = " << kCloudTpuBlockSize; + buffer_size = kCloudTpuBlockSize; + } else if (is_s3_fs && buffer_size < kS3BlockSize) { + LOG_FIRST_N(WARNING, 1) + << "User buffer size is too small for reading " + << "TFRecords stored in S3. Overriding " << buffer_size + << " to the minimum recommended buffer_size = " << kS3BlockSize; + buffer_size = kS3BlockSize; + } else { + LOG_FIRST_N(INFO, 1) + << "TFRecordDataset `buffer_size` is unspecified, default to " + << kDefaultBufferSize; + buffer_size = kDefaultBufferSize; + } + } else { + LOG_FIRST_N(INFO, 1) + << "The default buffer size is " << kDefaultBufferSize + << ", which is overridden by the user specified `buffer_size` of " + << buffer_size; } *output = new Dataset(ctx, std::move(filenames), compression_type, diff --git a/tensorflow/python/data/ops/readers.py b/tensorflow/python/data/ops/readers.py index 566abb7b66eceb..202fe945534490 100644 --- a/tensorflow/python/data/ops/readers.py +++ b/tensorflow/python/data/ops/readers.py @@ -34,6 +34,9 @@ from tensorflow.python.util.tf_export import tf_export _DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB +# The default TFRecordDataset buffer size is set to -1. The actual default is +# set in the kernel when this value is detected. +_DEFAULT_TF_RECORD_BUFFER_SIZE_BYTES = -1 def _normalise_fspath(path): @@ -306,7 +309,7 @@ def __init__(self, self._buffer_size = convert.optional_param_to_tensor( "buffer_size", buffer_size, - argument_default=_DEFAULT_READER_BUFFER_SIZE_BYTES) + argument_default=_DEFAULT_TF_RECORD_BUFFER_SIZE_BYTES) self._name = name variant_tensor = gen_dataset_ops.tf_record_dataset(