From 6f26118dfc1ba6ed0afbb749a8d93fb4c742e752 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Tue, 18 Oct 2022 18:55:42 +0800 Subject: [PATCH 1/4] make ossfs work again --- tensorflow_io/BUILD | 1 - tensorflow_io/core/BUILD | 16 - tensorflow_io/core/filesystems/BUILD | 1 + .../core/filesystems/filesystem_plugins.cc | 3 +- .../core/filesystems/filesystem_plugins.h | 6 + .../core/kernels/oss/oss_file_system.cc | 382 ++++++++++++++++-- .../core/kernels/oss/oss_file_system.h | 63 +-- tests/test_ossfs.py | 1 + 8 files changed, 394 insertions(+), 79 deletions(-) diff --git a/tensorflow_io/BUILD b/tensorflow_io/BUILD index 0159098f3..6373f11e3 100644 --- a/tensorflow_io/BUILD +++ b/tensorflow_io/BUILD @@ -46,7 +46,6 @@ cc_binary( "//tensorflow_io/core:elasticsearch_ops", "//tensorflow_io/core:genome_ops", "//tensorflow_io/core:optimization", - "//tensorflow_io/core:oss_ops", "//tensorflow_io/core/kernels/gsmemcachedfs:gs_memcached_file_system", ], }) + select({ diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index 511282484..e68c817a2 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -707,22 +707,6 @@ cc_library( alwayslink = 1, ) -cc_library( - name = "oss_ops", - srcs = [ - "kernels/oss/oss_file_system.cc", - "kernels/oss/oss_file_system.h", - ], - copts = tf_io_copts(), - linkstatic = True, - deps = [ - "@aliyun_oss_c_sdk", - "@local_config_tf//:libtensorflow_framework", - "@local_config_tf//:tf_header_lib", - ], - alwayslink = 1, -) - cc_library( name = "sql_ops", srcs = [ diff --git a/tensorflow_io/core/filesystems/BUILD b/tensorflow_io/core/filesystems/BUILD index 780b6a2e4..89ed11ace 100644 --- a/tensorflow_io/core/filesystems/BUILD +++ b/tensorflow_io/core/filesystems/BUILD @@ -39,6 +39,7 @@ cc_library( "//tensorflow_io/core/filesystems/hdfs", "//tensorflow_io/core/filesystems/http", "//tensorflow_io/core/filesystems/s3", + "//tensorflow_io/core/filesystems/oss", ], alwayslink = 1, ) diff --git a/tensorflow_io/core/filesystems/filesystem_plugins.cc b/tensorflow_io/core/filesystems/filesystem_plugins.cc index 5db773bf3..5d059bd5b 100644 --- a/tensorflow_io/core/filesystems/filesystem_plugins.cc +++ b/tensorflow_io/core/filesystems/filesystem_plugins.cc @@ -29,7 +29,7 @@ limitations under the License. TFIO_PLUGIN_EXPORT void TF_InitPlugin(TF_FilesystemPluginInfo* info) { info->plugin_memory_allocate = tensorflow::io::plugin_memory_allocate; info->plugin_memory_free = tensorflow::io::plugin_memory_free; - info->num_schemes = 7; + info->num_schemes = 8; info->ops = static_cast( tensorflow::io::plugin_memory_allocate(info->num_schemes * sizeof(info->ops[0]))); @@ -40,4 +40,5 @@ TFIO_PLUGIN_EXPORT void TF_InitPlugin(TF_FilesystemPluginInfo* info) { tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[4], "hdfs"); tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[5], "viewfs"); tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[6], "har"); + tensorflow::io::oss::ProvideFilesystemSupportFor(&info->ops[7], "oss"); } diff --git a/tensorflow_io/core/filesystems/filesystem_plugins.h b/tensorflow_io/core/filesystems/filesystem_plugins.h index b3e708a37..1b39a407a 100644 --- a/tensorflow_io/core/filesystems/filesystem_plugins.h +++ b/tensorflow_io/core/filesystems/filesystem_plugins.h @@ -50,6 +50,12 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri); } // namespace s3 +namespace oss { + +void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri); + +} // namespace oss + } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/kernels/oss/oss_file_system.cc b/tensorflow_io/core/kernels/oss/oss_file_system.cc index 67b789b73..1165d3876 100644 --- a/tensorflow_io/core/kernels/oss/oss_file_system.cc +++ b/tensorflow_io/core/kernels/oss/oss_file_system.cc @@ -9,7 +9,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "oss_file_system.h" +#include "oss_filesystem.h" #include #include @@ -31,10 +31,11 @@ limitations under the License. #include "tensorflow/core/platform/file_system_helper.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/thread_annotations.h" +#include "tensorflow_io/core/filesystems/filesystem_plugins.h" namespace tensorflow { namespace io { -namespace { +namespace oss { constexpr char kOSSCredentialsDefaultFile[] = ".osscredentials"; constexpr char kOSSCredentialsFileEnvKey[] = "OSS_CREDENTIALS"; @@ -46,11 +47,18 @@ constexpr char kOSSAccessIdKey[] = "id"; constexpr char kOSSAccessKeyKey[] = "key"; constexpr char kOSSHostKey[] = "host"; constexpr char kDelim[] = "/"; +static char oss_user_agent[256] = ""; void oss_initialize_with_throwable() { if (aos_http_io_initialize(NULL, 0) != AOSE_OK) { throw std::exception(); } + std::string user_agent = aos_default_http_transport_options->user_agent; + user_agent += std::string(", TensorFlow I/O"); + if (user_agent.size() < 256) { + strncpy(oss_user_agent, user_agent.c_str(), user_agent.size()); + aos_default_http_transport_options->user_agent = oss_user_agent; + } } Status oss_initialize() { @@ -487,7 +495,6 @@ class OSSWritableFile : public WritableFile { mutex mu_; int64_t part_number_; }; -} // namespace OSSFileSystem::OSSFileSystem() {} @@ -567,7 +574,7 @@ Status OSSFileSystem::NewRandomAccessFile( std::string host, access_id, access_key; TF_RETURN_IF_ERROR( _ParseOSSURIPath(filename, bucket, object, host, access_id, access_key)); - FileStatistics stat; + TF_FileStatistics stat; OSSConnection conn(host, access_id, access_key); TF_RETURN_IF_ERROR(_RetrieveObjectMetadata( conn.getPool(), conn.getRequestOptions(), bucket, object, &stat)); @@ -614,7 +621,7 @@ Status OSSFileSystem::NewReadOnlyMemoryRegionFromFile( } Status OSSFileSystem::FileExists(const std::string& fname) { - FileStatistics stat; + TF_FileStatistics stat; if (Stat(fname, &stat).ok()) { return Status::OK(); } else { @@ -687,7 +694,7 @@ Status OSSFileSystem::_StatInternal(aos_pool_t* pool, const oss_request_options_t* options, const std::string& bucket, const std::string& object, - FileStatistics* stat) { + TF_FileStatistics* stat) { Status s = _RetrieveObjectMetadata(pool, options, bucket, object, stat); if (s.ok()) { VLOG(1) << "RetrieveObjectMetadata for object: " << object @@ -728,7 +735,7 @@ Status OSSFileSystem::_StatInternal(aos_pool_t* pool, Status OSSFileSystem::_RetrieveObjectMetadata( aos_pool_t* pool, const oss_request_options_t* options, const std::string& bucket, const std::string& object, - FileStatistics* stat) { + TF_FileStatistics* stat) { aos_string_t oss_bucket; aos_string_t oss_object; aos_table_t* headers = NULL; @@ -788,7 +795,7 @@ Status OSSFileSystem::_RetrieveObjectMetadata( } } -Status OSSFileSystem::Stat(const std::string& fname, FileStatistics* stat) { +Status OSSFileSystem::Stat(const std::string& fname, TF_FileStatistics* stat) { TF_RETURN_IF_ERROR(oss_initialize()); std::string object, bucket; std::string host, access_id, access_key; @@ -816,12 +823,6 @@ Status OSSFileSystem::GetChildren(const std::string& dir, true, 1000); } -Status OSSFileSystem::GetMatchingPaths(const std::string& pattern, - std::vector* results) { - return tensorflow::internal::GetMatchingPaths(this, Env::Default(), pattern, - results); -} - Status OSSFileSystem::_DeleteObjectInternal( const oss_request_options_t* options, const std::string& bucket, const std::string& object) { @@ -873,7 +874,7 @@ Status OSSFileSystem::CreateDir(const std::string& dirname) { return _CreateDirInternal(pool, ossOptions, bucket, object); } - FileStatistics stat; + TF_FileStatistics stat; StringPiece parent = io::Dirname(dirs); if (!_StatInternal(pool, ossOptions, bucket, string(parent), &stat).ok()) { @@ -925,7 +926,7 @@ Status OSSFileSystem::_CreateDirInternal(aos_pool_t* pool, const oss_request_options_t* options, const std::string& bucket, const std::string& dirname) { - FileStatistics stat; + TF_FileStatistics stat; if (_RetrieveObjectMetadata(pool, options, bucket, dirname, &stat).ok()) { if (!stat.is_directory) { VLOG(0) << "object already exists as a file: " << dirname; @@ -998,7 +999,7 @@ Status OSSFileSystem::DeleteDir(const std::string& dirname) { } Status OSSFileSystem::GetFileSize(const std::string& fname, uint64* file_size) { - FileStatistics stat; + TF_FileStatistics stat; TF_RETURN_IF_ERROR(Stat(fname, &stat)); *file_size = stat.length; return Status::OK(); @@ -1056,8 +1057,8 @@ Status OSSFileSystem::RenameFile(const std::string& src, aos_str_set(&dest_object, tmp_dobject.c_str()); resp_status = - _RenameFileInternal(oss_options, pool, source_bucket, source_object, - dest_bucket, dest_object); + _CopyFileInternal(oss_options, pool, source_bucket, source_object, + dest_bucket, dest_object); if (!aos_status_is_ok(resp_status)) { string msg; oss_error_message(resp_status, &msg); @@ -1073,8 +1074,8 @@ Status OSSFileSystem::RenameFile(const std::string& src, aos_str_set(&source_object, sobject.c_str()); aos_str_set(&dest_object, dobject.c_str()); - resp_status = _RenameFileInternal(oss_options, pool, source_bucket, - source_object, dest_bucket, dest_object); + resp_status = _CopyFileInternal(oss_options, pool, source_bucket, + source_object, dest_bucket, dest_object); if (!aos_status_is_ok(resp_status)) { string msg; oss_error_message(resp_status, &msg); @@ -1087,7 +1088,7 @@ Status OSSFileSystem::RenameFile(const std::string& src, return _DeleteObjectInternal(oss_options, sbucket, sobject); } -aos_status_t* OSSFileSystem::_RenameFileInternal( +aos_status_t* OSSFileSystem::_CopyFileInternal( const oss_request_options_t* oss_options, aos_pool_t* pool, const aos_string_t& source_bucket, const aos_string_t& source_object, const aos_string_t& dest_bucket, const aos_string_t& dest_object) { @@ -1107,7 +1108,7 @@ aos_status_t* OSSFileSystem::_RenameFileInternal( int max_ret = 1000; // get file size - FileStatistics stat; + TF_FileStatistics stat; _StatInternal(pool, oss_options, std::string(source_bucket.data), std::string(source_object.data), &stat); uint64 file_size = stat.length; @@ -1206,7 +1207,7 @@ aos_status_t* OSSFileSystem::_RenameFileInternal( } Status OSSFileSystem::IsDirectory(const std::string& fname) { - FileStatistics stat; + TF_FileStatistics stat; TF_RETURN_IF_ERROR(Stat(fname, &stat)); return stat.is_directory @@ -1215,8 +1216,8 @@ Status OSSFileSystem::IsDirectory(const std::string& fname) { } Status OSSFileSystem::DeleteRecursively(const std::string& dirname, - int64* undeleted_files, - int64* undeleted_dirs) { + uint64* undeleted_files, + uint64* undeleted_dirs) { if (!undeleted_files || !undeleted_dirs) { return errors::Internal( "'undeleted_files' and 'undeleted_dirs' cannot be nullptr."); @@ -1234,7 +1235,7 @@ Status OSSFileSystem::DeleteRecursively(const std::string& dirname, aos_pool_t* pool = oss.getPool(); std::vector children; - FileStatistics stat; + TF_FileStatistics stat; Status s; s = _StatInternal(pool, oss_options, bucket, object, &stat); if (!s.ok() || !stat.is_directory) { @@ -1274,10 +1275,331 @@ Status OSSFileSystem::DeleteRecursively(const std::string& dirname, return Status::OK(); } -namespace { +Status OSSFileSystem::CopyFile(const string& src, const string& target) { + TF_RETURN_IF_ERROR(oss_initialize()); + + std::string sobject, sbucket; + std::string host, access_id, access_key; + TF_RETURN_IF_ERROR( + _ParseOSSURIPath(src, sbucket, sobject, host, access_id, access_key)); + std::string dobject, dbucket; + std::string dhost, daccess_id, daccess_key; + TF_RETURN_IF_ERROR(_ParseOSSURIPath(target, dbucket, dobject, dhost, + daccess_id, daccess_key)); + + if (host != dhost || access_id != daccess_id || access_key != daccess_key) { + VLOG(0) << "rename " << src << " to " << target << " failed, with errMsg: " + << " source oss cluster does not match dest oss cluster"; + return errors::Internal( + "rename ", src, " to ", target, " failed, errMsg: ", + "source oss cluster does not match dest oss cluster"); + } + + OSSConnection oss(host, access_id, access_key); + oss_request_options_t* oss_options = oss.getRequestOptions(); + aos_pool_t* pool = oss.getPool(); + + aos_status_t* resp_status; + aos_string_t source_bucket; + aos_string_t source_object; + aos_string_t dest_bucket; + aos_string_t dest_object; + + aos_str_set(&source_bucket, sbucket.c_str()); + aos_str_set(&source_object, sobject.c_str()); + aos_str_set(&dest_bucket, dbucket.c_str()); + aos_str_set(&dest_object, dobject.c_str()); + + resp_status = _CopyFileInternal(oss_options, pool, source_bucket, source_object, + dest_bucket, dest_object); + if (!aos_status_is_ok(resp_status)) { + string msg; + oss_error_message(resp_status, &msg); + VLOG(0) << "copy " << src << " to " << target + << " failed, errMsg: " << msg; + return errors::Internal("copy ", src, " to ", target, + " failed, errMsg: ", msg); + } + return Status::OK(); +} + +void ToTF_Status(const ::tensorflow::Status& s, TF_Status* status) { + TF_SetStatus(status, TF_Code(int(s.code())), s.error_message().c_str()); +} + +// SECTION 1. Implementation for `TF_RandomAccessFile` +// ---------------------------------------------------------------------------- +namespace tf_random_access_file { + +static void Cleanup(TF_RandomAccessFile* file) { + auto oss_file = static_cast(file->plugin_file); + delete oss_file; +} + +static int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, + char* buffer, TF_Status* status) { + auto oss_file = static_cast(file->plugin_file); + StringPiece result; + ToTF_Status(oss_file->Read(offset, n, &result, buffer), status); + return result.size(); +} + +} // namespace tf_random_access_file + +// SECTION 2. Implementation for `TF_WritableFile` +// ---------------------------------------------------------------------------- +namespace tf_writable_file { + +static void Cleanup(TF_WritableFile* file) { + auto oss_file = static_cast(file->plugin_file); + delete oss_file; +} + +static void Append(const TF_WritableFile* file, const char* buffer, size_t n, + TF_Status* status) { + auto oss_file = static_cast(file->plugin_file); + ToTF_Status(oss_file->Append(StringPiece(buffer, n)), status); +} + +static int64_t Tell(const TF_WritableFile* file, TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, "Stat not implemented"); + return -1; +} + +static void Flush(const TF_WritableFile* file, TF_Status* status) { + auto oss_file = static_cast(file->plugin_file); + ToTF_Status(oss_file->Flush(), status); +} + +static void Sync(const TF_WritableFile* file, TF_Status* status) { + auto oss_file = static_cast(file->plugin_file); + ToTF_Status(oss_file->Sync(), status); +} + +static void Close(const TF_WritableFile* file, TF_Status* status) { + auto oss_file = static_cast(file->plugin_file); + ToTF_Status(oss_file->Close(), status); +} + +} // namespace tf_writable_file + +// SECTION 3. Implementation for `TF_ReadOnlyMemoryRegion` +// ---------------------------------------------------------------------------- +namespace tf_read_only_memory_region { +void Cleanup(TF_ReadOnlyMemoryRegion* region) { + auto r = static_cast(region->plugin_memory_region); + delete r; +} + +const void* Data(const TF_ReadOnlyMemoryRegion* region) { + auto r = static_cast(region->plugin_memory_region); + return r->data(); +} + +uint64_t Length(const TF_ReadOnlyMemoryRegion* region) { + auto r = static_cast(region->plugin_memory_region); + return r->length(); +} + +} // namespace tf_read_only_memory_region -REGISTER_FILE_SYSTEM("oss", OSSFileSystem); +// SECTION 4. Implementation for `TF_Filesystem`, the actual filesystem +// ---------------------------------------------------------------------------- +namespace tf_oss_filesystem { + +static void Init(TF_Filesystem* filesystem, TF_Status* status) { + filesystem->plugin_filesystem = new OSSFileSystem(); + TF_SetStatus(status, TF_OK, ""); +} + +static void Cleanup(TF_Filesystem* filesystem) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + delete oss_fs; +} + +void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path, + TF_RandomAccessFile* file, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + std::unique_ptr result; + ToTF_Status(oss_fs->NewRandomAccessFile(path, &result), status); + if (TF_GetCode(status) == TF_OK) { + file->plugin_file = result.release(); + } +} + +void NewWritableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + std::unique_ptr result; + ToTF_Status(oss_fs->NewWritableFile(path, &result), status); + if (TF_GetCode(status) == TF_OK) { + file->plugin_file = result.release(); + } +} + +void NewAppendableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + std::unique_ptr result; + ToTF_Status(oss_fs->NewAppendableFile(path, &result), status); + if (TF_GetCode(status) == TF_OK) { + file->plugin_file = result.release(); + } +} + +void NewReadOnlyMemoryRegionFromFile(const TF_Filesystem* filesystem, + const char* path, + TF_ReadOnlyMemoryRegion* region, + TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + std::unique_ptr result; + ToTF_Status(oss_fs->NewReadOnlyMemoryRegionFromFile(path, &result), status); + if (TF_GetCode(status) == TF_OK) { + region->plugin_memory_region = result.release(); + } +} + +void CreateDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->CreateDir(path), status); +} + +void RecursivelyCreateDir(const TF_Filesystem* filesystem, + const char* path, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->RecursivelyCreateDir(path), status); +} + +void DeleteFile(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->DeleteFile(path), status); +} + +void DeleteDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->DeleteDir(path), status); +} + +void DeleteRecursively(const TF_Filesystem* filesystem, const char* path, + uint64_t* undeleted_files, + uint64_t* undeleted_dirs, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->DeleteRecursively(path, undeleted_files, + undeleted_dirs), status); +} + +void RenameFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->RenameFile(src, dst), status); +} + +void CopyFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->CopyFile(src, dst), status); +} + +bool IsDirectory(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->IsDirectory(path), status); + return TF_GetCode(status) == TF_OK; +} + +void Stat(const TF_Filesystem* filesystem, const char* path, + TF_FileStatistics* stats, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + ToTF_Status(oss_fs->Stat(path, stats), status); +} + +void PathExists(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_FileStatistics stats; + Stat(filesystem, path, &stats, status); +} + +int GetChildren(const TF_Filesystem* filesystem, const char* path, + char*** entries, TF_Status* status) { + auto oss_fs = static_cast(filesystem->plugin_filesystem); + std::vector result; + ToTF_Status(oss_fs->GetChildren(path, &result), status); + int num_entries = result.size(); + *entries = static_cast( + plugin_memory_allocate(num_entries * sizeof((*entries)[0]))); + for (int i = 0; i < num_entries; i++) + (*entries)[i] = strdup(result[i].c_str()); + return TF_GetCode(status) == TF_OK ? num_entries : -1; +} + +int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_FileStatistics stats; + Stat(filesystem, path, &stats, status); + return stats.length; +} + +char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { + return strdup(uri); +} + +} // namespace tf_oss_filesystem + +void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) { + TF_SetFilesystemVersionMetadata(ops); + ops->scheme = strdup(uri); + + ops->random_access_file_ops = static_cast( + plugin_memory_allocate(TF_RANDOM_ACCESS_FILE_OPS_SIZE)); + ops->random_access_file_ops->cleanup = tf_random_access_file::Cleanup; + ops->random_access_file_ops->read = tf_random_access_file::Read; + + ops->writable_file_ops = static_cast( + plugin_memory_allocate(TF_WRITABLE_FILE_OPS_SIZE)); + ops->writable_file_ops->cleanup = tf_writable_file::Cleanup; + ops->writable_file_ops->append = tf_writable_file::Append; + ops->writable_file_ops->tell = tf_writable_file::Tell; + ops->writable_file_ops->flush = tf_writable_file::Flush; + ops->writable_file_ops->sync = tf_writable_file::Sync; + ops->writable_file_ops->close = tf_writable_file::Close; + + ops->read_only_memory_region_ops = static_cast( + plugin_memory_allocate(TF_READ_ONLY_MEMORY_REGION_OPS_SIZE)); + ops->read_only_memory_region_ops->cleanup = + tf_read_only_memory_region::Cleanup; + ops->read_only_memory_region_ops->data = tf_read_only_memory_region::Data; + ops->read_only_memory_region_ops->length = tf_read_only_memory_region::Length; + + ops->filesystem_ops = static_cast( + plugin_memory_allocate(TF_FILESYSTEM_OPS_SIZE)); + ops->filesystem_ops->init = tf_oss_filesystem::Init; + ops->filesystem_ops->cleanup = tf_oss_filesystem::Cleanup; + ops->filesystem_ops->new_random_access_file = + tf_oss_filesystem::NewRandomAccessFile; + ops->filesystem_ops->new_writable_file = tf_oss_filesystem::NewWritableFile; + ops->filesystem_ops->new_appendable_file = + tf_oss_filesystem::NewAppendableFile; + ops->filesystem_ops->new_read_only_memory_region_from_file = + tf_oss_filesystem::NewReadOnlyMemoryRegionFromFile; + ops->filesystem_ops->create_dir = tf_oss_filesystem::CreateDir; + ops->filesystem_ops->recursively_create_dir = + tf_oss_filesystem::RecursivelyCreateDir; + ops->filesystem_ops->delete_file = tf_oss_filesystem::DeleteFile; + ops->filesystem_ops->delete_recursively = tf_oss_filesystem::DeleteRecursively; + ops->filesystem_ops->delete_dir = tf_oss_filesystem::DeleteDir; + ops->filesystem_ops->copy_file = tf_oss_filesystem::CopyFile; + ops->filesystem_ops->rename_file = tf_oss_filesystem::RenameFile; + ops->filesystem_ops->path_exists = tf_oss_filesystem::PathExists; + ops->filesystem_ops->stat = tf_oss_filesystem::Stat; + ops->filesystem_ops->is_directory = tf_oss_filesystem::IsDirectory; + ops->filesystem_ops->get_file_size = tf_oss_filesystem::GetFileSize; + ops->filesystem_ops->get_children = tf_oss_filesystem::GetChildren; + ops->filesystem_ops->translate_name = tf_oss_filesystem::TranslateName; +} -} // namespace +} // end namespace oss } // end namespace io } // end namespace tensorflow diff --git a/tensorflow_io/core/kernels/oss/oss_file_system.h b/tensorflow_io/core/kernels/oss/oss_file_system.h index 34a3bb21a..9ea11a0c4 100644 --- a/tensorflow_io/core/kernels/oss/oss_file_system.h +++ b/tensorflow_io/core/kernels/oss/oss_file_system.h @@ -10,8 +10,8 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#ifndef TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILE_SYSTEM_H_ -#define TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILE_SYSTEM_H_ +#ifndef TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILESYSTEM_H_ +#define TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILESYSTEM_H_ #include #include @@ -25,58 +25,58 @@ limitations under the License. #include "oss_api.h" #include "oss_auth.h" #include "oss_util.h" +#include "tensorflow/c/experimental/filesystem/filesystem_interface.h" #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/platform/env.h" -#include "tensorflow/core/platform/file_system.h" #include "tensorflow/core/platform/mutex.h" namespace tensorflow { namespace io { +namespace oss { /// Aliyun oss implementation of a file system. -class OSSFileSystem : public FileSystem { +class OSSFileSystem { public: OSSFileSystem(); Status NewRandomAccessFile( const string& filename, - std::unique_ptr* result) override; + std::unique_ptr* result); Status NewWritableFile(const string& fname, - std::unique_ptr* result) override; + std::unique_ptr* result); Status NewAppendableFile(const string& fname, - std::unique_ptr* result) override; + std::unique_ptr* result); Status NewReadOnlyMemoryRegionFromFile( const string& filename, - std::unique_ptr* result) override; + std::unique_ptr* result); - Status FileExists(const string& fname) override; + Status FileExists(const string& fname); - Status Stat(const string& fname, FileStatistics* stat) override; + Status Stat(const string& fname, TF_FileStatistics* stat); - Status GetChildren(const string& dir, std::vector* result) override; + Status GetChildren(const string& dir, std::vector* result); - Status GetMatchingPaths(const string& pattern, - std::vector* results) override; + Status DeleteFile(const string& fname); - Status DeleteFile(const string& fname) override; + Status CreateDir(const string& dirname); - Status CreateDir(const string& dirname) override; + Status RecursivelyCreateDir(const string& dirname); - Status RecursivelyCreateDir(const string& dirname) override; + Status DeleteDir(const string& dirname); - Status DeleteDir(const string& dirname) override; + Status GetFileSize(const string& fname, uint64* file_size); - Status GetFileSize(const string& fname, uint64* file_size) override; + Status RenameFile(const string& src, const string& target); - Status RenameFile(const string& src, const string& target) override; + Status CopyFile(const string& src, const string& dst); - Status IsDirectory(const string& fname) override; + Status IsDirectory(const string& fname); - Status DeleteRecursively(const string& dirname, int64* undeleted_files, - int64* undeleted_dirs) override; + Status DeleteRecursively(const string& dirname, uint64* undeleted_files, + uint64* undeleted_dirs); private: Status _CreateDirInternal(aos_pool_t* pool, @@ -85,7 +85,7 @@ class OSSFileSystem : public FileSystem { Status _StatInternal(aos_pool_t* pool, const oss_request_options_t* options, const string& bucket, const string& object, - FileStatistics* stat); + TF_FileStatistics* stat); Status _DeleteObjectInternal(const oss_request_options_t* options, const string& bucket, const string& object); @@ -93,14 +93,14 @@ class OSSFileSystem : public FileSystem { Status _RetrieveObjectMetadata(aos_pool_t* pool, const oss_request_options_t* options, const string& bucket, const string& object, - FileStatistics* stat); + TF_FileStatistics* stat); - aos_status_t* _RenameFileInternal(const oss_request_options_t* oss_options, - aos_pool_t* pool, - const aos_string_t& source_bucket, - const aos_string_t& source_object, - const aos_string_t& dest_bucket, - const aos_string_t& dest_object); + aos_status_t* _CopyFileInternal(const oss_request_options_t* oss_options, + aos_pool_t* pool, + const aos_string_t& source_bucket, + const aos_string_t& source_object, + const aos_string_t& dest_bucket, + const aos_string_t& dest_object); Status _ListObjects(aos_pool_t* pool, const oss_request_options_t* options, const string& bucket, const string& key, @@ -131,7 +131,8 @@ class OSSFileSystem : public FileSystem { TF_DISALLOW_COPY_AND_ASSIGN(OSSFileSystem); }; +} // namespace oss } // namespace io } // namespace tensorflow -#endif // TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILE_SYSTEM_H_ +#endif // TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILESYSTEM_H_ diff --git a/tests/test_ossfs.py b/tests/test_ossfs.py index 20e794486..619b7203e 100644 --- a/tests/test_ossfs.py +++ b/tests/test_ossfs.py @@ -19,6 +19,7 @@ from tensorflow.python.platform import test from tensorflow.python.platform import gfile +import tensorflow as tf import tensorflow_io as tfio # pylint: disable=unused-import get_oss_path = None From 5470e3fe5e059567eeebc9867406b5eae05126b4 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Wed, 19 Oct 2022 13:55:37 +0800 Subject: [PATCH 2/4] make ossfs work again --- tensorflow_io/core/filesystems/oss/BUILD | 24 +++++++++++++ .../{kernels => filesystems}/oss/README.md | 0 .../oss/oss_filesystem.cc} | 35 +++++++++---------- .../oss/oss_filesystem.h} | 8 ++--- 4 files changed, 44 insertions(+), 23 deletions(-) create mode 100644 tensorflow_io/core/filesystems/oss/BUILD rename tensorflow_io/core/{kernels => filesystems}/oss/README.md (100%) rename tensorflow_io/core/{kernels/oss/oss_file_system.cc => filesystems/oss/oss_filesystem.cc} (98%) rename tensorflow_io/core/{kernels/oss/oss_file_system.h => filesystems/oss/oss_filesystem.h} (95%) diff --git a/tensorflow_io/core/filesystems/oss/BUILD b/tensorflow_io/core/filesystems/oss/BUILD new file mode 100644 index 000000000..f25b4e81a --- /dev/null +++ b/tensorflow_io/core/filesystems/oss/BUILD @@ -0,0 +1,24 @@ +licenses(["notice"]) # Apache 2.0 + +package(default_visibility = ["//visibility:public"]) + +load( + "//:tools/build/tensorflow_io.bzl", + "tf_io_copts", +) + +cc_library( + name = "oss", + srcs = [ + "oss_filesystem.cc", + "oss_filesystem.h", + ], + copts = tf_io_copts(), + linkstatic = True, + deps = [ + "//tensorflow_io/core/filesystems:filesystem_plugins_header", + "@aliyun_oss_c_sdk", + "@local_config_tf//:tf_header_lib", + ], + alwayslink = 1, +) diff --git a/tensorflow_io/core/kernels/oss/README.md b/tensorflow_io/core/filesystems/oss/README.md similarity index 100% rename from tensorflow_io/core/kernels/oss/README.md rename to tensorflow_io/core/filesystems/oss/README.md diff --git a/tensorflow_io/core/kernels/oss/oss_file_system.cc b/tensorflow_io/core/filesystems/oss/oss_filesystem.cc similarity index 98% rename from tensorflow_io/core/kernels/oss/oss_file_system.cc rename to tensorflow_io/core/filesystems/oss/oss_filesystem.cc index 1165d3876..3f9e268e4 100644 --- a/tensorflow_io/core/kernels/oss/oss_file_system.cc +++ b/tensorflow_io/core/filesystems/oss/oss_filesystem.cc @@ -9,7 +9,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "oss_filesystem.h" +#include "tensorflow_io/core/filesystems/oss/oss_filesystem.h" #include #include @@ -1056,9 +1056,8 @@ Status OSSFileSystem::RenameFile(const std::string& src, aos_str_set(&source_object, tmp_sobject.c_str()); aos_str_set(&dest_object, tmp_dobject.c_str()); - resp_status = - _CopyFileInternal(oss_options, pool, source_bucket, source_object, - dest_bucket, dest_object); + resp_status = _CopyFileInternal(oss_options, pool, source_bucket, + source_object, dest_bucket, dest_object); if (!aos_status_is_ok(resp_status)) { string msg; oss_error_message(resp_status, &msg); @@ -1310,13 +1309,12 @@ Status OSSFileSystem::CopyFile(const string& src, const string& target) { aos_str_set(&dest_bucket, dbucket.c_str()); aos_str_set(&dest_object, dobject.c_str()); - resp_status = _CopyFileInternal(oss_options, pool, source_bucket, source_object, - dest_bucket, dest_object); + resp_status = _CopyFileInternal(oss_options, pool, source_bucket, + source_object, dest_bucket, dest_object); if (!aos_status_is_ok(resp_status)) { string msg; oss_error_message(resp_status, &msg); - VLOG(0) << "copy " << src << " to " << target - << " failed, errMsg: " << msg; + VLOG(0) << "copy " << src << " to " << target << " failed, errMsg: " << msg; return errors::Internal("copy ", src, " to ", target, " failed, errMsg: ", msg); } @@ -1465,8 +1463,8 @@ void CreateDir(const TF_Filesystem* filesystem, const char* path, ToTF_Status(oss_fs->CreateDir(path), status); } -void RecursivelyCreateDir(const TF_Filesystem* filesystem, - const char* path, TF_Status* status) { +void RecursivelyCreateDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { auto oss_fs = static_cast(filesystem->plugin_filesystem); ToTF_Status(oss_fs->RecursivelyCreateDir(path), status); } @@ -1484,11 +1482,11 @@ void DeleteDir(const TF_Filesystem* filesystem, const char* path, } void DeleteRecursively(const TF_Filesystem* filesystem, const char* path, - uint64_t* undeleted_files, - uint64_t* undeleted_dirs, TF_Status* status) { + uint64_t* undeleted_files, uint64_t* undeleted_dirs, + TF_Status* status) { auto oss_fs = static_cast(filesystem->plugin_filesystem); - ToTF_Status(oss_fs->DeleteRecursively(path, undeleted_files, - undeleted_dirs), status); + ToTF_Status(oss_fs->DeleteRecursively(path, undeleted_files, undeleted_dirs), + status); } void RenameFile(const TF_Filesystem* filesystem, const char* src, @@ -1497,8 +1495,8 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src, ToTF_Status(oss_fs->RenameFile(src, dst), status); } -void CopyFile(const TF_Filesystem* filesystem, const char* src, - const char* dst, TF_Status* status) { +void CopyFile(const TF_Filesystem* filesystem, const char* src, const char* dst, + TF_Status* status) { auto oss_fs = static_cast(filesystem->plugin_filesystem); ToTF_Status(oss_fs->CopyFile(src, dst), status); } @@ -1546,7 +1544,7 @@ char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { return strdup(uri); } -} // namespace tf_oss_filesystem +} // namespace tf_oss_filesystem void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) { TF_SetFilesystemVersionMetadata(ops); @@ -1588,7 +1586,8 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) { ops->filesystem_ops->recursively_create_dir = tf_oss_filesystem::RecursivelyCreateDir; ops->filesystem_ops->delete_file = tf_oss_filesystem::DeleteFile; - ops->filesystem_ops->delete_recursively = tf_oss_filesystem::DeleteRecursively; + ops->filesystem_ops->delete_recursively = + tf_oss_filesystem::DeleteRecursively; ops->filesystem_ops->delete_dir = tf_oss_filesystem::DeleteDir; ops->filesystem_ops->copy_file = tf_oss_filesystem::CopyFile; ops->filesystem_ops->rename_file = tf_oss_filesystem::RenameFile; diff --git a/tensorflow_io/core/kernels/oss/oss_file_system.h b/tensorflow_io/core/filesystems/oss/oss_filesystem.h similarity index 95% rename from tensorflow_io/core/kernels/oss/oss_file_system.h rename to tensorflow_io/core/filesystems/oss/oss_filesystem.h index 9ea11a0c4..4d02bc860 100644 --- a/tensorflow_io/core/kernels/oss/oss_file_system.h +++ b/tensorflow_io/core/filesystems/oss/oss_filesystem.h @@ -39,9 +39,8 @@ class OSSFileSystem { public: OSSFileSystem(); - Status NewRandomAccessFile( - const string& filename, - std::unique_ptr* result); + Status NewRandomAccessFile(const string& filename, + std::unique_ptr* result); Status NewWritableFile(const string& fname, std::unique_ptr* result); @@ -50,8 +49,7 @@ class OSSFileSystem { std::unique_ptr* result); Status NewReadOnlyMemoryRegionFromFile( - const string& filename, - std::unique_ptr* result); + const string& filename, std::unique_ptr* result); Status FileExists(const string& fname); From 7ac0563bd9713ac0f22407728f7eaefde986dae8 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Sun, 23 Oct 2022 21:48:08 +0800 Subject: [PATCH 3/4] make ossfs work again --- .../core/filesystems/oss/oss_filesystem.cc | 37 +- .../core/filesystems/oss/oss_filesystem.h | 7 +- tests/test_ossfs.py | 435 +++++++++++++++++- 3 files changed, 456 insertions(+), 23 deletions(-) diff --git a/tensorflow_io/core/filesystems/oss/oss_filesystem.cc b/tensorflow_io/core/filesystems/oss/oss_filesystem.cc index 3f9e268e4..ce59dc305 100644 --- a/tensorflow_io/core/filesystems/oss/oss_filesystem.cc +++ b/tensorflow_io/core/filesystems/oss/oss_filesystem.cc @@ -634,7 +634,7 @@ Status OSSFileSystem::_ListObjects( aos_pool_t* pool, const oss_request_options_t* options, const std::string& bucket, const std::string& key, std::vector* result, bool return_all, bool return_full_path, - bool should_remove_suffix, int max_ret_per_iterator) { + bool should_remove_suffix, bool recursive, int max_ret_per_iterator) { aos_string_t bucket_; aos_status_t* s = NULL; oss_list_object_params_t* params = NULL; @@ -646,6 +646,9 @@ Status OSSFileSystem::_ListObjects( params->max_ret = max_ret_per_iterator; aos_str_set(¶ms->prefix, key.c_str()); aos_str_set(¶ms->marker, next_marker); + if (!recursive) { + aos_str_set(¶ms->delimiter, "/"); + } do { s = oss_list_object(options, &bucket_, params, NULL); @@ -679,6 +682,29 @@ Status OSSFileSystem::_ListObjects( } } + aos_list_for_each_entry(oss_list_object_content_t, content, + ¶ms->common_prefix_list, node) { + int path_length = content->key.len; + if (should_remove_suffix && path_length > 0 && + content->key.data[content->key.len - 1] == '/') { + path_length = content->key.len - 1; + } + if (return_full_path) { + string child(content->key.data, 0, path_length); + result->push_back(child); + } else { + int prefix_len = (key.length() > 0 && key.at(key.length() - 1) != '/') + ? key.length() + 1 + : key.length(); + // remove prefix for GetChildren + if (content->key.len > prefix_len) { + string child(content->key.data + prefix_len, 0, + path_length - prefix_len); + result->push_back(child); + } + } + } + next_marker = apr_psprintf(pool, "%.*s", params->next_marker.len, params->next_marker.data); @@ -715,7 +741,7 @@ Status OSSFileSystem::_StatInternal(aos_pool_t* pool, // check list if it has children std::vector listing; s = _ListObjects(pool, options, bucket, object, &listing, true, false, false, - 10); + true, 10); if (s == Status::OK() && !listing.empty()) { if (str_util::EndsWith(object, "/")) { @@ -819,8 +845,9 @@ Status OSSFileSystem::GetChildren(const std::string& dir, OSSConnection oss(host, access_id, access_key); oss_request_options_t* oss_options = oss.getRequestOptions(); aos_pool_t* pool = oss.getPool(); + if (!object.empty() && object.back() != '/') object.push_back('/'); return _ListObjects(pool, oss_options, bucket, object, result, true, false, - true, 1000); + true, false, 1000); } Status OSSFileSystem::_DeleteObjectInternal( @@ -1048,7 +1075,7 @@ Status OSSFileSystem::RenameFile(const std::string& src, } std::vector childPaths; _ListObjects(pool, oss_options, sbucket, sobject, &childPaths, true, false, - false, 1000); + false, true, 1000); for (const auto& child : childPaths) { std::string tmp_sobject = sobject + child; std::string tmp_dobject = dobject + child; @@ -1243,7 +1270,7 @@ Status OSSFileSystem::DeleteRecursively(const std::string& dirname, } s = _ListObjects(pool, oss_options, bucket, object, &children, true, true, - false, 1000); + false, true, 1000); if (!s.ok()) { // empty dir, just delete it return _DeleteObjectInternal(oss_options, bucket, object); diff --git a/tensorflow_io/core/filesystems/oss/oss_filesystem.h b/tensorflow_io/core/filesystems/oss/oss_filesystem.h index 4d02bc860..6ee99fc76 100644 --- a/tensorflow_io/core/filesystems/oss/oss_filesystem.h +++ b/tensorflow_io/core/filesystems/oss/oss_filesystem.h @@ -10,8 +10,8 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#ifndef TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILESYSTEM_H_ -#define TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILESYSTEM_H_ +#ifndef TENSORFLOW_IO_CORE_FILESYSTEMS_OSS_OSS_FILESYSTEM_H_ +#define TENSORFLOW_IO_CORE_FILESYSTEMS_OSS_OSS_FILESYSTEM_H_ #include #include @@ -105,6 +105,7 @@ class OSSFileSystem { std::vector* result, bool return_all = true, bool return_full_path = false, bool should_remove_suffix = true, + bool recursive = true, int max_ret_per_iterator = 1000); Status _InitOSSCredentials(); @@ -133,4 +134,4 @@ class OSSFileSystem { } // namespace io } // namespace tensorflow -#endif // TENSORFLOW_IO_OSS_KERNELS_OSSFS_OSS_FILESYSTEM_H_ +#endif // TENSORFLOW_IO_CORE_FILESYSTEMS_OSS_OSS_FILESYSTEM_H_ diff --git a/tests/test_ossfs.py b/tests/test_ossfs.py index 619b7203e..5760e6be2 100644 --- a/tests/test_ossfs.py +++ b/tests/test_ossfs.py @@ -17,6 +17,8 @@ import os import unittest +from tensorflow.python.framework import errors +from tensorflow.python.lib.io import file_io from tensorflow.python.platform import test from tensorflow.python.platform import gfile import tensorflow as tf @@ -61,6 +63,14 @@ def setUpClass(cls): # pylint: disable=invalid-name def tearDownClass(cls): # pylint: disable=invalid-name gfile.DeleteRecursively(get_oss_path("")) + def setUp(self): + self._base_dir = file_io.join("oss://%s\x01id=%s\x02key=%s\x02host=%s" + % (bucket, access_id, access_key, host), "base_dir") + file_io.create_dir(self._base_dir) + + def tearDown(self): + file_io.delete_recursively(self._base_dir) + def test_file_operations(self): """Test file operations""" @@ -94,20 +104,6 @@ def test_dir_operations(self): gfile.MakeDirs(d) self.assertTrue(gfile.Stat(d).is_directory) - # Test listing bucket directory with and without trailing '/' - content = gfile.ListDirectory( - "oss://%s\x01id=%s\x02key=%s\x02host=%s" - % (bucket, access_id, access_key, host) - ) - content_s = gfile.ListDirectory( - "oss://%s\x01id=%s\x02key=%s\x02host=%s/" - % (bucket, access_id, access_key, host) - ) - self.assertEqual(content, content_s) - self.assertIn("oss_fs_test", content) - self.assertIn("oss_fs_test/d1", content) - self.assertIn("oss_fs_test/d1/d2", content) - # Test listing test directory with and without trailing '/' content = gfile.ListDirectory( "oss://%s\x01id=%s\x02key=%s\x02host=%s" @@ -121,7 +117,6 @@ def test_dir_operations(self): ) self.assertEqual(content, content_s) self.assertIn("d1", content) - self.assertIn("d1/d2", content) # Test listing sub directories. content = gfile.ListDirectory(get_oss_path("d1")) @@ -162,6 +157,416 @@ def test_dir_operations(self): self.assertTrue(gfile.Exists(rename_not_empty_dir)) self.assertTrue(gfile.Exists(rename_not_empty_file)) + def test_file_doesnt_exist(self): + file_path = file_io.join(self._base_dir, "temp_file") + self.assertFalse(gfile.Exists(file_path)) + with self.assertRaises(errors.NotFoundError): + _ = file_io.read_file_to_string(file_path) + + def test_write_to_string(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + self.assertTrue(gfile.Exists(file_path)) + file_contents = file_io.read_file_to_string(file_path) + self.assertEqual("testing", file_contents) + + def test_append(self): + file_path = file_io.join(self._base_dir, "temp_file") + with self.assertRaises(errors.UnimplementedError): + with gfile.Open(file_path, mode="a") as f: + f.write("a1\n") + + def test_read_binary_mode(self): + file_path = file_io.join(self._base_dir, "temp_file") + file_io.write_string_to_file(file_path, "testing") + with gfile.Open(file_path, mode="rb") as f: + self.assertEqual(b"testing", f.read()) + + def test_write_binary_mode(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="wb") as f: + f.write("testing") + with gfile.Open(file_path, mode="r") as f: + self.assertEqual("testing", f.read()) + + def test_multiple_writes(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("line1\n") + f.write("line2") + file_contents = file_io.read_file_to_string(file_path) + self.assertEqual("line1\nline2", file_contents) + + def test_file_write_bad_mode(self): + file_path = file_io.join(self._base_dir, "temp_file") + with self.assertRaises(errors.PermissionDeniedError): + gfile.Open(file_path, mode="r").write("testing") + + def test_file_read_bad_mode(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="wb") as f: + f.write("testing") + self.assertTrue(file_io.file_exists(file_path)) + with self.assertRaises(errors.PermissionDeniedError): + gfile.Open(file_path, mode="w").read() + + def test_file_delete(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + gfile.Remove(file_path) + self.assertFalse(gfile.Exists(file_path)) + + def test_create_recursive_dir(self): + dir_path = file_io.join(self._base_dir, "temp_dir/temp_dir1/temp_dir2") + gfile.MakeDirs(dir_path) + gfile.MakeDirs(dir_path) # repeat creation + file_path = file_io.join(str(dir_path), "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + self.assertTrue(gfile.Exists(file_path)) + gfile.DeleteRecursively(file_io.join(self._base_dir, "temp_dir")) + self.assertFalse(gfile.Exists(file_path)) + + def test_copy(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + copy_path = file_io.join(self._base_dir, "copy_file") + gfile.Copy(file_path, copy_path) + self.assertTrue(file_io.file_exists(copy_path)) + f = gfile.Open(file_path, mode="r") + self.assertEqual("testing", f.read()) + self.assertEqual(7, f.tell()) + + def test_copy_overwrite(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + copy_path = file_io.join(self._base_dir, "copy_file") + gfile.Open(copy_path, mode="w").write("copy") + gfile.Copy(file_path, copy_path, overwrite=True) + self.assertTrue(gfile.Exists(copy_path)) + self.assertEqual("testing", gfile.Open(copy_path, mode="r").read()) + + def test_copy_overwrite_false(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + copy_path = file_io.join(self._base_dir, "copy_file") + with gfile.Open(copy_path, mode="w") as f: + f.write("copy") + with self.assertRaises(errors.AlreadyExistsError): + gfile.Copy(file_path, copy_path, overwrite=False) + + def test_rename(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + rename_path = file_io.join(self._base_dir, "rename_file") + gfile.Rename(file_path, rename_path) + self.assertTrue(file_io.file_exists(rename_path)) + self.assertFalse(file_io.file_exists(file_path)) + + def test_rename_overwrite(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + rename_path = file_io.join(self._base_dir, "rename_file") + with gfile.Open(rename_path, mode="w") as f: + f.write("rename") + gfile.Rename(file_path, rename_path, overwrite=True) + self.assertTrue(gfile.Exists(rename_path)) + self.assertFalse(gfile.Exists(file_path)) + + def test_rename_overwrite_false(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + rename_path = file_io.join(self._base_dir, "rename_file") + with gfile.Open(rename_path, mode="w") as f: + f.write("rename") + with self.assertRaises(errors.AlreadyExistsError): + gfile.Rename(file_path, rename_path, overwrite=False) + self.assertTrue(gfile.Exists(rename_path)) + self.assertTrue(gfile.Exists(file_path)) + + def test_delete_recursively_fail(self): + fake_dir_path = file_io.join(self._base_dir, "temp_dir") + with self.assertRaises(errors.NotFoundError): + gfile.DeleteRecursively(fake_dir_path) + + def test_is_directory(self): + dir_path = file_io.join(self._base_dir, "test_dir") + # Failure for a non-existing dir. + self.assertFalse(gfile.IsDirectory(dir_path)) + gfile.MkDir(dir_path) + self.assertTrue(gfile.IsDirectory(dir_path)) + file_path = file_io.join(str(dir_path), "test_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + # False for a file. + self.assertFalse(gfile.IsDirectory(file_path)) + # Test that the value returned from `stat()` has `is_directory` set. + file_statistics = gfile.Stat(dir_path) + self.assertTrue(file_statistics.is_directory) + + def test_list_directory(self): + dir_path = file_io.join(self._base_dir, "test_dir") + gfile.MkDir(dir_path) + files = ["file1.txt", "file2.txt", "file3.txt"] + for name in files: + file_path = file_io.join(str(dir_path), name) + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + subdir_path = file_io.join(str(dir_path), "sub_dir") + gfile.MkDir(subdir_path) + subdir_file_path = file_io.join(str(subdir_path), "file4.txt") + with gfile.Open(subdir_file_path, mode="w") as f: + f.write("testing") + dir_list = gfile.ListDirectory(dir_path) + self.assertItemsEqual(files + ["sub_dir"], dir_list) + + def test_list_directory_failure(self): + dir_path = file_io.join(self._base_dir, "test_dir") + with self.assertRaises(errors.NotFoundError): + gfile.ListDirectory(dir_path) + + def _setup_walk_directories(self, dir_path): + # Creating a file structure as follows + # test_dir -> file: file1.txt; dirs: subdir1_1, subdir1_2, subdir1_3 + # subdir1_1 -> file: file3.txt + # subdir1_2 -> dir: subdir2 + gfile.MkDir(dir_path) + with gfile.Open(file_io.join(dir_path, "file1.txt"), mode="w") as f: + f.write("testing") + sub_dirs1 = ["subdir1_1", "subdir1_2", "subdir1_3"] + for name in sub_dirs1: + gfile.MkDir(file_io.join(dir_path, name)) + with gfile.Open(file_io.join(dir_path, "subdir1_1/file2.txt"), mode="w") as f: + f.write("testing") + gfile.MkDir(file_io.join(dir_path, "subdir1_2/subdir2")) + + def test_walk_in_order(self): + dir_path_str = file_io.join(self._base_dir, "test_dir") + dir_path = file_io.join(self._base_dir, "test_dir") + self._setup_walk_directories(dir_path_str) + # Now test the walk (in_order = True) + all_dirs = [] + all_subdirs = [] + all_files = [] + for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=True): + all_dirs.append(w_dir) + all_subdirs.append(w_subdirs) + all_files.append(w_files) + self.assertItemsEqual(all_dirs, [dir_path_str] + [ + file_io.join(dir_path_str, item) for item in + ["subdir1_1", "subdir1_2", "subdir1_2/subdir2", "subdir1_3"] + ]) + self.assertEqual(dir_path_str, all_dirs[0]) + self.assertLess( + all_dirs.index(file_io.join(dir_path_str, "subdir1_2")), + all_dirs.index(file_io.join(dir_path_str, "subdir1_2/subdir2"))) + self.assertItemsEqual(all_subdirs[1:5], [[], ["subdir2"], [], []]) + self.assertItemsEqual(all_subdirs[0], + ["subdir1_1", "subdir1_2", "subdir1_3"]) + self.assertItemsEqual(all_files, [["file1.txt"], ["file2.txt"], [], [], []]) + self.assertLess( + all_files.index(["file1.txt"]), all_files.index(["file2.txt"])) + + def test_walk_post_order(self): + dir_path = file_io.join(self._base_dir, "test_dir") + self._setup_walk_directories(dir_path) + # Now test the walk (in_order = False) + all_dirs = [] + all_subdirs = [] + all_files = [] + for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False): + all_dirs.append(w_dir) + all_subdirs.append(w_subdirs) + all_files.append(w_files) + self.assertItemsEqual(all_dirs, [ + file_io.join(dir_path, item) for item in + ["subdir1_1", "subdir1_2/subdir2", "subdir1_2", "subdir1_3"] + ] + [dir_path]) + self.assertEqual(dir_path, all_dirs[4]) + self.assertLess( + all_dirs.index(file_io.join(dir_path, "subdir1_2/subdir2")), + all_dirs.index(file_io.join(dir_path, "subdir1_2"))) + self.assertItemsEqual(all_subdirs[0:4], [[], [], ["subdir2"], []]) + self.assertItemsEqual(all_subdirs[4], + ["subdir1_1", "subdir1_2", "subdir1_3"]) + self.assertItemsEqual(all_files, [["file2.txt"], [], [], [], ["file1.txt"]]) + self.assertLess( + all_files.index(["file2.txt"]), all_files.index(["file1.txt"])) + + def test_walk_failure(self): + dir_path = file_io.join(self._base_dir, "test_dir") + # Try walking a directory that wasn't created. + all_dirs = [] + all_subdirs = [] + all_files = [] + for (w_dir, w_subdirs, w_files) in file_io.walk(dir_path, in_order=False): + all_dirs.append(w_dir) + all_subdirs.append(w_subdirs) + all_files.append(w_files) + self.assertItemsEqual(all_dirs, []) + self.assertItemsEqual(all_subdirs, []) + self.assertItemsEqual(all_files, []) + + def test_stat(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="w") as f: + f.write("testing") + file_statistics = gfile.Stat(file_path) + self.assertEqual(7, file_statistics.length) + self.assertFalse(file_statistics.is_directory) + + def test_read_line(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="r+") as f: + f.write("testing1\ntesting2\ntesting3\n\ntesting5") + self.assertEqual(36, f.size()) + self.assertEqual("testing1\n", f.readline()) + self.assertEqual("testing2\n", f.readline()) + self.assertEqual("testing3\n", f.readline()) + self.assertEqual("\n", f.readline()) + self.assertEqual("testing5", f.readline()) + self.assertEqual("", f.readline()) + + def test_read(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="r+") as f: + f.write("testing1\ntesting2\ntesting3\n\ntesting5") + self.assertEqual(36, f.size()) + self.assertEqual("testing1\n", f.read(9)) + self.assertEqual("testing2\n", f.read(9)) + self.assertEqual("t", f.read(1)) + self.assertEqual("esting3\n\ntesting5", f.read()) + + def test_read_error_reacquires_gil(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="r+") as f: + f.write("testing1\ntesting2\ntesting3\n\ntesting5") + with self.assertRaises(errors.InvalidArgumentError): + # At present, this is sufficient to convince ourselves that the change + # fixes the problem. That is, this test will seg fault without the change, + # and pass with it. Unfortunately, this is brittle, as it relies on the + # Python layer to pass the argument along to the wrapped C++ without + # checking the argument itself. + f.read(-2) + + def test_tell(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="r+") as f: + f.write("testing1\ntesting2\ntesting3\n\ntesting5") + self.assertEqual(0, f.tell()) + self.assertEqual("testing1\n", f.readline()) + self.assertEqual(9, f.tell()) + self.assertEqual("testing2\n", f.readline()) + self.assertEqual(18, f.tell()) + self.assertEqual("testing3\n", f.readline()) + self.assertEqual(27, f.tell()) + self.assertEqual("\n", f.readline()) + self.assertEqual(28, f.tell()) + self.assertEqual("testing5", f.readline()) + self.assertEqual(36, f.tell()) + self.assertEqual("", f.readline()) + self.assertEqual(36, f.tell()) + + def test_seek(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="r+") as f: + f.write("testing1\ntesting2\ntesting3\n\ntesting5") + self.assertEqual("testing1\n", f.readline()) + self.assertEqual(9, f.tell()) + + # Seek to 18 + f.seek(18) + self.assertEqual(18, f.tell()) + self.assertEqual("testing3\n", f.readline()) + + # Seek back to 9 + f.seek(9) + self.assertEqual(9, f.tell()) + self.assertEqual("testing2\n", f.readline()) + + f.seek(0) + self.assertEqual(0, f.tell()) + self.assertEqual("testing1\n", f.readline()) + + with self.assertRaises(errors.InvalidArgumentError): + f.seek(-1) + + with self.assertRaises(TypeError): + f.seek() + + with self.assertRaises(TypeError): + f.seek(offset=0, position=0) + f.seek(position=9) + self.assertEqual(9, f.tell()) + self.assertEqual("testing2\n", f.readline()) + + def test_seek_from_what(self): + file_path = file_io.join(self._base_dir, "temp_file") + with gfile.Open(file_path, mode="r+") as f: + f.write("testing1\ntesting2\ntesting3\n\ntesting5") + self.assertEqual("testing1\n", f.readline()) + self.assertEqual(9, f.tell()) + + # Seek to 18 + f.seek(9, 1) + self.assertEqual(18, f.tell()) + self.assertEqual("testing3\n", f.readline()) + + # Seek back to 9 + f.seek(9, 0) + self.assertEqual(9, f.tell()) + self.assertEqual("testing2\n", f.readline()) + + f.seek(-f.size(), 2) + self.assertEqual(0, f.tell()) + self.assertEqual("testing1\n", f.readline()) + + with self.assertRaises(errors.InvalidArgumentError): + f.seek(0, 3) + + def test_reading_iterator(self): + file_path = file_io.join(self._base_dir, "temp_file") + data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"] + with gfile.Open(file_path, mode="r+") as f: + f.write("".join(data)) + actual_data = [] + for line in f: + actual_data.append(line) + self.assertSequenceEqual(actual_data, data) + + def test_read_lines(self): + file_path = file_io.join(self._base_dir, "temp_file") + data = ["testing1\n", "testing2\n", "testing3\n", "\n", "testing5"] + f = gfile.Open(file_path, mode="r+") + f.write("".join(data)) + f.flush() + f.close() + lines = f.readlines() + self.assertSequenceEqual(lines, data) + + def test_utf8_string_path(self): + file_path = file_io.join(self._base_dir, "UTF8测试_file") + file_io.write_string_to_file(file_path, "testing") + self.assertTrue(gfile.Exists(file_path)) + with gfile.Open(file_path, mode="rb") as f: + self.assertEqual(b"testing", f.read()) + + def test_eof(self): + file_path = file_io.join(self._base_dir, "temp_file") + f = gfile.Open(file_path, mode="r+") + content = "testing" + f.write(content) + f.flush() + f.close() + self.assertEqual(content, f.read(len(content) + 1)) if __name__ == "__main__": test.main() From 2eabebf101f3a709bda7ef3e3c7c8b7f529506fa Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 31 Oct 2022 18:01:20 +0800 Subject: [PATCH 4/4] fix code style --- tensorflow_io/core/filesystems/BUILD | 2 +- .../core/filesystems/oss/oss_filesystem.h | 3 +- tests/test_ossfs.py | 52 +++++++++++-------- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/tensorflow_io/core/filesystems/BUILD b/tensorflow_io/core/filesystems/BUILD index 89ed11ace..b49a3885e 100644 --- a/tensorflow_io/core/filesystems/BUILD +++ b/tensorflow_io/core/filesystems/BUILD @@ -38,8 +38,8 @@ cc_library( "//tensorflow_io/core/filesystems/az", "//tensorflow_io/core/filesystems/hdfs", "//tensorflow_io/core/filesystems/http", - "//tensorflow_io/core/filesystems/s3", "//tensorflow_io/core/filesystems/oss", + "//tensorflow_io/core/filesystems/s3", ], alwayslink = 1, ) diff --git a/tensorflow_io/core/filesystems/oss/oss_filesystem.h b/tensorflow_io/core/filesystems/oss/oss_filesystem.h index 6ee99fc76..3e350bf19 100644 --- a/tensorflow_io/core/filesystems/oss/oss_filesystem.h +++ b/tensorflow_io/core/filesystems/oss/oss_filesystem.h @@ -104,8 +104,7 @@ class OSSFileSystem { const string& bucket, const string& key, std::vector* result, bool return_all = true, bool return_full_path = false, - bool should_remove_suffix = true, - bool recursive = true, + bool should_remove_suffix = true, bool recursive = true, int max_ret_per_iterator = 1000); Status _InitOSSCredentials(); diff --git a/tests/test_ossfs.py b/tests/test_ossfs.py index 5760e6be2..4c2470e79 100644 --- a/tests/test_ossfs.py +++ b/tests/test_ossfs.py @@ -64,8 +64,11 @@ def tearDownClass(cls): # pylint: disable=invalid-name gfile.DeleteRecursively(get_oss_path("")) def setUp(self): - self._base_dir = file_io.join("oss://%s\x01id=%s\x02key=%s\x02host=%s" - % (bucket, access_id, access_key, host), "base_dir") + self._base_dir = file_io.join( + "oss://%s\x01id=%s\x02key=%s\x02host=%s" + % (bucket, access_id, access_key, host), + "base_dir", + ) file_io.create_dir(self._base_dir) def tearDown(self): @@ -360,20 +363,23 @@ def test_walk_in_order(self): all_dirs.append(w_dir) all_subdirs.append(w_subdirs) all_files.append(w_files) - self.assertItemsEqual(all_dirs, [dir_path_str] + [ - file_io.join(dir_path_str, item) for item in - ["subdir1_1", "subdir1_2", "subdir1_2/subdir2", "subdir1_3"] - ]) + self.assertItemsEqual( + all_dirs, + [dir_path_str] + + [ + file_io.join(dir_path_str, item) + for item in ["subdir1_1", "subdir1_2", "subdir1_2/subdir2", "subdir1_3"] + ], + ) self.assertEqual(dir_path_str, all_dirs[0]) self.assertLess( all_dirs.index(file_io.join(dir_path_str, "subdir1_2")), - all_dirs.index(file_io.join(dir_path_str, "subdir1_2/subdir2"))) + all_dirs.index(file_io.join(dir_path_str, "subdir1_2/subdir2")), + ) self.assertItemsEqual(all_subdirs[1:5], [[], ["subdir2"], [], []]) - self.assertItemsEqual(all_subdirs[0], - ["subdir1_1", "subdir1_2", "subdir1_3"]) + self.assertItemsEqual(all_subdirs[0], ["subdir1_1", "subdir1_2", "subdir1_3"]) self.assertItemsEqual(all_files, [["file1.txt"], ["file2.txt"], [], [], []]) - self.assertLess( - all_files.index(["file1.txt"]), all_files.index(["file2.txt"])) + self.assertLess(all_files.index(["file1.txt"]), all_files.index(["file2.txt"])) def test_walk_post_order(self): dir_path = file_io.join(self._base_dir, "test_dir") @@ -386,20 +392,23 @@ def test_walk_post_order(self): all_dirs.append(w_dir) all_subdirs.append(w_subdirs) all_files.append(w_files) - self.assertItemsEqual(all_dirs, [ - file_io.join(dir_path, item) for item in - ["subdir1_1", "subdir1_2/subdir2", "subdir1_2", "subdir1_3"] - ] + [dir_path]) + self.assertItemsEqual( + all_dirs, + [ + file_io.join(dir_path, item) + for item in ["subdir1_1", "subdir1_2/subdir2", "subdir1_2", "subdir1_3"] + ] + + [dir_path], + ) self.assertEqual(dir_path, all_dirs[4]) self.assertLess( - all_dirs.index(file_io.join(dir_path, "subdir1_2/subdir2")), - all_dirs.index(file_io.join(dir_path, "subdir1_2"))) + all_dirs.index(file_io.join(dir_path, "subdir1_2/subdir2")), + all_dirs.index(file_io.join(dir_path, "subdir1_2")), + ) self.assertItemsEqual(all_subdirs[0:4], [[], [], ["subdir2"], []]) - self.assertItemsEqual(all_subdirs[4], - ["subdir1_1", "subdir1_2", "subdir1_3"]) + self.assertItemsEqual(all_subdirs[4], ["subdir1_1", "subdir1_2", "subdir1_3"]) self.assertItemsEqual(all_files, [["file2.txt"], [], [], [], ["file1.txt"]]) - self.assertLess( - all_files.index(["file2.txt"]), all_files.index(["file1.txt"])) + self.assertLess(all_files.index(["file2.txt"]), all_files.index(["file1.txt"])) def test_walk_failure(self): dir_path = file_io.join(self._base_dir, "test_dir") @@ -568,5 +577,6 @@ def test_eof(self): f.close() self.assertEqual(content, f.read(len(content) + 1)) + if __name__ == "__main__": test.main()