Skip to content

Commit

Permalink
fix the bugs below:
Browse files Browse the repository at this point in the history
any dimension(integer) is ok.
any positive cache_size is ok.
  • Loading branch information
ljeagle committed Nov 15, 2019
1 parent 7b3ddb2 commit dba224c
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 64 deletions.
2 changes: 1 addition & 1 deletion engine/gamma/index/gamma_index.h
Expand Up @@ -137,7 +137,7 @@ struct GammaIndex {
virtual int Dump(const std::string &dir) = 0;
virtual int Load(const std::vector<std::string> &index_dirs) = 0;

size_t d_;
int d_;

const char *docids_bitmap_;
RawVector *raw_vec_;
Expand Down
43 changes: 27 additions & 16 deletions engine/gamma/index/gamma_index_factory.h
Expand Up @@ -17,7 +17,7 @@
namespace tig_gamma {

class GammaIndexFactory {
public:
public:
static GammaIndex *Create(RetrievalModel model, size_t dimension,
const char *docids_bitmap, RawVector *raw_vec,
IVFPQParameters *ivfpq_param) {
Expand All @@ -26,24 +26,35 @@ class GammaIndexFactory {
return nullptr;
}
switch (model) {
case IVFPQ: {
faiss::IndexFlatL2 *coarse_quantizer = new faiss::IndexFlatL2(dimension);
return (GammaIndex *)new GammaIVFPQIndex(
coarse_quantizer, dimension, ivfpq_param->ncentroids,
ivfpq_param->nsubvector, ivfpq_param->nbits_per_idx, docids_bitmap,
raw_vec, ivfpq_param->nprobe);
break;
}

default: {
throw std::invalid_argument("invalid raw feature type");
break;
}
case IVFPQ: {
if (dimension % ivfpq_param->nsubvector != 0) {
dimension = (dimension / ivfpq_param->nsubvector + 1) *
ivfpq_param->nsubvector;
LOG(INFO) << "Dimension [" << raw_vec->GetDimension()
<< "] cannot divide by nsubvector ["
<< ivfpq_param->nsubvector << "], adjusted to ["
<< dimension << "]";
}

faiss::IndexFlatL2 *coarse_quantizer =
new faiss::IndexFlatL2(dimension);

return (GammaIndex *)new GammaIVFPQIndex(
coarse_quantizer, dimension, ivfpq_param->ncentroids,
ivfpq_param->nsubvector, ivfpq_param->nbits_per_idx, docids_bitmap,
raw_vec, ivfpq_param->nprobe);
break;
}

default: {
throw std::invalid_argument("invalid raw feature type");
break;
}
}

return nullptr;
}
};
} // namespace tig_gamma
} // namespace tig_gamma

#endif // GAMMA_INDEX_FACTORY_H_
#endif // GAMMA_INDEX_FACTORY_H_
93 changes: 79 additions & 14 deletions engine/gamma/index/gamma_index_ivfpq.cc
Expand Up @@ -32,6 +32,18 @@

namespace tig_gamma {

static inline void ConvertVectorDim(size_t num, int raw_d, int d,
const float *raw_vec, float *vec) {
memset(vec, 0, num * d * sizeof(float));

#pragma omp parallel for
for (size_t i = 0; i < num; ++i) {
for (int j = 0; j < raw_d; ++j) {
vec[i * d + j] = raw_vec[i * raw_d + j];
}
}
}

IndexIVFPQStats indexIVFPQ_stats;

GammaIVFPQIndex::GammaIVFPQIndex(faiss::Index *quantizer, size_t d,
Expand Down Expand Up @@ -119,10 +131,31 @@ int GammaIVFPQIndex::Indexing() {
<< "] less then 8192, failed!";
return -1;
}
int num = vectors_count > 100000 ? 100000 : vectors_count;
size_t num = vectors_count > 100000 ? 100000 : vectors_count;
const float *header = raw_vec_->GetVectorHeader(0, num);
train(num, header);

int raw_d = raw_vec_->GetDimension();

float *train_vec = nullptr;

if (d_ > raw_d) {
float *vec = new float[num * d_];

ConvertVectorDim(num, raw_d, d, header, vec);

train_vec = vec;
} else {
train_vec = const_cast<float *>(header);
}

train(num, train_vec);

if (d_ > raw_d) {
delete train_vec;
}

raw_vec_->Destroy(header, true);

LOG(INFO) << "train successed!";
return 0;
}
Expand All @@ -147,18 +180,32 @@ int GammaIVFPQIndex::AddRTVecsToIndex() {

for (int i = 0; i < index_count; i++) {
int start_docid = indexed_vec_count_;
int count_per_index =
size_t count_per_index =
(i == (index_count - 1) ? total_stored_vecs - start_docid
: MAX_NUM_PER_INDEX);
// const float *index_ptr = raw_vec_->GetVector(start_docid);
const float *vector_head = raw_vec_->GetVectorHeader(
indexed_vec_count_, indexed_vec_count_ + count_per_index);
// const float *index_ptr = vector_head + (uint64_t)start_docid * d_;
const float *index_ptr = vector_head;
if (!Add(count_per_index, index_ptr)) {

int raw_d = raw_vec_->GetDimension();
float *add_vec = nullptr;

if (d_ > raw_d) {
float *vec = new float[count_per_index * d_];

ConvertVectorDim(count_per_index, raw_d, d, vector_head, vec);
add_vec = vec;
} else {
add_vec = const_cast<float *>(vector_head);
}

if (!Add(count_per_index, add_vec)) {
LOG(ERROR) << "add index from docid " << start_docid << " error!";
ret = -2;
}

if (d_ > raw_d) {
delete add_vec;
}
raw_vec_->Destroy(vector_head, true);
}
}
Expand Down Expand Up @@ -310,13 +357,14 @@ void GammaIVFPQIndex::search_preassigned(
float *recall_simi, idx_t *recall_idxi) {
std::vector<const float *> vecs(recall_num);
raw_vec_->Gets(recall_num, recall_idxi, vecs);
int raw_d = raw_vec_->GetDimension();
for (int j = 0; j < recall_num; j++) {
if (recall_idxi[j] == -1) continue;
float dis = 0;
if (metric_type == faiss::METRIC_INNER_PRODUCT) {
dis = faiss::fvec_inner_product(xi, vecs[j], this->d);
dis = faiss::fvec_inner_product(xi, vecs[j], raw_d);
} else {
dis = faiss::fvec_L2sqr(xi, vecs[j], this->d);
dis = faiss::fvec_L2sqr(xi, vecs[j], raw_d);
}

if (((condition->min_dist >= 0 && dis >= condition->min_dist) &&
Expand Down Expand Up @@ -405,7 +453,6 @@ void GammaIVFPQIndex::search_preassigned(
int *vid_list_data = vid_list.data();
int *curr_ptr = vid_list_data;
for (size_t i = 0; i < docid_list.size(); i++) {
// vids_list[i] = this->raw_vec_->docid2vid_[docid_list[i]];
if (bitmap::test(this->docids_bitmap_, docid_list[i])) {
continue;
}
Expand Down Expand Up @@ -900,21 +947,39 @@ int GammaIVFPQIndex::Search(const VectorQuery *query,
const GammaSearchCondition *condition,
VectorResult &result) {
float *x = reinterpret_cast<float *>(query->value->value);
int n = query->value->len / (d * sizeof(float));
int raw_d = raw_vec_->GetDimension();
size_t n = query->value->len / (raw_d * sizeof(float));

if (condition->metric_type == InnerProduct) {
metric_type = faiss::METRIC_INNER_PRODUCT;
} else {
metric_type = faiss::METRIC_L2;
}
idx_t *idx = reinterpret_cast<idx_t *>(result.docids);

float *vec_q = nullptr;

if (d > raw_d) {
float *vec = new float[n * d];

ConvertVectorDim(n, raw_d, d, x, vec);

vec_q = vec;
} else {
vec_q = x;
}

if (condition->use_direct_search) {
SearchDirectly(n, x, condition, result.dists, idx, result.total.data());
SearchDirectly(n, vec_q, condition, result.dists, idx, result.total.data());
} else {
SearchIVFPQ(n, x, condition, result.dists, idx, result.total.data());
SearchIVFPQ(n, vec_q, condition, result.dists, idx, result.total.data());
}

for (int i = 0; i < n; i++) {
if (d > raw_d) {
delete vec_q;
}

for (size_t i = 0; i < n; i++) {
int pos = 0;

std::map<int, int> docid2count;
Expand Down
12 changes: 4 additions & 8 deletions engine/gamma/search/gamma_common_data.h
Expand Up @@ -175,14 +175,10 @@ struct IVFPQParamHelper {
void SetDefaultValue() {
if (ivfpq_param_->metric_type == -1)
ivfpq_param_->metric_type = InnerProduct;
if (ivfpq_param_->nprobe == -1)
ivfpq_param_->nprobe = 20;
if (ivfpq_param_->ncentroids == -1)
ivfpq_param_->ncentroids = 256;
if (ivfpq_param_->nsubvector == -1)
ivfpq_param_->nsubvector = 64;
if (ivfpq_param_->nbits_per_idx == -1)
ivfpq_param_->nbits_per_idx = 8;
if (ivfpq_param_->nprobe == -1) ivfpq_param_->nprobe = 20;
if (ivfpq_param_->ncentroids == -1) ivfpq_param_->ncentroids = 256;
if (ivfpq_param_->nsubvector == -1) ivfpq_param_->nsubvector = 64;
if (ivfpq_param_->nbits_per_idx == -1) ivfpq_param_->nbits_per_idx = 8;
}

bool Validate() {
Expand Down
2 changes: 1 addition & 1 deletion engine/gamma/tests/README.md
Expand Up @@ -21,4 +21,4 @@ op8=>operation: Close engine
st->op1->op2->op3->op4->op5->op6->op7->op8->e
```
## test
`./test_files profile_10k.txt siftsmall_base.fvecs`
`./test_files profile_10k.txt siftsmall_base.fvecs`
8 changes: 4 additions & 4 deletions engine/gamma/tests/test_files.cc
Expand Up @@ -32,7 +32,7 @@ struct Options {
max_doc_size = 10000 * 200;
add_doc_num = 10000 * 100;
search_num = 10000 * 1;
fields_vec = {"sku", "_id", "cid1", "cid2", "cid3"};
fields_vec = {"key", "_id", "field1", "field2", "field3"};
fields_type = {LONG, STRING, STRING, INT, INT};
vector_name = "abc";
path = "files";
Expand Down Expand Up @@ -144,7 +144,7 @@ int SearchThread(void *engine, size_t num) {

if (idx % 1000 == 0) LOG(INFO) << "idx=" << idx;

string name = "cid2";
string name = "field2";
RangeFilter **range_filters = MakeRangeFilters(2);
RangeFilter *range_filter =
MakeRangeFilter(StringToByteArray(name), StringToByteArray(c1_lower),
Expand All @@ -155,7 +155,7 @@ int SearchThread(void *engine, size_t num) {
upper = 999999;
c1_lower = string((char *)&low, sizeof(low));
c1_upper = string((char *)&upper, sizeof(upper));
name = "cid3";
name = "field3";
range_filter =
MakeRangeFilter(StringToByteArray(name), StringToByteArray(c1_lower),
StringToByteArray(c1_upper), false, true);
Expand All @@ -165,7 +165,7 @@ int SearchThread(void *engine, size_t num) {
TermFilter *term_filter;

std::string term_low = string("1315\00115248");
name = "cid1";
name = "field1";
term_filter = MakeTermFilter(StringToByteArray(name),
StringToByteArray(term_low), true);
SetTermFilter(term_filters, 0, term_filter);
Expand Down
6 changes: 5 additions & 1 deletion engine/gamma/vector/mmap_raw_vector.cc
Expand Up @@ -77,7 +77,7 @@ MmapRawVector::~MmapRawVector() {
}

int MmapRawVector::Init() {
max_buffer_size_ = (int)((long)store_params_->cache_size_ * 1024 * 1024 / vector_byte_size_);
max_buffer_size_ = (int)(store_params_->cache_size_ / vector_byte_size_);

fet_fd_ = open(fet_file_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT, 00664);
if (fet_fd_ == -1) {
Expand All @@ -91,6 +91,10 @@ int MmapRawVector::Init() {
return -1;
}

int remainder = max_buffer_size_ % buffer_chunk_num_;
if (remainder > 0) {
max_buffer_size_ += buffer_chunk_num_ - remainder;
}
vector_buffer_queue_ =
new VectorBufferQueue(max_buffer_size_, dimension_, buffer_chunk_num_);
vector_file_mapper_ =
Expand Down
6 changes: 3 additions & 3 deletions engine/gamma/vector/mmap_raw_vector.h
Expand Up @@ -5,8 +5,8 @@
* found in the LICENSE file in the root directory of this source tree.
*/

#ifndef MEMORY_DISK_RAW_VECTOR_H_
#define MEMORY_DISK_RAW_VECTOR_H_
#ifndef MMAP_RAW_VECTOR_H_
#define MMAP_RAW_VECTOR_H_

#include "raw_vector.h"
#include "vector_buffer_queue.h"
Expand Down Expand Up @@ -49,4 +49,4 @@ class MmapRawVector : public RawVector, public AsyncFlusher {

} // namespace tig_gamma

#endif
#endif // MMAP_RAW_VECTOR_H_
6 changes: 3 additions & 3 deletions engine/gamma/vector/raw_vector.cc
Expand Up @@ -269,11 +269,11 @@ int StoreParams::Parse(const char *str) {
double cache_size = 0;
if (!jp.GetDouble("cache_size", cache_size)) {
if (cache_size > MAX_CACHE_SIZE || cache_size < 0) {
LOG(ERROR) << "invalid cache size=" << cache_size
<< ", limit size=" << MAX_CACHE_SIZE;
LOG(ERROR) << "invalid cache size=" << cache_size << "M"
<< ", limit size=" << MAX_CACHE_SIZE << "M";
return -1;
}
cache_size_ = (int)cache_size;
cache_size_ = (long)cache_size * 1024 * 1024;
}

return 0;
Expand Down
4 changes: 2 additions & 2 deletions engine/gamma/vector/raw_vector.h
Expand Up @@ -174,9 +174,9 @@ void StartFlushingIfNeed(RawVector *vec);
void StopFlushingIfNeed(RawVector *vec);

struct StoreParams {
int cache_size_; // M bytes
long cache_size_; // bytes

StoreParams() { cache_size_ = 1024 * 2; }
StoreParams() { cache_size_ = -1; }
StoreParams(const StoreParams &other) {
this->cache_size_ = other.cache_size_;
}
Expand Down
3 changes: 3 additions & 0 deletions engine/gamma/vector/raw_vector_factory.h
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb_raw_vector.h"
#endif // WITH_ROCKSDB

#include "gamma_common_data.h"
#include <string>

namespace tig_gamma {
Expand All @@ -28,6 +29,8 @@ class RawVectorFactory {
StoreParams store_params;
if (store_param != "" && store_params.Parse(store_param.c_str()))
return nullptr;
if (store_params.cache_size_ == -1)
store_params.cache_size_ = max_doc_size * dimension * sizeof(float);
LOG(INFO) << "store parameters=" << store_params.ToString();
switch (type) {
case Mmap:
Expand Down
2 changes: 1 addition & 1 deletion engine/gamma/vector/rocksdb_raw_vector.cc
Expand Up @@ -41,7 +41,7 @@ RocksDBRawVector::~RocksDBRawVector() {
}

int RocksDBRawVector::Init() {
block_cache_size_ = store_params_->cache_size_ * 1024 * 1024;
block_cache_size_ = store_params_->cache_size_;

raw_vector_io_ = new RawVectorIO(this);
if (raw_vector_io_->Init(true)) {
Expand Down

0 comments on commit dba224c

Please sign in to comment.