Skip to content

Commit

Permalink
enhance: convert the GetObject util to async (#30166) (#30197)
Browse files Browse the repository at this point in the history
This makes it much easier to use
pr: #30166

Signed-off-by: yah01 <yang.cen@zilliz.com>
  • Loading branch information
yah01 committed Feb 20, 2024
1 parent 3ecda6b commit 4b7c5ba
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 71 deletions.
39 changes: 19 additions & 20 deletions internal/core/src/storage/DiskFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
#include "log/Log.h"

#include "storage/DiskFileManagerImpl.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/FileManager.h"
#include "storage/IndexData.h"
#include "storage/Util.h"
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/ThreadPools.h"
#include "storage/Util.h"

namespace milvus::storage {

Expand Down Expand Up @@ -116,16 +117,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const std::vector<int64_t>& remote_file_sizes) {
auto local_chunk_manager =
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);

auto LoadIndexFromDisk = [&](
const std::string& file,
const int64_t offset,
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
local_chunk_manager->Read(file, offset, buf.get(), data_size);
return buf;
};
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);

std::vector<std::future<std::shared_ptr<uint8_t[]>>> futures;
AssertInfo(local_file_offsets.size() == remote_files.size(),
Expand All @@ -134,10 +126,17 @@ DiskFileManagerImpl::AddBatchIndexFiles(
"inconsistent size of file slices with size slices");

for (int64_t i = 0; i < remote_files.size(); ++i) {
futures.push_back(pool.Submit(LoadIndexFromDisk,
local_file_name,
local_file_offsets[i],
remote_file_sizes[i]));
futures.push_back(pool.Submit(
[&](const std::string& file,
const int64_t offset,
const int64_t data_size) -> std::shared_ptr<uint8_t[]> {
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[data_size]);
local_chunk_manager->Read(file, offset, buf.get(), data_size);
return buf;
},
local_file_name,
local_file_offsets[i],
remote_file_sizes[i]));
}

// hold index data util upload index file done
Expand All @@ -155,8 +154,8 @@ DiskFileManagerImpl::AddBatchIndexFiles(
remote_files,
field_meta_,
index_meta_);
for (auto iter = res.begin(); iter != res.end(); ++iter) {
remote_paths_to_size_[iter->first] = iter->second;
for (auto& re : res) {
remote_paths_to_size_[re.first] = re.second;
}
}

Expand Down Expand Up @@ -229,7 +228,7 @@ DiskFileManagerImpl::CacheBatchIndexFilesToDisk(

uint64_t offset = local_file_init_offfset;
for (int i = 0; i < batch_size; ++i) {
auto index_data = index_datas[i];
auto index_data = index_datas[i].get()->GetFieldData();
auto index_size = index_data->Size();
auto uint8_data =
reinterpret_cast<uint8_t*>(const_cast<void*>(index_data->Data()));
Expand Down Expand Up @@ -273,7 +272,7 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
auto field_datas = GetObjectData(rcm_.get(), batch_files);
int batch_size = batch_files.size();
for (int i = 0; i < batch_size; ++i) {
auto field_data = field_datas[i];
auto field_data = field_datas[i].get()->GetFieldData();
num_rows += uint32_t(field_data->get_num_rows());
AssertInfo(dim == 0 || dim == field_data->get_dim(),
"inconsistent dim value in multi binlogs!");
Expand Down
5 changes: 3 additions & 2 deletions internal/core/src/storage/MemFileManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ MemFileManagerImpl::LoadIndexToMemory(
for (size_t idx = 0; idx < batch_files.size(); ++idx) {
auto file_name =
batch_files[idx].substr(batch_files[idx].find_last_of('/') + 1);
file_to_index_data[file_name] = index_datas[idx];
file_to_index_data[file_name] =
index_datas[idx].get()->GetFieldData();
}
};

Expand Down Expand Up @@ -137,7 +138,7 @@ MemFileManagerImpl::CacheRawDataToMemory(
auto FetchRawData = [&]() {
auto raw_datas = GetObjectData(rcm_.get(), batch_files);
for (auto& data : raw_datas) {
field_datas.emplace_back(data);
field_datas.emplace_back(data.get()->GetFieldData());
}
};

Expand Down
51 changes: 3 additions & 48 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,62 +447,17 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
return std::make_pair(std::move(object_key), serialized_index_size);
}

// /**
// * Returns the current resident set size (physical memory use) measured
// * in bytes, or zero if the value cannot be determined on this OS.
// */
// size_t
// getCurrentRSS() {
// #if defined(_WIN32)
// /* Windows -------------------------------------------------- */
// PROCESS_MEMORY_COUNTERS info;
// GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info));
// return (size_t)info.WorkingSetSize;

// #elif defined(__APPLE__) && defined(__MACH__)
// /* OSX ------------------------------------------------------ */
// struct mach_task_basic_info info;
// mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT;
// if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != KERN_SUCCESS)
// return (size_t)0L; /* Can't access? */
// return (size_t)info.resident_size;

// #elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__)
// /* Linux ---------------------------------------------------- */
// long rss = 0L;
// FILE* fp = NULL;
// if ((fp = fopen("/proc/self/statm", "r")) == NULL)
// return (size_t)0L; /* Can't open? */
// if (fscanf(fp, "%*s%ld", &rss) != 1) {
// fclose(fp);
// return (size_t)0L; /* Can't read? */
// }
// fclose(fp);
// return (size_t)rss * (size_t)sysconf(_SC_PAGESIZE);

// #else
// /* AIX, BSD, Solaris, and Unknown OS ------------------------ */
// return (size_t)0L; /* Unsupported. */
// #endif
// }

std::vector<FieldDataPtr>
std::vector<std::future<std::unique_ptr<DataCodec>>>
GetObjectData(ChunkManager* remote_chunk_manager,
const std::vector<std::string>& remote_files) {
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH);
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
futures.reserve(remote_files.size());
for (auto& file : remote_files) {
futures.emplace_back(pool.Submit(
DownloadAndDecodeRemoteFile, remote_chunk_manager, file));
}

std::vector<FieldDataPtr> datas;
for (int i = 0; i < futures.size(); ++i) {
auto res = futures[i].get();
datas.emplace_back(res->GetFieldData());
}
ReleaseArrowUnused();
return datas;
return futures;
}

std::map<std::string, int64_t>
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <vector>
#include <future>

#include "storage/FieldData.h"
#include "storage/PayloadStream.h"
Expand Down Expand Up @@ -102,7 +103,7 @@ EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
const FieldMeta& field_meta,
std::string object_key);

std::vector<FieldDataPtr>
std::vector<std::future<std::unique_ptr<DataCodec>>>
GetObjectData(ChunkManager* remote_chunk_manager,
const std::vector<std::string>& remote_files);

Expand Down

0 comments on commit 4b7c5ba

Please sign in to comment.