Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CopyFile with streaming #12658

Merged
merged 10 commits into from
Feb 8, 2018
37 changes: 37 additions & 0 deletions tensorflow/core/platform/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ limitations under the License.

namespace tensorflow {

// 128KB copy buffer
constexpr size_t kCopyFileBufferSize = 128 * 1024;

class FileSystemRegistryImpl : public FileSystemRegistry {
public:
Status Register(const string& scheme, Factory factory) override;
Expand Down Expand Up @@ -258,6 +261,17 @@ Status Env::RenameFile(const string& src, const string& target) {
return src_fs->RenameFile(src, target);
}

Status Env::CopyFile(const string& src, const string& target) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic for the same FS and different FS seem very similar... would it make sense to create a utility method that takes in FileSystem* src_fs and FileSystem* target_fs and does the copy. For the Filesystem::CopyFile method pass in this, this to that method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

FileSystem* src_fs;
FileSystem* target_fs;
TF_RETURN_IF_ERROR(GetFileSystemForFile(src, &src_fs));
TF_RETURN_IF_ERROR(GetFileSystemForFile(target, &target_fs));
if (src_fs == target_fs) {
return src_fs->CopyFile(src, target);
}
return FileSystemCopyFile(src_fs, src, target_fs, target);
}

string Env::GetExecutablePath() {
char exe_path[PATH_MAX] = {0};
#ifdef __APPLE__
Expand Down Expand Up @@ -361,6 +375,29 @@ Status WriteStringToFile(Env* env, const string& fname,
return s;
}

Status FileSystemCopyFile(FileSystem* src_fs, const string& src,
FileSystem* target_fs, const string& target) {
std::unique_ptr<RandomAccessFile> src_file;
TF_RETURN_IF_ERROR(src_fs->NewRandomAccessFile(src, &src_file));

std::unique_ptr<WritableFile> target_file;
TF_RETURN_IF_ERROR(target_fs->NewWritableFile(target, &target_file));

uint64 offset = 0;
std::unique_ptr<char[]> scratch(new char[kCopyFileBufferSize]);
Status s = Status::OK();
while (s.ok()) {
StringPiece result;
s = src_file->Read(offset, kCopyFileBufferSize, &result, scratch.get());
if (!(s.ok() || s.code() == error::OUT_OF_RANGE)) {
return s;
}
TF_RETURN_IF_ERROR(target_file->Append(result));
offset += result.size();
}
return target_file->Close();
}

// A ZeroCopyInputStream on a RandomAccessFile.
namespace {
class FileStream : public ::tensorflow::protobuf::io::ZeroCopyInputStream {
Expand Down
11 changes: 10 additions & 1 deletion tensorflow/core/platform/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ class Env {
/// replaced.
Status RenameFile(const string& src, const string& target);

/// \brief Copy the src to target.
Status CopyFile(const string& src, const string& target);

/// \brief Returns the absolute path of the current executable. It resolves
/// symlinks if there is any.
string GetExecutablePath();
Expand Down Expand Up @@ -279,7 +282,7 @@ class Env {
// "version" should be the version of the library or NULL
// returns the name that LoadLibrary() can use
virtual string FormatLibraryFileName(const string& name,
const string& version) = 0;
const string& version) = 0;

private:
// Returns a possible list of local temporary directories.
Expand Down Expand Up @@ -346,6 +349,7 @@ class EnvWrapper : public Env {
const string& version) override {
return target_->FormatLibraryFileName(name, version);
}

private:
Env* target_;
};
Expand Down Expand Up @@ -373,6 +377,11 @@ struct ThreadOptions {
size_t guard_size = 0; // 0: use system default value
};

/// A utility routine: copy contents of `src` in file system `src_fs`
/// to `target` in file system `target_fs`.
Status FileSystemCopyFile(FileSystem* src_fs, const string& src,
FileSystem* target_fs, const string& target);

/// A utility routine: reads contents of named file into `*data`
Status ReadFileToString(Env* env, const string& fname, string* data);

Expand Down
4 changes: 4 additions & 0 deletions tensorflow/core/platform/file_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,8 @@ Status FileSystem::RecursivelyCreateDir(const string& dirname) {
return Status::OK();
}

Status FileSystem::CopyFile(const string& src, const string& target) {
return FileSystemCopyFile(this, src, this, target);
}

} // namespace tensorflow
3 changes: 3 additions & 0 deletions tensorflow/core/platform/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ class FileSystem {
/// \brief Overwrites the target if it exists.
virtual Status RenameFile(const string& src, const string& target) = 0;

/// \brief Copy the src to target.
virtual Status CopyFile(const string& src, const string& target);

/// \brief Translate an URI to a filename for the FileSystem implementation.
///
/// The implementation in this class cleans up the path, removing
Expand Down
72 changes: 72 additions & 0 deletions tensorflow/core/platform/posix/posix_file_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ limitations under the License.
#include <fcntl.h>
#include <stdio.h>
#include <sys/mman.h>
#if !defined(__APPLE__)
#include <sys/sendfile.h>
#endif
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
Expand All @@ -34,6 +37,9 @@ limitations under the License.

namespace tensorflow {

// 128KB of copy buffer
constexpr size_t kPosixCopyFileBufferSize = 128 * 1024;

// pread() based random-access
class PosixRandomAccessFile : public RandomAccessFile {
private:
Expand Down Expand Up @@ -276,4 +282,70 @@ Status PosixFileSystem::RenameFile(const string& src, const string& target) {
return result;
}

Status PosixFileSystem::CopyFile(const string& src, const string& target) {
string translated_src = TranslateName(src);
struct stat sbuf;
if (stat(translated_src.c_str(), &sbuf) != 0) {
return IOError(src, errno);
}
int src_fd = open(translated_src.c_str(), O_RDONLY);
if (src_fd < 0) {
return IOError(src, errno);
}
string translated_target = TranslateName(target);
// O_WRONLY | O_CREAT:
// Open file for write and if file does not exist, create the file.
// S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH:
// Create the file with permission of 0644
int target_fd = open(translated_target.c_str(), O_WRONLY | O_CREAT,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (target_fd < 0) {
close(src_fd);
return IOError(target, errno);
}
int rc = 0;
off_t offset = 0;
std::unique_ptr<char[]> buffer(new char[kPosixCopyFileBufferSize]);
while (offset < sbuf.st_size) {
// Use uint64 for safe compare SSIZE_MAX
uint64 chunk = sbuf.st_size - offset;
if (chunk > SSIZE_MAX) {
chunk = SSIZE_MAX;
}
#if defined(__linux__) && !defined(__ANDROID__)
rc = sendfile(target_fd, src_fd, &offset, static_cast<size_t>(chunk));
#else
if (chunk > kPosixCopyFileBufferSize) {
chunk = kPosixCopyFileBufferSize;
}
rc = read(src_fd, buffer.get(), static_cast<size_t>(chunk));
if (rc <= 0) {
break;
}
rc = write(target_fd, buffer.get(), static_cast<size_t>(chunk));
offset += chunk;
#endif
if (rc <= 0) {
break;
}
}

Status result = Status::OK();
if (rc < 0) {
result = IOError(target, errno);
}

// Keep the error code
rc = close(target_fd);
if (rc < 0 && result == Status::OK()) {
result = IOError(target, errno);
}
rc = close(src_fd);
if (rc < 0 && result == Status::OK()) {
result = IOError(target, errno);
}

return result;
}

} // namespace tensorflow
2 changes: 2 additions & 0 deletions tensorflow/core/platform/posix/posix_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class PosixFileSystem : public FileSystem {
Status GetFileSize(const string& fname, uint64* size) override;

Status RenameFile(const string& src, const string& target) override;

Status CopyFile(const string& src, const string& target) override;
};

Status IOError(const string& context, int err_number);
Expand Down
16 changes: 5 additions & 11 deletions tensorflow/python/lib/io/file_io.i
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,15 @@ void RecursivelyCreateDir(const string& dirname, TF_Status* out_status) {
}
}

void CopyFile(const string& oldpath, const string& newpath, bool overwrite,
void CopyFile(const string& src, const string& target, bool overwrite,
TF_Status* out_status) {
// If overwrite is false and the newpath file exists then it's an error.
if (!overwrite && tensorflow::Env::Default()->FileExists(newpath).ok()) {
// If overwrite is false and the target file exists then its an error.
if (!overwrite && tensorflow::Env::Default()->FileExists(target).ok()) {
TF_SetStatus(out_status, TF_ALREADY_EXISTS, "file already exists");
return;
}
string file_content;
tensorflow::Status status = ReadFileToString(tensorflow::Env::Default(),
oldpath, &file_content);
if (!status.ok()) {
Set_TF_Status_from_Status(out_status, status);
return;
}
status = WriteStringToFile(tensorflow::Env::Default(), newpath, file_content);
tensorflow::Status status =
tensorflow::Env::Default()->CopyFile(src, target);
if (!status.ok()) {
Set_TF_Status_from_Status(out_status, status);
}
Expand Down