Skip to content

Commit

Permalink
More bug fixes.
Browse files Browse the repository at this point in the history
1. RENAME now unblocks blpop/brpop.
2. Fix a deadlock bug with blpop running with the same key multiple times.
  • Loading branch information
romange committed Apr 6, 2022
1 parent 1fc9f11 commit abbefd0
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 73 deletions.
7 changes: 4 additions & 3 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -674,21 +674,23 @@ std::optional<int64_t> CompactObj::TryGetInt() const {
}

void CompactObj::SetString(std::string_view str) {
uint8_t mask = mask_ & ~kEncMask;

// Trying auto-detection heuristics first.
if (str.size() <= 20) {
long long ival;
static_assert(sizeof(long long) == 8);

// We use redis string2ll to be compatible with Redis.
if (string2ll(str.data(), str.size(), &ival)) {
SetMeta(INT_TAG, mask_ & ~kEncMask);
SetMeta(INT_TAG, mask);
u_.ival = ival;

return;
}

if (str.size() <= kInlineLen) {
SetMeta(str.size(), mask_ & ~kEncMask);
SetMeta(str.size(), mask);

memcpy(u_.inline_str, str.data(), str.size());
return;
Expand All @@ -698,7 +700,6 @@ void CompactObj::SetString(std::string_view str) {
DCHECK_GT(str.size(), kInlineLen);

string_view encoded = str;
uint8_t mask = mask_ & ~kEncMask;
bool is_ascii = kUseAsciiEncoding && validate_ascii_fast(str.data(), str.size());

if (is_ascii) {
Expand Down
31 changes: 16 additions & 15 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,21 +310,22 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
auto& queue = wq.items;
DCHECK(!queue.empty()); // since it's active

if (queue.front().trans == completed_t) {
do {
const WatchItem& bi = queue.front();
Transaction* head = bi.trans.get();

// if a transaction blpops on the same key multiple times it will appear here
// here several times as well, hence we check != completed_t.
if (head != completed_t && head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
} while (!queue.empty());

if (queue.empty()) {
wt.RemoveEntry(w_it);
}
if (queue.front().trans != completed_t)
continue;

DVLOG(1) << "Wakening next transaction for key " << key;

do {
const WatchItem& bi = queue.front();
Transaction* head = bi.trans.get();

if (head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
} while (!queue.empty());

if (queue.empty()) {
wt.RemoveEntry(w_it);
}
}

Expand Down
112 changes: 69 additions & 43 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class Renamer {
void Finalize(Transaction* t, bool skip_exist_dest);

private:
OpStatus MoveSrc(Transaction* t, EngineShard* es);
OpStatus UpdateDest(Transaction* t, EngineShard* es);

DbIndex db_indx_;
ShardId src_sid_;

Expand All @@ -50,6 +53,9 @@ class Renamer {
bool found = false;
};

PrimeValue pv_;
string str_val_;

FindResult src_res_, dest_res_; // index 0 for source, 1 for destination
OpResult<void> status_;
};
Expand Down Expand Up @@ -95,52 +101,66 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) {

DCHECK(src_res_.ref_val.IsRef());

PrimeValue pv;
string str_val;

auto move_src = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() == src_sid_) { // Handle source key.
// TODO: to call PreUpdate/PostUpdate.
auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first;
CHECK(IsValid(it));
// Src key exist and we need to override the destination.
// Alternatively, we could apply an optimistic algorithm and move src at Find step.
// We would need to restore the state in case of cleanups.
t->Execute([&](Transaction* t, EngineShard* shard) { return MoveSrc(t, shard); }, false);
t->Execute([&](Transaction* t, EngineShard* shard) { return UpdateDest(t, shard); }, true);
}

if (it->second.ObjType() == OBJ_STRING) {
it->second.GetString(&str_val);
} else {
pv = std::move(it->second);
}
CHECK(shard->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it.
OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
if (es->shard_id() == src_sid_) { // Handle source key.
// TODO: to call PreUpdate/PostUpdate.
auto it = es->db_slice().FindExt(db_indx_, src_res_.key).first;
CHECK(IsValid(it));

// We distinguish because of the SmallString that is pinned to its thread by design,
// thus can not be accessed via another thread.
// Therefore, we copy it to standard string in its thread.
if (it->second.ObjType() == OBJ_STRING) {
it->second.GetString(&str_val_);
} else {
bool has_expire = it->second.HasExpire();
pv_ = std::move(it->second);
it->second.SetExpire(has_expire);
}
return OpStatus::OK;
};
CHECK(es->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it.
}

return OpStatus::OK;
}

t->Execute(move(move_src), false);
OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
if (es->shard_id() != src_sid_) {
auto& db_slice = es->db_slice();
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first;
bool is_prior_list = false;

// Src key exist and we need to override the destination.
auto set_dest = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() != src_sid_) {
auto& db_slice = shard->db_slice();
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first;
if (IsValid(dest_it)) {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
dest_it->second.SetString(str_val);
} else {
dest_it->second = std::move(pv);
}
db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts);
if (IsValid(dest_it)) {
bool has_expire = dest_it->second.HasExpire();
is_prior_list = dest_it->second.ObjType() == OBJ_LIST;

if (src_res_.ref_val.ObjType() == OBJ_STRING) {
dest_it->second.SetString(str_val_);
} else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
db_slice.AddNew(db_indx_, dest_key, PrimeValue{str_val}, src_res_.expire_ts);
} else {
db_slice.AddNew(db_indx_, dest_key, std::move(pv), src_res_.expire_ts);
}
dest_it->second = std::move(pv_);
}
dest_it->second.SetExpire(has_expire); // preserve expire flag.
db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts);
} else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
pv_.SetString(str_val_);
}
dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts);
}

return OpStatus::OK;
};
t->Execute(move(set_dest), true);
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST) {
es->AwakeWatched(db_indx_, dest_key);
}
}

return OpStatus::OK;
}

const char* ObjTypeName(int type) {
Expand Down Expand Up @@ -565,17 +585,20 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys)
return res;
}

OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, string_view to,
OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key,
bool skip_exists) {
auto& db_slice = op_args.shard->db_slice();
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from);
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;

auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to);
bool is_prior_list = false;
auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to_key);
if (IsValid(to_it)) {
if (skip_exists)
return OpStatus::KEY_EXISTS;

is_prior_list = (to_it->second.ObjType() == OBJ_LIST);
}

uint64_t exp_ts =
Expand All @@ -584,7 +607,7 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
// we keep the value we want to move.
PrimeValue from_obj = std::move(from_it->second);

// Restore the expire flag on 'from'.
// Restore the expire flag on 'from' so we could delete it from expire table.
from_it->second.SetExpire(IsValid(from_expire));

if (IsValid(to_it)) {
Expand All @@ -601,9 +624,12 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
// On the other hand, AddNew does not rely on the iterators - this is why we keep
// the value in `from_obj`.
CHECK(db_slice.Del(op_args.db_ind, from_it));
db_slice.AddNew(op_args.db_ind, to, std::move(from_obj), exp_ts);
to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts);
}

if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST) {
op_args.shard->AwakeWatched(op_args.db_ind, to_key);
}
return OpStatus::OK;
}

Expand Down
38 changes: 37 additions & 1 deletion src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
RespVec blpop_resp;

auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey1, "0"});
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});

do {
Expand All @@ -287,6 +287,42 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
});
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar"));

pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});

do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));

pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"}));
});
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey2, "bar"));
}

TEST_F(ListFamilyTest, BPopRename) {
RespVec blpop_resp;

Run({"exists", kKey1, kKey2});
ASSERT_EQ(2, GetDebugInfo().shards_count);

auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});

do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));

pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Run({"rename", "a", kKey1});
});
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar"));
}

} // namespace dfly
27 changes: 16 additions & 11 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,9 @@ void Transaction::UnregisterWatch() {

// Runs only in the shard thread.
OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id());
ShardId idx = SidToId(shard->shard_id());

auto& sd = shard_data_[sid];
auto& sd = shard_data_[idx];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
DCHECK_EQ(0, sd.local_mask & ARMED);

Expand All @@ -1085,23 +1085,24 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
shard->AddWatched(s, this);
}
sd.local_mask |= SUSPENDED_Q;
DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask;

return OpStatus::OK;
}

// Runs only in the shard thread.
// Quadratic complexity in number of arguments and queue length.
bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id());
auto& sd = shard_data_[sid];
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];

constexpr uint16_t kQueueMask =
-Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q;
Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q;

if ((sd.local_mask & kQueueMask) == 0)
return false;

sd.local_mask &= kQueueMask;
sd.local_mask &= ~kQueueMask;

// TODO: what if args have keys and values?
auto args = ShardArgsInShard(shard->shard_id());
Expand Down Expand Up @@ -1129,17 +1130,21 @@ bool Transaction::IsGlobal() const {
}

// Runs only in the shard thread.
// Returns true if the transcton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
unsigned sd_id = SidToId(sid);
auto& sd = shard_data_[sd_id];
unsigned idx = SidToId(sid);
auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask;
CHECK_NE(0u, local_mask & SUSPENDED_Q);
DVLOG(1) << "NotifyBlocked " << DebugId() << ", local_mask: " << local_mask;

if (local_mask & Transaction::EXPIRED_Q) {
return false;
}

DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask;

// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.
if (local_mask & SUSPENDED_Q) {
DCHECK_EQ(0u, local_mask & AWAKED_Q);

Expand All @@ -1159,7 +1164,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
}

CHECK(sd.local_mask & AWAKED_Q);
return true;
return false;
}

void Transaction::BreakOnClose() {
Expand Down

0 comments on commit abbefd0

Please sign in to comment.