353 lines (282 sloc) 13.8 KB
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
#include <string>
#include <utility>
#include <vector>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/platform/cloud/auth_provider.h"
#include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h"
#include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h"
#include "tensorflow/core/platform/cloud/expiring_lru_cache.h"
#include "tensorflow/core/platform/cloud/file_block_cache.h"
#include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
#include "tensorflow/core/platform/cloud/gcs_throttle.h"
#include "tensorflow/core/platform/cloud/http_request.h"
#include "tensorflow/core/platform/cloud/retrying_file_system.h"
#include "tensorflow/core/platform/file_system.h"
namespace tensorflow {
class GcsFileSystem;
/// GcsStatsInterface allows for instrumentation of the GCS file system.
/// GcsStatsInterface and its subclasses must be safe to use from multiple
/// threads concurrently.
/// WARNING! This is an experimental interface that may change or go away at any
/// time.
class GcsStatsInterface {
/// Configure is called by the GcsFileSystem to provide instrumentation hooks.
/// Note: Configure can be called multiple times (e.g. if the block cache is
/// re-initialized).
virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle,
const FileBlockCache* block_cache) = 0;
/// RecordBlockLoadRequest is called to record a block load request is about
/// to be made.
virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0;
/// RecordBlockRetrieved is called once a block within the file has been
/// retrieved.
virtual void RecordBlockRetrieved(const string& file, size_t offset,
size_t bytes_transferred) = 0;
// RecordStatObjectRequest is called once a statting object request over GCS
// is about to be made.
virtual void RecordStatObjectRequest() = 0;
/// HttpStats is called to optionally provide a RequestStats listener
/// to be annotated on every HTTP request made to the GCS API.
/// HttpStats() may return nullptr.
virtual HttpRequest::RequestStats* HttpStats() = 0;
virtual ~GcsStatsInterface() = default;
/// Google Cloud Storage implementation of a file system.
/// The clients should use RetryingGcsFileSystem defined below,
/// which adds retry logic to GCS operations.
class GcsFileSystem : public FileSystem {
struct TimeoutConfig;
// Main constructor used (via RetryingFileSystem) throughout Tensorflow
// Used mostly for unit testing or use cases which need to customize the
// filesystem from defaults
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
std::unique_ptr<HttpRequest::Factory> http_request_factory,
std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
size_t max_bytes, uint64 max_staleness,
uint64 stat_cache_max_age, size_t stat_cache_max_entries,
uint64 matching_paths_cache_max_age,
size_t matching_paths_cache_max_entries,
int64 initial_retry_delay_usec, TimeoutConfig timeouts,
const std::unordered_set<string>& allowed_locations,
std::pair<const string, const string>* additional_header);
Status NewRandomAccessFile(
const string& filename,
std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const string& filename,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
Status FileExists(const string& fname) override;
Status Stat(const string& fname, FileStatistics* stat) override;
Status GetChildren(const string& dir, std::vector<string>* result) override;
Status GetMatchingPaths(const string& pattern,
std::vector<string>* results) override;
Status DeleteFile(const string& fname) override;
Status CreateDir(const string& dirname) override;
Status DeleteDir(const string& dirname) override;
Status GetFileSize(const string& fname, uint64* file_size) override;
Status RenameFile(const string& src, const string& target) override;
Status IsDirectory(const string& fname) override;
Status DeleteRecursively(const string& dirname, int64* undeleted_files,
int64* undeleted_dirs) override;
void FlushCaches() override;
/// Set an object to collect runtime statistics from the GcsFilesystem.
void SetStats(GcsStatsInterface* stats);
/// These accessors are mainly for testing purposes, to verify that the
/// environment variables that control these parameters are handled correctly.
size_t block_size() {
tf_shared_lock l(block_cache_lock_);
return file_block_cache_->block_size();
size_t max_bytes() {
tf_shared_lock l(block_cache_lock_);
return file_block_cache_->max_bytes();
uint64 max_staleness() {
tf_shared_lock l(block_cache_lock_);
return file_block_cache_->max_staleness();
TimeoutConfig timeouts() const { return timeouts_; }
std::unordered_set<string> allowed_locations() const {
return allowed_locations_;
string additional_header_name() const {
return additional_header_ ? additional_header_->first : "";
string additional_header_value() const {
return additional_header_ ? additional_header_->second : "";
uint64 stat_cache_max_age() const { return stat_cache_->max_age(); }
size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); }
uint64 matching_paths_cache_max_age() const {
return matching_paths_cache_->max_age();
size_t matching_paths_cache_max_entries() const {
return matching_paths_cache_->max_entries();
/// Structure containing the information for timeouts related to accessing the
/// GCS APIs.
/// All values are in seconds.
struct TimeoutConfig {
// The request connection timeout. If a connection cannot be established
// within `connect` seconds, abort the request.
uint32 connect = 120; // 2 minutes
// The request idle timeout. If a request has seen no activity in `idle`
// seconds, abort the request.
uint32 idle = 60; // 1 minute
// The maximum total time a metadata request can take. If a request has not
// completed within `metadata` seconds, the request is aborted.
uint32 metadata = 3600; // 1 hour
// The maximum total time a block read request can take. If a request has
// not completed within `read` seconds, the request is aborted.
uint32 read = 3600; // 1 hour
// The maximum total time an upload request can take. If a request has not
// completed within `write` seconds, the request is aborted.
uint32 write = 3600; // 1 hour
TimeoutConfig() {}
TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read,
uint32 write)
: connect(connect),
write(write) {}
Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request);
/// \brief Sets a new AuthProvider on the GCS FileSystem.
/// The new auth provider will be used for all subsequent requests.
void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider);
/// \brief Resets the block cache and re-instantiates it with the new values.
/// This method can be used to clear the existing block cache and/or to
/// re-configure the block cache for different values.
/// Note: the existing block cache is not cleaned up until all existing files
/// have been closed.
void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes,
uint64 max_staleness_secs);
// GCS file statistics.
struct GcsFileStat {
FileStatistics base;
int64 generation_number = 0;
/// \brief Checks if the bucket exists. Returns OK if the check succeeded.
/// 'result' is set if the function returns OK. 'result' cannot be nullptr.
Status BucketExists(const string& bucket, bool* result);
/// \brief Retrieves the GCS bucket location. Returns OK if the location was
/// retrieved.
/// Given a string bucket the GCS bucket metadata API will be called and the
/// location string filled with the location of the bucket.
/// This requires the bucket metadata permission.
/// Repeated calls for the same bucket are cached so this function can be
/// called frequently without causing an extra API call
Status GetBucketLocation(const string& bucket, string* location);
/// \brief Check if the GCS buckets location is allowed with the current
/// constraint configuration
Status CheckBucketLocationConstraint(const string& bucket);
/// \brief Given the input bucket `bucket`, fills `result_buffer` with the
/// results of the metadata. Returns OK if the API call succeeds without
/// error.
Status GetBucketMetadata(const string& bucket,
std::vector<char>* result_buffer);
/// \brief Checks if the object exists. Returns OK if the check succeeded.
/// 'result' is set if the function returns OK. 'result' cannot be nullptr.
Status ObjectExists(const string& fname, const string& bucket,
const string& object, bool* result);
/// \brief Checks if the folder exists. Returns OK if the check succeeded.
/// 'result' is set if the function returns OK. 'result' cannot be nullptr.
Status FolderExists(const string& dirname, bool* result);
/// \brief Internal version of GetChildren with more knobs.
/// If 'recursively' is true, returns all objects in all subfolders.
/// Otherwise only returns the immediate children in the directory.
/// If 'include_self_directory_marker' is true and there is a GCS directory
/// marker at the path 'dir', GetChildrenBound will return an empty string
/// as one of the children that represents this marker.
Status GetChildrenBounded(const string& dir, uint64 max_results,
std::vector<string>* result, bool recursively,
bool include_self_directory_marker);
/// Retrieves file statistics assuming fname points to a GCS object. The data
/// may be read from cache or from GCS directly.
Status StatForObject(const string& fname, const string& bucket,
const string& object, GcsFileStat* stat);
/// Retrieves file statistics of file fname directly from GCS.
Status UncachedStatForObject(const string& fname, const string& bucket,
const string& object, GcsFileStat* stat);
Status RenameObject(const string& src, const string& target);
std::unique_ptr<FileBlockCache> MakeFileBlockCache(size_t block_size,
size_t max_bytes,
uint64 max_staleness);
/// Loads file contents from GCS for a given filename, offset, and length.
Status LoadBufferFromGCS(const string& filename, size_t offset, size_t n,
char* buffer, size_t* bytes_transferred);
// Clear all the caches related to the file with name `filename`.
void ClearFileCaches(const string& fname);
mutex mu_;
std::unique_ptr<AuthProvider> auth_provider_ GUARDED_BY(mu_);
std::shared_ptr<HttpRequest::Factory> http_request_factory_;
std::unique_ptr<ZoneProvider> zone_provider_;
// block_cache_lock_ protects the file_block_cache_ pointer (Note that
// FileBlockCache instances are themselves threadsafe).
mutex block_cache_lock_;
std::unique_ptr<FileBlockCache> file_block_cache_
std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_;
std::unique_ptr<GcsDnsCache> dns_cache_;
GcsThrottle throttle_;
using StatCache = ExpiringLRUCache<GcsFileStat>;
std::unique_ptr<StatCache> stat_cache_;
using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>;
std::unique_ptr<MatchingPathsCache> matching_paths_cache_;
using BucketLocationCache = ExpiringLRUCache<string>;
std::unique_ptr<BucketLocationCache> bucket_location_cache_;
std::unordered_set<string> allowed_locations_;
TimeoutConfig timeouts_;
GcsStatsInterface* stats_ = nullptr; // Not owned.
/// The initial delay for exponential backoffs when retrying failed calls.
const int64 initial_retry_delay_usec_ = 1000000L;
// Additional header material to be transmitted with all GCS requests
std::unique_ptr<std::pair<const string, const string>> additional_header_;
/// Google Cloud Storage implementation of a file system with retry on failures.
class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> {
: RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem)) {}
} // namespace tensorflow