Skip to content

Commit

Permalink
dht: fix value edition, add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Jan 27, 2023
1 parent b1009a5 commit e6971c1
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 9 deletions.
7 changes: 5 additions & 2 deletions include/opendht/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ struct OPENDHT_PUBLIC Value
/**
* Returns true if value contents are equals (not considering the value ID)
*/
inline bool contentEquals(const Value& o) {
inline bool contentEquals(const Value& o) const {
return isEncrypted() ? cypher == o.cypher :
((owner == o.owner || (owner and o.owner and *owner == *o.owner))
&& type == o.type
Expand All @@ -436,9 +436,12 @@ struct OPENDHT_PUBLIC Value
&& signature == o.signature);
}

inline bool operator== (const Value& o) {
inline bool operator== (const Value& o) const {
return id == o.id and contentEquals(o);
}
inline bool operator!= (const Value& o) const {
return !(*this == o);
}

inline void setRecipient(const InfoHash& r) {
recipient = r;
Expand Down
5 changes: 3 additions & 2 deletions src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1289,10 +1289,11 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created
if (total_store_size > max_store_size) {
auto value = vs->data;
auto value_diff = store.second.values_diff;
auto value_edit = store.second.edited_values;
expireStore();
storageChanged(id, st->second, value, value_diff > 0);
storageChanged(id, st->second, value, value_diff > 0 || value_edit > 0);
} else {
storageChanged(id, st->second, vs->data, store.second.values_diff > 0);
storageChanged(id, st->second, vs->data, store.second.values_diff > 0 || store.second.edited_values > 0);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/op_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals, const system_clo
auto viop = values.emplace(v->id, v);
if (viop.second) {
newValues.emplace_back(v);
} else if (*viop.first->second.data != *v) {
// Special case for edition
if (v->seq > viop.first->second.data->seq) {
viop.first->second.data = v;
newValues.emplace_back(v);
}
} else {
viop.first->second.refCount++;
}
Expand Down
8 changes: 5 additions & 3 deletions src/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct Storage {
ssize_t values_diff;
/** Difference in number of listeners */
ssize_t listeners_diff;
/** Number of edited values */
size_t edited_values;
};

Storage() {}
Expand Down Expand Up @@ -230,7 +232,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t
sb->insert(id, *value, expiration);
it->data = value;
total_size += size_diff;
return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0});
return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0, 1});
}
} else {
//DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str());
Expand All @@ -240,7 +242,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t
values.back().store_bucket = sb;
if (sb)
sb->insert(id, *value, expiration);
return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0});
return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0, 0});
}
}
return std::make_pair(nullptr, StoreDiff{});
Expand Down Expand Up @@ -272,7 +274,7 @@ Storage::clear()
ssize_t tot_size = total_size;
values.clear();
total_size = 0;
return {-tot_size, -num_values, 0};
return {-tot_size, -num_values, 0, 0};
}

std::pair<ssize_t, std::vector<Sp<Value>>>
Expand Down
39 changes: 37 additions & 2 deletions tests/dhtrunnertester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ DhtRunnerTester::testIdOps() {
std::mutex mutex;
std::condition_variable cv;
unsigned valueCount(0);
unsigned valueCountEdit(0);

dht::DhtRunner::Config config2;
config2.dht_config.node_config.max_peer_req_per_sec = -1;
Expand Down Expand Up @@ -280,10 +281,44 @@ DhtRunnerTester::testIdOps() {
return true;
});

auto key2 = dht::InfoHash::get("key2");
auto editValue = std::make_shared<dht::Value>("v1");
node1.putSigned(key2, editValue, [&](bool ok){
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
valueCountEdit++;
cv.notify_all();
});
node2.listen(key2, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired){
for (const auto& v : values) {
if (v->seq == 0)
CPPUNIT_ASSERT_EQUAL("v1"s, dht::unpackMsg<std::string>(v->data));
else if (v->seq == 1)
CPPUNIT_ASSERT_EQUAL("v2"s, dht::unpackMsg<std::string>(v->data));
CPPUNIT_ASSERT_EQUAL(v->owner->getLongId(), node1.getPublicKey()->getLongId());
}
std::lock_guard<std::mutex> lk(mutex);
valueCountEdit += values.size();
cv.notify_all();
return true;
});

{
std::unique_lock<std::mutex> lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7; }));
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7 && valueCountEdit == 2; }));
}

// editValue->data = dht::packMsg("v2");
editValue = std::make_shared<dht::Value>(editValue->id);
editValue->data = dht::packMsg("v2");
node1.putSigned(key2, editValue, [&](bool ok){
CPPUNIT_ASSERT(ok);
std::lock_guard<std::mutex> lk(mutex);
valueCountEdit++;
cv.notify_all();
});
std::unique_lock<std::mutex> lk(mutex);
CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4; }));
}

void
Expand All @@ -297,7 +332,7 @@ DhtRunnerTester::testListenLotOfBytes() {
std::string data(10000, 'a');

auto foo = dht::InfoHash::get("foo");
constexpr unsigned N = 50;
constexpr unsigned N = 1024;

for (unsigned i=0; i<N; i++) {
node2.put(foo, data, [&](bool ok) {
Expand Down

0 comments on commit e6971c1

Please sign in to comment.