Skip to content

Commit

Permalink
Merge feature/paxos-cache-limit into master.
Browse files Browse the repository at this point in the history
Limit paxos entries in memory.

See merge request !94
  • Loading branch information
mefyl committed Sep 7, 2017
2 parents acb03e9 + 89e69ca commit 2ad25f8
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 122 deletions.
1 change: 1 addition & 0 deletions src/memo/environ.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace
{"LOOKAHEAD_THREADS", ""},
{"MAX_EMBED_SIZE", ""},
{"MAX_SQUASH_SIZE", ""},
{"PAXOS_CACHE_SIZE", ""},
{"PAXOS_LENIENT_FETCH", ""},
{"PREEMPT_DECODE", ""},
{"PREFETCH_DEPTH", ""},
Expand Down
192 changes: 75 additions & 117 deletions src/memo/model/doughnut/consensus/Paxos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ namespace memo
{
if (auto it = elle::find(self._addresses, a))
{
auto& paxos = it->second.paxos;
auto decision = it->decision;
auto& paxos = decision->paxos;
if (auto value = paxos.current_value())
{
elle::reactor::for_each_parallel(
Expand Down Expand Up @@ -844,7 +845,13 @@ namespace memo
Paxos::LocalPeer::_load(Address address)
{
if (auto decision = elle::find(this->_addresses, address))
return BlockOrPaxos(&decision->second);
{
this->_addresses.modify(decision, [&](DecisionEntry& de)
{
de.use = elle::Clock::now();
});
return BlockOrPaxos(decision->decision.get());
}
else
{
ELLE_TRACE_SCOPE("%s: load %f from storage", *this, address);
Expand Down Expand Up @@ -886,27 +893,40 @@ namespace memo
return stored;
}
else if (stored.paxos)
return BlockOrPaxos(
&this->_load_paxos(address, std::move(*stored.paxos)));
{
auto decision = this->_load_paxos(address,
std::move(*stored.paxos));
ELLE_DEBUG("%s: reloaded %f with state %s", this, address,
decision->paxos);
return BlockOrPaxos(decision.get());
}
else
ELLE_ABORT("no block and no paxos?");
}
}

Paxos::LocalPeer::Decision&
std::shared_ptr<Paxos::LocalPeer::Decision>
Paxos::LocalPeer::_load_paxos(
Address address,
boost::optional<PaxosServer::Quorum> peers,
std::shared_ptr<blocks::Block> value)
{
try
{
if (auto decision = elle::find(this->_addresses, address))
{
this->_addresses.modify(decision, [&](DecisionEntry& de)
{
de.use = elle::Clock::now();
});
return decision->decision;
}
auto res = this->_load(address);
if (res.block)
elle::err("immutable block found when paxos was expected: %s",
address);
else
return *res.paxos;
return ELLE_ENFORCE(elle::find(this->_addresses, address))->decision;
}
catch (silo::MissingKey const& e)
{
Expand Down Expand Up @@ -936,7 +956,7 @@ namespace memo
}
}

Paxos::LocalPeer::Decision&
std::shared_ptr<Paxos::LocalPeer::Decision>
Paxos::LocalPeer::_load_paxos(Address address,
Paxos::LocalPeer::Decision decision)
{
Expand All @@ -949,8 +969,28 @@ namespace memo
ELLE_DUMP("schedule %f for rebalancing after load", address);
this->_rebalancable.emplace(address, false);
}
return this->_addresses.emplace(
address, std::move(decision)).first->second;
auto ir = this->_addresses.insert(DecisionEntry{
address,
elle::Clock::now(),
std::make_shared<Decision>(std::move(decision))
});
ELLE_ASSERT(ir.second);
auto res = ir.first->decision;
auto it = this->_addresses.get<1>().begin();
for (int i = 0;
i < signed(this->_addresses.size()) - this->_max_addresses_size;
++i)
// Don't unload blocks if they are in use, as they could be reloaded
// into _addresses in the meantime and be duplicated, entailing a
// local split brain.
if (it->decision.use_count() == 1)
{
ELLE_DEBUG(
"dropping cache entry for %f with use index %s at state %s",
it->address, it->use, it->decision->paxos);
it = this->_addresses.get<1>().erase(it);
}
return res;
}

void
Expand Down Expand Up @@ -1198,7 +1238,7 @@ namespace memo
if (it == this->_addresses.end())
// The block was deleted in the meantime.
continue;
auto quorum = it->second.paxos.current_quorum();
auto quorum = it->decision->paxos.current_quorum();
// We can't actually rebalance this block, under_represented
// was wrong. Don't think this can happen but better safe
// than sorry.
Expand Down Expand Up @@ -1249,11 +1289,11 @@ namespace memo
{
ELLE_TRACE_SCOPE("%s: get proposal at %f: %s%s",
*this, address, p, insert ? " (insert)" : "");
auto& decision = this->_load_paxos(
auto decision = this->_load_paxos(
address, insert ? boost::optional<PaxosServer::Quorum>(peers)
: boost::optional<PaxosServer::Quorum>());
auto res = decision.paxos.propose(peers, p);
BlockOrPaxos data(&decision);
auto res = decision->paxos.propose(peers, p);
BlockOrPaxos data(decision.get());
this->storage()->set(
address,
elle::serialization::binary::serialize(data,
Expand All @@ -1270,11 +1310,11 @@ namespace memo
{
try
{
auto& decision = this->_load_paxos(address);
auto decision = this->_load_paxos(address);
auto& dht = this->paxos().doughnut();
auto peers = Details::lookup_nodes(
dht,
decision.paxos.current_quorum(),
decision->paxos.current_quorum(),
address);
Paxos::PaxosClient client(address, std::move(peers));
client.state();
Expand Down Expand Up @@ -1322,8 +1362,8 @@ namespace memo
if (auto res = block->validate(this->doughnut(), true)); else
throw ValidationFailed(res.reason());
}
auto& decision = this->_load_paxos(address);
auto& paxos = decision.paxos;
auto decision = this->_load_paxos(address);
auto& paxos = decision->paxos;
if (block)
if (auto previous = paxos.current_value())
{
Expand All @@ -1336,7 +1376,7 @@ namespace memo
auto res = paxos.accept(std::move(peers), p, value);
{
ELLE_DEBUG_SCOPE("store accepted paxos");
BlockOrPaxos data(&decision);
BlockOrPaxos data(decision.get());
this->storage()->set(
address,
elle::serialization::binary::serialize(
Expand Down Expand Up @@ -1435,7 +1475,7 @@ namespace memo
boost::optional<int> local_version)
{
ELLE_TRACE_SCOPE("%s: get %f from %f", *this, address, peers);
auto res = this->_load_paxos(address).paxos.get(peers);
auto res = this->_load_paxos(address)->paxos.get(peers);
// Honor local_version
if (local_version && res &&
res->value.template is<std::shared_ptr<blocks::Block>>())
Expand All @@ -1460,11 +1500,11 @@ namespace memo
{
ELLE_TRACE_SCOPE("%s: propagate %f on %f at %f",
this, block->address(), q, p);
auto& decision = this->_load_paxos(block->address(), q, block);
decision.paxos.propose(q, p);
decision.paxos.accept(q, p, q);
decision.paxos.confirm(q, p);
BlockOrPaxos data(&decision);
auto decision = this->_load_paxos(block->address(), q, block);
decision->paxos.propose(q, p);
decision->paxos.accept(q, p, q);
decision->paxos.confirm(q, p);
BlockOrPaxos data(decision.get());
this->storage()->set(
block->address(),
elle::serialization::binary::serialize(
Expand Down Expand Up @@ -1563,101 +1603,19 @@ namespace memo
Paxos::LocalPeer::_fetch(Address address,
boost::optional<int> local_version) const
{
if (this->doughnut().version() >= elle::Version(0, 5, 0))
{
elle::serialization::Context context;
context.set<Doughnut*>(&this->doughnut());
context.set<elle::Version>(
elle_serialization_version(this->doughnut().version()));
auto data =
elle::serialization::binary::deserialize<BlockOrPaxos>(
this->storage()->get(address), true, context);
if (!data.block)
{
ELLE_TRACE("%s: plain fetch called on mutable block", *this);
elle::err("plain fetch called on mutable block %f", address);
}
return std::unique_ptr<blocks::Block>(data.block.release());
}
// Backward compatibility pre-0.5.0
auto decision = this->_addresses.find(address);
if (decision == this->_addresses.end())
try
{
elle::serialization::Context context;
context.set<Doughnut*>(&this->doughnut());
context.set<elle::Version>(
elle_serialization_version(this->doughnut().version()));
auto data =
elle::serialization::binary::deserialize<BlockOrPaxos>(
this->storage()->get(address), true, context);
if (data.block)
{
ELLE_DEBUG("loaded immutable block from storage");
return std::unique_ptr<blocks::Block>(data.block.release());
}
else
{
ELLE_DEBUG("loaded mutable block from storage");
decision = const_cast<LocalPeer*>(this)->_addresses.emplace(
address, std::move(*data.paxos)).first;
}
}
catch (silo::MissingKey const& e)
{
ELLE_TRACE("missing block %x", address);
throw MissingBlock(e.key());
}
else
ELLE_DEBUG("mutable block already loaded");
auto& paxos = decision->second.paxos;
if (auto highest = paxos.current_value())
{
auto version = highest->proposal.version;
if (decision->second.chosen == version
&& highest->value.is<std::shared_ptr<blocks::Block>>())
{
ELLE_DEBUG("return already chosen mutable block");
return highest->value.get<std::shared_ptr<blocks::Block>>()
->clone();
}
else
{
ELLE_TRACE_SCOPE(
"finalize running Paxos for version %s (last chosen %s)"
, version, decision->second.chosen);
auto block = highest->value;
auto peers = Details::lookup_nodes(
this->doughnut(), paxos.quorum(), address);
if (peers.empty())
elle::err("no peer available for fetching %f", address);
Paxos::PaxosClient client(this->doughnut().id(),
std::move(peers));
auto chosen = client.choose(version, block);
// FIXME: factor with the end of doughnut::Local::store
ELLE_DEBUG("%s: store chosen block", *this)
elle::unconst(decision->second).chosen = version;
{
BlockOrPaxos data(const_cast<Decision*>(&decision->second));
this->storage()->set(
address,
elle::serialization::binary::serialize(
data,
this->doughnut().version()),
true, true);
}
// ELLE_ASSERT(block.unique());
// FIXME: Don't clone, it's useless, find a way to steal
// ownership from the shared_ptr.
return block.get<std::shared_ptr<blocks::Block>>()->clone();
}
}
else
elle::serialization::Context context;
context.set<Doughnut*>(&this->doughnut());
context.set<elle::Version>(
elle_serialization_version(this->doughnut().version()));
auto data =
elle::serialization::binary::deserialize<BlockOrPaxos>(
this->storage()->get(address), true, context);
if (!data.block)
{
ELLE_TRACE("%s: block has running Paxos but no value: %x",
*this, address);
throw MissingBlock(address);
ELLE_TRACE("%s: plain fetch called on mutable block", *this);
elle::err("plain fetch called on mutable block %f", address);
}
return std::unique_ptr<blocks::Block>(data.block.release());
}

void
Expand Down
28 changes: 24 additions & 4 deletions src/memo/model/doughnut/consensus/Paxos.hh
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,39 @@ namespace memo
boost::optional<int> local_version) const override;
void
_register_rpcs(Connection& rpcs) override;
using Addresses = elle::unordered_map<Address, Decision>;
ELLE_ATTRIBUTE(Addresses, addresses);
struct DecisionEntry
{
Address address;
elle::Time use;
std::shared_ptr<Decision> decision;
};
using Addresses = bmi::multi_index_container<
DecisionEntry,
bmi::indexed_by<
bmi::hashed_unique<
bmi::member<
DecisionEntry,
Address,
&DecisionEntry::address>>,
bmi::ordered_unique<
bmi::member<
DecisionEntry,
elle::Time,
&DecisionEntry::use>>
>>;
ELLE_ATTRIBUTE_R(Addresses, addresses);
ELLE_ATTRIBUTE_R(int, max_addresses_size);
private:
void
_remove(Address address);
BlockOrPaxos
_load(Address address);
Decision&
std::shared_ptr<Decision>
_load_paxos(
Address address,
boost::optional<PaxosServer::Quorum> peers = {},
std::shared_ptr<blocks::Block> value = nullptr);
Decision&
std::shared_ptr<Decision>
_load_paxos(Address address, Decision decision);
void
_cache(Address address, bool immutable, Quorum quorum);
Expand Down
1 change: 1 addition & 0 deletions src/memo/model/doughnut/consensus/Paxos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace memo
, _rebalance_auto_expand(rebalance_auto_expand)
, _rebalance_inspect(rebalance_inspect)
, _node_timeout(node_timeout)
, _max_addresses_size(elle::os::getenv("MEMO_PAXOS_CACHE_SIZE", 100))
, _rebalancable()
, _rebalanced()
, _rebalance_thread(elle::sprintf("%s: rebalance", this),
Expand Down
Loading

0 comments on commit 2ad25f8

Please sign in to comment.