Skip to content

Commit

Permalink
Add: Streaming and in-memory serialization in C++
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvardanian committed Aug 4, 2023
1 parent 4b910a8 commit 7da44a2
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 37 deletions.
66 changes: 45 additions & 21 deletions include/usearch/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ class input_file_t {
};

/**
* @brief Represents a memory-mapped file.
* @brief Represents a memory-mapped file or a pre-allocated anonymous memory region.
*
* This class provides a convenient way to memory-map a file and access its contents as a block of
* memory. The class handles platform-specific memory-mapping operations on Windows, Linux, and MacOS.
Expand Down Expand Up @@ -1212,6 +1212,8 @@ class memory_mapped_file_t {
{
}

memory_mapped_file_t(byte_t* data, std::size_t length) noexcept : ptr_(data), length_(length) {}

memory_mapped_file_t& operator=(memory_mapped_file_t&& other) noexcept {
std::swap(path_, other.path_);
std::swap(ptr_, other.ptr_);
Expand All @@ -1227,6 +1229,8 @@ class memory_mapped_file_t {

serialization_result_t open_if_not() noexcept {
serialization_result_t result;
if (!path_)
return result;

#if defined(USEARCH_DEFINED_WINDOWS)

Expand Down Expand Up @@ -1282,8 +1286,11 @@ class memory_mapped_file_t {
}

void close() noexcept {
if (!ptr_)
if (!path_) {
ptr_ = nullptr;
length_ = 0;
return;
}
#if defined(USEARCH_DEFINED_WINDOWS)
UnmapViewOfFile(ptr_);
CloseHandle(mapping_handle_);
Expand Down Expand Up @@ -2256,16 +2263,27 @@ class index_gt {
#pragma region Serialization

/**
* @brief Saves serialized binary index representation to disk,
* co-locating vectors and neighbors lists.
* Available on Linux, MacOS, Windows.
* @brief Saves serialized binary index representation to a file, generally on disk.
*/
template <typename progress_at = dummy_progress_t>
serialization_result_t save(output_file_t file, progress_at&& progress = {}) const noexcept {

serialization_result_t result = file.open_if_not();
if (!result)
return result;
if (result)
stream([&](void* buffer, std::size_t length) {
result = file.write(buffer, length);
return !!result;
});
return result;
}

/**
* @brief Saves serialized binary index representation to a stream.
*/
template <typename output_callback_at, typename progress_at = dummy_progress_t>
serialization_result_t stream(output_callback_at&& callback, progress_at&& progress = {}) const noexcept {

serialization_result_t result;

// Export some basic metadata
index_serialized_header_t header;
Expand All @@ -2274,37 +2292,44 @@ class index_gt {
header.connectivity_base = config_.connectivity_base;
header.max_level = max_level_;
header.entry_slot = entry_slot_;
result = file.write(&header, sizeof(header));
if (!result)
return result;
if (!callback(&header, sizeof(header)))
return result.failed("Failed to serialize into stream");

// Export the number of levels per node
// That is both enough to estimate the overall memory consumption,
// and to be able to estimate the offsets of every entry in the file.
for (std::size_t i = 0; i != header.size; ++i) {
node_t node = node_at_(i);
level_t level = node.level();
result = file.write(&level, sizeof(level));
if (!result)
return result;
if (!callback(&level, sizeof(level)))
return result.failed("Failed to serialize into stream");
}

// After that dump the nodes themselves
for (std::size_t i = 0; i != header.size; ++i) {
span_bytes_t node_bytes = node_bytes_(node_at_(i));
result = file.write(node_bytes.data(), node_bytes.size());
if (!result)
return result;
if (!callback(node_bytes.data(), node_bytes.size()))
return result.failed("Failed to serialize into stream");
progress(i, header.size);
}

return {};
}

/**
* @brief Loads the serialized binary index representation from disk,
* copying both vectors and neighbors lists into RAM.
* Available on Linux, MacOS, Windows.
* @brief Estimate the binary length (in bytes) of the serialized index.
*/
std::size_t stream_length() const noexcept {
std::size_t neighbors_length = 0;
for (std::size_t i = 0; i != size(); ++i)
neighbors_length += node_bytes_(node_at_(i).level());
return sizeof(index_serialized_header_t) + neighbors_length;
}

/**
* @brief Loads the serialized binary index representation from disk to RAM.
* Adjusts the configuration properties of the constructed index to
* match the settings in the file.
*/
template <typename progress_at = dummy_progress_t>
serialization_result_t load(input_file_t file, progress_at&& progress = {}) noexcept {
Expand Down Expand Up @@ -2361,8 +2386,7 @@ class index_gt {

/**
* @brief Memory-maps the serialized binary index representation from disk,
* @b without copying the vectors and neighbors lists into RAM.
* Available on Linux, MacOS, Windows.
* @b without copying data into RAM, and fetching it on-demand.
*/
template <typename progress_at = dummy_progress_t>
serialization_result_t view(memory_mapped_file_t file, std::size_t offset = 0,
Expand Down
52 changes: 36 additions & 16 deletions include/usearch/index_dense.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,22 @@ class index_dense_gt {
* @return Outcome descriptor explicitly convertible to boolean.
*/
serialization_result_t save(output_file_t file, serialization_config_t config = {}) const {

serialization_result_t result = file.open_if_not();
if (!result)
return result;
if (result)
stream([&](void* buffer, std::size_t length) {
result = file.write(buffer, length);
return !!result;
});
return result;
}

/**
* @brief Saves serialized binary index representation to a stream.
*/
template <typename output_callback_at, typename progress_at = dummy_progress_t>
serialization_result_t stream(output_callback_at&& callback, serialization_config_t config = {}) const noexcept {

serialization_result_t result;
std::uint64_t matrix_rows = 0;
std::uint64_t matrix_cols = 0;

Expand All @@ -609,28 +620,25 @@ class index_dense_gt {
std::uint32_t dimensions[2];
dimensions[0] = static_cast<std::uint32_t>(typed_->size());
dimensions[1] = static_cast<std::uint32_t>(metric_.bytes_per_vector());
result = file.write(&dimensions, sizeof(dimensions));
if (!result)
return result;
if (!callback(&dimensions, sizeof(dimensions)))
return result.failed("Failed to serialize into stream");
matrix_rows = dimensions[0];
matrix_cols = dimensions[1];
} else {
std::uint64_t dimensions[2];
dimensions[0] = static_cast<std::uint64_t>(typed_->size());
dimensions[1] = static_cast<std::uint64_t>(metric_.bytes_per_vector());
result = file.write(&dimensions, sizeof(dimensions));
if (!result)
return result;
if (!callback(&dimensions, sizeof(dimensions)))
return result.failed("Failed to serialize into stream");
matrix_rows = dimensions[0];
matrix_cols = dimensions[1];
}

// Dump the vectors one after another
for (std::uint64_t i = 0; i != matrix_rows; ++i) {
byte_t* vector = vectors_lookup_[i];
result = file.write(vector, matrix_cols);
if (!result)
return result;
if (!callback(vector, matrix_cols))
return result.failed("Failed to serialize into stream");
}
}

Expand All @@ -657,13 +665,25 @@ class index_dense_gt {
head.count_deleted = typed_->size() - size();
head.dimensions = dimensions();

result = file.write(&buffer, sizeof(buffer));
if (!result)
return result;
if (!callback(&buffer, sizeof(buffer)))
return result.failed("Failed to serialize into stream");
}

// Save the actual proximity graph
return typed_->save(std::move(file));
return typed_->stream(std::forward<output_callback_at>(callback));
}

/**
* @brief Estimate the binary length (in bytes) of the serialized index.
*/
std::size_t stream_length(serialization_config_t config = {}) const noexcept {
std::size_t dimensions_length = 0;
std::size_t matrix_length = 0;
if (!config.exclude_vectors) {
dimensions_length = config.use_64_bit_dimensions ? sizeof(std::uint64_t) * 2 : sizeof(std::uint32_t) * 2;
matrix_length = typed_->size() * metric_.bytes_per_vector();
}
return dimensions_length + matrix_length + sizeof(index_dense_head_buffer_t) + typed_->stream_length();
}

/**
Expand Down

0 comments on commit 7da44a2

Please sign in to comment.