Skip to content

Commit

Permalink
feat(tianmu):Increase assertion printing information and optimize cod…
Browse files Browse the repository at this point in the history
…e logic(#1617)
  • Loading branch information
konghaiya authored and mergify[bot] committed Apr 27, 2023
1 parent 4a41514 commit 70f15f1
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 27 deletions.
9 changes: 5 additions & 4 deletions storage/tianmu/core/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,11 @@ int Engine::Init(uint engine_slot) {
[this]() {
for (auto &delta : m_table_deltas) {
TIANMU_LOG(LogCtl_Level::INFO,
"table name: %s, delta table id: %d delta_size: %ld, current load id: %ld, merge id: %ld, current row_id: %ld",
delta.second->FullName().c_str(),
delta.second->GetDeltaTableID(), delta.second->load_id.load() - delta.second->merge_id.load(),
delta.second->load_id.load(), delta.second->merge_id.load(), delta.second->row_id.load());
"table name: %s, delta table id: %d delta_size: %ld, current load id: %ld, merge id: %ld, "
"current row_id: %ld",
delta.second->FullName().c_str(), delta.second->GetDeltaTableID(),
delta.second->load_id.load() - delta.second->merge_id.load(), delta.second->load_id.load(),
delta.second->merge_id.load(), delta.second->row_id.load());
}
}},
{tianmu_sysvar_log_loop_interval * 5,
Expand Down
3 changes: 2 additions & 1 deletion storage/tianmu/data/pack_int.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ void PackInt::LoadValuesFixed(const loader::ValueCache *vc, const std::optional<
auto delta = dpn_->min_i - new_min;

if (dpn_->NullOnly() || dpn_->Uniform()) {
if(data_.ptr_) dealloc(data_.ptr_);
if (data_.ptr_)
dealloc(data_.ptr_);
data_.ptr_ = alloc(new_vt * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(data_.ptr_, 0, new_vt * new_nr);
if (dpn_->Uniform()) {
Expand Down
10 changes: 6 additions & 4 deletions storage/tianmu/data/pack_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,22 @@ class PackInt final : public Pack {

void AppendNull() {
dpn_->numOfRecords++;
if(likely(NotNull(dpn_->numOfRecords-1))){
SetNull(dpn_->numOfRecords-1);
if (likely(NotNull(dpn_->numOfRecords - 1))) {
SetNull(dpn_->numOfRecords - 1);
}
dpn_->numOfNulls++;
}
void SetValD(uint n, double v) {
dpn_->synced = false;
ASSERT(n < dpn_->numOfRecords , "n: "+ std::to_string(n) + " ,dpn->numOfRecords: "+ std::to_string(dpn_->numOfRecords));
ASSERT(n < dpn_->numOfRecords,
"n: " + std::to_string(n) + " ,dpn->numOfRecords: " + std::to_string(dpn_->numOfRecords));
ASSERT(is_real_);
data_.ptr_double_[n] = v;
}
void SetVal64(uint n, uint64_t v) {
dpn_->synced = false;
ASSERT(n < dpn_->numOfRecords , "n: "+ std::to_string(n) + " ,dpn->numOfRecords: "+ std::to_string(dpn_->numOfRecords));
ASSERT(n < dpn_->numOfRecords,
"n: " + std::to_string(n) + " ,dpn->numOfRecords: " + std::to_string(dpn_->numOfRecords));
switch (data_.value_type_) {
case 8:
data_.ptr_int64_[n] = v;
Expand Down
34 changes: 18 additions & 16 deletions storage/tianmu/data/pack_str.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,14 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) {
SetPtrSize(locationInPack, nullptr, 0);
SetNull(locationInPack);
dpn_->numOfNulls++;
if(dpn_->NullOnly()){
if (dpn_->NullOnly()) {
data_.sum_len = 0;
}
} else {
// update non-null to another nonull
auto vsize = GetValueBinary(locationInPack).size();
ASSERT(data_.sum_len >= vsize, col_share_->DataFile() + ", data_.sum_len:" + std::to_string(data_.sum_len) + ", vsize:"+std::to_string(vsize));
ASSERT(data_.sum_len >= vsize, col_share_->DataFile() + ", data_.sum_len:" + std::to_string(data_.sum_len) +
", vsize:" + std::to_string(vsize));
data_.sum_len -= vsize;
auto &str = v.GetString();
if (str.size() <= vsize) {
Expand Down Expand Up @@ -262,7 +263,7 @@ void PackStr::DeleteByRow(size_t locationInPack) {
SetPtrSize(locationInPack, nullptr, 0);
SetNull(locationInPack);
dpn_->numOfNulls++;
if(dpn_->NullOnly()){
if (dpn_->NullOnly()) {
data_.sum_len = 0;
}
}
Expand Down Expand Up @@ -386,8 +387,8 @@ std::pair<PackStr::UniquePtr, size_t> PackStr::Compress() {
reinterpret_cast<uint *>(alloc(comp_len_buf_size / 4 * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY)), *this);
uint tmp_comp_len_buf_size = comp_len_buf_size - 8;
compress::NumCompressor<uint> nc;
CprsErr res = nc.Compress(reinterpret_cast<char *>(comp_len_buf.get() + 2), tmp_comp_len_buf_size,
nc_buffer.get(), onn, maxv);
CprsErr res = nc.Compress(reinterpret_cast<char *>(comp_len_buf.get() + 2), tmp_comp_len_buf_size, nc_buffer.get(),
onn, maxv);
if (res != CprsErr::CPRS_SUCCESS) {
throw common::InternalException("Compression of lengths of values failed for column " +
std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " +
Expand Down Expand Up @@ -564,20 +565,20 @@ void PackStr::Save() {
SaveUncompressed(&f);
}

ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), col_share_->DataFile() + ", " +
std::to_string(dpn_->dataAddress) + ":" + std::to_string(dpn_->dataLength) + "/" + std::to_string(f.Tell()));
ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength),
col_share_->DataFile() + ", " + std::to_string(dpn_->dataAddress) + ":" + std::to_string(dpn_->dataLength) +
"/" + std::to_string(f.Tell()));
dpn_->synced = true;
}

void PackStr::SaveUncompressed(system::Stream *f) {
f->WriteExact(nulls_ptr_.get(), bitmap_size_);
f->WriteExact(deletes_ptr_.get(), bitmap_size_);
f->WriteExact(data_.lens, (data_.len_mode * (1 << col_share_->pss)));
if (data_.v.empty() || dpn_->NullOnly()){
if (data_.v.empty() || dpn_->NullOnly()) {
return;
}


std::unique_ptr<char[]> buff(new char[data_.sum_len]);
char *ptr = buff.get();
for (uint i = 0; i < dpn_->numOfRecords; i++) {
Expand All @@ -593,7 +594,7 @@ void PackStr::SaveUncompressed(system::Stream *f) {
}

void PackStr::LoadCompressed(system::Stream *f) {
ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile());
ASSERT(IsModeCompressionApplied(), "path:" + col_share_->DataFile());

auto compressed_buf = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED);
f->ReadExact(compressed_buf.get(), dpn_->dataLength);
Expand Down Expand Up @@ -634,8 +635,9 @@ void PackStr::LoadCompressed(system::Stream *f) {
std::memcpy(deletes_ptr_.get(), reinterpret_cast<char *>(cur_buf) + sizeof(ushort), delete_buf_size);
else {
compress::BitstreamCompressor bsc;
CprsErr res = bsc.Decompress(reinterpret_cast<char *>(deletes_ptr_.get()), delete_buf_size,
reinterpret_cast<char *>(cur_buf) + sizeof(ushort), dpn_->numOfRecords, dpn_->numOfDeleted);
CprsErr res =
bsc.Decompress(reinterpret_cast<char *>(deletes_ptr_.get()), delete_buf_size,
reinterpret_cast<char *>(cur_buf) + sizeof(ushort), dpn_->numOfRecords, dpn_->numOfDeleted);
if (res != CprsErr::CPRS_SUCCESS) {
throw common::DatabaseException("Decompression of deletes failed for column " +
std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " +
Expand All @@ -653,8 +655,8 @@ void PackStr::LoadCompressed(system::Stream *f) {
compress::NumCompressor<uint> nc;
mm::MMGuard<uint> cn_ptr((uint *)alloc((1 << col_share_->pss) * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY),
*this);
CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast<char *>(cur_buf + sizeof(uint32_t)*2), comp_len_buf_size - 8,
dpn_->numOfRecords - dpn_->numOfNulls, maxv);
CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast<char *>(cur_buf + sizeof(uint32_t) * 2),
comp_len_buf_size - sizeof(uint32_t) * 2, dpn_->numOfRecords - dpn_->numOfNulls, maxv);
if (res != CprsErr::CPRS_SUCCESS) {
std::stringstream msg_buf;
msg_buf << "Decompression of lengths of std::string values failed for column "
Expand All @@ -675,7 +677,7 @@ void PackStr::LoadCompressed(system::Stream *f) {
cur_buf += comp_len_buf_size;

auto dlen = *reinterpret_cast<uint32_t *>(cur_buf);
cur_buf += sizeof(dlen);
cur_buf += sizeof(uint32_t);
data_.sum_len = *reinterpret_cast<uint32_t *>(cur_buf);
cur_buf += sizeof(uint32_t);

Expand Down Expand Up @@ -721,7 +723,7 @@ void PackStr::LoadCompressed(system::Stream *f) {
}

void PackStr::LoadCompressedTrie(system::Stream *f) {
ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile());
ASSERT(IsModeCompressionApplied(), "path:" + col_share_->DataFile());
compressed_data_.reset(nullptr);
compressed_data_ = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED);
f->ReadExact(compressed_data_.get(), dpn_->dataLength);
Expand Down
2 changes: 0 additions & 2 deletions storage/tianmu/handler/ha_tianmu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2016,8 +2016,6 @@ char *strmov_str(char *dst, const char *src) {
static int tianmu_done_func([[maybe_unused]] void *p) {
DBUG_ENTER(__PRETTY_FUNCTION__);

assert(p == tianmu_hton);

if (tianmu_hton->data) {
core::Engine *eng = reinterpret_cast<core::Engine *>(tianmu_hton->data);
delete eng;
Expand Down

0 comments on commit 70f15f1

Please sign in to comment.