diff --git a/engine/gamma/index/gamma_index.h b/engine/gamma/index/gamma_index.h index 25b4c98a..032497f9 100644 --- a/engine/gamma/index/gamma_index.h +++ b/engine/gamma/index/gamma_index.h @@ -137,7 +137,7 @@ struct GammaIndex { virtual int Dump(const std::string &dir) = 0; virtual int Load(const std::vector &index_dirs) = 0; - size_t d_; + int d_; const char *docids_bitmap_; RawVector *raw_vec_; diff --git a/engine/gamma/index/gamma_index_factory.h b/engine/gamma/index/gamma_index_factory.h index 93c8785b..543ea24b 100644 --- a/engine/gamma/index/gamma_index_factory.h +++ b/engine/gamma/index/gamma_index_factory.h @@ -17,7 +17,7 @@ namespace tig_gamma { class GammaIndexFactory { -public: + public: static GammaIndex *Create(RetrievalModel model, size_t dimension, const char *docids_bitmap, RawVector *raw_vec, IVFPQParameters *ivfpq_param) { @@ -26,24 +26,35 @@ class GammaIndexFactory { return nullptr; } switch (model) { - case IVFPQ: { - faiss::IndexFlatL2 *coarse_quantizer = new faiss::IndexFlatL2(dimension); - return (GammaIndex *)new GammaIVFPQIndex( - coarse_quantizer, dimension, ivfpq_param->ncentroids, - ivfpq_param->nsubvector, ivfpq_param->nbits_per_idx, docids_bitmap, - raw_vec, ivfpq_param->nprobe); - break; - } - - default: { - throw std::invalid_argument("invalid raw feature type"); - break; - } + case IVFPQ: { + if (dimension % ivfpq_param->nsubvector != 0) { + dimension = (dimension / ivfpq_param->nsubvector + 1) * + ivfpq_param->nsubvector; + LOG(INFO) << "Dimension [" << raw_vec->GetDimension() + << "] cannot divide by nsubvector [" + << ivfpq_param->nsubvector << "], adjusted to [" + << dimension << "]"; + } + + faiss::IndexFlatL2 *coarse_quantizer = + new faiss::IndexFlatL2(dimension); + + return (GammaIndex *)new GammaIVFPQIndex( + coarse_quantizer, dimension, ivfpq_param->ncentroids, + ivfpq_param->nsubvector, ivfpq_param->nbits_per_idx, docids_bitmap, + raw_vec, ivfpq_param->nprobe); + break; + } + + default: { + throw std::invalid_argument("invalid raw feature type"); + break; + } } return nullptr; } }; -} // namespace tig_gamma +} // namespace tig_gamma -#endif // GAMMA_INDEX_FACTORY_H_ +#endif // GAMMA_INDEX_FACTORY_H_ diff --git a/engine/gamma/index/gamma_index_ivfpq.cc b/engine/gamma/index/gamma_index_ivfpq.cc index c7d83d1f..40d61d95 100644 --- a/engine/gamma/index/gamma_index_ivfpq.cc +++ b/engine/gamma/index/gamma_index_ivfpq.cc @@ -32,6 +32,18 @@ namespace tig_gamma { +static inline void ConvertVectorDim(size_t num, int raw_d, int d, + const float *raw_vec, float *vec) { + memset(vec, 0, num * d * sizeof(float)); + +#pragma omp parallel for + for (size_t i = 0; i < num; ++i) { + for (int j = 0; j < raw_d; ++j) { + vec[i * d + j] = raw_vec[i * raw_d + j]; + } + } +} + IndexIVFPQStats indexIVFPQ_stats; GammaIVFPQIndex::GammaIVFPQIndex(faiss::Index *quantizer, size_t d, @@ -119,10 +131,31 @@ int GammaIVFPQIndex::Indexing() { << "] less then 8192, failed!"; return -1; } - int num = vectors_count > 100000 ? 100000 : vectors_count; + size_t num = vectors_count > 100000 ? 100000 : vectors_count; const float *header = raw_vec_->GetVectorHeader(0, num); - train(num, header); + + int raw_d = raw_vec_->GetDimension(); + + float *train_vec = nullptr; + + if (d_ > raw_d) { + float *vec = new float[num * d_]; + + ConvertVectorDim(num, raw_d, d, header, vec); + + train_vec = vec; + } else { + train_vec = const_cast(header); + } + + train(num, train_vec); + + if (d_ > raw_d) { + delete train_vec; + } + raw_vec_->Destroy(header, true); + LOG(INFO) << "train successed!"; return 0; } @@ -147,18 +180,32 @@ int GammaIVFPQIndex::AddRTVecsToIndex() { for (int i = 0; i < index_count; i++) { int start_docid = indexed_vec_count_; - int count_per_index = + size_t count_per_index = (i == (index_count - 1) ? total_stored_vecs - start_docid : MAX_NUM_PER_INDEX); - // const float *index_ptr = raw_vec_->GetVector(start_docid); const float *vector_head = raw_vec_->GetVectorHeader( indexed_vec_count_, indexed_vec_count_ + count_per_index); - // const float *index_ptr = vector_head + (uint64_t)start_docid * d_; - const float *index_ptr = vector_head; - if (!Add(count_per_index, index_ptr)) { + + int raw_d = raw_vec_->GetDimension(); + float *add_vec = nullptr; + + if (d_ > raw_d) { + float *vec = new float[count_per_index * d_]; + + ConvertVectorDim(count_per_index, raw_d, d, vector_head, vec); + add_vec = vec; + } else { + add_vec = const_cast(vector_head); + } + + if (!Add(count_per_index, add_vec)) { LOG(ERROR) << "add index from docid " << start_docid << " error!"; ret = -2; } + + if (d_ > raw_d) { + delete add_vec; + } raw_vec_->Destroy(vector_head, true); } } @@ -310,13 +357,14 @@ void GammaIVFPQIndex::search_preassigned( float *recall_simi, idx_t *recall_idxi) { std::vector vecs(recall_num); raw_vec_->Gets(recall_num, recall_idxi, vecs); + int raw_d = raw_vec_->GetDimension(); for (int j = 0; j < recall_num; j++) { if (recall_idxi[j] == -1) continue; float dis = 0; if (metric_type == faiss::METRIC_INNER_PRODUCT) { - dis = faiss::fvec_inner_product(xi, vecs[j], this->d); + dis = faiss::fvec_inner_product(xi, vecs[j], raw_d); } else { - dis = faiss::fvec_L2sqr(xi, vecs[j], this->d); + dis = faiss::fvec_L2sqr(xi, vecs[j], raw_d); } if (((condition->min_dist >= 0 && dis >= condition->min_dist) && @@ -405,7 +453,6 @@ void GammaIVFPQIndex::search_preassigned( int *vid_list_data = vid_list.data(); int *curr_ptr = vid_list_data; for (size_t i = 0; i < docid_list.size(); i++) { - // vids_list[i] = this->raw_vec_->docid2vid_[docid_list[i]]; if (bitmap::test(this->docids_bitmap_, docid_list[i])) { continue; } @@ -900,7 +947,8 @@ int GammaIVFPQIndex::Search(const VectorQuery *query, const GammaSearchCondition *condition, VectorResult &result) { float *x = reinterpret_cast(query->value->value); - int n = query->value->len / (d * sizeof(float)); + int raw_d = raw_vec_->GetDimension(); + size_t n = query->value->len / (raw_d * sizeof(float)); if (condition->metric_type == InnerProduct) { metric_type = faiss::METRIC_INNER_PRODUCT; @@ -908,13 +956,30 @@ int GammaIVFPQIndex::Search(const VectorQuery *query, metric_type = faiss::METRIC_L2; } idx_t *idx = reinterpret_cast(result.docids); + + float *vec_q = nullptr; + + if (d > raw_d) { + float *vec = new float[n * d]; + + ConvertVectorDim(n, raw_d, d, x, vec); + + vec_q = vec; + } else { + vec_q = x; + } + if (condition->use_direct_search) { - SearchDirectly(n, x, condition, result.dists, idx, result.total.data()); + SearchDirectly(n, vec_q, condition, result.dists, idx, result.total.data()); } else { - SearchIVFPQ(n, x, condition, result.dists, idx, result.total.data()); + SearchIVFPQ(n, vec_q, condition, result.dists, idx, result.total.data()); } - for (int i = 0; i < n; i++) { + if (d > raw_d) { + delete vec_q; + } + + for (size_t i = 0; i < n; i++) { int pos = 0; std::map docid2count; diff --git a/engine/gamma/search/gamma_common_data.h b/engine/gamma/search/gamma_common_data.h index 6928ba65..79a696b7 100644 --- a/engine/gamma/search/gamma_common_data.h +++ b/engine/gamma/search/gamma_common_data.h @@ -175,14 +175,10 @@ struct IVFPQParamHelper { void SetDefaultValue() { if (ivfpq_param_->metric_type == -1) ivfpq_param_->metric_type = InnerProduct; - if (ivfpq_param_->nprobe == -1) - ivfpq_param_->nprobe = 20; - if (ivfpq_param_->ncentroids == -1) - ivfpq_param_->ncentroids = 256; - if (ivfpq_param_->nsubvector == -1) - ivfpq_param_->nsubvector = 64; - if (ivfpq_param_->nbits_per_idx == -1) - ivfpq_param_->nbits_per_idx = 8; + if (ivfpq_param_->nprobe == -1) ivfpq_param_->nprobe = 20; + if (ivfpq_param_->ncentroids == -1) ivfpq_param_->ncentroids = 256; + if (ivfpq_param_->nsubvector == -1) ivfpq_param_->nsubvector = 64; + if (ivfpq_param_->nbits_per_idx == -1) ivfpq_param_->nbits_per_idx = 8; } bool Validate() { diff --git a/engine/gamma/tests/README.md b/engine/gamma/tests/README.md index aabda206..eedbb6d8 100644 --- a/engine/gamma/tests/README.md +++ b/engine/gamma/tests/README.md @@ -21,4 +21,4 @@ op8=>operation: Close engine st->op1->op2->op3->op4->op5->op6->op7->op8->e ``` ## test -`./test_files profile_10k.txt siftsmall_base.fvecs` +`./test_files profile_10k.txt siftsmall_base.fvecs` \ No newline at end of file diff --git a/engine/gamma/tests/test_files.cc b/engine/gamma/tests/test_files.cc index f5b08106..8a54bbc5 100644 --- a/engine/gamma/tests/test_files.cc +++ b/engine/gamma/tests/test_files.cc @@ -32,7 +32,7 @@ struct Options { max_doc_size = 10000 * 200; add_doc_num = 10000 * 100; search_num = 10000 * 1; - fields_vec = {"sku", "_id", "cid1", "cid2", "cid3"}; + fields_vec = {"key", "_id", "field1", "field2", "field3"}; fields_type = {LONG, STRING, STRING, INT, INT}; vector_name = "abc"; path = "files"; @@ -144,7 +144,7 @@ int SearchThread(void *engine, size_t num) { if (idx % 1000 == 0) LOG(INFO) << "idx=" << idx; - string name = "cid2"; + string name = "field2"; RangeFilter **range_filters = MakeRangeFilters(2); RangeFilter *range_filter = MakeRangeFilter(StringToByteArray(name), StringToByteArray(c1_lower), @@ -155,7 +155,7 @@ int SearchThread(void *engine, size_t num) { upper = 999999; c1_lower = string((char *)&low, sizeof(low)); c1_upper = string((char *)&upper, sizeof(upper)); - name = "cid3"; + name = "field3"; range_filter = MakeRangeFilter(StringToByteArray(name), StringToByteArray(c1_lower), StringToByteArray(c1_upper), false, true); @@ -165,7 +165,7 @@ int SearchThread(void *engine, size_t num) { TermFilter *term_filter; std::string term_low = string("1315\00115248"); - name = "cid1"; + name = "field1"; term_filter = MakeTermFilter(StringToByteArray(name), StringToByteArray(term_low), true); SetTermFilter(term_filters, 0, term_filter); diff --git a/engine/gamma/vector/mmap_raw_vector.cc b/engine/gamma/vector/mmap_raw_vector.cc index 1a3fc9aa..c10f2d99 100644 --- a/engine/gamma/vector/mmap_raw_vector.cc +++ b/engine/gamma/vector/mmap_raw_vector.cc @@ -77,7 +77,7 @@ MmapRawVector::~MmapRawVector() { } int MmapRawVector::Init() { - max_buffer_size_ = (int)((long)store_params_->cache_size_ * 1024 * 1024 / vector_byte_size_); + max_buffer_size_ = (int)(store_params_->cache_size_ / vector_byte_size_); fet_fd_ = open(fet_file_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT, 00664); if (fet_fd_ == -1) { @@ -91,6 +91,10 @@ int MmapRawVector::Init() { return -1; } + int remainder = max_buffer_size_ % buffer_chunk_num_; + if (remainder > 0) { + max_buffer_size_ += buffer_chunk_num_ - remainder; + } vector_buffer_queue_ = new VectorBufferQueue(max_buffer_size_, dimension_, buffer_chunk_num_); vector_file_mapper_ = diff --git a/engine/gamma/vector/mmap_raw_vector.h b/engine/gamma/vector/mmap_raw_vector.h index b2d6a8fa..0c39935b 100644 --- a/engine/gamma/vector/mmap_raw_vector.h +++ b/engine/gamma/vector/mmap_raw_vector.h @@ -5,8 +5,8 @@ * found in the LICENSE file in the root directory of this source tree. */ -#ifndef MEMORY_DISK_RAW_VECTOR_H_ -#define MEMORY_DISK_RAW_VECTOR_H_ +#ifndef MMAP_RAW_VECTOR_H_ +#define MMAP_RAW_VECTOR_H_ #include "raw_vector.h" #include "vector_buffer_queue.h" @@ -49,4 +49,4 @@ class MmapRawVector : public RawVector, public AsyncFlusher { } // namespace tig_gamma -#endif +#endif // MMAP_RAW_VECTOR_H_ diff --git a/engine/gamma/vector/raw_vector.cc b/engine/gamma/vector/raw_vector.cc index d864b86b..52176185 100644 --- a/engine/gamma/vector/raw_vector.cc +++ b/engine/gamma/vector/raw_vector.cc @@ -269,11 +269,11 @@ int StoreParams::Parse(const char *str) { double cache_size = 0; if (!jp.GetDouble("cache_size", cache_size)) { if (cache_size > MAX_CACHE_SIZE || cache_size < 0) { - LOG(ERROR) << "invalid cache size=" << cache_size - << ", limit size=" << MAX_CACHE_SIZE; + LOG(ERROR) << "invalid cache size=" << cache_size << "M" + << ", limit size=" << MAX_CACHE_SIZE << "M"; return -1; } - cache_size_ = (int)cache_size; + cache_size_ = (long)cache_size * 1024 * 1024; } return 0; diff --git a/engine/gamma/vector/raw_vector.h b/engine/gamma/vector/raw_vector.h index 1acc9526..7a7e0f14 100644 --- a/engine/gamma/vector/raw_vector.h +++ b/engine/gamma/vector/raw_vector.h @@ -174,9 +174,9 @@ void StartFlushingIfNeed(RawVector *vec); void StopFlushingIfNeed(RawVector *vec); struct StoreParams { - int cache_size_; // M bytes + long cache_size_; // bytes - StoreParams() { cache_size_ = 1024 * 2; } + StoreParams() { cache_size_ = -1; } StoreParams(const StoreParams &other) { this->cache_size_ = other.cache_size_; } diff --git a/engine/gamma/vector/raw_vector_factory.h b/engine/gamma/vector/raw_vector_factory.h index 4f7eaaa4..b95769d3 100644 --- a/engine/gamma/vector/raw_vector_factory.h +++ b/engine/gamma/vector/raw_vector_factory.h @@ -15,6 +15,7 @@ #include "rocksdb_raw_vector.h" #endif // WITH_ROCKSDB +#include "gamma_common_data.h" #include namespace tig_gamma { @@ -28,6 +29,8 @@ class RawVectorFactory { StoreParams store_params; if (store_param != "" && store_params.Parse(store_param.c_str())) return nullptr; + if (store_params.cache_size_ == -1) + store_params.cache_size_ = max_doc_size * dimension * sizeof(float); LOG(INFO) << "store parameters=" << store_params.ToString(); switch (type) { case Mmap: diff --git a/engine/gamma/vector/rocksdb_raw_vector.cc b/engine/gamma/vector/rocksdb_raw_vector.cc index 82789d6c..d57c7f7b 100644 --- a/engine/gamma/vector/rocksdb_raw_vector.cc +++ b/engine/gamma/vector/rocksdb_raw_vector.cc @@ -41,7 +41,7 @@ RocksDBRawVector::~RocksDBRawVector() { } int RocksDBRawVector::Init() { - block_cache_size_ = store_params_->cache_size_ * 1024 * 1024; + block_cache_size_ = store_params_->cache_size_; raw_vector_io_ = new RawVectorIO(this); if (raw_vector_io_->Init(true)) { diff --git a/engine/gamma/vector/vector_buffer_queue.cc b/engine/gamma/vector/vector_buffer_queue.cc index 5cb3f011..09f6041b 100644 --- a/engine/gamma/vector/vector_buffer_queue.cc +++ b/engine/gamma/vector/vector_buffer_queue.cc @@ -50,6 +50,13 @@ int VectorBufferQueue::Init(string &fet_file_path) { if (dimension_ <= 0) { return 1; } + if (max_vector_size_ % chunk_num_ != 0) { + LOG(ERROR) << "max_vector_size(" << max_vector_size_ << ") % chunk_num(" + << chunk_num_ << ") != 0"; + return 1; + } + chunk_size_ = max_vector_size_ / chunk_num_; + vector_byte_size_ = sizeof(float) * dimension_; buffer_ = (float *)calloc(max_vector_size_, vector_byte_size_); if (buffer_ == NULL) { @@ -90,12 +97,6 @@ int VectorBufferQueue::Init(string &fet_file_path) { pop_index_ = push_index_; } - if (max_vector_size_ % chunk_num_ != 0) { - LOG(ERROR) << "max_vector_size_ % chunk_num_ != 0"; - return 1; - } - chunk_size_ = max_vector_size_ / chunk_num_; - shared_mutexes_ = new pthread_rwlock_t[chunk_num_]; for (int i = 0; i < chunk_num_; i++) { int ret = pthread_rwlock_init(&shared_mutexes_[i], NULL); @@ -107,6 +108,7 @@ int VectorBufferQueue::Init(string &fet_file_path) { LOG(INFO) << "vector buffer queue init success! buffer byte size=" << (long)max_vector_size_ * vector_byte_size_ << ", buffer vector size=" << max_vector_size_ + << ", chunk number=" << chunk_num_ << ", stored number=" << stored_num_; return 0; } diff --git a/engine/gamma/vector/vector_manager.cc b/engine/gamma/vector/vector_manager.cc index dcd18ea6..c63834cb 100644 --- a/engine/gamma/vector/vector_manager.cc +++ b/engine/gamma/vector/vector_manager.cc @@ -18,9 +18,9 @@ static bool InnerProductCmp(const VectorDoc &a, const VectorDoc &b) { return a.score > b.score; } -static bool L2Cmp(const VectorDoc &a, const VectorDoc &b) { - return a.score < b.score; -} +// static bool L2Cmp(const VectorDoc &a, const VectorDoc &b) { +// return a.score < b.score; +// } static ByteArray *CopyByteArray(ByteArray *ba) { return MakeByteArray(ba->value, ba->len); @@ -233,7 +233,8 @@ int VectorManager::Search(const GammaQuery &query, GammaResult *results) { return -1; } - n = query.vec_query[i]->value->len / (sizeof(float) * iter->second->d_); + int d = iter->second->raw_vec_->GetDimension(); + n = query.vec_query[i]->value->len / (sizeof(float) * d); if (!all_vector_results[i].init(n, query.condition->topn)) { LOG(ERROR) << "Query name " << name << "init vector result error"; return -1;