Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fulltext benchmark improvement #1263

Merged
merged 7 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/references/benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ psql -h 0.0.0.0 -p 5432 -c "SELECT doctitle, ROW_ID(), SCORE() FROM infinity_enw
| | Time to insert & build index | Time to import & build index | P95 Latency(ms)| QPS (16 python clients) | Memory | vCPU |
| ----------------- | ---------------------------- | ---------------------------- | ---------------| ------------------------| --------| ----- |
| **Elasticsearch** | 2289 s | N/A | 14.75 | 1340 | 21.0GB | 10.6 |
| **Infinity** | 2321 s | 2890 s | 1.86 | 12328 | 10.0GB | 11.0 |
| **Infinity** | 1562 s | 2244 s | 1.86 | 12328 | 10.0GB | 11.0 |

---

Expand Down
46 changes: 39 additions & 7 deletions python/benchmark/clients/infinity_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,41 @@
import h5py
from typing import Any
import logging
import requests

import infinity
import infinity.index as index
from infinity import NetworkAddress
from .base_client import BaseClient


class InfinityHttpClient:
def __init__(self, db_name, table_name):
self.url = (
"http://localhost:23820/" + f"databases/{db_name}/tables/{table_name}/docs"
)
self.headers = {
"accept": "application/json",
"content-type": "application/json",
}

def request(self, method, data={}):
match method:
case "get":
response = requests.get(self.url, headers=self.headers, json=data)
case "post":
response = requests.post(self.url, headers=self.headers, json=data)
case "put":
response = requests.put(self.url, headers=self.headers, json=data)
case "delete":
response = requests.delete(self.url, headers=self.headers, json=data)
return response

def insert(self, values=[]):
r = self.request("post", values)
return r


class InfinityClient(BaseClient):
def __init__(self, conf_path: str) -> None:
"""
Expand Down Expand Up @@ -45,6 +73,13 @@ def upload(self):
db_obj.drop_table(self.table_name)
db_obj.create_table(self.table_name, self.data["schema"])
table_obj = db_obj.get_table(self.table_name)
# create index
indexs = self._parse_index_schema(self.data["index"])
for i, idx in enumerate(indexs):
table_obj.create_index(f"index{i}", [idx])

inf_http_client = InfinityHttpClient("default_db", self.table_name)

dataset_path = os.path.join(self.path_prefix, self.data["data_path"])
if not os.path.exists(dataset_path):
self.download_data(self.data["data_link"], dataset_path)
Expand Down Expand Up @@ -104,16 +139,13 @@ def upload(self):
}
current_batch.append(row_dict)
if len(current_batch) >= batch_size:
table_obj.insert(current_batch)
# table_obj.insert(current_batch)
inf_http_client.insert(current_batch)
current_batch = []

if current_batch:
table_obj.insert(current_batch)

# create index
indexs = self._parse_index_schema(self.data["index"])
for i, idx in enumerate(indexs):
table_obj.create_index(f"index{i}", [idx])
# table_obj.insert(current_batch)
inf_http_client.insert(current_batch)

def setup_clients(self, num_threads=1):
host, port = self.data["host"].split(":")
Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/configs/infinity_enwiki.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"query_link": "to_be_set",
"mode": "fulltext",
"topK": 10,
"use_import": true,
"use_import": false,
"schema": {
"doctitle": {"type": "varchar", "default":""},
"docdate": {"type": "varchar", "default":""},
Expand Down
2 changes: 2 additions & 0 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ void InfinityContext::Init(const SharedPtr<String> &config_path) {
storage_ = MakeUnique<Storage>(config_.get());
storage_->Init();

inverting_thread_pool_.resize(config_->CPULimit());
commiting_thread_pool_.resize(config_->CPULimit());
initialized_ = true;
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/infinity_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import task_scheduler;
import storage;
import singleton;
import session_manager;
import third_party;

namespace infinity {

Expand All @@ -38,6 +39,9 @@ public:

[[nodiscard]] inline SessionManager *session_manager() noexcept { return session_mgr_.get(); }

[[nodiscard]] inline ThreadPool &GetFulltextInvertingThreadPool() { return inverting_thread_pool_; }
[[nodiscard]] inline ThreadPool &GetFulltextCommitingThreadPool() { return commiting_thread_pool_; }

void Init(const SharedPtr<String> &config_path);

void UnInit();
Expand All @@ -52,6 +56,9 @@ private:
UniquePtr<TaskScheduler> task_scheduler_{};
UniquePtr<Storage> storage_{};
UniquePtr<SessionManager> session_mgr_{};
// For fulltext index
ThreadPool inverting_thread_pool_{4};
ThreadPool commiting_thread_pool_{2};

bool initialized_{false};
};
Expand Down
17 changes: 10 additions & 7 deletions src/storage/invertedindex/format/inmem_doc_list_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ bool InMemDocListDecoder::DecodeSkipList(docid_t start_doc_id, docid_t &prev_las
if (skiplist_reader_ == nullptr) {
prev_last_doc_id = 0;
current_ttf = 0;
return DecodeSkipListWithoutSkipList(0, 0, start_doc_id, last_doc_id);
// If skiplist is absent, we allow doc_id buffer be decoded only once.
// So here we pass zero as the encoded doc_id buffer offset.
return DecodeSkipListWithoutSkipList(last_doc_id_in_prev_record_, 0, start_doc_id, last_doc_id);
}
auto ret = skiplist_reader_->SkipTo((u32)start_doc_id, last_doc_id_, last_doc_id_in_prev_record_, offset_, record_len_);
if (!ret) {
Expand All @@ -53,24 +55,25 @@ bool InMemDocListDecoder::DecodeSkipList(docid_t start_doc_id, docid_t &prev_las
}

bool InMemDocListDecoder::DecodeSkipListWithoutSkipList(docid_t last_doc_id_in_prev_record, u32 offset, docid_t start_doc_id, docid_t &last_doc_id) {
if (finish_decoded_) {
return false;
}
// allocate space
doc_buffer_to_copy_ = new docid_t[MAX_DOC_PER_RECORD];
if (doc_buffer_to_copy_ == nullptr)
doc_buffer_to_copy_ = new docid_t[MAX_DOC_PER_RECORD];

doc_list_reader_.Seek(offset);
finish_decoded_ = false;
if (!doc_list_reader_.Seek(offset)) {
return false;
}
if (!doc_list_reader_.Decode(doc_buffer_to_copy_, MAX_DOC_PER_RECORD, decode_count_)) {
return false;
}
finish_decoded_ = true;
last_doc_id = last_doc_id_in_prev_record;
for (SizeT i = 0; i < decode_count_; ++i) {
last_doc_id += doc_buffer_to_copy_[i];
}
if (start_doc_id > last_doc_id) {
return false;
}
finish_decoded_ = true;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ public:
posting_fields_ = posting_byte_slice_->GetPostingFields();
}

void Seek(u32 pos) {
byte_slice_reader_.Seek(pos);
bool Seek(u32 pos) {
SizeT ret = byte_slice_reader_.Seek(pos);
location_cursor_ = 0;
posting_buffer_cursor_ = 0;
if (ret == ByteSliceReader::BYTE_SLICE_EOF)
return false;
return true;
}

u32 Tell() const { return byte_slice_reader_.Tell(); }
Expand Down
12 changes: 4 additions & 8 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import mmap;
import buf_writer;
import profiler;
import third_party;
import infinity_context;

namespace infinity {
constexpr int MAX_TUPLE_LENGTH = 1024; // we assume that analyzed term, together with docid/offset info, will never exceed such length
Expand All @@ -69,15 +70,10 @@ bool MemoryIndexer::KeyComp::operator()(const String &lhs, const String &rhs) co

MemoryIndexer::PostingTable::PostingTable() {}

MemoryIndexer::MemoryIndexer(const String &index_dir,
const String &base_name,
RowID base_row_id,
optionflag_t flag,
const String &analyzer,
ThreadPool &inverting_thread_pool,
ThreadPool &commiting_thread_pool)
MemoryIndexer::MemoryIndexer(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag, const String &analyzer)
: index_dir_(index_dir), base_name_(base_name), base_row_id_(base_row_id), flag_(flag), analyzer_(analyzer),
inverting_thread_pool_(inverting_thread_pool), commiting_thread_pool_(commiting_thread_pool), ring_inverted_(15UL), ring_sorted_(13UL) {
inverting_thread_pool_(infinity::InfinityContext::instance().GetFulltextInvertingThreadPool()),
commiting_thread_pool_(infinity::InfinityContext::instance().GetFulltextCommitingThreadPool()), ring_inverted_(15UL), ring_sorted_(13UL) {
posting_table_ = MakeShared<PostingTable>();
prepared_posting_ = MakeShared<PostingWriter>(PostingFormatOption(flag_), column_lengths_);
Path path = Path(index_dir) / (base_name + ".tmp.merge");
Expand Down
8 changes: 1 addition & 7 deletions src/storage/invertedindex/memory_indexer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ public:
PostingTableStore store_;
};

MemoryIndexer(const String &index_dir,
const String &base_name,
RowID base_row_id,
optionflag_t flag,
const String &analyzer,
ThreadPool &inverting_thread_pool,
ThreadPool &commiting_thread_pool);
MemoryIndexer(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag, const String &analyzer);

~MemoryIndexer();

Expand Down
11 changes: 6 additions & 5 deletions src/storage/io/byte_slice_reader.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module;

#include <cassert>

module byte_slice_reader;

import stl;
Expand Down Expand Up @@ -111,10 +113,8 @@ SizeT ByteSliceReader::ReadMayCopy(void *&value, SizeT len) {

SizeT ByteSliceReader::Seek(SizeT offset) {
if (offset < global_offset_) {
// fmt::format("invalid offset value: seek offset = {}, State: list length = {}, offset = {}", offset, GetSize(), global_offset_));
String error_message = "Invalid offset value";
LOG_CRITICAL(error_message);
UnrecoverableError(error_message);
// seeking backward is disallowed
return BYTE_SLICE_EOF;
}

SizeT len = offset - global_offset_;
Expand All @@ -125,6 +125,7 @@ SizeT ByteSliceReader::Seek(SizeT offset) {
if (current_slice_offset_ + len < GetSliceDataSize(current_slice_)) {
current_slice_offset_ += len;
global_offset_ += len;
assert(global_offset_ == offset);
return global_offset_;
} else {
// current byteslice is not long enough, seek to next byteslices
Expand Down Expand Up @@ -154,7 +155,7 @@ SizeT ByteSliceReader::Seek(SizeT offset) {
current_slice_ = ByteSlice::GetEmptySlice();
current_slice_offset_ = 0;
}

assert(global_offset_ == offset);
return global_offset_;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/io/byte_slice_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace infinity {

export class ByteSliceReader {
public:
static const int BYTE_SLICE_EOF = -1;
static const SizeT BYTE_SLICE_EOF = -1;

public:
ByteSliceReader();
Expand Down
4 changes: 2 additions & 2 deletions src/storage/io/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Pair<UniquePtr<FileHandler>, Status> LocalFileSystem::OpenFile(const String &pat
if (read_flag && write_flag) {
file_flags = O_RDWR;
} else if (read_flag) {
file_flags = O_RDONLY;
file_flags = O_RDONLY | O_NOATIME;
} else if (write_flag) {
file_flags = O_WRONLY;
} else {
Expand Down Expand Up @@ -369,7 +369,7 @@ int LocalFileSystem::MmapFile(const String &file_path, u8 *&data_ptr, SizeT &dat
long len_f = fs::file_size(file_path);
if (len_f == 0)
return -1;
int f = open(file_path.c_str(), O_RDONLY);
int f = open(file_path.c_str(), O_RDONLY | O_NOATIME);
void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0);
if (tmpd == MAP_FAILED)
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ void BlockEntry::UpdateBlockReplay(SharedPtr<BlockEntry> block_entry, String blo

SizeT BlockEntry::row_count(TxnTimeStamp check_ts) const {
std::shared_lock lock(rw_locker_);
if (check_ts >= max_row_ts_)
return row_count_;

auto block_version_handle = this->block_version_->Load();
const auto *block_version = reinterpret_cast<const BlockVersion *>(block_version_handle.GetData());
Expand Down
22 changes: 5 additions & 17 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
base_name,
begin_row_id,
index_fulltext->flag_,
index_fulltext->analyzer_,
table_index_entry_->GetFulltextInvertingThreadPool(),
table_index_entry_->GetFulltextCommitingThreadPool());
index_fulltext->analyzer_);
}
table_index_entry_->UpdateFulltextSegmentTs(commit_ts);
} else {
Expand Down Expand Up @@ -396,13 +394,8 @@ void SegmentIndexEntry::MemIndexLoad(const String &base_name, RowID base_row_id)
// Init the mem index from previously spilled one.
assert(memory_indexer_.get() == nullptr);
const IndexFullText *index_fulltext = static_cast<const IndexFullText *>(index_base);
memory_indexer_ = MakeUnique<MemoryIndexer>(*table_index_entry_->index_dir(),
base_name,
base_row_id,
index_fulltext->flag_,
index_fulltext->analyzer_,
table_index_entry_->GetFulltextInvertingThreadPool(),
table_index_entry_->GetFulltextCommitingThreadPool());
memory_indexer_ =
MakeUnique<MemoryIndexer>(*table_index_entry_->index_dir(), base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_);
memory_indexer_->Load();
}

Expand Down Expand Up @@ -435,13 +428,8 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn
u32 seg_id = segment_entry->segment_id();
RowID base_row_id(seg_id, 0);
String base_name = fmt::format("ft_{:016x}", base_row_id.ToUint64());
memory_indexer_ = MakeUnique<MemoryIndexer>(*table_index_entry_->index_dir(),
base_name,
base_row_id,
index_fulltext->flag_,
index_fulltext->analyzer_,
table_index_entry_->GetFulltextInvertingThreadPool(),
table_index_entry_->GetFulltextCommitingThreadPool());
memory_indexer_ =
MakeUnique<MemoryIndexer>(*table_index_entry_->index_dir(), base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_);
u64 column_id = column_def->id();
auto block_entry_iter = BlockEntryIter(segment_entry);
for (const auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,13 @@ void TableEntry::MemIndexInsertInner(TableIndexEntry *table_index_entry, Txn *tx
if (block_entry->GetAvailableCapacity() <= 0)
dump_idx = i;
}

for (SizeT i = 0; i < num_ranges; i++) {
AppendRange &range = append_ranges[i];
SharedPtr<BlockEntry> block_entry = block_entries[i];
segment_index_entry->MemIndexInsert(block_entry, range.start_offset_, range.row_count_, txn->CommitTS(), txn->buffer_mgr());
if (i == dump_idx && segment_index_entry->MemIndexRowCount() >= infinity::InfinityContext::instance().config()->MemIndexCapacity()) {
if ((i == dump_idx && segment_index_entry->MemIndexRowCount() >= infinity::InfinityContext::instance().config()->MemIndexCapacity()) ||
(i == num_ranges - 1 && segment_entry->Room() <= 0)) {
SharedPtr<ChunkIndexEntry> chunk_index_entry = segment_index_entry->MemIndexDump();
if (chunk_index_entry.get() != nullptr) {
chunk_index_entry->Commit(txn->CommitTS());
Expand Down
3 changes: 1 addition & 2 deletions src/storage/meta/entry/table_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ TableIndexEntry::TableIndexEntry(const SharedPtr<IndexBase> &index_base,
TransactionID txn_id,
TxnTimeStamp begin_ts)
: BaseEntry(EntryType::kTableIndex, is_delete, TableIndexEntry::EncodeIndex(*index_base->index_name_, table_index_meta)),
inverting_thread_pool_(4), commiting_thread_pool_(2), table_index_meta_(table_index_meta), index_base_(std::move(index_base)),
index_dir_(index_entry_dir) {
table_index_meta_(table_index_meta), index_base_(std::move(index_base)), index_dir_(index_entry_dir) {
if (!is_delete) {
assert(index_base.get() != nullptr);
const String &column_name = index_base->column_name();
Expand Down
4 changes: 0 additions & 4 deletions src/storage/meta/entry/table_index_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ public:

Status CreateIndexDo(BaseTableRef *table_ref, HashMap<SegmentID, atomic_u64> &create_index_idxes, Txn *txn);

ThreadPool &GetFulltextInvertingThreadPool() { return inverting_thread_pool_; }
ThreadPool &GetFulltextCommitingThreadPool() { return commiting_thread_pool_; }
TxnTimeStamp GetFulltexSegmentUpdateTs() {
std::shared_lock lock(segment_update_ts_mutex_);
return segment_update_ts_;
Expand All @@ -148,8 +146,6 @@ private:

private:
// For fulltext index
ThreadPool inverting_thread_pool_{};
ThreadPool commiting_thread_pool_{};
std::shared_mutex segment_update_ts_mutex_{};
TxnTimeStamp segment_update_ts_{0};

Expand Down
Loading
Loading