diff --git a/storage/tianmu/core/engine.cpp b/storage/tianmu/core/engine.cpp index 6888c773e..c321f56a5 100644 --- a/storage/tianmu/core/engine.cpp +++ b/storage/tianmu/core/engine.cpp @@ -291,10 +291,11 @@ int Engine::Init(uint engine_slot) { [this]() { for (auto &delta : m_table_deltas) { TIANMU_LOG(LogCtl_Level::INFO, - "table name: %s, delta table id: %d delta_size: %ld, current load id: %ld, merge id: %ld, current row_id: %ld", - delta.second->FullName().c_str(), - delta.second->GetDeltaTableID(), delta.second->load_id.load() - delta.second->merge_id.load(), - delta.second->load_id.load(), delta.second->merge_id.load(), delta.second->row_id.load()); + "table name: %s, delta table id: %d delta_size: %ld, current load id: %ld, merge id: %ld, " + "current row_id: %ld", + delta.second->FullName().c_str(), delta.second->GetDeltaTableID(), + delta.second->load_id.load() - delta.second->merge_id.load(), delta.second->load_id.load(), + delta.second->merge_id.load(), delta.second->row_id.load()); } }}, {tianmu_sysvar_log_loop_interval * 5, diff --git a/storage/tianmu/data/pack_int.cpp b/storage/tianmu/data/pack_int.cpp index f86a4f5b7..6b664c8c0 100644 --- a/storage/tianmu/data/pack_int.cpp +++ b/storage/tianmu/data/pack_int.cpp @@ -548,7 +548,8 @@ void PackInt::LoadValuesFixed(const loader::ValueCache *vc, const std::optional< auto delta = dpn_->min_i - new_min; if (dpn_->NullOnly() || dpn_->Uniform()) { - if(data_.ptr_) dealloc(data_.ptr_); + if (data_.ptr_) + dealloc(data_.ptr_); data_.ptr_ = alloc(new_vt * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED); std::memset(data_.ptr_, 0, new_vt * new_nr); if (dpn_->Uniform()) { diff --git a/storage/tianmu/data/pack_int.h b/storage/tianmu/data/pack_int.h index 0449e2772..2e8769cd2 100644 --- a/storage/tianmu/data/pack_int.h +++ b/storage/tianmu/data/pack_int.h @@ -73,20 +73,22 @@ class PackInt final : public Pack { void AppendNull() { dpn_->numOfRecords++; - if(likely(NotNull(dpn_->numOfRecords-1))){ - SetNull(dpn_->numOfRecords-1); + if (likely(NotNull(dpn_->numOfRecords - 1))) { + SetNull(dpn_->numOfRecords - 1); } dpn_->numOfNulls++; } void SetValD(uint n, double v) { dpn_->synced = false; - ASSERT(n < dpn_->numOfRecords , "n: "+ std::to_string(n) + " ,dpn->numOfRecords: "+ std::to_string(dpn_->numOfRecords)); + ASSERT(n < dpn_->numOfRecords, + "n: " + std::to_string(n) + " ,dpn->numOfRecords: " + std::to_string(dpn_->numOfRecords)); ASSERT(is_real_); data_.ptr_double_[n] = v; } void SetVal64(uint n, uint64_t v) { dpn_->synced = false; - ASSERT(n < dpn_->numOfRecords , "n: "+ std::to_string(n) + " ,dpn->numOfRecords: "+ std::to_string(dpn_->numOfRecords)); + ASSERT(n < dpn_->numOfRecords, + "n: " + std::to_string(n) + " ,dpn->numOfRecords: " + std::to_string(dpn_->numOfRecords)); switch (data_.value_type_) { case 8: data_.ptr_int64_[n] = v; diff --git a/storage/tianmu/data/pack_str.cpp b/storage/tianmu/data/pack_str.cpp index d6b636d88..36b47c06c 100644 --- a/storage/tianmu/data/pack_str.cpp +++ b/storage/tianmu/data/pack_str.cpp @@ -226,13 +226,14 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) { SetPtrSize(locationInPack, nullptr, 0); SetNull(locationInPack); dpn_->numOfNulls++; - if(dpn_->NullOnly()){ + if (dpn_->NullOnly()) { data_.sum_len = 0; } } else { // update non-null to another nonull auto vsize = GetValueBinary(locationInPack).size(); - ASSERT(data_.sum_len >= vsize, col_share_->DataFile() + ", data_.sum_len:" + std::to_string(data_.sum_len) + ", vsize:"+std::to_string(vsize)); + ASSERT(data_.sum_len >= vsize, col_share_->DataFile() + ", data_.sum_len:" + std::to_string(data_.sum_len) + + ", vsize:" + std::to_string(vsize)); data_.sum_len -= vsize; auto &str = v.GetString(); if (str.size() <= vsize) { @@ -262,7 +263,7 @@ void PackStr::DeleteByRow(size_t locationInPack) { SetPtrSize(locationInPack, nullptr, 0); SetNull(locationInPack); dpn_->numOfNulls++; - if(dpn_->NullOnly()){ + if (dpn_->NullOnly()) { data_.sum_len = 0; } } @@ -386,8 +387,8 @@ std::pair PackStr::Compress() { reinterpret_cast(alloc(comp_len_buf_size / 4 * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY)), *this); uint tmp_comp_len_buf_size = comp_len_buf_size - 8; compress::NumCompressor nc; - CprsErr res = nc.Compress(reinterpret_cast(comp_len_buf.get() + 2), tmp_comp_len_buf_size, - nc_buffer.get(), onn, maxv); + CprsErr res = nc.Compress(reinterpret_cast(comp_len_buf.get() + 2), tmp_comp_len_buf_size, nc_buffer.get(), + onn, maxv); if (res != CprsErr::CPRS_SUCCESS) { throw common::InternalException("Compression of lengths of values failed for column " + std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " + @@ -564,8 +565,9 @@ void PackStr::Save() { SaveUncompressed(&f); } - ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), col_share_->DataFile() + ", " + - std::to_string(dpn_->dataAddress) + ":" + std::to_string(dpn_->dataLength) + "/" + std::to_string(f.Tell())); + ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), + col_share_->DataFile() + ", " + std::to_string(dpn_->dataAddress) + ":" + std::to_string(dpn_->dataLength) + + "/" + std::to_string(f.Tell())); dpn_->synced = true; } @@ -573,11 +575,10 @@ void PackStr::SaveUncompressed(system::Stream *f) { f->WriteExact(nulls_ptr_.get(), bitmap_size_); f->WriteExact(deletes_ptr_.get(), bitmap_size_); f->WriteExact(data_.lens, (data_.len_mode * (1 << col_share_->pss))); - if (data_.v.empty() || dpn_->NullOnly()){ + if (data_.v.empty() || dpn_->NullOnly()) { return; } - std::unique_ptr buff(new char[data_.sum_len]); char *ptr = buff.get(); for (uint i = 0; i < dpn_->numOfRecords; i++) { @@ -593,7 +594,7 @@ void PackStr::SaveUncompressed(system::Stream *f) { } void PackStr::LoadCompressed(system::Stream *f) { - ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile()); + ASSERT(IsModeCompressionApplied(), "path:" + col_share_->DataFile()); auto compressed_buf = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED); f->ReadExact(compressed_buf.get(), dpn_->dataLength); @@ -634,8 +635,9 @@ void PackStr::LoadCompressed(system::Stream *f) { std::memcpy(deletes_ptr_.get(), reinterpret_cast(cur_buf) + sizeof(ushort), delete_buf_size); else { compress::BitstreamCompressor bsc; - CprsErr res = bsc.Decompress(reinterpret_cast(deletes_ptr_.get()), delete_buf_size, - reinterpret_cast(cur_buf) + sizeof(ushort), dpn_->numOfRecords, dpn_->numOfDeleted); + CprsErr res = + bsc.Decompress(reinterpret_cast(deletes_ptr_.get()), delete_buf_size, + reinterpret_cast(cur_buf) + sizeof(ushort), dpn_->numOfRecords, dpn_->numOfDeleted); if (res != CprsErr::CPRS_SUCCESS) { throw common::DatabaseException("Decompression of deletes failed for column " + std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " + @@ -653,8 +655,8 @@ void PackStr::LoadCompressed(system::Stream *f) { compress::NumCompressor nc; mm::MMGuard cn_ptr((uint *)alloc((1 << col_share_->pss) * sizeof(uint), mm::BLOCK_TYPE::BLOCK_TEMPORARY), *this); - CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast(cur_buf + sizeof(uint32_t)*2), comp_len_buf_size - 8, - dpn_->numOfRecords - dpn_->numOfNulls, maxv); + CprsErr res = nc.Decompress(cn_ptr.get(), reinterpret_cast(cur_buf + sizeof(uint32_t) * 2), + comp_len_buf_size - sizeof(uint32_t) * 2, dpn_->numOfRecords - dpn_->numOfNulls, maxv); if (res != CprsErr::CPRS_SUCCESS) { std::stringstream msg_buf; msg_buf << "Decompression of lengths of std::string values failed for column " @@ -675,7 +677,7 @@ void PackStr::LoadCompressed(system::Stream *f) { cur_buf += comp_len_buf_size; auto dlen = *reinterpret_cast(cur_buf); - cur_buf += sizeof(dlen); + cur_buf += sizeof(uint32_t); data_.sum_len = *reinterpret_cast(cur_buf); cur_buf += sizeof(uint32_t); @@ -721,7 +723,7 @@ void PackStr::LoadCompressed(system::Stream *f) { } void PackStr::LoadCompressedTrie(system::Stream *f) { - ASSERT(IsModeCompressionApplied() , "path:" + col_share_->DataFile()); + ASSERT(IsModeCompressionApplied(), "path:" + col_share_->DataFile()); compressed_data_.reset(nullptr); compressed_data_ = alloc_ptr(dpn_->dataLength + 1, mm::BLOCK_TYPE::BLOCK_COMPRESSED); f->ReadExact(compressed_data_.get(), dpn_->dataLength); diff --git a/storage/tianmu/handler/ha_tianmu.cpp b/storage/tianmu/handler/ha_tianmu.cpp index bab7e2435..403607451 100644 --- a/storage/tianmu/handler/ha_tianmu.cpp +++ b/storage/tianmu/handler/ha_tianmu.cpp @@ -2016,8 +2016,6 @@ char *strmov_str(char *dst, const char *src) { static int tianmu_done_func([[maybe_unused]] void *p) { DBUG_ENTER(__PRETTY_FUNCTION__); - assert(p == tianmu_hton); - if (tianmu_hton->data) { core::Engine *eng = reinterpret_cast(tianmu_hton->data); delete eng;