Skip to content

Commit

Permalink
Garbage collect (req_id, seq_id) pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
rescrv committed Jan 7, 2013
1 parent dd563fb commit dbda322
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 11 deletions.
12 changes: 12 additions & 0 deletions common/configuration.cc
Expand Up @@ -90,6 +90,18 @@ configuration :: version() const
return m_version; return m_version;
} }


void
configuration :: get_all_addresses(std::vector<std::pair<server_id, po6::net::location> >* addrs)
{
addrs->resize(m_addresses_by_server_id.size());

for (size_t i = 0; i < m_addresses_by_server_id.size(); ++i)
{
(*addrs)[i].first = server_id(m_addresses_by_server_id[i].first);
(*addrs)[i].second = m_addresses_by_server_id[i].second;
}
}

po6::net::location po6::net::location
configuration :: get_address(const server_id& id) const configuration :: get_address(const server_id& id) const
{ {
Expand Down
1 change: 1 addition & 0 deletions common/configuration.h
Expand Up @@ -62,6 +62,7 @@ class configuration


// membership metadata // membership metadata
public: public:
void get_all_addresses(std::vector<std::pair<server_id, po6::net::location> >* addrs);
po6::net::location get_address(const server_id& id) const; po6::net::location get_address(const server_id& id) const;
region_id get_region_id(const virtual_server_id& id) const; region_id get_region_id(const virtual_server_id& id) const;
server_id get_server_id(const virtual_server_id& id) const; server_id get_server_id(const virtual_server_id& id) const;
Expand Down
9 changes: 9 additions & 0 deletions common/counter_map.cc
Expand Up @@ -74,6 +74,15 @@ counter_map :: adopt(const std::vector<region_id>& ris)
tmp.swap(m_counters); tmp.swap(m_counters);
} }


void
counter_map :: peek(std::map<region_id, uint64_t>* ris)
{
for (size_t i = 0; i < m_counters.size(); ++i)
{
(*ris)[m_counters[i].first] = m_counters[i].second;
}
}

bool bool
counter_map :: lookup(const region_id& ri, uint64_t* count) counter_map :: lookup(const region_id& ri, uint64_t* count)
{ {
Expand Down
6 changes: 4 additions & 2 deletions common/counter_map.h
Expand Up @@ -29,14 +29,15 @@
#define hyperdex_common_counter_map_h_ #define hyperdex_common_counter_map_h_


// STL // STL
#include <map>
#include <utility> #include <utility>
#include <vector> #include <vector>


// HyperDex // HyperDex
#include "common/ids.h" #include "common/ids.h"


// The only thread-safe call is "lookup". "adopt" and "take_max" both require // The only thread-safe call is "lookup". "adopt", "peek", and "take_max" all
// external synchronization. // require external synchronization.


namespace hyperdex namespace hyperdex
{ {
Expand All @@ -49,6 +50,7 @@ class counter_map


public: public:
void adopt(const std::vector<region_id>& ris); void adopt(const std::vector<region_id>& ris);
void peek(std::map<region_id, uint64_t>* ris);
bool lookup(const region_id& ri, uint64_t* count); bool lookup(const region_id& ri, uint64_t* count);
bool take_max(const region_id& ri, uint64_t count); bool take_max(const region_id& ri, uint64_t count);


Expand Down
1 change: 1 addition & 0 deletions common/network_msgtype.cc
Expand Up @@ -52,6 +52,7 @@ hyperdex :: operator << (std::ostream& lhs, const network_msgtype& rhs)
STRINGIFY(CHAIN_OP); STRINGIFY(CHAIN_OP);
STRINGIFY(CHAIN_SUBSPACE); STRINGIFY(CHAIN_SUBSPACE);
STRINGIFY(CHAIN_ACK); STRINGIFY(CHAIN_ACK);
STRINGIFY(CHAIN_GC);
STRINGIFY(XFER_OP); STRINGIFY(XFER_OP);
STRINGIFY(XFER_ACK); STRINGIFY(XFER_ACK);
STRINGIFY(CONFIGMISMATCH); STRINGIFY(CONFIGMISMATCH);
Expand Down
1 change: 1 addition & 0 deletions common/network_msgtype.h
Expand Up @@ -60,6 +60,7 @@ enum network_msgtype
CHAIN_OP = 64, CHAIN_OP = 64,
CHAIN_SUBSPACE = 65, CHAIN_SUBSPACE = 65,
CHAIN_ACK = 66, CHAIN_ACK = 66,
CHAIN_GC = 67,


XFER_OP = 80, XFER_OP = 80,
XFER_ACK = 81, XFER_ACK = 81,
Expand Down
62 changes: 61 additions & 1 deletion daemon/communication.cc
Expand Up @@ -25,6 +25,8 @@
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE. // POSSIBILITY OF SUCH DAMAGE.


#define __STDC_LIMIT_MACROS

#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include "config.h" #include "config.h"
#endif #endif
Expand Down Expand Up @@ -197,6 +199,63 @@ communication :: send_client(const virtual_server_id& from,
return true; return true;
} }


bool
communication :: send(const virtual_server_id& from,
const server_id& to,
network_msgtype msg_type,
std::auto_ptr<e::buffer> msg)
{
assert(msg->size() >= HYPERDEX_HEADER_SIZE_VV);

if (m_daemon->m_us != m_daemon->m_config.get_server_id(from))
{
return false;
}

uint8_t mt = static_cast<uint8_t>(msg_type);
uint8_t flags = 1;
virtual_server_id vto(UINT64_MAX);
msg->pack_at(BUSYBEE_HEADER_SIZE) << mt << flags << m_daemon->m_config.version() << vto.get() << from.get();

if (to == server_id())
{
return false;
}

#ifdef HD_LOG_ALL_MESSAGES
LOG(INFO) << "SEND " << from << "->" << to << " " << msg_type << " " << msg->hex();
#endif

if (to == m_daemon->m_us)
{
m_busybee->deliver(to.get(), msg);
}
else
{
busybee_returncode rc = m_busybee->send(to.get(), msg);

switch (rc)
{
case BUSYBEE_SUCCESS:
break;
case BUSYBEE_DISRUPTED:
handle_disruption(to.get());
return false;
case BUSYBEE_SHUTDOWN:
case BUSYBEE_POLLFAILED:
case BUSYBEE_ADDFDFAIL:
case BUSYBEE_TIMEOUT:
case BUSYBEE_EXTERNAL:
case BUSYBEE_INTERRUPTED:
default:
LOG(ERROR) << "BusyBee unexpectedly returned " << rc;
return false;
}
}

return true;
}

bool bool
communication :: send(const virtual_server_id& from, communication :: send(const virtual_server_id& from,
const virtual_server_id& vto, const virtual_server_id& vto,
Expand Down Expand Up @@ -428,7 +487,8 @@ communication :: recv(server_id* from,
} }


bool from_valid = true; bool from_valid = true;
bool to_valid = m_daemon->m_us == m_daemon->m_config.get_server_id(virtual_server_id(vidt)); bool to_valid = m_daemon->m_us == m_daemon->m_config.get_server_id(*vto) ||
*vto == virtual_server_id(UINT64_MAX);


// If this is a virtual-virtual message // If this is a virtual-virtual message
if ((flags & 0x1)) if ((flags & 0x1))
Expand Down
5 changes: 5 additions & 0 deletions daemon/communication.h
Expand Up @@ -59,6 +59,7 @@
+ sizeof(uint64_t) /*config version*/ \ + sizeof(uint64_t) /*config version*/ \
+ sizeof(uint64_t) /*virt to*/ \ + sizeof(uint64_t) /*virt to*/ \
+ sizeof(uint64_t) /*virt from*/) + sizeof(uint64_t) /*virt from*/)
#define HYPERDEX_HEADER_SIZE_VS HYPERDEX_HEADER_SIZE_VV


namespace hyperdex namespace hyperdex
{ {
Expand Down Expand Up @@ -95,6 +96,10 @@ class communication
const server_id& to, const server_id& to,
network_msgtype msg_type, network_msgtype msg_type,
std::auto_ptr<e::buffer> msg); std::auto_ptr<e::buffer> msg);
bool send(const virtual_server_id& from,
const server_id& to,
network_msgtype msg_type,
std::auto_ptr<e::buffer> msg);
bool send(const virtual_server_id& from, bool send(const virtual_server_id& from,
const virtual_server_id& to, const virtual_server_id& to,
network_msgtype msg_type, network_msgtype msg_type,
Expand Down
9 changes: 9 additions & 0 deletions daemon/coordinator_link.cc
Expand Up @@ -213,6 +213,7 @@ coordinator_link :: register_id(server_id us, const po6::net::location& bind_to)
} }


extern bool s_continue; extern bool s_continue;
extern bool s_alarm;


bool bool
coordinator_link :: wait_for_config(configuration* config) coordinator_link :: wait_for_config(configuration* config)
Expand All @@ -222,6 +223,14 @@ coordinator_link :: wait_for_config(configuration* config)


while (s_continue) while (s_continue)
{ {
if (s_alarm)
{
alarm(30);
s_alarm = false;
m_daemon->m_repl.trip_periodic();
need_to_backoff = false;
}

if (need_to_backoff) if (need_to_backoff)
{ {
LOG(ERROR) << "connection to the coordinator failed; retrying in " << retry / 1000000. << " milliseconds"; LOG(ERROR) << "connection to the coordinator failed; retrying in " << retry / 1000000. << " milliseconds";
Expand Down
50 changes: 50 additions & 0 deletions daemon/daemon.cc
Expand Up @@ -47,6 +47,7 @@
using hyperdex::daemon; using hyperdex::daemon;


bool s_continue = true; bool s_continue = true;
bool s_alarm = false;


static void static void
exit_on_signal(int /*signum*/) exit_on_signal(int /*signum*/)
Expand All @@ -55,6 +56,12 @@ exit_on_signal(int /*signum*/)
s_continue = false; s_continue = false;
} }


static void
handle_alarm(int /*signum*/)
{
s_alarm = true;
}

static void static void
dummy(int /*signum*/) dummy(int /*signum*/)
{ {
Expand Down Expand Up @@ -132,12 +139,33 @@ daemon :: run(bool daemonize,
return EXIT_FAILURE; return EXIT_FAILURE;
} }


if (!install_signal_handler(SIGALRM, handle_alarm))
{
std::cerr << "could not install SIGUSR1 handler; exiting" << std::endl;
return EXIT_FAILURE;
}

if (!install_signal_handler(SIGUSR1, dummy)) if (!install_signal_handler(SIGUSR1, dummy))
{ {
std::cerr << "could not install SIGUSR1 handler; exiting" << std::endl; std::cerr << "could not install SIGUSR1 handler; exiting" << std::endl;
return EXIT_FAILURE; return EXIT_FAILURE;
} }


sigset_t ss;

if (sigfillset(&ss) < 0)
{
PLOG(ERROR) << "sigfillset";
return EXIT_FAILURE;
}

if (pthread_sigmask(SIG_BLOCK, &ss, NULL) < 0)
{
PLOG(ERROR) << "could not block signals";
return EXIT_FAILURE;
}

alarm(30);
google::LogToStderr(); google::LogToStderr();
bool saved = false; bool saved = false;
server_id saved_us; server_id saved_us;
Expand Down Expand Up @@ -362,6 +390,9 @@ daemon :: loop()
case CHAIN_ACK: case CHAIN_ACK:
process_chain_ack(from, vfrom, vto, msg, up); process_chain_ack(from, vfrom, vto, msg, up);
break; break;
case CHAIN_GC:
process_chain_gc(from, vfrom, vto, msg, up);
break;
case XFER_OP: case XFER_OP:
process_xfer_op(from, vfrom, vto, msg, up); process_xfer_op(from, vfrom, vto, msg, up);
break; break;
Expand Down Expand Up @@ -631,6 +662,25 @@ daemon :: process_chain_subspace(server_id,
m_repl.chain_subspace(vfrom, vto, retransmission, region_id(reg_id), seq_id, version, msg, key, value, hashes); m_repl.chain_subspace(vfrom, vto, retransmission, region_id(reg_id), seq_id, version, msg, key, value, hashes);
} }


void
daemon :: process_chain_gc(server_id,
virtual_server_id vfrom,
virtual_server_id,
std::auto_ptr<e::buffer> msg,
e::unpacker up)
{
uint64_t seq_id;

if ((up >> seq_id).error())
{
LOG(WARNING) << "unpack of CHAIN_GC failed; here's some hex: " << msg->hex();
return;
}

region_id ri = m_config.get_region_id(vfrom);
m_repl.chain_gc(ri, seq_id);
}

void void
daemon :: process_chain_ack(server_id, daemon :: process_chain_ack(server_id,
virtual_server_id vfrom, virtual_server_id vfrom,
Expand Down
1 change: 1 addition & 0 deletions daemon/daemon.h
Expand Up @@ -81,6 +81,7 @@ class daemon
void process_chain_op(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up); void process_chain_op(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up);
void process_chain_subspace(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up); void process_chain_subspace(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up);
void process_chain_ack(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up); void process_chain_ack(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up);
void process_chain_gc(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up);
void process_xfer_op(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up); void process_xfer_op(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up);
void process_xfer_ack(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up); void process_xfer_ack(server_id from, virtual_server_id vfrom, virtual_server_id vto, std::auto_ptr<e::buffer> msg, e::unpacker up);


Expand Down

0 comments on commit dbda322

Please sign in to comment.