Skip to content

Commit

Permalink
C++ bindings to the Batch API (#220)
Browse files Browse the repository at this point in the history
An example use of the Batch API (requires CUDA v12.1+):
```c++
    // Here we use the batch API to read "/tmp/test-file" into `b_dev` by
    // submitting 4 batch operations.
    constexpr int num_ops_in_batch = 4;
    constexpr int batchsize        = SIZE / num_ops_in_batch;
    kvikio::DriverProperties props;
    check(num_ops_in_batch < props.get_max_batch_io_size());

    // We open the file as usual.
    kvikio::FileHandle f("/tmp/test-file", "r");

    // Then we create a batch
    auto batch = kvikio::BatchHandle(num_ops_in_batch);

    // And submit 4 operations each with its own offset
    std::vector<kvikio::BatchOp> ops;
    for (int i = 0; i < num_ops_in_batch; ++i) {
      ops.push_back(kvikio::BatchOp{.file_handle   = f,
                                    .devPtr_base   = b_dev,
                                    .file_offset   = i * batchsize,
                                    .devPtr_offset = i * batchsize,
                                    .size          = batchsize,
                                    .opcode        = CUFILE_READ});
    }
    batch.submit(ops);

    // Finally, we wait on all 4 operations to be finished and check the result
    auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
    check(statuses.size() == num_ops_in_batch);
    size_t total_read = 0;
    for (auto status : statuses) {
      check(status.status == CUFILE_COMPLETE);
      check(status.ret == batchsize);
      total_read += status.ret;
    }
    check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
    for (int i = 0; i < NELEM; ++i) {
      check(a[i] == b[i]);
    }
    cout << "Batch read using 4 operations: " << total_read << endl;
```

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #220
  • Loading branch information
madsbk committed Jun 1, 2023
1 parent 6503c23 commit b28298f
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 13 deletions.
20 changes: 12 additions & 8 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ rapids_find_package(
)
if(NOT cuFile_FOUND)
message(WARNING "Building KvikIO without cuFile")
else()
file(READ "${cuFile_INCLUDE_DIRS}/cufile.h" CUFILE_H_STR)
string(FIND "${CUFILE_H_STR}" "cuFileBatchIOSetUp" cuFileBatchIOSetUp_location)
if(cuFileBatchIOSetUp_location EQUAL "-1")
set(cuFile_BATCH_API_FOUND FALSE)
else()
set(cuFile_BATCH_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}")
endif()

# library targets
Expand All @@ -75,6 +84,9 @@ target_link_libraries(kvikio INTERFACE Threads::Threads)
target_link_libraries(kvikio INTERFACE CUDA::toolkit)
if(cuFile_FOUND)
target_link_libraries(kvikio INTERFACE cufile::cuFile_interface)
if(cuFile_BATCH_API_FOUND)
target_compile_definitions(kvikio INTERFACE CUFILE_BATCH_API_FOUND)
endif()
endif()
target_link_libraries(kvikio INTERFACE ${CMAKE_DL_LIBS})
target_compile_features(kvikio INTERFACE cxx_std_17)
Expand Down Expand Up @@ -128,11 +140,3 @@ rapids_export(
NAMESPACE kvikio::
DOCUMENTATION doc_string
)

# make documentation

# add_custom_command( OUTPUT KvikIO_DOXYGEN WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/doxygen
# COMMAND doxygen Doxyfile VERBATIM COMMENT "Custom command for KvikIO doxygen docs")

# add_custom_target( kvikio_doc DEPENDS KvikIO_DOXYGEN COMMENT "Target for the custom command to
# build the KvikIO doxygen docs")
56 changes: 53 additions & 3 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

#include <cuda_runtime_api.h>

#include <kvikio/batch.hpp>
#include <kvikio/buffer.hpp>
#include <kvikio/defaults.hpp>
#include <kvikio/driver.hpp>
Expand Down Expand Up @@ -48,13 +49,14 @@ int main()
cout << " Compatibility mode: disabled" << endl;
kvikio::DriverProperties props;
cout << "DriverProperties: " << endl;
cout << " Version: " << props.get_nvfs_major_version() << "." << props.get_nvfs_minor_version()
<< endl;
cout << " nvfs version: " << props.get_nvfs_major_version() << "."
<< props.get_nvfs_minor_version() << endl;
cout << " Allow compatibility mode: " << std::boolalpha << props.get_nvfs_allow_compat_mode()
<< endl;
cout << " Pool mode - enabled: " << std::boolalpha << props.get_nvfs_poll_mode()
<< ", threshold: " << props.get_nvfs_poll_thresh_size() << " kb" << endl;
cout << " Max pinned memory: " << props.get_max_pinned_memory_size() << " kb" << endl;
cout << " Max batch IO size: " << props.get_max_batch_io_size() << endl;
}

int* a{};
Expand Down Expand Up @@ -142,4 +144,52 @@ int main()
cout << "Parallel POSIX read (" << kvikio::defaults::thread_pool_nthreads()
<< " threads): " << read << endl;
}

if (kvikio::is_batch_available()) {
// Here we use the batch API to read "/tmp/test-file" into `b_dev` by
// submitting 4 batch operations.
constexpr int num_ops_in_batch = 4;
constexpr int batchsize = SIZE / num_ops_in_batch;
kvikio::DriverProperties props;
check(num_ops_in_batch < props.get_max_batch_io_size());

// We open the file as usual.
kvikio::FileHandle f("/tmp/test-file", "r");

// Then we create a batch
auto batch = kvikio::BatchHandle(num_ops_in_batch);

// And submit 4 operations each with its own offset
std::vector<kvikio::BatchOp> ops;
for (int i = 0; i < num_ops_in_batch; ++i) {
ops.push_back(kvikio::BatchOp{.file_handle = f,
.devPtr_base = b_dev,
.file_offset = i * batchsize,
.devPtr_offset = i * batchsize,
.size = batchsize,
.opcode = CUFILE_READ});
}
batch.submit(ops);

// Finally, we wait on all 4 operations to be finished and check the result
auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.size() == num_ops_in_batch);
size_t total_read = 0;
for (auto status : statuses) {
check(status.status == CUFILE_COMPLETE);
check(status.ret == batchsize);
total_read += status.ret;
}
check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
for (int i = 0; i < NELEM; ++i) {
check(a[i] == b[i]);
}
cout << "Batch read using 4 operations: " << total_read << endl;

batch.submit(ops);
batch.cancel();
statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}
}
187 changes: 187 additions & 0 deletions cpp/include/kvikio/batch.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstddef>
#include <ctime>
#include <utility>
#include <vector>

#include <kvikio/error.hpp>
#include <kvikio/file_handle.hpp>
#include <kvikio/shim/cufile.hpp>

namespace kvikio {

/**
* @brief IO operation used when submiting batches
*/
struct BatchOp {
// The file handle of the file to read or write
FileHandle& file_handle;
// Base address of buffer in device memory (host memory not supported).
void* devPtr_base;
// Offset in the file to read from or write to.
off_t file_offset;
// Offset relative to the `devPtr_base` pointer to write into or read from.
off_t devPtr_offset;
// Size in bytes to read or write.
size_t size;
// The operation type: CUFILE_READ or CUFILE_WRITE.
CUfileOpcode_t opcode;
};

#ifdef CUFILE_BATCH_API_FOUND

/**
* @brief Handle of an cuFile batch using semantic.
*
* The workflow is as follows:
* 1) Create a batch with a large enough `max_num_events`.
* 2) Call `.submit()` with a vector of operations (`vector.size() <= max_num_events`).
* 3) Call `.status()` to wait on the operations to finish, or
* 3) Call `.cancel()` to cancel the operations.
* 4) Go to step 2 or call `.close()` to free up resources.
*
* Notice, a batch handle can only handle one "submit" at a time and is closed
* in the destructor automatically.
*/
class BatchHandle {
private:
bool _initialized{false};
int _max_num_events{};
CUfileBatchHandle_t _handle{};

public:
BatchHandle() noexcept = default;

/**
* @brief Construct a batch handle
*
* @param max_num_events The maximum number of operations supported by this instance.
*/
BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events}
{
CUFILE_TRY(cuFileAPI::instance().BatchIOSetUp(&_handle, max_num_events));
}

/**
* @brief BatchHandle support move semantic but isn't copyable
*/
BatchHandle(const BatchHandle&) = delete;
BatchHandle& operator=(BatchHandle const&) = delete;
BatchHandle(BatchHandle&& o) noexcept
: _initialized{std::exchange(o._initialized, false)},
_max_num_events{std::exchange(o._max_num_events, 0)}
{
_handle = std::exchange(o._handle, CUfileBatchHandle_t{});
}
~BatchHandle() noexcept { close(); }

[[nodiscard]] bool closed() const noexcept { return !_initialized; }

/**
* @brief Destroy the batch handle and free up resources
*/
void close() noexcept
{
if (closed()) { return; }
_initialized = false;

cuFileAPI::instance().BatchIODestroy(_handle);
}

/**
* @brief Submit a vector of batch operations
*
* @param operations The vector of batch operations, which must not exceed the
* `max_num_events`.
*/
void submit(const std::vector<BatchOp>& operations)
{
if (convert_size2ssize(operations.size()) > _max_num_events) {
throw CUfileException("Cannot submit more than the max_num_events)");
}
std::vector<CUfileIOParams_t> io_batch_params;
io_batch_params.reserve(operations.size());
for (const auto& op : operations) {
io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH,
.u = {.batch = {.devPtr_base = op.devPtr_base,
.file_offset = op.file_offset,
.devPtr_offset = op.devPtr_offset,
.size = op.size}},
.fh = op.file_handle.handle(),
.opcode = op.opcode,
.cookie = nullptr});
}

CUFILE_TRY(cuFileAPI::instance().BatchIOSubmit(
_handle, io_batch_params.size(), io_batch_params.data(), 0));
}

/**
* @brief Get status of submitted operations
*
* @param min_nr The minimum number of IO entries for which status is requested.
* @param max_nr The maximum number of IO requests to poll for.
* @param timeout This parameter is used to specify the amount of time to wait for
* in this API, even if the minimum number of requests have not completed. If the
* timeout hits, it is possible that the number of returned IOs can be less than `min_nr`
* @return Vector of the status of the completed I/Os in the batch.
*/
std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
std::vector<CUfileIOEvents_t> ret;
ret.resize(_max_num_events);
CUFILE_TRY(cuFileAPI::instance().BatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout));
ret.resize(max_nr);
return ret;
}

void cancel() { CUFILE_TRY(cuFileAPI::instance().BatchIOCancel(_handle)); }
};

#else

class BatchHandle {
public:
BatchHandle() noexcept = default;

BatchHandle(int max_num_events)
{
throw CUfileException("BatchHandle requires cuFile's batch API, please build with CUDA v12.1+");
}

[[nodiscard]] bool closed() const noexcept { return true; }

void close() noexcept {}

void submit(const std::vector<BatchOp>& operations) {}

std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
return std::vector<CUfileIOEvents_t>{};
}
void cancel() {}
};

#endif

} // namespace kvikio
15 changes: 15 additions & 0 deletions cpp/include/kvikio/driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ class DriverProperties {
CUFILE_TRY(cuFileAPI::instance().DriverSetMaxPinnedMemSize(size_in_kb));
_props.max_device_pinned_mem_size = size_in_kb;
}

[[nodiscard]] std::size_t get_max_batch_io_size()
{
#ifdef CUFILE_BATCH_API_FOUND
lazy_init();
return _props.max_batch_io_size;
#else
return 0;
#endif
}
};

#else
Expand Down Expand Up @@ -253,6 +263,11 @@ struct DriverProperties {
{
throw CUfileException("KvikIO not compiled with cuFile.h");
}

[[nodiscard]] std::size_t get_max_batch_io_size()
{
throw CUfileException("KvikIO not compiled with cuFile.h");
}
};
#endif

Expand Down
17 changes: 17 additions & 0 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,23 @@ class FileHandle {
_initialized = false;
}

/**
* @brief Get the underlying cuFile file handle
*
* The file handle must be open and not in compatibility mode i.e.
* both `.closed()` and `.is_compat_mode_on()` must be return false.
*
* @return cuFile's file handle
*/
[[nodiscard]] CUfileHandle_t handle()
{
if (closed()) { throw CUfileException("File handle is closed"); }
if (_compat_mode) {
throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
}
return _handle;
}

/**
* @brief Get one of the file descriptors
*
Expand Down
Loading

0 comments on commit b28298f

Please sign in to comment.