diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 89b080c391..15c68514f2 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -60,6 +60,15 @@ rapids_find_package( ) if(NOT cuFile_FOUND) message(WARNING "Building KvikIO without cuFile") +else() + file(READ "${cuFile_INCLUDE_DIRS}/cufile.h" CUFILE_H_STR) + string(FIND "${CUFILE_H_STR}" "cuFileBatchIOSetUp" cuFileBatchIOSetUp_location) + if(cuFileBatchIOSetUp_location EQUAL "-1") + set(cuFile_BATCH_API_FOUND FALSE) + else() + set(cuFile_BATCH_API_FOUND TRUE) + endif() + message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}") endif() # library targets @@ -75,6 +84,9 @@ target_link_libraries(kvikio INTERFACE Threads::Threads) target_link_libraries(kvikio INTERFACE CUDA::toolkit) if(cuFile_FOUND) target_link_libraries(kvikio INTERFACE cufile::cuFile_interface) + if(cuFile_BATCH_API_FOUND) + target_compile_definitions(kvikio INTERFACE CUFILE_BATCH_API_FOUND) + endif() endif() target_link_libraries(kvikio INTERFACE ${CMAKE_DL_LIBS}) target_compile_features(kvikio INTERFACE cxx_std_17) @@ -128,11 +140,3 @@ rapids_export( NAMESPACE kvikio:: DOCUMENTATION doc_string ) - -# make documentation - -# add_custom_command( OUTPUT KvikIO_DOXYGEN WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/doxygen -# COMMAND doxygen Doxyfile VERBATIM COMMENT "Custom command for KvikIO doxygen docs") - -# add_custom_target( kvikio_doc DEPENDS KvikIO_DOXYGEN COMMENT "Target for the custom command to -# build the KvikIO doxygen docs") diff --git a/cpp/examples/basic_io.cpp b/cpp/examples/basic_io.cpp index eccc5fc916..9811eb210b 100644 --- a/cpp/examples/basic_io.cpp +++ b/cpp/examples/basic_io.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -48,13 +49,14 @@ int main() cout << " Compatibility mode: disabled" << endl; kvikio::DriverProperties props; cout << "DriverProperties: " << endl; - cout << " Version: " << props.get_nvfs_major_version() << "." << props.get_nvfs_minor_version() - << endl; + cout << " nvfs version: " << props.get_nvfs_major_version() << "." + << props.get_nvfs_minor_version() << endl; cout << " Allow compatibility mode: " << std::boolalpha << props.get_nvfs_allow_compat_mode() << endl; cout << " Pool mode - enabled: " << std::boolalpha << props.get_nvfs_poll_mode() << ", threshold: " << props.get_nvfs_poll_thresh_size() << " kb" << endl; cout << " Max pinned memory: " << props.get_max_pinned_memory_size() << " kb" << endl; + cout << " Max batch IO size: " << props.get_max_batch_io_size() << endl; } int* a{}; @@ -142,4 +144,52 @@ int main() cout << "Parallel POSIX read (" << kvikio::defaults::thread_pool_nthreads() << " threads): " << read << endl; } + + if (kvikio::is_batch_available()) { + // Here we use the batch API to read "/tmp/test-file" into `b_dev` by + // submitting 4 batch operations. + constexpr int num_ops_in_batch = 4; + constexpr int batchsize = SIZE / num_ops_in_batch; + kvikio::DriverProperties props; + check(num_ops_in_batch < props.get_max_batch_io_size()); + + // We open the file as usual. + kvikio::FileHandle f("/tmp/test-file", "r"); + + // Then we create a batch + auto batch = kvikio::BatchHandle(num_ops_in_batch); + + // And submit 4 operations each with its own offset + std::vector ops; + for (int i = 0; i < num_ops_in_batch; ++i) { + ops.push_back(kvikio::BatchOp{.file_handle = f, + .devPtr_base = b_dev, + .file_offset = i * batchsize, + .devPtr_offset = i * batchsize, + .size = batchsize, + .opcode = CUFILE_READ}); + } + batch.submit(ops); + + // Finally, we wait on all 4 operations to be finished and check the result + auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch); + check(statuses.size() == num_ops_in_batch); + size_t total_read = 0; + for (auto status : statuses) { + check(status.status == CUFILE_COMPLETE); + check(status.ret == batchsize); + total_read += status.ret; + } + check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess); + for (int i = 0; i < NELEM; ++i) { + check(a[i] == b[i]); + } + cout << "Batch read using 4 operations: " << total_read << endl; + + batch.submit(ops); + batch.cancel(); + statuses = batch.status(num_ops_in_batch, num_ops_in_batch); + check(statuses.empty()); + cout << "Batch canceling of all 4 operations" << endl; + } } diff --git a/cpp/include/kvikio/batch.hpp b/cpp/include/kvikio/batch.hpp new file mode 100644 index 0000000000..e6b29129ab --- /dev/null +++ b/cpp/include/kvikio/batch.hpp @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +namespace kvikio { + +/** + * @brief IO operation used when submiting batches + */ +struct BatchOp { + // The file handle of the file to read or write + FileHandle& file_handle; + // Base address of buffer in device memory (host memory not supported). + void* devPtr_base; + // Offset in the file to read from or write to. + off_t file_offset; + // Offset relative to the `devPtr_base` pointer to write into or read from. + off_t devPtr_offset; + // Size in bytes to read or write. + size_t size; + // The operation type: CUFILE_READ or CUFILE_WRITE. + CUfileOpcode_t opcode; +}; + +#ifdef CUFILE_BATCH_API_FOUND + +/** + * @brief Handle of an cuFile batch using semantic. + * + * The workflow is as follows: + * 1) Create a batch with a large enough `max_num_events`. + * 2) Call `.submit()` with a vector of operations (`vector.size() <= max_num_events`). + * 3) Call `.status()` to wait on the operations to finish, or + * 3) Call `.cancel()` to cancel the operations. + * 4) Go to step 2 or call `.close()` to free up resources. + * + * Notice, a batch handle can only handle one "submit" at a time and is closed + * in the destructor automatically. + */ +class BatchHandle { + private: + bool _initialized{false}; + int _max_num_events{}; + CUfileBatchHandle_t _handle{}; + + public: + BatchHandle() noexcept = default; + + /** + * @brief Construct a batch handle + * + * @param max_num_events The maximum number of operations supported by this instance. + */ + BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events} + { + CUFILE_TRY(cuFileAPI::instance().BatchIOSetUp(&_handle, max_num_events)); + } + + /** + * @brief BatchHandle support move semantic but isn't copyable + */ + BatchHandle(const BatchHandle&) = delete; + BatchHandle& operator=(BatchHandle const&) = delete; + BatchHandle(BatchHandle&& o) noexcept + : _initialized{std::exchange(o._initialized, false)}, + _max_num_events{std::exchange(o._max_num_events, 0)} + { + _handle = std::exchange(o._handle, CUfileBatchHandle_t{}); + } + ~BatchHandle() noexcept { close(); } + + [[nodiscard]] bool closed() const noexcept { return !_initialized; } + + /** + * @brief Destroy the batch handle and free up resources + */ + void close() noexcept + { + if (closed()) { return; } + _initialized = false; + + cuFileAPI::instance().BatchIODestroy(_handle); + } + + /** + * @brief Submit a vector of batch operations + * + * @param operations The vector of batch operations, which must not exceed the + * `max_num_events`. + */ + void submit(const std::vector& operations) + { + if (convert_size2ssize(operations.size()) > _max_num_events) { + throw CUfileException("Cannot submit more than the max_num_events)"); + } + std::vector io_batch_params; + io_batch_params.reserve(operations.size()); + for (const auto& op : operations) { + io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH, + .u = {.batch = {.devPtr_base = op.devPtr_base, + .file_offset = op.file_offset, + .devPtr_offset = op.devPtr_offset, + .size = op.size}}, + .fh = op.file_handle.handle(), + .opcode = op.opcode, + .cookie = nullptr}); + } + + CUFILE_TRY(cuFileAPI::instance().BatchIOSubmit( + _handle, io_batch_params.size(), io_batch_params.data(), 0)); + } + + /** + * @brief Get status of submitted operations + * + * @param min_nr The minimum number of IO entries for which status is requested. + * @param max_nr The maximum number of IO requests to poll for. + * @param timeout This parameter is used to specify the amount of time to wait for + * in this API, even if the minimum number of requests have not completed. If the + * timeout hits, it is possible that the number of returned IOs can be less than `min_nr` + * @return Vector of the status of the completed I/Os in the batch. + */ + std::vector status(unsigned min_nr, + unsigned max_nr, + struct timespec* timeout = nullptr) + { + std::vector ret; + ret.resize(_max_num_events); + CUFILE_TRY(cuFileAPI::instance().BatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout)); + ret.resize(max_nr); + return ret; + } + + void cancel() { CUFILE_TRY(cuFileAPI::instance().BatchIOCancel(_handle)); } +}; + +#else + +class BatchHandle { + public: + BatchHandle() noexcept = default; + + BatchHandle(int max_num_events) + { + throw CUfileException("BatchHandle requires cuFile's batch API, please build with CUDA v12.1+"); + } + + [[nodiscard]] bool closed() const noexcept { return true; } + + void close() noexcept {} + + void submit(const std::vector& operations) {} + + std::vector status(unsigned min_nr, + unsigned max_nr, + struct timespec* timeout = nullptr) + { + return std::vector{}; + } + void cancel() {} +}; + +#endif + +} // namespace kvikio diff --git a/cpp/include/kvikio/driver.hpp b/cpp/include/kvikio/driver.hpp index 0b9e52ba41..24526565c5 100644 --- a/cpp/include/kvikio/driver.hpp +++ b/cpp/include/kvikio/driver.hpp @@ -175,6 +175,16 @@ class DriverProperties { CUFILE_TRY(cuFileAPI::instance().DriverSetMaxPinnedMemSize(size_in_kb)); _props.max_device_pinned_mem_size = size_in_kb; } + + [[nodiscard]] std::size_t get_max_batch_io_size() + { +#ifdef CUFILE_BATCH_API_FOUND + lazy_init(); + return _props.max_batch_io_size; +#else + return 0; +#endif + } }; #else @@ -253,6 +263,11 @@ struct DriverProperties { { throw CUfileException("KvikIO not compiled with cuFile.h"); } + + [[nodiscard]] std::size_t get_max_batch_io_size() + { + throw CUfileException("KvikIO not compiled with cuFile.h"); + } }; #endif diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index 58b0fe4198..834d44d48d 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -224,6 +224,23 @@ class FileHandle { _initialized = false; } + /** + * @brief Get the underlying cuFile file handle + * + * The file handle must be open and not in compatibility mode i.e. + * both `.closed()` and `.is_compat_mode_on()` must be return false. + * + * @return cuFile's file handle + */ + [[nodiscard]] CUfileHandle_t handle() + { + if (closed()) { throw CUfileException("File handle is closed"); } + if (_compat_mode) { + throw CUfileException("The underlying cuFile handle isn't available in compatibility mode"); + } + return _handle; + } + /** * @brief Get one of the file descriptors * diff --git a/cpp/include/kvikio/shim/cufile.hpp b/cpp/include/kvikio/shim/cufile.hpp index 6e6bd78371..21e5b736fc 100644 --- a/cpp/include/kvikio/shim/cufile.hpp +++ b/cpp/include/kvikio/shim/cufile.hpp @@ -47,6 +47,15 @@ class cuFileAPI { decltype(cuFileDriverSetMaxCacheSize)* DriverSetMaxCacheSize{nullptr}; decltype(cuFileDriverSetMaxPinnedMemSize)* DriverSetMaxPinnedMemSize{nullptr}; +#ifdef CUFILE_BATCH_API_FOUND + decltype(cuFileBatchIOSetUp)* BatchIOSetUp{nullptr}; + decltype(cuFileBatchIOSubmit)* BatchIOSubmit{nullptr}; + decltype(cuFileBatchIOGetStatus)* BatchIOGetStatus{nullptr}; + decltype(cuFileBatchIOCancel)* BatchIOCancel{nullptr}; + decltype(cuFileBatchIODestroy)* BatchIODestroy{nullptr}; +#endif + bool batch_available = false; + private: cuFileAPI() { @@ -74,6 +83,26 @@ class cuFileAPI { get_symbol(DriverSetMaxCacheSize, lib, KVIKIO_STRINGIFY(cuFileDriverSetMaxCacheSize)); get_symbol(DriverSetMaxPinnedMemSize, lib, KVIKIO_STRINGIFY(cuFileDriverSetMaxPinnedMemSize)); +#ifdef CUFILE_BATCH_API_FOUND + get_symbol(BatchIOSetUp, lib, KVIKIO_STRINGIFY(cuFileBatchIOSetUp)); + get_symbol(BatchIOSubmit, lib, KVIKIO_STRINGIFY(cuFileBatchIOSubmit)); + get_symbol(BatchIOGetStatus, lib, KVIKIO_STRINGIFY(cuFileBatchIOGetStatus)); + get_symbol(BatchIOCancel, lib, KVIKIO_STRINGIFY(cuFileBatchIOCancel)); + get_symbol(BatchIODestroy, lib, KVIKIO_STRINGIFY(cuFileBatchIODestroy)); + + // HACK: we use the mangled name of the `cuFileBatchContextState` to determine if cuFile's + // batch API is available. Notice, the symbols of `cuFileBatchIOSetUp` & co. exist all + // the way back to CUDA v11.5 but calling them is undefined behavior. + // TODO: when CUDA v12.2 is released, use `cuFileReadAsync` to determine the availability of + // both the batch and async API. + try { + void* s{}; + get_symbol(s, lib, "_ZTS23cuFileBatchContextState"); + batch_available = true; + } catch (const std::runtime_error&) { + } +#endif + // cuFile is supposed to open and close the driver automatically but because of a bug in // CUDA 11.8, it sometimes segfault. See . CUfileError_t const error = DriverOpen(); @@ -139,4 +168,22 @@ inline bool is_cufile_available() return is_cufile_library_available() && run_udev_readable() && !is_running_in_wsl(); } +/** + * @brief Check if cuFile's batch API is available + * + * @return The boolean answer + */ +#ifdef CUFILE_BATCH_API_FOUND +inline bool is_batch_available() +{ + try { + return cuFileAPI::instance().batch_available; + } catch (const std::runtime_error&) { + return false; + } +} +#else +constexpr bool is_batch_available() { return false; } +#endif + } // namespace kvikio diff --git a/cpp/include/kvikio/shim/cufile_h_wrapper.hpp b/cpp/include/kvikio/shim/cufile_h_wrapper.hpp index 3cbce1f8d1..9f1a28cf33 100644 --- a/cpp/include/kvikio/shim/cufile_h_wrapper.hpp +++ b/cpp/include/kvikio/shim/cufile_h_wrapper.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,3 +36,25 @@ using CUfileDriverControlFlags_t = enum CUfileDriverControlFlags { }; using CUfileHandle_t = void*; #endif + +// If the Batch API isn't defined, we define some of the data types here. +// Notice, this doesn't need to be ABI compatible with the cufile definitions. +#ifndef CUFILE_BATCH_API_FOUND +typedef enum CUfileOpcode { CUFILE_READ = 0, CUFILE_WRITE } CUfileOpcode_t; + +typedef enum CUFILEStatus_enum { + CUFILE_WAITING = 0x000001, /* required value prior to submission */ + CUFILE_PENDING = 0x000002, /* once enqueued */ + CUFILE_INVALID = 0x000004, /* request was ill-formed or could not be enqueued */ + CUFILE_CANCELED = 0x000008, /* request successfully canceled */ + CUFILE_COMPLETE = 0x0000010, /* request successfully completed */ + CUFILE_TIMEOUT = 0x0000020, /* request timed out */ + CUFILE_FAILED = 0x0000040 /* unable to complete */ +} CUfileStatus_t; + +typedef struct CUfileIOEvents { + void* cookie; + CUfileStatus_t status; /* status of the operation */ + size_t ret; /* -ve error or amount of I/O done. */ +} CUfileIOEvents_t; +#endif diff --git a/cpp/include/kvikio/utils.hpp b/cpp/include/kvikio/utils.hpp index 56f922532a..4348135d58 100644 --- a/cpp/include/kvikio/utils.hpp +++ b/cpp/include/kvikio/utils.hpp @@ -39,7 +39,7 @@ inline constexpr std::size_t page_size = 4096; return static_cast(x); } -[[nodiscard]] inline off_t convert_size2ssize(std::size_t x) +[[nodiscard]] inline ssize_t convert_size2ssize(std::size_t x) { if (x >= static_cast(std::numeric_limits::max())) { throw CUfileException("size_t argument too large to fit ssize_t");