Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize index creation time #1240

Merged
merged 18 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 26 additions & 14 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import infinity_exception;
import third_party;
import status;
import logger;
import buf_writer;
import profiler;
import third_party;

namespace infinity {

Expand Down Expand Up @@ -253,24 +256,32 @@ void ColumnInverter::SortForOfflineDump() {
// +-----------+ +----------------++--------------------++--------------------------++-------------------------------------------------------+
// ----------------------------------------------------------------------------------------------------------------------------+
// Data within each group
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {

void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer) {
Ma-cat marked this conversation as resolved.
Show resolved Hide resolved
// spill sort results for external merge sort
if (positions_.empty()) {
return;
}
SizeT spill_file_tell = ftell(spill_file);
// size of this Run in bytes
u32 data_size = 0;
u64 data_size_pos = ftell(spill_file);
fwrite(&data_size, sizeof(u32), 1, spill_file);
u64 data_size_pos = spill_file_tell;
buf_writer->Write((const char*)&data_size, sizeof(u32));
spill_file_tell += sizeof(u32);

// number of tuples
u32 num_of_tuples = positions_.size();
tuple_count += num_of_tuples;
fwrite(&num_of_tuples, sizeof(u32), 1, spill_file);
buf_writer->Write((const char*)&num_of_tuples, sizeof(u32));
spill_file_tell += sizeof(u32);

// start offset for next spill
u64 next_start_offset = 0;
u64 next_start_offset_pos = ftell(spill_file);
fwrite(&next_start_offset, sizeof(u64), 1, spill_file);
u64 data_start_offset = ftell(spill_file);
u64 next_start_offset_pos = spill_file_tell;
buf_writer->Write((const char*)&next_start_offset, sizeof(u64));
spill_file_tell += sizeof(u64);

u64 data_start_offset = spill_file_tell;
// sorted data
u32 last_term_num = std::numeric_limits<u32>::max();
StringRef term;
Expand All @@ -282,15 +293,16 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) {
term = GetTermFromNum(last_term_num);
}
record_length = term.size() + sizeof(docid_t) + sizeof(u32) + 1;
fwrite(&record_length, sizeof(u32), 1, spill_file);
fwrite(term.data(), term.size(), 1, spill_file);
fwrite(&str_null, sizeof(char), 1, spill_file);
fwrite(&i.doc_id_, sizeof(docid_t), 1, spill_file);
fwrite(&i.term_pos_, sizeof(u32), 1, spill_file);
}

buf_writer->Write((const char*)&record_length, sizeof(u32));
buf_writer->Write(term.data(), term.size());
buf_writer->Write((const char*)&str_null, sizeof(char));
buf_writer->Write((const char*)&(i.doc_id_), sizeof(docid_t));
buf_writer->Write((const char*)&(i.term_pos_), sizeof(u32));
}
buf_writer->Flush();
// update data size
next_start_offset = ftell(spill_file);
next_start_offset = buf_writer->Tell();
data_size = next_start_offset - data_start_offset;
fseek(spill_file, data_size_pos, SEEK_SET);
fwrite(&data_size, sizeof(u32), 1, spill_file); // update offset for next spill
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import string_ref;
import internal_types;
import posting_writer;
import vector_with_lock;
import buf_writer;

namespace infinity {

Expand Down Expand Up @@ -73,7 +74,7 @@ public:
}
};

void SpillSortResults(FILE *spill_file, u64 &tuple_count);
void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer);

private:
using TermBuffer = Vector<char>;
Expand Down
42 changes: 42 additions & 0 deletions src/storage/invertedindex/common/buf_writer.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
module;

#include <cstring>
#include <cstdio>

export module buf_writer;
import stl;

namespace infinity {
// A simple buffer writer that writes data to a file.
// Now only used for ColumnInverter
// ColumnInverter will use BufWriter sequentially write data and use spill_file pointer randomly write data
export struct BufWriter {
Ma-cat marked this conversation as resolved.
Show resolved Hide resolved
BufWriter(FILE *spill_file, SizeT spill_buf_size) : spill_file_(spill_file), spill_buf_size_(spill_buf_size) {
spill_buffer_ = MakeUnique<char_t[]>(spill_buf_size_);
}

void Write(const char* data, SizeT data_size) {
if (spill_buf_idx_ + data_size > spill_buf_size_) {
Flush();
}
memcpy(spill_buffer_.get() + spill_buf_idx_, data, data_size);
spill_buf_idx_ += data_size;
}

void Flush() {
if (spill_buf_idx_) {
fwrite(spill_buffer_.get(), spill_buf_idx_, 1, spill_file_);
spill_buf_idx_ = 0;
}
}

SizeT Tell() {
return ftell(spill_file_);
}

FILE *spill_file_{nullptr};
SizeT spill_buf_idx_{0};
UniquePtr<char_t[]> spill_buffer_{};
SizeT spill_buf_size_{0};
};
}