Skip to content

Commit

Permalink
impl. of BatchHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed May 16, 2023
1 parent fffd9b0 commit 3d49f43
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 12 deletions.
8 changes: 0 additions & 8 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,3 @@ rapids_export(
NAMESPACE kvikio::
DOCUMENTATION doc_string
)

# make documentation

# add_custom_command( OUTPUT KvikIO_DOXYGEN WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/doxygen
# COMMAND doxygen Doxyfile VERBATIM COMMENT "Custom command for KvikIO doxygen docs")

# add_custom_target( kvikio_doc DEPENDS KvikIO_DOXYGEN COMMENT "Target for the custom command to
# build the KvikIO doxygen docs")
53 changes: 50 additions & 3 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

#include <cuda_runtime_api.h>

#include <kvikio/batch.hpp>
#include <kvikio/buffer.hpp>
#include <kvikio/defaults.hpp>
#include <kvikio/driver.hpp>
Expand Down Expand Up @@ -48,13 +49,14 @@ int main()
cout << " Compatibility mode: disabled" << endl;
kvikio::DriverProperties props;
cout << "DriverProperties: " << endl;
cout << " Version: " << props.get_nvfs_major_version() << "." << props.get_nvfs_minor_version()
<< endl;
cout << " nvfs version: " << props.get_nvfs_major_version() << "."
<< props.get_nvfs_minor_version() << endl;
cout << " Allow compatibility mode: " << std::boolalpha << props.get_nvfs_allow_compat_mode()
<< endl;
cout << " Pool mode - enabled: " << std::boolalpha << props.get_nvfs_poll_mode()
<< ", threshold: " << props.get_nvfs_poll_thresh_size() << " kb" << endl;
cout << " Max pinned memory: " << props.get_max_pinned_memory_size() << " kb" << endl;
cout << " max_batch_io_size: " << props.get_max_batch_io_size() << endl;
}

int* a{};
Expand Down Expand Up @@ -142,4 +144,49 @@ int main()
cout << "Parallel POSIX read (" << kvikio::defaults::thread_pool_nthreads()
<< " threads): " << read << endl;
}

#ifdef CUFILE_BATCH_API_FOUND
{
// Here we use the batch API to read "/tmp/test-file" into `b_dev` by
// submitting 4 batch operations.
constexpr int num_of_batches = 4;
constexpr int batchsize = SIZE / num_of_batches;
kvikio::DriverProperties props;
check(num_of_batches < props.get_max_batch_io_size());

// We open the file as usual.
kvikio::FileHandle f("/tmp/test-file", "r");

// Then we create a batch
auto batch = kvikio::BatchHandle(num_of_batches);
std::vector<kvikio::BatchOp> ops;
ops.reserve(num_of_batches);

// And submit 4 operations each with its own offset
for (int i = 0; i < num_of_batches; ++i) {
ops.push_back(kvikio::BatchOp{.file_handle = f,
.devPtr_base = b_dev,
.file_offset = i * batchsize,
.devPtr_offset = i * batchsize,
.size = batchsize,
.opcode = CUFILE_READ});
}
batch.submit(ops);

// Finally, we wait on all 4 operations to be finished and check the result
auto statuses = batch.status(num_of_batches, num_of_batches);
check(statuses.size() == num_of_batches);
size_t total_read = 0;
for (auto status : statuses) {
check(status.status == CUFILE_COMPLETE);
check(status.ret == batchsize);
total_read += status.ret;
}
check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
for (int i = 0; i < NELEM; ++i) {
check(a[i] == b[i]);
}
cout << "Batch read using 4 operations: " << total_read << endl;
}
#endif
}
116 changes: 116 additions & 0 deletions cpp/include/kvikio/batch.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstddef>
#include <ctime>
#include <utility>
#include <vector>

#include <kvikio/error.hpp>
#include <kvikio/file_handle.hpp>
#include <kvikio/shim/cufile.hpp>

namespace kvikio {

struct BatchOp {
FileHandle& file_handle;
void* devPtr_base;
off_t file_offset;
off_t devPtr_offset;
size_t size;
CUfileOpcode_t opcode;
};

class BatchHandle {
private:
bool _initialized{false};
int _max_num_events{};
#ifdef CUFILE_BATCH_API_FOUND
CUfileBatchHandle_t _handle{};
#endif
public:
BatchHandle() noexcept = default;

BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events}
{
#ifdef CUFILE_BATCH_API_FOUND
CUFILE_TRY(cuFileAPI::instance().FileBatchIOSetUp(&_handle, max_num_events));
#else
throw CUfileException("BatchHandle requires cuFile's batch API, please use CUDA v12.1+");
#endif
}

/**
* @brief BatchHandle support move semantic but isn't copyable
*/
BatchHandle(const BatchHandle&) = delete;
BatchHandle& operator=(BatchHandle const&) = delete;
BatchHandle(BatchHandle&& o) noexcept
: _initialized{std::exchange(o._initialized, false)},
_max_num_events{std::exchange(o._max_num_events, 0)}
{
#ifdef CUFILE_BATCH_API_FOUND
_handle = std::exchange(o._handle, CUfileBatchHandle_t{});
#endif
}
~BatchHandle() noexcept { close(); }

[[nodiscard]] bool closed() const noexcept { return !_initialized; }

void close() noexcept
{
if (closed()) { return; }
_initialized = false;

#ifdef CUFILE_BATCH_API_FOUND
cuFileAPI::instance().FileBatchIODestroy(_handle);
#endif
}

void submit(const std::vector<BatchOp>& operations)
{
std::vector<CUfileIOParams_t> io_batch_params;
io_batch_params.reserve(operations.size());
for (const auto& op : operations) {
io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH,
.u = {.batch = {.devPtr_base = op.devPtr_base,
.file_offset = op.file_offset,
.devPtr_offset = op.devPtr_offset,
.size = op.size}},
.fh = op.file_handle.handle(),
.opcode = op.opcode,
.cookie = nullptr});
}

CUFILE_TRY(cuFileAPI::instance().FileBatchIOSubmit(
_handle, io_batch_params.size(), &io_batch_params[0], 0));
}

std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
std::vector<CUfileIOEvents_t> ret;
ret.resize(_max_num_events);
CUFILE_TRY(
cuFileAPI::instance().FileBatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout));
ret.resize(max_nr);
return ret;
}
};

} // namespace kvikio
6 changes: 6 additions & 0 deletions cpp/include/kvikio/driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ class DriverProperties {
CUFILE_TRY(cuFileAPI::instance().DriverSetMaxPinnedMemSize(size_in_kb));
_props.max_device_pinned_mem_size = size_in_kb;
}

[[nodiscard]] std::size_t get_max_batch_io_size()
{
lazy_init();
return _props.max_batch_io_size;
}
};

#else
Expand Down
17 changes: 17 additions & 0 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,23 @@ class FileHandle {
_initialized = false;
}

/**
* @brief Get the underlying cuFile file handle
*
* The file handle must be open and not in compatibility mode i.e.
* both `.closed()` and `.is_compat_mode_on()` must be return false.
*
* @return cuFile's file handle
*/
[[nodiscard]] CUfileHandle_t handle()
{
if (closed()) { throw CUfileException("File handle is closed"); }
if (_compat_mode) {
throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
}
return _handle;
}

/**
* @brief Get one of the file descriptors
*
Expand Down
16 changes: 16 additions & 0 deletions cpp/include/kvikio/shim/cufile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ class cuFileAPI {
decltype(cuFileDriverSetMaxCacheSize)* DriverSetMaxCacheSize{nullptr};
decltype(cuFileDriverSetMaxPinnedMemSize)* DriverSetMaxPinnedMemSize{nullptr};

#ifdef CUFILE_BATCH_API_FOUND
decltype(cuFileBatchIOSetUp)* FileBatchIOSetUp{nullptr};
decltype(cuFileBatchIOSubmit)* FileBatchIOSubmit{nullptr};
decltype(cuFileBatchIOGetStatus)* FileBatchIOGetStatus{nullptr};
decltype(cuFileBatchIOCancel)* FileBatchIOCancel{nullptr};
decltype(cuFileBatchIODestroy)* FileBatchIODestroy{nullptr};
#endif

private:
cuFileAPI()
{
Expand Down Expand Up @@ -74,6 +82,14 @@ class cuFileAPI {
get_symbol(DriverSetMaxCacheSize, lib, KVIKIO_STRINGIFY(cuFileDriverSetMaxCacheSize));
get_symbol(DriverSetMaxPinnedMemSize, lib, KVIKIO_STRINGIFY(cuFileDriverSetMaxPinnedMemSize));

#ifdef CUFILE_BATCH_API_FOUND
get_symbol(FileBatchIOSetUp, lib, KVIKIO_STRINGIFY(cuFileBatchIOSetUp));
get_symbol(FileBatchIOSubmit, lib, KVIKIO_STRINGIFY(cuFileBatchIOSubmit));
get_symbol(FileBatchIOGetStatus, lib, KVIKIO_STRINGIFY(cuFileBatchIOGetStatus));
get_symbol(FileBatchIOCancel, lib, KVIKIO_STRINGIFY(cuFileBatchIOCancel));
get_symbol(FileBatchIODestroy, lib, KVIKIO_STRINGIFY(cuFileBatchIODestroy));
#endif

// cuFile is supposed to open and close the driver automatically but because of a bug in
// CUDA 11.8, it sometimes segfault. See <https://github.com/rapidsai/kvikio/issues/159>.
CUfileError_t const error = DriverOpen();
Expand Down
21 changes: 20 additions & 1 deletion cpp/include/kvikio/shim/cufile_h_wrapper.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,4 +35,23 @@ using CUfileDriverControlFlags_t = enum CUfileDriverControlFlags {
CU_FILE_ALLOW_COMPAT_MODE = 1 /*!< allow COMPATIBILITY mode. properties.allow_compat_mode*/
};
using CUfileHandle_t = void*;

typedef enum CUfileOpcode { CUFILE_READ = 0, CUFILE_WRITE } CUfileOpcode_t;

typedef enum CUFILEStatus_enum {
CUFILE_WAITING = 0x000001, /* required value prior to submission */
CUFILE_PENDING = 0x000002, /* once enqueued */
CUFILE_INVALID = 0x000004, /* request was ill-formed or could not be enqueued */
CUFILE_CANCELED = 0x000008, /* request successfully canceled */
CUFILE_COMPLETE = 0x0000010, /* request successfully completed */
CUFILE_TIMEOUT = 0x0000020, /* request timed out */
CUFILE_FAILED = 0x0000040 /* unable to complete */
} CUfileStatus_t;

typedef struct CUfileIOEvents {
void* cookie;
CUfileStatus_t status; /* status of the operation */
size_t ret; /* -ve error or amount of I/O done. */
} CUfileIOEvents_t;

#endif

0 comments on commit 3d49f43

Please sign in to comment.