Skip to content

Commit

Permalink
fix(tianmu):The mysqld is crashed when you are starting replication.(#…
Browse files Browse the repository at this point in the history
…1523)

It is possible to update null values with null values in the delta layer,
For example:
update t set name="xiaohua" where id=1;
update t set name=null where id=1;
So when encountering this situation, directly return
  • Loading branch information
konghaiya committed Apr 10, 2023
1 parent 166f87a commit 5defe1b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 79 deletions.
8 changes: 4 additions & 4 deletions storage/tianmu/core/pack_int.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ namespace Tianmu {
namespace core {
PackInt::PackInt(DPN *dpn, PackCoordinate pc, ColumnShare *s) : Pack(dpn, pc, s) {
is_real_ = ATI::IsRealType(s->ColType().GetTypeName());

if (dpn_->NotTrivial()) {
system::TianmuFile f;
f.OpenReadOnly(s->DataFile());
Expand Down Expand Up @@ -177,7 +176,8 @@ void PackInt::ExpandOrShrink(uint64_t maxv, int64_t delta) {

void PackInt::UpdateValueFloat(size_t locationInPack, const Value &v) {
if (IsNull(locationInPack)) {
ASSERT(v.HasValue());
if(!v.HasValue()) return;
//ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack));

// update null to non-null
dpn_->synced = false;
Expand Down Expand Up @@ -344,8 +344,8 @@ void PackInt::DeleteByRow(size_t locationInPack) {

void PackInt::UpdateValueFixed(size_t locationInPack, const Value &v) {
if (IsNull(locationInPack)) {
ASSERT(v.HasValue());

if(!v.HasValue()) return;
//ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack));
// update null to non-null
dpn_->synced = false;
UnsetNull(locationInPack);
Expand Down
10 changes: 7 additions & 3 deletions storage/tianmu/core/pack_str.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,18 @@ void PackStr::UpdateValue(size_t locationInPack, const Value &v) {
dpn_->synced = false;

if (IsNull(locationInPack)) {
// update null to non-null
// In the delta layer, there may be situations where null
// values are updated with null values, so when encountering this situation, it is directly returned
if(!v.HasValue()) return;

// update null to non-null

// first non-null value?
if (dpn_->NullOnly()) {
dpn_->max_i = -1;
}

ASSERT(v.HasValue());
//ASSERT(v.HasValue(), col_share_->DataFile() + " locationInPack: " + std::to_string(locationInPack));
UnsetNull(locationInPack);
dpn_->numOfNulls--;
auto &str = v.GetString();
Expand Down
33 changes: 8 additions & 25 deletions storage/tianmu/core/tianmu_attr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1046,10 +1046,6 @@ void TianmuAttr::UpdateData(uint64_t row, Value &old_v, Value &new_v) {

auto &dpn = get_dpn(pn);
auto dpn_save = dpn;
if (dpn.Trivial()) {
// need to create pack struct for previous trivial pack
ha_tianmu_engine_->cache.GetOrFetchObject<Pack>(get_pc(pn), this);
}

if (ct.Lookup() && new_v.HasValue()) {
auto &str = new_v.GetString();
Expand Down Expand Up @@ -1079,11 +1075,11 @@ void TianmuAttr::UpdateData(uint64_t row, Value &old_v, Value &new_v) {
ResetMaxMin(dpn);
}

void TianmuAttr::UpdateBatchData(core::Transaction *tx, const std::unordered_map<uint64_t, Value> &rows) {
void TianmuAttr::UpdateBatchData(core::Transaction *tx, const std::unordered_map<uint64_t, std::shared_ptr<Value>> &rows) {
no_change = false;

// group by pn
std::unordered_map<common::PACK_INDEX, std::unordered_map<uint64_t, Value>> packs;
std::unordered_map<common::PACK_INDEX, std::unordered_map<uint64_t, std::shared_ptr<Value>>> packs;
for (const auto &row : rows) {
auto row_id = row.first;
auto row_val = row.second;
Expand All @@ -1092,7 +1088,7 @@ void TianmuAttr::UpdateBatchData(core::Transaction *tx, const std::unordered_map
if (pack != packs.end()) {
pack->second.emplace(row_id, row_val);
} else {
packs.emplace(pn, std::unordered_map<uint64_t, Value>{{row_id, row_val}});
packs.emplace(pn, std::unordered_map<uint64_t, std::shared_ptr<Value>>{{row_id, row_val}});
}
}

Expand All @@ -1102,15 +1098,12 @@ void TianmuAttr::UpdateBatchData(core::Transaction *tx, const std::unordered_map
CopyPackForWrite(pn);
auto &dpn = get_dpn(pn);
auto dpn_save = dpn;
if (dpn.Trivial()) {
ha_tianmu_engine_->cache.GetOrFetchObject<Pack>(get_pc(pn), this);
}

for (const auto &row : pack.second) {
uint64_t row_id = row.first;
Value row_val = row.second;
if (ct.Lookup() && row_val.HasValue()) {
auto &str = row_val.GetString();
std::shared_ptr<Value> row_val = row.second;
if (ct.Lookup() && row_val->HasValue()) {
auto &str = row_val->GetString();
int code = m_dict->GetEncodedValue(str.data(), str.size());
if (code < 0) {
ASSERT(m_tx != nullptr, "attempt to update dictionary in readonly transaction");
Expand All @@ -1123,9 +1116,9 @@ void TianmuAttr::UpdateBatchData(core::Transaction *tx, const std::unordered_map
}
code = m_dict->Add(str.data(), str.size());
}
row_val.SetInt(code);
row_val->SetInt(code);
}
get_pack(pn)->UpdateValue(row2offset(row_id), row_val);
get_pack(pn)->UpdateValue(row2offset(row_id), *row_val);
}

dpn.synced = false;
Expand Down Expand Up @@ -1168,10 +1161,6 @@ void TianmuAttr::DeleteData(uint64_t row) {
CopyPackForWrite(pn);
auto &dpn = get_dpn(pn);
auto dpn_save = dpn;
if (dpn.Trivial()) {
// need to create pack struct for previous trivial pack
ha_tianmu_engine_->cache.GetOrFetchObject<Pack>(get_pc(pn), this);
}
get_pack(pn)->DeleteByRow(row2offset(row));

// update global data
Expand Down Expand Up @@ -1201,9 +1190,6 @@ void TianmuAttr::DeleteBatchData(core::Transaction *tx, const std::vector<uint64
CopyPackForWrite(pn);
auto &dpn = get_dpn(pn);
auto dpn_save = dpn;
if (dpn.Trivial()) {
ha_tianmu_engine_->cache.GetOrFetchObject<Pack>(get_pc(pn), this);
}

for (const auto &row_id : pack.second) {
get_pack(pn)->DeleteByRow(row2offset(row_id));
Expand Down Expand Up @@ -1261,9 +1247,6 @@ void TianmuAttr::CopyPackForWrite(common::PACK_INDEX pi) {
m_idx[pi] = pos;
auto &dpn(get_dpn(pi));

// if (dpn.Trivial())
// return;

const PackCoordinate pc_old(m_tid, m_cid, m_share->GetPackIndex(&old_dpn));
const PackCoordinate pc_new(get_pc(pi));
std::shared_ptr<Pack> new_pack;
Expand Down
2 changes: 1 addition & 1 deletion storage/tianmu/core/tianmu_attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TianmuAttr final : public mm::TraceableObject, public PhysicalColumn, publ

mm::TO_TYPE TraceableType() const override { return mm::TO_TYPE::TO_TEMPORARY; }
void UpdateData(uint64_t row, Value &old_v, Value &new_v);
void UpdateBatchData(core::Transaction *tx, const std::unordered_map<uint64_t, Value> &rows);
void UpdateBatchData(core::Transaction *tx, const std::unordered_map<uint64_t, std::shared_ptr<Value>> &rows);
void UpdateIfIndex(core::Transaction *tx, uint64_t row, uint64_t col, const Value &old_v, const Value &new_v);
void Truncate();
void DeleteData(uint64_t row);
Expand Down
65 changes: 19 additions & 46 deletions storage/tianmu/core/tianmu_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,30 +209,10 @@ class DelayedInsertParser final {
ptr += str_len;
} break;
case common::PackType::INT: {
if (attr->Type().Lookup()) {
uint32_t len = *(uint32_t *)ptr;
ptr += sizeof(uint32_t);
types::BString s(len == 0 ? "" : ptr, len);
int64_t *buf = reinterpret_cast<int64_t *>(vc.Prepare(sizeof(int64_t)));
*buf = attr->EncodeValue_T(s, true);
vc.ExpectedSize(sizeof(int64_t));
ptr += len;
} else {
int64_t *buf = reinterpret_cast<int64_t *>(vc.Prepare(sizeof(int64_t)));
*buf = *(int64_t *)ptr;

// if (attr->GetIfAutoInc()) {
// if (*buf == 0) // Value of auto inc column was not assigned by user
// *buf = attr->AutoIncNext();
// if (static_cast<uint64_t>(*buf) > attr->GetAutoInc()) {
// if (*buf > 0 || ((attr->TypeName() == common::ColumnType::BIGINT) &&
// attr->GetIfUnsigned()))
// attr->SetAutoInc(*buf);
// }
// }
vc.ExpectedSize(sizeof(int64_t));
ptr += sizeof(int64_t);
}
int64_t *buf = reinterpret_cast<int64_t *>(vc.Prepare(sizeof(int64_t)));
*buf = *(int64_t *)ptr;
vc.ExpectedSize(sizeof(int64_t));
ptr += sizeof(int64_t);
} break;
default:
break;
Expand Down Expand Up @@ -280,18 +260,18 @@ class DelayedUpdateParser final {
std::shared_ptr<index::TianmuTableIndex> index)
: attrs(attrs), update_rows(update_rows), index_table(std::move(index)) {}

uint GetRows(std::vector<std::unordered_map<uint64_t, Value>> &update_cols_buf) {
uint GetRows(std::vector<std::unordered_map<uint64_t, std::shared_ptr<Value>>> &update_cols_buf) {
update_cols_buf.reserve(attrs.size());
for (int i = 0; i < attrs.size(); i++) {
update_cols_buf.emplace_back(std::unordered_map<uint64_t, Value>());
update_cols_buf.emplace_back(std::unordered_map<uint64_t, std::shared_ptr<Value>>());
}
uint update_row_num = update_rows->size();
for (auto &[row_id, row] : *update_rows) {
auto row_ptr = const_cast<const char *>(row.get());
DeltaRecordHeadForUpdate rec_head;
row_ptr = rec_head.recordDecode(row_ptr);
for (uint col_id = 0; col_id < attrs.size(); col_id++) {
core::Value val;
std::shared_ptr<Value> val = std::make_shared<Value>();
if (!rec_head.update_mask_[col_id]) {
continue;
}
Expand All @@ -304,29 +284,21 @@ class DelayedUpdateParser final {
switch (attr->GetPackType()) {
case common::PackType::STR: {
uint32_t str_len = rec_head.field_len_[col_id];
val.SetString(const_cast<char *>(row_ptr), str_len);
val->SetString(const_cast<char *>(row_ptr), str_len);
update_cols_buf[col_id].emplace(row_id, val);
row_ptr += str_len;
} break;
case common::PackType::INT: {
if (attr->Type().Lookup()) {
uint32_t len = *(uint32_t *)row_ptr;
row_ptr += sizeof(uint32_t);
types::BString s(len == 0 ? "" : row_ptr, len);
int64_t int_val = attr->EncodeValue_T(s, true);
row_ptr += len;
} else {
int64_t int_val = *(int64_t *)row_ptr;
if (attr->GetIfAutoInc()) {
if (int_val == 0) // Value of auto inc column was not assigned by user
int_val = attr->AutoIncNext();
if (static_cast<uint64_t>(int_val) > attr->GetAutoInc())
attr->SetAutoInc(int_val);
}
val.SetInt(int_val);
update_cols_buf[col_id].emplace(row_id, val);
row_ptr += sizeof(int64_t);
int64_t int_val = *(int64_t *)row_ptr;
if (attr->GetIfAutoInc()) {
if (int_val == 0) // Value of auto inc column was not assigned by user
int_val = attr->AutoIncNext();
if (static_cast<uint64_t>(int_val) > attr->GetAutoInc())
attr->SetAutoInc(int_val);
}
val->SetInt(int_val);
update_cols_buf[col_id].emplace(row_id, val);
row_ptr += sizeof(int64_t);
} break;
default:
break;
Expand Down Expand Up @@ -1659,7 +1631,8 @@ int TianmuTable::AsyncParseUpdateRecords(system::IOParameters *iop,
clock_gettime(CLOCK_REALTIME, &t1);
auto index_table = ha_tianmu_engine_->GetTableIndex(share->Path());
DelayedUpdateParser parser(m_attrs, update_records, index_table);
std::vector<std::unordered_map<uint64_t, Value>> update_cols_buf;
std::vector<std::unordered_map<uint64_t, std::shared_ptr<Value>>> update_cols_buf;

int returned_row_num = parser.GetRows(update_cols_buf);
if (returned_row_num > 0) {
utils::result_set<void> res;
Expand Down

0 comments on commit 5defe1b

Please sign in to comment.