Permalink
Browse files

Introduce a dependence upon BusyBee.

BusyBee is an optimized version of the HyperDex event loop.
  • Loading branch information...
rescrv committed May 11, 2012
1 parent 01259fa commit 9e7884d7a3869ddb7d57da57e05365e8e97db08b
Showing with 55 additions and 970 deletions.
  1. +1 −3 Makefile.am
  2. +46 −32 hyperdaemon/logical.cc
  3. +8 −7 hyperdaemon/logical.h
  4. +0 −746 hyperdaemon/physical.cc
  5. +0 −182 hyperdaemon/physical.h
View
@@ -336,7 +336,6 @@ libhyperdaemon_noinst_headers = \
hyperdaemon/logical.h \
hyperdaemon/network_worker.h \
hyperdaemon/ongoing_state_transfers.h \
- hyperdaemon/physical.h \
hyperdaemon/replication/clientop.h \
hyperdaemon/replication/keypair.h \
hyperdaemon/replication_manager.cc \
@@ -354,7 +353,6 @@ libhyperdaemon_la_SOURCES = \
hyperdaemon/logical.cc \
hyperdaemon/network_worker.cc \
hyperdaemon/ongoing_state_transfers.cc \
- hyperdaemon/physical.cc \
hyperdaemon/replication_manager.cc \
hyperdaemon/runtimeconfig.cc \
hyperdaemon/searches.cc
@@ -509,7 +507,7 @@ hyperdex_daemon_LDADD = \
libhyperdisk.la \
libhyperdaemon.la \
$(COVERAGE_LDADD) \
- -lpopt -lglog -lrt
+ -lbusybee-mta -lpopt -lglog -lrt
hyperdex_daemon_CPPFLAGS = \
$(PO6_CFLAGS) \
$(CPPFLAGS)
View
@@ -41,6 +41,9 @@
// Google Log
#include <glog/logging.h>
+// BusyBee
+#include <busybee_constants.h>
+
// HyperDex
#include "hyperdex/hyperdex/coordinatorlink.h"
@@ -102,12 +105,12 @@ hyperdaemon :: logical :: logical(coordinatorlink* cl, const po6::net::ipaddr& i
, m_client_nums()
, m_client_locs()
, m_client_counter(0)
- , m_physical(ip, incoming, outgoing, true, num_threads)
+ , m_busybee(ip, incoming, outgoing, num_threads)
{
- assert(m_physical.inbound().address == m_physical.outbound().address);
- m_us.address = m_physical.inbound().address;
- m_us.inbound_port = m_physical.inbound().port;
- m_us.outbound_port = m_physical.outbound().port;
+ assert(m_busybee.inbound().address == m_busybee.outbound().address);
+ m_us.address = m_busybee.inbound().address;
+ m_us.inbound_port = m_busybee.inbound().port;
+ m_us.outbound_port = m_busybee.outbound().port;
m_us.inbound_version = 0;
m_us.outbound_version = 0;
}
@@ -121,7 +124,7 @@ typedef std::map<hyperdex::entityid, hyperdex::instance>::iterator mapiter;
size_t
hyperdaemon :: logical :: header_size() const
{
- return sizeof(uint8_t) + m_physical.header_size() + sizeof(uint64_t) + 2 * sizeof(uint16_t) + 2 * entityid::SERIALIZEDSIZE;
+ return sizeof(uint8_t) + BUSYBEE_HEADER_SIZE + sizeof(uint64_t) + 2 * sizeof(uint16_t) + 2 * entityid::SERIALIZEDSIZE;
}
void
@@ -144,7 +147,7 @@ hyperdaemon :: logical :: reconfigure(const configuration& newconfig,
{
if (em.config_version <= m_config.version())
{
- m_physical.deliver(em.loc, em.msg);
+ m_busybee.deliver(em.loc, em.msg);
}
else
{
@@ -216,36 +219,41 @@ hyperdaemon :: logical :: send(const hyperdex::entityid& from,
uint8_t mt = static_cast<uint8_t>(msg_type);
assert(msg->capacity() >= header_size());
- msg->pack_at(m_physical.header_size())
+ msg->pack_at(BUSYBEE_HEADER_SIZE)
<< mt << m_config.version() << src.outbound_version << dst.inbound_version << from << to;
if (dst == m_us)
{
- m_physical.deliver(po6::net::location(m_us.address, m_us.outbound_port), msg);
+ m_busybee.deliver(po6::net::location(m_us.address, m_us.outbound_port), msg);
}
else
{
po6::net::location loc(dst.address, dst.inbound_port);
- switch (m_physical.send(loc, msg))
+ switch (m_busybee.send(loc, msg))
{
- case physical::SUCCESS:
- case physical::QUEUED:
+ case BUSYBEE_SUCCESS:
+ case BUSYBEE_QUEUED:
break;
- case physical::CONNECTFAIL:
- handle_connectfail(loc);
+ case BUSYBEE_SHUTDOWN:
+ LOG(ERROR) << "busybee unexpectedly returned SHUTDOWN";
+ return false;
+ case BUSYBEE_POLLFAILED:
+ PLOG(ERROR) << "busybee unexpectedly returned POLLFAILED";
return false;
- case physical::DISCONNECT:
+ case BUSYBEE_DISCONNECT:
handle_disconnect(loc);
return false;
- case physical::SHUTDOWN:
- LOG(ERROR) << "physical::recv unexpectedly returned SHUTDOWN.";
+ case BUSYBEE_CONNECTFAIL:
+ handle_connectfail(loc);
+ return false;
+ case BUSYBEE_ADDFDFAIL:
+ handle_connectfail(loc);
return false;
- case physical::LOGICERROR:
- LOG(ERROR) << "physical::recv unexpectedly returned LOGICERROR.";
+ case BUSYBEE_BUFFERFULL:
return false;
default:
- LOG(ERROR) << "physical::recv unexpectedly returned unknown state.";
+ LOG(ERROR) << "busybee wandered into bad state, and returned garbage";
return false;
}
}
@@ -277,26 +285,32 @@ hyperdaemon :: logical :: recv(hyperdex::entityid* from,
// - The message is to the correct version of our port bindings.
while (true)
{
- switch(m_physical.recv(&loc, msg))
+ switch(m_busybee.recv(&loc, msg))
{
- case physical::SHUTDOWN:
- return false;
- case physical::SUCCESS:
+ case BUSYBEE_SUCCESS:
break;
- case physical::DISCONNECT:
+ case BUSYBEE_SHUTDOWN:
+ return false;
+ case BUSYBEE_QUEUED:
+ LOG(ERROR) << "busybee unexpectedly returned QUEUED";
+ continue;
+ case BUSYBEE_POLLFAILED:
+ PLOG(ERROR) << "busybee unexpectedly returned POLLFAILED";
+ continue;
+ case BUSYBEE_DISCONNECT:
handle_disconnect(loc);
continue;
- case physical::CONNECTFAIL:
+ case BUSYBEE_CONNECTFAIL:
handle_connectfail(loc);
continue;
- case physical::QUEUED:
- LOG(ERROR) << "physical::recv unexpectedly returned QUEUED.";
+ case BUSYBEE_ADDFDFAIL:
+ PLOG(ERROR) << "busybee unexpectedly returned ADDFDFAIL";
continue;
- case physical::LOGICERROR:
- LOG(ERROR) << "physical::recv unexpectedly returned LOGICERROR.";
+ case BUSYBEE_BUFFERFULL:
+ LOG(ERROR) << "busybee unexpectedly returned BUFFERFULL";
continue;
default:
- LOG(ERROR) << "physical::recv unexpectedly returned unknown state.";
+ LOG(ERROR) << "busybee unexpectedly returned unknown state.";
continue;
}
@@ -389,7 +403,7 @@ hyperdaemon :: logical :: recv(hyperdex::entityid* from,
mt = static_cast<uint8_t>(hyperdex::CONFIGMISMATCH);
(*msg)->pack_at(sizeof(uint32_t))
<< mt << m_config.version() << tover << fromver << *to << *from;
- m_physical.send(loc, *msg);
+ m_busybee.send(loc, *msg);
}
// Otherwise, it's an early arrival. We should postpone it, because it
// could become valid in the future.
View
@@ -37,16 +37,17 @@
// e
#include <e/buffer.h>
+#include <e/lockfree_fifo.h>
#include <e/lockfree_hash_map.h>
+// BusyBee
+#include <busybee_mta.h>
+
// HyperDex
#include "hyperdex/hyperdex/configuration.h"
#include "hyperdex/hyperdex/instance.h"
#include "hyperdex/hyperdex/network_constants.h"
-// HyperDaemon
-#include "hyperdaemon/physical.h"
-
// Forward Declarations
namespace hyperdex
{
@@ -78,9 +79,9 @@ class logical
// Pause/unpause or completely stop recv of messages. Paused threads will
// not hold locks, and therefore will not pose risk of deadlock.
public:
- void pause() { m_physical.pause(); }
- void unpause() { m_physical.unpause(); }
- void shutdown() { m_physical.shutdown(); }
+ void pause() { m_busybee.pause(); }
+ void unpause() { m_busybee.unpause(); }
+ void shutdown() { m_busybee.shutdown(); }
// Send and recv messages.
public:
@@ -117,7 +118,7 @@ class logical
e::lockfree_hash_map<po6::net::location, uint64_t, po6::net::location::hash> m_client_nums;
e::lockfree_hash_map<uint64_t, po6::net::location, id> m_client_locs;
uint64_t m_client_counter;
- physical m_physical;
+ busybee_mta m_busybee;
};
} // namespace hyperdaemon
Oops, something went wrong.

0 comments on commit 9e7884d

Please sign in to comment.