Skip to content

Commit

Permalink
snapshot integrate (#2814)
Browse files Browse the repository at this point in the history
* update HybridQuery

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add IterateHandler

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* IterateHandler opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* opt

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add SSSegmentReader

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add src/codecs/snapshot

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update latest code

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update SSSegmentReader interface

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* add SSSegmentWriter

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update SSSegmentWriter interface

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* update unittest

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix clang-format

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix unittest/db build fail

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* use SSSegmentReader and SSSegmentWriter in SSDBImpl

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix clang-format

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix build issue

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
  • Loading branch information
cydrain committed Jul 11, 2020
1 parent d004d27 commit 2c3b3a5
Show file tree
Hide file tree
Showing 36 changed files with 2,798 additions and 85 deletions.
2 changes: 2 additions & 0 deletions core/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/tracing tracing_files)

aux_source_directory(${MILVUS_ENGINE_SRC}/codecs codecs_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/snapshot codecs_snapshot_files)

aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files)

Expand All @@ -156,6 +157,7 @@ set(engine_files
${wrapper_files}
${codecs_files}
${codecs_default_files}
${codecs_snapshot_files}
${segment_files}
)

Expand Down
242 changes: 242 additions & 0 deletions core/src/codecs/snapshot/SSAttrsFormat.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "codecs/snapshot/SSAttrsFormat.h"

#include <fcntl.h>
#include <fiu-local.h>
#include <unistd.h>
#include <algorithm>
#include <memory>

#include <boost/filesystem.hpp>

#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"

namespace milvus {
namespace codec {

void
SSAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset,
size_t num, std::vector<uint8_t>& raw_attrs, size_t& nbytes) {
auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str());
fiu_do_on("read_attrs_internal_open_file_fail", open_res = false);
if (!open_res) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}

fs_ptr->reader_ptr_->read(&nbytes, sizeof(size_t));

num = std::min(num, nbytes - offset);

offset += sizeof(size_t);
fs_ptr->reader_ptr_->seekg(offset);

raw_attrs.resize(num / sizeof(uint8_t));
fs_ptr->reader_ptr_->read(raw_attrs.data(), num);

fs_ptr->reader_ptr_->close();
}

void
SSAttrsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
std::vector<int64_t>& uids) {
auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str());
fiu_do_on("read_uids_internal_open_file_fail", open_res = false);
if (!open_res) {
std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}

size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));

uids.resize(num_bytes / sizeof(int64_t));
fs_ptr->reader_ptr_->read(uids.data(), num_bytes);

fs_ptr->reader_ptr_->read(uids.data(), num_bytes);
}

void
SSAttrsFormat::read(const milvus::storage::FSHandlerPtr& fs_ptr, milvus::segment::AttrsPtr& attrs_read) {
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
auto is_directory = boost::filesystem::is_directory(dir_path);
fiu_do_on("read_id_directory_false", is_directory = false);
if (!is_directory) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
}

boost::filesystem::path target_path(dir_path);
typedef boost::filesystem::directory_iterator d_it;
d_it it_end;
d_it uid_it(target_path);
std::vector<int64_t> uids;
for (; uid_it != it_end; ++uid_it) {
const auto& path = uid_it->path();
if (path.extension().string() == user_id_extension_) {
read_uids_internal(fs_ptr, path.string(), uids);
break;
}
}

d_it it(target_path);
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == raw_attr_extension_) {
auto file_name = path.filename().string();
auto field_name = file_name.substr(0, file_name.size() - 3);
std::vector<uint8_t> attr_list;
size_t nbytes;
read_attrs_internal(fs_ptr, path.string(), 0, INT64_MAX, attr_list, nbytes);
milvus::segment::AttrPtr attr =
std::make_shared<milvus::segment::Attr>(attr_list, nbytes, uids, field_name);
attrs_read->attrs.insert(std::pair(field_name, attr));
}
}
}

void
SSAttrsFormat::write(const milvus::storage::FSHandlerPtr& fs_ptr, const milvus::segment::AttrsPtr& attrs_ptr) {
TimeRecorder rc("write attributes");

std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();

auto it = attrs_ptr->attrs.begin();
if (it == attrs_ptr->attrs.end()) {
// std::string err_msg = "Attributes is null";
// LOG_ENGINE_ERROR_ << err_msg;
return;
}

#if 0
const std::string uid_file_path = dir_path + "/" + it->second->GetCollectionId() + user_id_extension_;

int uid_fd = open(uid_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (uid_fd == -1) {
std::string err_msg = "Failed to open file: " + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}
size_t uid_num_bytes = it->second->GetUids().size() * sizeof(int64_t);
if (::write(uid_fd, &uid_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(uid_fd, it->second->GetUids().data(), uid_num_bytes) == -1) {
std::string err_msg = "Failed to write to file" + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(uid_fd) == -1) {
std::string err_msg = "Failed to close file: " + uid_file_path + ", error: " + std::strerror(errno);
ENGINE_LOG_ERROR << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
rc.RecordSection("write uids done");
#endif

for (; it != attrs_ptr->attrs.end(); it++) {
const std::string ra_file_path = dir_path + "/" + it->second->GetName() + raw_attr_extension_;

int ra_fd = open(ra_file_path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 00664);
if (ra_fd == -1) {
std::string err_msg = "Failed to open file: " + ra_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
}

size_t ra_num_bytes = it->second->GetNbytes();
if (::write(ra_fd, &ra_num_bytes, sizeof(size_t)) == -1) {
std::string err_msg = "Failed to write to file: " + ra_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::write(ra_fd, it->second->GetData().data(), ra_num_bytes) == -1) {
std::string err_msg = "Failed to write to file: " + ra_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}
if (::close(ra_fd) == -1) {
std::string err_msg = "Failed to close file: " + ra_file_path + ", error: " + std::strerror(errno);
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_WRITE_ERROR, err_msg);
}

rc.RecordSection("write rv done");
}
}

void
SSAttrsFormat::read_attrs(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& field_name, off_t offset,
size_t num_bytes, std::vector<uint8_t>& raw_attrs) {
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
if (!boost::filesystem::is_directory(dir_path)) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
}

boost::filesystem::path target_path(dir_path);
typedef boost::filesystem::directory_iterator d_it;
d_it it_end;
d_it it(target_path);

for (; it != it_end; ++it) {
const auto& path = it->path();
std::string file_name = path.filename().string();
if (path.extension().string() == raw_attr_extension_ &&
file_name.substr(0, file_name.size() - 3) == field_name) {
size_t nbytes;
read_attrs_internal(fs_ptr, path.string(), offset, num_bytes, raw_attrs, nbytes);
}
}
}

void
SSAttrsFormat::read_uids(const milvus::storage::FSHandlerPtr& fs_ptr, std::vector<int64_t>& uids) {
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
auto is_directory = boost::filesystem::is_directory(dir_path);
fiu_do_on("is_directory_false", is_directory = false);
if (!is_directory) {
std::string err_msg = "Directory: " + dir_path + "does not exist";
LOG_ENGINE_ERROR_ << err_msg;
throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
}

boost::filesystem::path target_path(dir_path);
typedef boost::filesystem::directory_iterator d_it;
d_it it_end;
d_it it(target_path);
// for (auto& it : boost::filesystem::directory_iterator(dir_path)) {
for (; it != it_end; ++it) {
const auto& path = it->path();
if (path.extension().string() == user_id_extension_) {
read_uids_internal(fs_ptr, path.string(), uids);
}
}
}

} // namespace codec
} // namespace milvus
72 changes: 72 additions & 0 deletions core/src/codecs/snapshot/SSAttrsFormat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "segment/Attrs.h"
#include "storage/FSHandler.h"

namespace milvus {
namespace codec {

class SSAttrsFormat {
public:
SSAttrsFormat() = default;

void
read(const storage::FSHandlerPtr& fs_ptr, segment::AttrsPtr& attrs_read);

void
write(const storage::FSHandlerPtr& fs_ptr, const segment::AttrsPtr& attr);

void
read_attrs(const storage::FSHandlerPtr& fs_ptr, const std::string& field_name, off_t offset, size_t num_bytes,
std::vector<uint8_t>& raw_attrs);

void
read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<int64_t>& uids);

// No copy and move
SSAttrsFormat(const SSAttrsFormat&) = delete;
SSAttrsFormat(SSAttrsFormat&&) = delete;

SSAttrsFormat&
operator=(const SSAttrsFormat&) = delete;
SSAttrsFormat&
operator=(SSAttrsFormat&&) = delete;

private:
void
read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, const std::string&, off_t, size_t, std::vector<uint8_t>&,
size_t&);

void
read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string&, std::vector<int64_t>&);

private:
const std::string raw_attr_extension_ = ".ra";
const std::string user_id_extension_ = ".uid";
};

using SSAttrsFormatPtr = std::shared_ptr<SSAttrsFormat>;

} // namespace codec
} // namespace milvus
Loading

0 comments on commit 2c3b3a5

Please sign in to comment.