Skip to content

Commit

Permalink
enhance: reduce many I/O operations while loading disk index (#30189) (
Browse files Browse the repository at this point in the history
…#30690)

before this, every time writting the index chunk data into the disk,
there are 4 I/O operations:
- open the file
- seek to the offset
- write the data
- close the file

this optimized this to open only once and continiously write all data.

This also makes it concurrent to load the files from object storage

pr: #30189

Signed-off-by: yah01 <yang.cen@zilliz.com>
  • Loading branch information
yah01 committed Feb 20, 2024
1 parent 8734bcc commit d4c4bf9
Showing 1 changed file with 21 additions and 23 deletions.
44 changes: 21 additions & 23 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <sys/fcntl.h>
#include <algorithm>
#include <boost/filesystem.hpp>
#include <mutex>
#include <utility>

#include "common/Common.h"
#include "common/Consts.h"
#include "common/EasyAssert.h"
#include "common/File.h"
#include "common/Slice.h"
#include "log/Log.h"

#include "storage/DiskFileManagerImpl.h"
#include "storage/FieldData.h"
#include "storage/FieldDataInterface.h"
#include "storage/FileManager.h"
#include "storage/IndexData.h"
#include "storage/LocalChunkManagerSingleton.h"
Expand Down Expand Up @@ -120,6 +126,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
futures.reserve(remote_file_sizes.size());
AssertInfo(local_file_offsets.size() == remote_files.size(),
"inconsistent size of offset slices with file slices");
AssertInfo(remote_files.size() == remote_file_sizes.size(),
Expand Down Expand Up @@ -167,7 +174,7 @@ DiskFileManagerImpl::CacheIndexToDisk(

std::map<std::string, std::vector<int>> index_slices;
for (auto& file_path : remote_files) {
auto pos = file_path.find_last_of("_");
auto pos = file_path.find_last_of('_');
index_slices[file_path.substr(0, pos)].emplace_back(
std::stoi(file_path.substr(pos + 1)));
}
Expand All @@ -176,39 +183,30 @@ DiskFileManagerImpl::CacheIndexToDisk(
std::sort(slices.second.begin(), slices.second.end());
}

auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t {
auto fileSize = rcm_->Size(file);
return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize);
};

for (auto& slices : index_slices) {
auto prefix = slices.first;
auto local_index_file_name =
GetLocalIndexObjectPrefix() +
prefix.substr(prefix.find_last_of('/') + 1);
local_chunk_manager->CreateFile(local_index_file_name);
int64_t offset = 0;
auto file =
File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC);

// Get the remote files
std::vector<std::string> batch_remote_files;
uint64_t max_parallel_degree = INT_MAX;
batch_remote_files.reserve(slices.second.size());
for (int& iter : slices.second) {
if (batch_remote_files.size() == max_parallel_degree) {
auto next_offset = CacheBatchIndexFilesToDisk(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();
}
auto origin_file = prefix + "_" + std::to_string(iter);
if (batch_remote_files.size() == 0) {
// Use first file size as average size to estimate
max_parallel_degree = EstimateParallelDegree(origin_file);
}
batch_remote_files.push_back(origin_file);
}
if (batch_remote_files.size() > 0) {
auto next_offset = CacheBatchIndexFilesToDisk(
batch_remote_files, local_index_file_name, offset);
offset = next_offset;
batch_remote_files.clear();

auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files);
for (auto& chunk : index_chunks) {
auto index_data = chunk.get()->GetFieldData();
auto index_size = index_data->Size();
auto chunk_data = reinterpret_cast<uint8_t*>(
const_cast<void*>(index_data->Data()));
file.Write(chunk_data, index_size);
}
local_paths_.emplace_back(local_index_file_name);
}
Expand Down

0 comments on commit d4c4bf9

Please sign in to comment.