Skip to content

Commit

Permalink
#tf-data Remove buffer size override for different file systems. This…
Browse files Browse the repository at this point in the history
… introduced an override that users have no way of avoiding. Users who would like to have different buffer sizes should simply specify them directly with the transform.

PiperOrigin-RevId: 641083936
  • Loading branch information
wilsingosti authored and tensorflower-gardener committed Jun 11, 2024
1 parent 8234a8d commit db5ae2e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 17 deletions.
1 change: 1 addition & 0 deletions tensorflow/core/kernels/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,7 @@ tf_kernel_library(
"//tensorflow/core:lib_internal",
"//tensorflow/core/data:name_utils",
"//tensorflow/core/data:utils",
"@local_tsl//tsl/platform:logging",
],
)

Expand Down
49 changes: 33 additions & 16 deletions tensorflow/core/kernels/data/tf_record_dataset_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/kernels/data/tf_record_dataset_op.h"

#include <cstdint>

#include "tensorflow/core/data/name_utils.h"
#include "tensorflow/core/data/utils.h"
#include "tensorflow/core/framework/metrics.h"
Expand All @@ -25,6 +27,7 @@ limitations under the License.
#include "tensorflow/core/lib/io/record_reader.h"
#include "tensorflow/core/lib/io/zlib_compression_options.h"
#include "tensorflow/core/lib/io/zlib_inputstream.h"
#include "tensorflow/core/platform/logging.h"

namespace tensorflow {
namespace data {
Expand All @@ -43,6 +46,8 @@ constexpr char kCurrentFileIndex[] = "current_file_index";
constexpr char kOffset[] = "offset";
constexpr char kGcsFsPrefix[] = "gs://";
constexpr char kS3FsPrefix[] = "s3://";
constexpr int64_t kUnspecifiedBufferSize = -1;
constexpr int64_t kDefaultBufferSize = 256LL << 10; // 256KB
constexpr int64_t kCloudTpuBlockSize = 127LL << 20; // 127MB.
constexpr int64_t kS3BlockSize = kCloudTpuBlockSize;

Expand Down Expand Up @@ -323,10 +328,11 @@ void TFRecordDatasetOp::MakeDataset(OpKernelContext* ctx,
OP_REQUIRES_OK(ctx, ParseScalarArgument<tstring>(ctx, kCompressionType,
&compression_type));

int64_t buffer_size = -1;
int64_t buffer_size = kUnspecifiedBufferSize;
OP_REQUIRES_OK(ctx,
ParseScalarArgument<int64_t>(ctx, kBufferSize, &buffer_size));
OP_REQUIRES(ctx, buffer_size >= 0,
OP_REQUIRES(ctx,
(buffer_size == kUnspecifiedBufferSize) || (buffer_size >= 0),
errors::InvalidArgument(
"`buffer_size` must be >= 0 (0 == no buffering)"));

Expand All @@ -346,20 +352,31 @@ void TFRecordDatasetOp::MakeDataset(OpKernelContext* ctx,
}
}

if (is_gcs_fs && is_cloud_tpu_gcs_fs() && buffer_size < kCloudTpuBlockSize) {
LOG(WARNING) << "User buffer size is too small for reading Cloud TPU "
<< "TFRecords stored in GCS. Overriding " << buffer_size
<< " to the minimum recommended buffer_size = "
<< kCloudTpuBlockSize;
buffer_size = kCloudTpuBlockSize;
}

if (is_s3_fs && buffer_size < kS3BlockSize) {
LOG(WARNING) << "User buffer size is too small for reading "
<< "TFRecords stored in S3. Overriding " << buffer_size
<< " to the minimum recommended buffer_size = "
<< kS3BlockSize;
buffer_size = kS3BlockSize;
if (buffer_size == kUnspecifiedBufferSize) {
if (is_gcs_fs && is_cloud_tpu_gcs_fs() &&
buffer_size < kCloudTpuBlockSize) {
LOG_FIRST_N(WARNING, 1)
<< "User buffer size is too small for reading Cloud TPU "
<< "TFRecords stored in GCS. Overriding " << buffer_size
<< " to the minimum recommended buffer_size = " << kCloudTpuBlockSize;
buffer_size = kCloudTpuBlockSize;
} else if (is_s3_fs && buffer_size < kS3BlockSize) {
LOG_FIRST_N(WARNING, 1)
<< "User buffer size is too small for reading "
<< "TFRecords stored in S3. Overriding " << buffer_size
<< " to the minimum recommended buffer_size = " << kS3BlockSize;
buffer_size = kS3BlockSize;
} else {
LOG_FIRST_N(INFO, 1)
<< "TFRecordDataset `buffer_size` is unspecified, default to "
<< kDefaultBufferSize;
buffer_size = kDefaultBufferSize;
}
} else {
LOG_FIRST_N(INFO, 1)
<< "The default buffer size is " << kDefaultBufferSize
<< ", which is overridden by the user specified `buffer_size` of "
<< buffer_size;
}

*output = new Dataset(ctx, std::move(filenames), compression_type,
Expand Down
5 changes: 4 additions & 1 deletion tensorflow/python/data/ops/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
from tensorflow.python.util.tf_export import tf_export

_DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB
# The default TFRecordDataset buffer size is set to -1. The actual default is
# set in the kernel when this value is detected.
_DEFAULT_TF_RECORD_BUFFER_SIZE_BYTES = -1


def _normalise_fspath(path):
Expand Down Expand Up @@ -306,7 +309,7 @@ def __init__(self,
self._buffer_size = convert.optional_param_to_tensor(
"buffer_size",
buffer_size,
argument_default=_DEFAULT_READER_BUFFER_SIZE_BYTES)
argument_default=_DEFAULT_TF_RECORD_BUFFER_SIZE_BYTES)
self._name = name

variant_tensor = gen_dataset_ops.tf_record_dataset(
Expand Down

0 comments on commit db5ae2e

Please sign in to comment.