diff --git a/internal/engine/common/common_query_data.h b/internal/engine/common/common_query_data.h index cc638b84..1e465c20 100644 --- a/internal/engine/common/common_query_data.h +++ b/internal/engine/common/common_query_data.h @@ -224,7 +224,7 @@ struct Ranker { virtual Status Parse() = 0; - std::string ToString() { + virtual std::string ToString() { std::stringstream ss; ss << "ranker type =" << type << ", "; ss << "params: " << params; @@ -242,7 +242,7 @@ struct WeightedRanker : public Ranker { virtual ~WeightedRanker() {} - Status Parse() { + virtual Status Parse() { Status status; std::string msg = "weighted ranker params err: " + std::string(raw_str); cJSON* jsonroot = cJSON_Parse(raw_str.c_str()); @@ -288,7 +288,7 @@ struct WeightedRanker : public Ranker { return Status::OK(); } - std::string ToString() { + virtual std::string ToString() { std::stringstream ss; ss << "ranker type =" << type << ", "; ss << "params: " << params << ", "; diff --git a/internal/engine/index/impl/hnswlib/hnswalg.h b/internal/engine/index/impl/hnswlib/hnswalg.h index e392ca35..703f5aaa 100644 --- a/internal/engine/index/impl/hnswlib/hnswalg.h +++ b/internal/engine/index/impl/hnswlib/hnswalg.h @@ -612,6 +612,7 @@ class HierarchicalNSW : public AlgorithmInterface { void resizeIndex(size_t new_max_elements) { pthread_rwlock_wrlock(&shared_mutex_); + LOG(INFO) << "hnsw resize new_max_elements=" << new_max_elements << ", realloc base layer size: " << new_max_elements * size_data_per_element_; if (new_max_elements < cur_element_count) throw std::runtime_error( "Cannot resize, max element is less than the current number of " diff --git a/internal/engine/index/index.cc b/internal/engine/index/index.cc index 051a1df8..904375a8 100644 --- a/internal/engine/index/index.cc +++ b/internal/engine/index/index.cc @@ -86,9 +86,9 @@ Status IndexIVFFlat::init(const std::string &index_param) { LOG(INFO) << "store params=" << store_params.ToJsonStr(); - docids_bitmap_ = new bitmap::BitmapManager(); + docids_bitmap_ = new bitmap::RocksdbBitmapManager(); int init_bitmap_size = 1000 * 10000; - if (docids_bitmap_->Init(init_bitmap_size) != 0) { + if (docids_bitmap_->Init(init_bitmap_size, index_root_path_ + "/bitmap") != 0) { std::string msg = "Cannot create bitmap!"; LOG(ERROR) << msg; return Status::IOError(msg); @@ -222,9 +222,9 @@ Status IndexIVFPQ::init(const std::string &index_param) { LOG(INFO) << "store params=" << store_params.ToJsonStr(); - docids_bitmap_ = new bitmap::BitmapManager(); + docids_bitmap_ = new bitmap::RocksdbBitmapManager(); int init_bitmap_size = 1000 * 10000; - if (docids_bitmap_->Init(init_bitmap_size) != 0) { + if (docids_bitmap_->Init(init_bitmap_size, index_root_path_ + "/bitmap") != 0) { std::string msg = "Cannot create bitmap!"; LOG(ERROR) << msg; return Status::IOError(msg); @@ -364,9 +364,9 @@ int IndexScann::init(const std::string &index_param) { LOG(INFO) << "store params=" << store_params.ToJsonStr(); - docids_bitmap_ = new bitmap::BitmapManager(); + docids_bitmap_ = new bitmap::RocksdbBitmapManager(); int init_bitmap_size = 1000 * 10000; - if (docids_bitmap_->Init(init_bitmap_size) != 0) { + if (docids_bitmap_->Init(init_bitmap_size, index_root_path_ + "/bitmap") != 0) { LOG(ERROR) << "Cannot create bitmap!"; return INTERNAL_ERR; } diff --git a/internal/engine/search/engine.cc b/internal/engine/search/engine.cc index 62b92ff9..626965ec 100644 --- a/internal/engine/search/engine.cc +++ b/internal/engine/search/engine.cc @@ -177,6 +177,28 @@ Engine::~Engine() { } } +void Engine::Close() { + if (vec_manager_) { + delete vec_manager_; + vec_manager_ = nullptr; + } + + if (table_) { + delete table_; + table_ = nullptr; + } + + if (field_range_index_) { + delete field_range_index_; + field_range_index_ = nullptr; + } + + if (docids_bitmap_) { + delete docids_bitmap_; + docids_bitmap_ = nullptr; + } +} + Engine *Engine::GetInstance(const std::string &index_root_path, const std::string &space_name) { Engine *engine = new Engine(index_root_path, space_name); @@ -184,6 +206,8 @@ Engine *Engine::GetInstance(const std::string &index_root_path, if (!status.ok()) { LOG(ERROR) << "Build " << space_name << " [" << index_root_path << "] failed!"; + delete engine; + engine = nullptr; return nullptr; } return engine; @@ -191,30 +215,30 @@ Engine *Engine::GetInstance(const std::string &index_root_path, Status Engine::Setup() { if (!utils::isFolderExist(index_root_path_.c_str())) { - mkdir(index_root_path_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if(mkdir(index_root_path_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { + std::string msg = "mkdir " + index_root_path_ + " error"; + LOG(ERROR) << msg; + return Status::IOError(msg); + } } dump_path_ = index_root_path_ + "/retrieval_model_index"; if (!utils::isFolderExist(dump_path_.c_str())) { - mkdir(dump_path_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if(mkdir(dump_path_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { + std::string msg = "mkdir " + dump_path_ + " error"; + LOG(ERROR) << msg; + return Status::IOError(msg); + } } - docids_bitmap_ = new bitmap::BitmapManager(); - docids_bitmap_->SetDumpFilePath(index_root_path_ + "/bitmap"); + docids_bitmap_ = new bitmap::RocksdbBitmapManager(); int init_bitmap_size = 5000 * 10000; - bool is_load = false; - int file_bytes_size = docids_bitmap_->FileBytesSize(); - if (file_bytes_size != 0) { - init_bitmap_size = file_bytes_size * 8; - is_load = true; - } - - if (docids_bitmap_->Init(init_bitmap_size) != 0) { + if (docids_bitmap_->Init(init_bitmap_size, index_root_path_ + "/bitmap") != 0) { std::string msg = "Cannot create bitmap!"; LOG(ERROR) << msg; return Status::IOError(msg); } - if (is_load) { + if (docids_bitmap_->IsLoad()) { docids_bitmap_->Load(); } else { docids_bitmap_->Dump(); @@ -496,6 +520,7 @@ Status Engine::CreateTable(TableInfo &table) { space_name_ + " cannot create VectorTable: " + status.ToString(); LOG(ERROR) << msg; vec_manager_->Close(); + this->Close(); return Status::ParamError(msg); } TableParams disk_table_params; @@ -601,14 +626,19 @@ int Engine::AddOrUpdate(Doc &doc) { #ifdef PERFORMANCE_TESTING double end_table = utils::getmillisecs(); #endif - + int ret = 0; // add vectors by VectorManager - if (vec_manager_->AddToStore(max_docid_, fields_vec) != 0) { - LOG(ERROR) << "Add to store error max_docid [" << max_docid_ << "]"; + ret = vec_manager_->AddToStore(max_docid_, fields_vec); + if (ret != 0) { + LOG(ERROR) << "Add to store error max_docid [" << max_docid_ << "] err=" << ret; return -4; } ++max_docid_; - docids_bitmap_->SetMaxID(max_docid_); + ret = docids_bitmap_->SetMaxID(max_docid_); + if (ret != 0) { + LOG(ERROR) << "Bitmap set max_docid [" << max_docid_ << "] err= " << ret; + return -5; + }; if (not b_running_ and index_status_ == UNINDEXED) { if (max_docid_ >= training_threshold_) { @@ -659,7 +689,7 @@ int Engine::Update(int doc_id, field_range_index_->Add(doc_id, idx); } - LOG(DEBUG) << "update success! key=" << fields_table["_id"].value; + LOG(DEBUG) << "update success! key=" << fields_table["_id"].value << ", doc_id=" << doc_id; is_dirty_ = true; return 0; } @@ -672,8 +702,12 @@ int Engine::Delete(std::string &key) { if (docids_bitmap_->Test(docid)) { return ret; } + ret = docids_bitmap_->Set(docid); + if (ret) { + LOG(ERROR) << "bitmap set failed: ret=" << ret; + return ret; + } ++delete_num_; - docids_bitmap_->Set(docid); docids_bitmap_->Dump(docid, 1); const auto &name_to_idx = table_->FieldMap(); for (const auto &ite : name_to_idx) { @@ -691,7 +725,7 @@ int Engine::GetDoc(const std::string &key, Doc &doc) { int docid = -1, ret = 0; ret = table_->GetDocIDByKey(key, docid); if (ret != 0 || docid < 0) { - LOG(INFO) << space_name_ << " GetDocIDbyKey [" << key << "] not found!"; + LOG(DEBUG) << space_name_ << " GetDocIDbyKey [" << key << "] not found!"; return -1; } @@ -1094,6 +1128,7 @@ int Engine::Load() { af_exector_->Start(); last_dump_dir_ = last_dir; LOG(INFO) << "load engine success! max docid=" << max_docid_ + << ", delete_num=" << delete_num_ << ", load directory=" << last_dir << ", clean directorys(not done)=" << utils::join(folders_not_done, ','); diff --git a/internal/engine/search/engine.h b/internal/engine/search/engine.h index 84c41b9b..132ea22b 100644 --- a/internal/engine/search/engine.h +++ b/internal/engine/search/engine.h @@ -90,6 +90,8 @@ class Engine { const std::string SpaceName() { return space_name_; } + void Close(); + private: Engine(const std::string &index_root_path, const std::string &space_name); diff --git a/internal/engine/util/bitmap_manager.cc b/internal/engine/util/bitmap_manager.cc index 2c70d5c4..d938ed1f 100644 --- a/internal/engine/util/bitmap_manager.cc +++ b/internal/engine/util/bitmap_manager.cc @@ -27,6 +27,7 @@ BitmapManager::BitmapManager() { size_ = 0; fd_ = -1; fpath_ = ""; + is_load_ = false; } BitmapManager::~BitmapManager() { @@ -128,17 +129,16 @@ int BitmapManager::Dump(uint32_t begin_bit_id, uint32_t bit_len) { return ret; } -int BitmapManager::Load(uint32_t begin_bit_id, uint32_t bit_len) { +int BitmapManager::Load(uint32_t bit_len) { if (bit_len == 0) bit_len = size_; - if (begin_bit_id < 0 || bit_len < 0 || begin_bit_id + bit_len > size_) { - LOG(ERROR) << "parameters error, begin_bit_id=" << begin_bit_id - << " load_bit_len=" << bit_len << " size=" << size_; + if (bit_len < 0 || bit_len > size_) { + LOG(ERROR) << "parameters error, load_bit_len=" << bit_len << " size=" << size_; return -1; } - uint32_t begin_bytes = begin_bit_id >> 3; - uint32_t end_bytes = (begin_bit_id + bit_len - 1) >> 3; + uint32_t begin_bytes = 0; + uint32_t end_bytes = (bit_len - 1) >> 3; uint32_t load_bytes = end_bytes - begin_bytes + 1; int ret = 0; if (fd_ != -1) { @@ -191,8 +191,8 @@ bool BitmapManager::Test(uint32_t bit_id) { return false; } -void BitmapManager::SetMaxID(uint32_t bit_id) { - if (size_ > bit_id) return; +int BitmapManager::SetMaxID(uint32_t bit_id) { + if (size_ > bit_id) return 0; uint32_t old_bytes_count = (size_ >> 3) + 1; size_ *= 2; @@ -200,7 +200,7 @@ void BitmapManager::SetMaxID(uint32_t bit_id) { char *bitmap = new char[bytes_count]; if (bitmap == nullptr) { LOG(INFO) << "new char [" << bytes_count << "] error."; - return; + return -1; } memset(bitmap, 0, bytes_count); char *old = bitmap_; @@ -213,6 +213,234 @@ void BitmapManager::SetMaxID(uint32_t bit_id) { 1000 * 100, [](char *bitmap) { delete[] bitmap; }, old); // after 100s LOG(INFO) << "Current bitmap size [" << size_ << "]"; + return 0; +} + +RocksdbBitmapManager::RocksdbBitmapManager() { + bitmap_ = nullptr; + size_ = 0; + fd_ = -1; + fpath_ = ""; + db_ = nullptr; + is_load_ = false; +} + +RocksdbBitmapManager::~RocksdbBitmapManager() { + if (db_ != nullptr) { + db_->Close(); + delete db_; + db_ = nullptr; + } +} + +int RocksdbBitmapManager::Init(uint32_t bit_size, const std::string &fpath, + char *bitmap) { + if (bit_size <= 0) { + LOG(ERROR) << "bit_size <= 0"; + return -1; + } + this->size_ = bit_size; + + if (fpath != "") { + int ret = RocksdbBitmapManager::SetDumpFilePath(fpath); + if (ret) { + LOG(ERROR) << "RoskdDB BitmapManager init path err:" << ret; + return ret; + } + } else { + LOG(ERROR) << "RoskdDB BitmapManager init path should not be empty."; + return -1; + } + + // load bitmap size + std::string value; + rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), rocksdb::Slice(kBitmapSizeKey), &value); + if (s.ok()) { + size_ = atol(value.c_str()); + LOG(INFO) << "RoskdDB set dump file path successed, load size_=" << size_; + } else { + // dump bitmap size + std::string value = std::to_string(size_); + rocksdb::Status s = + db_->Put(rocksdb::WriteOptions(), rocksdb::Slice(kBitmapSizeKey), + rocksdb::Slice(value)); + if (!s.ok()) { + LOG(ERROR) << "rocksdb set bitmap size error:" << s.ToString() << ", key=" << kBitmapSizeKey << ", value=" << value; + return s.code(); + } + } + + uint32_t bytes_count = (size_ >> 3) + 1; + if (bitmap) { + bitmap_ = bitmap; + } else { + bitmap_ = new char[bytes_count]; + if (bitmap_ == nullptr) { + LOG(ERROR) << "new char[" << bytes_count << "] error."; + return -1; + } + } + memset(bitmap_, 0, bytes_count); + + LOG(INFO) << "RoskdDB BitmapManager init successed. bytes_count=" << bytes_count + << " bit_size=" << bit_size; + return 0; +} + +int RocksdbBitmapManager::SetDumpFilePath(const std::string &fpath) { + if (db_ == nullptr && fpath != "") { + rocksdb::BlockBasedTableOptions table_options; + std::shared_ptr cache = + rocksdb::NewLRUCache(kBitmapCacheSize); + table_options.block_cache = cache; + rocksdb::Options options; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + options.IncreaseParallelism(); + // options.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options.create_if_missing = true; + + if (!utils::isFolderExist(fpath.c_str())) { + if(mkdir(fpath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { + std::string msg = "mkdir " + fpath + " error"; + LOG(ERROR) << msg; + return -1; + } + } else { + is_load_ = true; + } + + // open DB + rocksdb::Status s = rocksdb::DB::Open(options, fpath, &db_); + if (!s.ok()) { + LOG(ERROR) << "open rocks db error: " << s.ToString(); + return -2; + } + return 0; + } + return -1; +} + +int RocksdbBitmapManager::Dump(uint32_t begin_bit_id, uint32_t bit_len) { + return 0; +} + +int RocksdbBitmapManager::Load(uint32_t bit_len) { + if (bit_len == 0) bit_len = size_; + + if (bit_len < 0 || bit_len > size_) { + LOG(ERROR) << "parameters error, load_bit_len=" << bit_len << " size=" << size_; + return -1; + } + int load_num = 0; + for(uint32_t i = 0; i < bit_len; i += kBitmapSegmentBits) { + std::string key, value; + ToRowKey(i, key); + rocksdb::Status s = + db_->Get(rocksdb::ReadOptions(), rocksdb::Slice(key), &value); + if (s.ok()) { + memcpy((void *)(bitmap_ + i / kBitmapSegmentBits * kBitmapSegmentBytes), value.c_str(), kBitmapSegmentBytes); + load_num += 1; + } + } + LOG(INFO) << "RoskdDB BitmapManager load successed. size_=" << size_ << ", load_num=" << load_num; + return 0; +} + +uint32_t RocksdbBitmapManager::FileBytesSize() { + return 0; +} + +void RocksdbBitmapManager::ToRowKey(uint32_t bit_id, std::string &key) { + //max length:6 because (2^32 -1) / 1024 / 8 = 524287 + char data[7]; + snprintf(data, 7, "%06d", bit_id / kBitmapSegmentBits); + key.assign(data, 7); +} + +int RocksdbBitmapManager::Set(uint32_t bit_id) { + if (bit_id >= 0 && bit_id < size_ && bitmap_ != nullptr) { + bitmap_[bit_id >> 3] |= (0x1 << (bit_id & 0x7)); + + std::string key, value; + ToRowKey(bit_id, key); + + rocksdb::Status s = + db_->Put(rocksdb::WriteOptions(), rocksdb::Slice(key), + rocksdb::Slice((const char *)(bitmap_ + bit_id / kBitmapSegmentBits * kBitmapSegmentBytes), kBitmapSegmentBytes)); + if (!s.ok()) { + LOG(ERROR) << "rocksdb set bitmap error:" << s.ToString() << ", key=" << key << ", value=" << value; + //reset + bitmap_[bit_id >> 3] &= ~(0x1 << (bit_id & 0x7)); + return s.code(); + } + return 0; + } + return -1; +} + +int RocksdbBitmapManager::Unset(uint32_t bit_id) { + if (bit_id >= 0 && bit_id < size_ && bitmap_ != nullptr) { + bitmap_[bit_id >> 3] &= ~(0x1 << (bit_id & 0x7)); + + std::string key, value; + ToRowKey(bit_id, key); + + rocksdb::Status s = + db_->Put(rocksdb::WriteOptions(), rocksdb::Slice(key), + rocksdb::Slice((const char *)(bitmap_ + bit_id / kBitmapSegmentBits * kBitmapSegmentBytes), kBitmapSegmentBytes)); + if (!s.ok()) { + LOG(ERROR) << "rocksdb unset bitmap error:" << s.ToString() << ", key=" << key << ", value=" << value; + //reset + bitmap_[bit_id >> 3] |= (0x1 << (bit_id & 0x7)); + return s.code(); + } + return 0; + } + return -1; +} + +bool RocksdbBitmapManager::Test(uint32_t bit_id) { + if (bit_id >= 0 && bit_id < size_ && bitmap_ != nullptr) { + return (bitmap_[bit_id >> 3] & (0x1 << (bit_id & 0x7))); + } + return false; +} + +int RocksdbBitmapManager::SetMaxID(uint32_t bit_id) { + if (size_ > bit_id) return 0; + + size_t new_size = size_ * 2; + std::string value = std::to_string(new_size); + rocksdb::Status s = + db_->Put(rocksdb::WriteOptions(), rocksdb::Slice(kBitmapSizeKey), + rocksdb::Slice(value)); + if (!s.ok()) { + LOG(ERROR) << "rocksdb set bitmap size error:" << s.ToString() << ", key=" << kBitmapSizeKey << ", value=" << value; + return s.code(); + } + + uint32_t old_bytes_count = (size_ >> 3) + 1; + size_ = new_size; + uint32_t bytes_count = (size_ >> 3) + 1; + char *bitmap = new char[bytes_count]; + if (bitmap == nullptr) { + LOG(INFO) << "new char [" << bytes_count << "] error."; + return -1; + } + memset(bitmap, 0, bytes_count); + char *old = bitmap_; + memcpy(bitmap, old, old_bytes_count); + bitmap_ = bitmap; + + // delay free + utils::AsyncWait( + 1000 * 100, [](char *bitmap) { delete[] bitmap; }, old); // after 100s + + LOG(INFO) << "Current bitmap size [" << size_ << "]"; + + return 0; } -} // namespace bitmap +} // namespace bitmap \ No newline at end of file diff --git a/internal/engine/util/bitmap_manager.h b/internal/engine/util/bitmap_manager.h index d998266c..6235d1dd 100644 --- a/internal/engine/util/bitmap_manager.h +++ b/internal/engine/util/bitmap_manager.h @@ -9,43 +9,86 @@ #include +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/table.h" +#include "util/status.h" + namespace bitmap { class BitmapManager { public: BitmapManager(); - ~BitmapManager(); + virtual ~BitmapManager(); - int Init(uint32_t bit_size, const std::string &fpath = "", + virtual int Init(uint32_t bit_size, const std::string &fpath = "", char *bitmap = nullptr); - int SetDumpFilePath(const std::string &fpath); + virtual int SetDumpFilePath(const std::string &fpath); + + virtual int Dump(uint32_t begin_bit_id = 0, uint32_t bit_len = 0); - int Dump(uint32_t begin_bit_id = 0, uint32_t bit_len = 0); + virtual int Load(uint32_t bit_len = 0); - int Load(uint32_t begin_bit_id = 0, uint32_t bit_len = 0); + virtual uint32_t FileBytesSize(); - uint32_t FileBytesSize(); + bool IsLoad() { return is_load_; } - int Set(uint32_t bit_id); + virtual int Set(uint32_t bit_id); - int Unset(uint32_t bit_id); + virtual int Unset(uint32_t bit_id); - bool Test(uint32_t bit_id); + virtual bool Test(uint32_t bit_id); - uint32_t BitSize() { return size_; } + virtual uint32_t BitSize() { return size_; } char *Bitmap() { return bitmap_; } - uint32_t BytesSize() { return (size_ >> 3) + 1; } + virtual uint32_t BytesSize() { return (size_ >> 3) + 1; } - void SetMaxID(uint32_t bit_id); + virtual int SetMaxID(uint32_t bit_id); - private: char *bitmap_; uint32_t size_; int fd_; std::string fpath_; + bool is_load_; +}; + +constexpr uint32_t kBitmapSegmentBits = 1024 * 8; +constexpr uint32_t kBitmapSegmentBytes = 1024; +constexpr uint32_t kBitmapCacheSize = 10 * 1024 * 1024; +const std::string kBitmapSizeKey = "bitmap_size"; + + +class RocksdbBitmapManager : public BitmapManager { + public: + RocksdbBitmapManager(); + virtual ~RocksdbBitmapManager(); + + virtual int Init(uint32_t bit_size, const std::string &fpath = "", + char *bitmap = nullptr); + + virtual int SetDumpFilePath(const std::string &fpath); + + virtual int Dump(uint32_t begin_bit_id = 0, uint32_t bit_len = 0); + + virtual int Load(uint32_t bit_len = 0); + + virtual uint32_t FileBytesSize(); + + virtual int Set(uint32_t bit_id); + + virtual int Unset(uint32_t bit_id); + + virtual bool Test(uint32_t bit_id); + + virtual int SetMaxID(uint32_t bit_id); + + virtual void ToRowKey(uint32_t bit_id, std::string &key); + + rocksdb::DB *db_; + bool should_load_; }; } // namespace bitmap diff --git a/internal/engine/vector/rocksdb_wrapper.cc b/internal/engine/vector/rocksdb_wrapper.cc index 3c3d5356..2e54b0fb 100644 --- a/internal/engine/vector/rocksdb_wrapper.cc +++ b/internal/engine/vector/rocksdb_wrapper.cc @@ -16,8 +16,11 @@ namespace vearch { RocksDBWrapper::RocksDBWrapper() : db_(nullptr) {} RocksDBWrapper::~RocksDBWrapper() { - delete db_; - db_ = nullptr; + if (db_ != nullptr) { + db_->Close(); + delete db_; + db_ = nullptr; + } } Status RocksDBWrapper::Open(string db_path, size_t block_cache_size) { diff --git a/internal/ps/engine/gammacb/gamma.go b/internal/ps/engine/gammacb/gamma.go index 75dbe5a5..2b8ebdc6 100644 --- a/internal/ps/engine/gammacb/gamma.go +++ b/internal/ps/engine/gammacb/gamma.go @@ -77,6 +77,11 @@ func New(cfg EngineConfig) (engine.Engine, error) { SpaceName: cfg.Space.Name + "-" + cast.ToString(cfg.PartitionID), LogDir: config.Conf().GetLogDir()} + gamma_engine_instance := gamma.Init(config) + if gamma_engine_instance == nil { + return nil, fmt.Errorf("init engine err") + } + ge := &gammaEngine{ ctx: ctx, cancel: cancel, @@ -84,7 +89,7 @@ func New(cfg EngineConfig) (engine.Engine, error) { space: cfg.Space, partitionID: cfg.PartitionID, path: cfg.Path, - gamma: gamma.Init(config), + gamma: gamma_engine_instance, counter: atomic.NewAtomicInt64(0), hasClosed: false, } @@ -97,7 +102,7 @@ func New(cfg EngineConfig) (engine.Engine, error) { if status := gamma.CreateTable(ge.gamma, table); status.Code != 0 { log.Error("create table [%s] err [%s] cost time: [%v]", cfg.Space.Name, status.Msg, time.Since(startTime).Seconds()) ge.Close() - return nil, fmt.Errorf("create table err:[%s]", status.Msg) + return nil, fmt.Errorf("create engine table err:[%s]", status.Msg) } gammaDirs := make([]string, 0) @@ -112,6 +117,7 @@ func New(cfg EngineConfig) (engine.Engine, error) { if code != 0 { vearchlog.LogErrNotNil(fmt.Errorf("load data err code:[%d]", code)) ge.Close() + return nil, fmt.Errorf("load data err code:[%d]", code) } } diff --git a/internal/ps/engine/gammacb/reader.go b/internal/ps/engine/gammacb/reader.go index 1b047e90..b62a0b64 100644 --- a/internal/ps/engine/gammacb/reader.go +++ b/internal/ps/engine/gammacb/reader.go @@ -64,8 +64,7 @@ func (ri *readerImpl) GetDoc(ctx context.Context, doc *vearchpb.Document, getByD code = gamma.GetDocByID(ri.engine.gamma, primaryKey, docGamma) } if code != 0 { - msg := "doc not found" - return vearchpb.NewError(vearchpb.ErrorEnum_DOCUMENT_NOT_EXIST, errors.New(msg)) + return vearchpb.NewError(vearchpb.ErrorEnum_DOCUMENT_NOT_EXIST, nil) } doc.Fields = docGamma.Fields return nil diff --git a/test/test_document_delete.py b/test/test_document_delete.py index be163585..d6dae1d1 100644 --- a/test/test_document_delete.py +++ b/test/test_document_delete.py @@ -171,3 +171,76 @@ def test_vearch_document_delete_multiple_badcase(self, index: int, wrong_type: s # destroy for badcase def test_destroy_cluster_badcase(self): destroy(router_url, db_name, space_name) + +class TestDocumentDeleteAndUpsert: + def setup_class(self): + self.logger = logger + self.xb = xb + + # prepare + def test_prepare_cluster(self): + prepare_cluster_for_document_test(self.logger, 1, self.xb) + + def test_prepare_delete_and_upsert(self): + add(1, 1, self.xb, with_id=True, full_field=True) + + assert get_space_num() == 1 + + response = get_space(router_url, db_name, space_name) + assert response.json()["data"]["partitions"][0]["max_docid"] == 0 + + query_dict = { + "document_ids":["0"], + "limit": 1, + "db_name": db_name, + "space_name": space_name, + } + partition_id = get_partition(router_url, db_name, space_name)[0] + query_dict_partition = { + "document_ids":["0"], + "partition_id": partition_id, + "limit": 1, + "db_name": db_name, + "space_name": space_name, + } + url = router_url + "/document/query" + json_str = json.dumps(query_dict) + response = requests.post(url, auth=(username, password), data=json_str) + logger.info(response.json()["data"]) + assert response.json()["data"]["total"] == 1 + + delete_interface(self.logger, 1, 1, delete_type="by_ids") + + response = requests.post(url, auth=(username, password), json=query_dict_partition) + logger.info(response.json()["data"]) + assert response.json()["data"]["total"] == 0 + + assert get_space_num() == 0 + + response = requests.post(url, auth=(username, password), data=json_str) + logger.info(response.json()["data"]) + + add(1, 1, self.xb, with_id=True, full_field=True) + + assert get_space_num() == 1 + + response = requests.post(url, auth=(username, password), data=json_str) + logger.info(response.json()["data"]) + assert response.status_code == 200 + + # add same _id then delete and add again, max_doc_id will increase + response = get_space(router_url, db_name, space_name) + assert response.json()["data"]["partitions"][0]["max_docid"] == 1 + + response = requests.post(url, auth=(username, password), json=query_dict_partition) + logger.info(response.json()["data"]) + assert response.json()["data"]["total"] == 0 + + query_dict_partition["document_ids"] = ["1"] + response = requests.post(url, auth=(username, password), json=query_dict_partition) + logger.info(response.json()["data"]) + assert response.json()["data"]["total"] == 1 + + # destroy + def test_destroy_cluster(self): + destroy(router_url, db_name, space_name) \ No newline at end of file diff --git a/test/test_module_space.py b/test/test_module_space.py index 8bd3822e..c3b592d4 100644 --- a/test/test_module_space.py +++ b/test/test_module_space.py @@ -250,17 +250,15 @@ def test_vearch_space_create_badcase(self, wrong_index, wrong_type, index_type): training_threshold = 1 if wrong_index == 2: create_space_name = "wrong-name" - if wrong_index == 3: - replica_num = 3 nlinks = 32 - if wrong_index == 4: + if wrong_index == 3: nlinks = 97 - if wrong_index == 5: + if wrong_index == 4: nlinks = 7 efConstruction = 100 - if wrong_index == 6: + if wrong_index == 5: efConstruction = 1025 - if wrong_index == 7: + if wrong_index == 6: efConstruction = 15 ncentroids = 2048 if wrong_index == 7 or wrong_index == 8: @@ -365,6 +363,7 @@ def test_vearch_space_describe_badcase(self, wrong_index, wrong_type): } response = create_space(router_url, db_name, space_config) + logger.info(response.json()) assert response.json()["code"] == 0 describe_db_name = db_name