Skip to content

Commit

Permalink
fix(tianmu):assert failed on oldv <= dpn_->max_i at pack_int.cpp:337 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
konghaiya authored and mergify[bot] committed Apr 27, 2023
1 parent f1a7259 commit 4a41514
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 43 deletions.
68 changes: 29 additions & 39 deletions storage/tianmu/data/pack_int.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ PackInt::PackInt(DPN *dpn, PackCoordinate pc, ColumnShare *s) : Pack(dpn, pc, s)
PackInt::PackInt(const PackInt &apn, const PackCoordinate &pc) : Pack(apn, pc), is_real_(apn.is_real_) {
data_.value_type_ = apn.data_.value_type_;
if (apn.data_.value_type_ > 0) {
ASSERT(apn.data_.ptr_ != nullptr);
ASSERT(apn.data_.ptr_ != nullptr, "path:" + col_share_->DataFile());
data_.ptr_ = alloc(data_.value_type_ * apn.dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memcpy(data_.ptr_, apn.data_.ptr_, data_.value_type_ * apn.dpn_->numOfRecords);
}
Expand Down Expand Up @@ -167,6 +167,7 @@ void PackInt::ExpandOrShrink(uint64_t maxv, int64_t delta) {
auto new_vt = GetValueSize(maxv);
if (new_vt != data_.value_type_ || delta != 0) {
auto tmp = alloc(new_vt * dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(tmp, 0, new_vt * dpn_->numOfRecords);
xf[{data_.value_type_, new_vt}](data_.ptr_, data_.ptr_int8_ + (data_.value_type_ * dpn_->numOfRecords), tmp, delta);
dealloc(data_.ptr_);
data_.ptr_ = tmp;
Expand All @@ -178,7 +179,6 @@ void PackInt::UpdateValueFloat(size_t locationInPack, const Value &v) {
if (IsNull(locationInPack)) {
if (!v.HasValue())
return;
// ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack));

// update null to non-null
dpn_->synced = false;
Expand All @@ -200,7 +200,7 @@ void PackInt::UpdateValueFloat(size_t locationInPack, const Value &v) {
SetValD(locationInPack, d);
} else {
dpn_->numOfNulls--;
ASSERT(data_.ptr_ != nullptr);
ASSERT(data_.ptr_ != nullptr, "path:" + col_share_->DataFile());

SetValD(locationInPack, d);

Expand All @@ -214,7 +214,7 @@ void PackInt::UpdateValueFloat(size_t locationInPack, const Value &v) {
// update an original non-null value

if (data_.empty()) {
ASSERT(dpn_->Uniform());
ASSERT(dpn_->Uniform(), "path:" + col_share_->DataFile());
data_.value_type_ = 8;
data_.ptr_ = alloc(data_.value_type_ * dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
for (uint i = 0; i < dpn_->numOfRecords; i++) SetValD(i, dpn_->min_d);
Expand Down Expand Up @@ -251,7 +251,7 @@ void PackInt::UpdateValueFloat(size_t locationInPack, const Value &v) {

// update non-null to another nonull
auto newv = v.GetDouble();
ASSERT(oldv != newv);
ASSERT(oldv != newv, "path:" + col_share_->DataFile());

newmin = std::min(newmin, newv);
newmax = std::max(newmax, newv);
Expand Down Expand Up @@ -290,7 +290,7 @@ void PackInt::DeleteByRow(size_t locationInPack) {
if (!IsNull(locationInPack)) {
if (is_real_) {
if (data_.empty()) {
ASSERT(dpn_->Uniform());
ASSERT(dpn_->Uniform(), "path:" + col_share_->DataFile());
data_.value_type_ = 8;
data_.ptr_ = alloc(data_.value_type_ * dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
for (uint i = 0; i < dpn_->numOfRecords; i++) SetValD(i, dpn_->min_d);
Expand Down Expand Up @@ -324,7 +324,7 @@ void PackInt::DeleteByRow(size_t locationInPack) {
}
} else {
if (data_.empty()) {
ASSERT(dpn_->Uniform());
ASSERT(dpn_->Uniform(), "path:" + col_share_->DataFile());
data_.value_type_ = GetValueSize(dpn_->max_i - dpn_->min_i);
data_.ptr_ = alloc(data_.value_type_ * dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(data_.ptr_, 0, data_.value_type_ * dpn_->numOfRecords);
Expand Down Expand Up @@ -376,7 +376,6 @@ void PackInt::UpdateValueFixed(size_t locationInPack, const Value &v) {
if (IsNull(locationInPack)) {
if (!v.HasValue())
return;
// ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack));
// update null to non-null
dpn_->synced = false;
UnsetNull(locationInPack);
Expand All @@ -397,7 +396,7 @@ void PackInt::UpdateValueFixed(size_t locationInPack, const Value &v) {
dpn_->numOfNulls--;
SetVal64(locationInPack, l - dpn_->min_i);
} else {
ASSERT(data_.ptr_ != nullptr);
ASSERT(data_.ptr_ != nullptr, "path:" + col_share_->DataFile());
dpn_->numOfNulls--;

if (l >= dpn_->min_i && l <= dpn_->max_i) {
Expand All @@ -408,31 +407,22 @@ void PackInt::UpdateValueFixed(size_t locationInPack, const Value &v) {
return;
}

decltype(data_.value_type_) new_vt;
int64_t delta = 0;
if (l < dpn_->min_i) {
delta = dpn_->min_i - l;
dpn_->min_i = l;
new_vt = GetValueSize(dpn_->max_i - dpn_->min_i);
} else { // so l > dpn_->max
dpn_->max_i = l;
new_vt = GetValueSize(dpn_->max_i - dpn_->min_i);
}
if (new_vt != data_.value_type_ || delta != 0) {
auto tmp = alloc(new_vt * dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
xf[{data_.value_type_, new_vt}](data_.ptr_, data_.ptr_int8_ + (data_.value_type_ * dpn_->numOfRecords), tmp,
delta);
dealloc(data_.ptr_);
data_.ptr_ = tmp;
data_.value_type_ = new_vt;
}

ExpandOrShrink(dpn_->max_i - dpn_->min_i, delta);
SetVal64(locationInPack, l - dpn_->min_i);
}
} else {
// update an original non-null value

if (data_.empty()) {
ASSERT(dpn_->Uniform());
ASSERT(dpn_->Uniform(), "path:" + col_share_->DataFile());
data_.value_type_ = GetValueSize(dpn_->max_i - dpn_->min_i);
data_.ptr_ = alloc(data_.value_type_ * dpn_->numOfRecords, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(data_.ptr_, 0, data_.value_type_ * dpn_->numOfRecords);
Expand Down Expand Up @@ -501,17 +491,11 @@ void PackInt::LoadValuesDouble(const loader::ValueCache *vc, const std::optional
auto new_min = std::min(vc->MinDouble(), dpn_->min_d);
auto new_max = std::max(vc->MaxDouble(), dpn_->max_d);
auto new_nr = dpn_->numOfRecords + vc->NumOfValues();
if (dpn_->NullOnly() || dpn_->Uniform()) {
ASSERT(data_.ptr_ == nullptr);
data_.value_type_ = sizeof(double);
data_.ptr_ = alloc(data_.value_type_ * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
if (dpn_->Uniform()) {
std::fill_n(reinterpret_cast<double *>(data_.ptr_), new_nr, dpn_->min_d);
}
} else {
// expanding existing pack data_
ASSERT(data_.ptr_ != nullptr);
data_.ptr_ = rc_realloc(data_.ptr_, data_.value_type_ * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
data_.value_type_ = sizeof(double);

data_.ptr_ = rc_realloc(data_.ptr_, data_.value_type_ * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
if (dpn_->Uniform()) {
std::fill_n(reinterpret_cast<double *>(data_.ptr_), new_nr, dpn_->min_d);
}

dpn_->synced = false;
Expand Down Expand Up @@ -558,13 +542,15 @@ void PackInt::LoadValuesFixed(const loader::ValueCache *vc, const std::optional<
auto new_vt = GetValueSize(new_max - new_min);
auto new_nr = dpn_->numOfRecords + vc->NumOfValues();

ASSERT(new_vt >= data_.value_type_);
ASSERT(new_vt >= data_.value_type_, col_share_->DataFile() + ", new_vt=:" + std::to_string(new_vt) +
" , data_.value_type_:" + std::to_string(data_.value_type_));

auto delta = dpn_->min_i - new_min;

if (dpn_->NullOnly() || dpn_->Uniform()) {
ASSERT(data_.ptr_ == nullptr);
if(data_.ptr_) dealloc(data_.ptr_);
data_.ptr_ = alloc(new_vt * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(data_.ptr_, 0, new_vt * new_nr);
if (dpn_->Uniform()) {
switch (new_vt) {
case 8:
Expand All @@ -586,8 +572,10 @@ void PackInt::LoadValuesFixed(const loader::ValueCache *vc, const std::optional<
}
} else {
// expanding existing pack data_
ASSERT(data_.ptr_ != nullptr);
ASSERT(data_.ptr_ != nullptr, "path:" + col_share_->DataFile());

auto tmp_ptr = alloc(new_vt * new_nr, mm::BLOCK_TYPE::BLOCK_UNCOMPRESSED);
std::memset(tmp_ptr, 0, new_vt * new_nr);
xf[{data_.value_type_, new_vt}](data_.ptr_, data_.ptr_int8_ + (data_.value_type_ * dpn_->numOfRecords), tmp_ptr,
delta);
dealloc(data_.ptr_);
Expand All @@ -602,10 +590,11 @@ void PackInt::LoadValuesFixed(const loader::ValueCache *vc, const std::optional<
if (vc->NotNull(i)) {
AppendValue(*(reinterpret_cast<uint64_t *>(const_cast<char *>(vc->GetDataBytesPointer(i)))) - new_min);
} else {
if (nv.has_value())
if (nv.has_value()) {
AppendValue(nv->i - new_min);
else
} else {
AppendNull();
}
if (vc->IsDelete(i)) {
if (!IsNull(dpn_->numOfRecords - 1)) {
SetNull(dpn_->numOfRecords - 1);
Expand All @@ -616,6 +605,7 @@ void PackInt::LoadValuesFixed(const loader::ValueCache *vc, const std::optional<
}
}
}

// sum has already been updated outside
dpn_->min_i = new_min;
dpn_->max_i = new_max;
Expand Down Expand Up @@ -656,7 +646,7 @@ void PackInt::LoadDataFromFile(system::Stream *fcurfile) {
fcurfile->ReadExact(uptr.get(), dpn_->dataLength);
dpn_->synced = true;
{
ASSERT(!IsModeNoCompression());
ASSERT(!IsModeNoCompression(), "path:" + col_share_->DataFile());

uint *cur_buf = static_cast<uint *>(uptr.get());
if (data_.ptr_ == nullptr && data_.value_type_ > 0)
Expand Down Expand Up @@ -758,7 +748,7 @@ void PackInt::Save() {
else
f.WriteExact(uptr.get(), dpn_->dataLength);

ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength));
ASSERT(f.Tell() == off_t(dpn_->dataAddress + dpn_->dataLength), "path:" + col_share_->DataFile());

f.Close();
dpn_->synced = true;
Expand Down
10 changes: 6 additions & 4 deletions storage/tianmu/data/pack_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,21 @@ class PackInt final : public Pack {
}

void AppendNull() {
SetNull(dpn_->numOfRecords);
dpn_->numOfNulls++;
dpn_->numOfRecords++;
if(likely(NotNull(dpn_->numOfRecords-1))){
SetNull(dpn_->numOfRecords-1);
}
dpn_->numOfNulls++;
}
void SetValD(uint n, double v) {
dpn_->synced = false;
ASSERT(n < dpn_->numOfRecords);
ASSERT(n < dpn_->numOfRecords , "n: "+ std::to_string(n) + " ,dpn->numOfRecords: "+ std::to_string(dpn_->numOfRecords));
ASSERT(is_real_);
data_.ptr_double_[n] = v;
}
void SetVal64(uint n, uint64_t v) {
dpn_->synced = false;
ASSERT(n < dpn_->numOfRecords);
ASSERT(n < dpn_->numOfRecords , "n: "+ std::to_string(n) + " ,dpn->numOfRecords: "+ std::to_string(dpn_->numOfRecords));
switch (data_.value_type_) {
case 8:
data_.ptr_int64_[n] = v;
Expand Down

0 comments on commit 4a41514

Please sign in to comment.