Skip to content

Commit

Permalink
WIP - Ugly mess of stuff used for Diego's dissertation
Browse files Browse the repository at this point in the history
This is a big rats' nest of changes, none of which are complete or
tested enough to go into master quite yet. I used something close to
this to produce the results in my dissertation, modulo a few flags and
options.

I don't expect anyone to be able to run this code, nor will I support
it. I'm uploading now so that I can later take the good bits and pieces
from here, clean them up, and commit to master in a more sane form. It
was never meant to be public, but it doesn't seem particularly
incriminating.

These changes are (c) 2014 Stanford University. Since I've graduated,
this will likely be my last commit on their behalf.
  • Loading branch information
ongardie committed Nov 19, 2014
1 parent 4f74c25 commit a7ce12d
Show file tree
Hide file tree
Showing 32 changed files with 2,020 additions and 89 deletions.
4 changes: 4 additions & 0 deletions Client/LeaderRPC.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -74,14 +74,18 @@ LeaderRPC::call(OpCode opCode,
case Status::OK: case Status::OK:
return; return;
case Status::SERVICE_SPECIFIC_ERROR: case Status::SERVICE_SPECIFIC_ERROR:
WARNING("service specific error");
handleServiceSpecificError(cachedSession, handleServiceSpecificError(cachedSession,
serviceSpecificError); serviceSpecificError);
break; break;
case Status::RPC_FAILED: case Status::RPC_FAILED:
WARNING("RPC failed");
// If the session is broken, get a new one and try again. // If the session is broken, get a new one and try again.
connectRandom(cachedSession); connectRandom(cachedSession);
break; break;
} }
WARNING("retry RPC");
usleep(10000);
} }
} }


Expand Down
2 changes: 1 addition & 1 deletion Core/ThreadId.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ setName(const std::string& name)
Internal::threadNames[id] = name; Internal::threadNames[id] = name;
// set system thread name, useful for gdb // set system thread name, useful for gdb
// name is truncated at first 16 characters // name is truncated at first 16 characters
prctl(PR_SET_NAME, name.c_str(), 0, 0, 0); //prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
} }


std::string std::string
Expand Down
5 changes: 5 additions & 0 deletions Protocol/Raft.proto
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ message Entry {
* The term in which the entry was first created. * The term in which the entry was first created.
*/ */
required uint64 term = 1; required uint64 term = 1;
/**
* The index for the entry. It's not used over the network, but it's here
* because some storage backends might want to use it.
*/
optional uint64 index = 5;
/** /**
* See EntryType. * See EntryType.
*/ */
Expand Down
4 changes: 3 additions & 1 deletion RPC/ClientRPC.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ ClientRPC::waitForReply(google::protobuf::Message* response,
{ {
opaqueRPC.waitForReply(); opaqueRPC.waitForReply();
std::string error = opaqueRPC.getErrorMessage(); std::string error = opaqueRPC.getErrorMessage();
if (!error.empty()) if (!error.empty()) {
WARNING("RPC failed: %s", error.c_str());
return Status::RPC_FAILED; return Status::RPC_FAILED;
}
const Buffer& responseBuffer = *opaqueRPC.peekReply(); const Buffer& responseBuffer = *opaqueRPC.peekReply();


// Extract the response's status field. // Extract the response's status field.
Expand Down
6 changes: 3 additions & 3 deletions RPC/ClientSession.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* *
* TODO(ongaro): How does this interact with TCP? * TODO(ongaro): How does this interact with TCP?
*/ */
enum { TIMEOUT_MS = 100 }; enum { TIMEOUT_MS = 1000 };


/** /**
* A message ID reserved for ping messages used to check the server's liveness. * A message ID reserved for ping messages used to check the server's liveness.
Expand Down Expand Up @@ -161,12 +161,12 @@ ClientSession::Timer::handleTimerEvent()


// Send a ping or expire the session. // Send a ping or expire the session.
if (!session.activePing) { if (!session.activePing) {
VERBOSE("ClientSession is suspicious. Sending ping."); WARNING("ClientSession is suspicious. Sending ping.");
session.activePing = true; session.activePing = true;
session.messageSocket->sendMessage(PING_MESSAGE_ID, Buffer()); session.messageSocket->sendMessage(PING_MESSAGE_ID, Buffer());
schedule(TIMEOUT_MS * 1000 * 1000); schedule(TIMEOUT_MS * 1000 * 1000);
} else { } else {
VERBOSE("ClientSession to %s timed out.", PANIC("ClientSession to %s timed out.",
session.address.toString().c_str()); session.address.toString().c_str());
// Fail all current and future RPCs. // Fail all current and future RPCs.
session.errorMessage = ("Server " + session.errorMessage = ("Server " +
Expand Down
38 changes: 21 additions & 17 deletions RPC/MessageSocket.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@


#include <cassert> #include <cassert>
#include <errno.h> #include <errno.h>
#include <string.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <string.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
Expand Down Expand Up @@ -52,14 +52,17 @@ MessageSocket::SendSocket::SendSocket(Event::Loop& eventLoop,
: Event::File(eventLoop, fd, 0) : Event::File(eventLoop, fd, 0)
, messageSocket(messageSocket) , messageSocket(messageSocket)
{ {
#if 1
int flag = 1; int flag = 1;
int r = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); int result = setsockopt(fd, /* socket affected */
if (r < 0) { IPPROTO_TCP, /* set option at TCP level */
// This should be a warning, but some unit tests pass weird types of TCP_NODELAY, /* name of option */
// file descriptors in here. It's not very important, anyhow. (char *) &flag, /* the cast is historical
NOTICE("Could not set TCP_NODELAY flag on sending socket %d: %s", cruft */
fd, strerror(errno)); sizeof(int)); /* length of option value */
} if (result < 0)
WARNING("could not set TCP_NODELAY");
#endif
} }


MessageSocket::SendSocket::~SendSocket() MessageSocket::SendSocket::~SendSocket()
Expand All @@ -80,16 +83,17 @@ MessageSocket::ReceiveSocket::ReceiveSocket(Event::Loop& eventLoop,
: Event::File(eventLoop, fd, EPOLLIN) : Event::File(eventLoop, fd, EPOLLIN)
, messageSocket(messageSocket) , messageSocket(messageSocket)
{ {
// I don't know that TCP_NODELAY has any effect if we're only reading from #if 1
// this file descriptor, but I guess it can't hurt.
int flag = 1; int flag = 1;
int r = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); int result = setsockopt(fd, /* socket affected */
if (r < 0) { IPPROTO_TCP, /* set option at TCP level */
// This should be a warning, but some unit tests pass weird types of TCP_NODELAY, /* name of option */
// file descriptors in here. It's not very important, anyhow. (char *) &flag, /* the cast is historical
NOTICE("Could not set TCP_NODELAY flag on receiving socket %d: %s", cruft */
fd, strerror(errno)); sizeof(int)); /* length of option value */
} if (result < 0)
WARNING("could not set TCP_NODELAY");
#endif
} }


MessageSocket::ReceiveSocket::~ReceiveSocket() MessageSocket::ReceiveSocket::~ReceiveSocket()
Expand Down
9 changes: 7 additions & 2 deletions RPC/Server.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ Server::registerService(uint16_t serviceId,
uint32_t maxThreads) uint32_t maxThreads)
{ {
std::unique_lock<std::mutex> lockGuard(mutex); std::unique_lock<std::mutex> lockGuard(mutex);
services[serviceId] = #if 0
std::make_shared<ThreadDispatchService>(service, 0, maxThreads); if (serviceId == 1) // client TODO
services[serviceId] = service;
else
#endif
services[serviceId] =
std::make_shared<ThreadDispatchService>(service, 0, maxThreads);
} }


void void
Expand Down
1 change: 1 addition & 0 deletions RPC/Server.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Server : public OpaqueServer {
* Maps from service IDs to ThreadDispatchService instances. * Maps from service IDs to ThreadDispatchService instances.
* Protected by #mutex. * Protected by #mutex.
*/ */
public:
std::unordered_map<uint16_t, std::shared_ptr<Service>> services; std::unordered_map<uint16_t, std::shared_ptr<Service>> services;


// Server is non-copyable. // Server is non-copyable.
Expand Down
9 changes: 8 additions & 1 deletion RPC/Service.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
#define LOGCABIN_RPC_SERVICE_H #define LOGCABIN_RPC_SERVICE_H


namespace LogCabin { namespace LogCabin {

namespace Server {
class Globals;
}

namespace RPC { namespace RPC {


// forward declaration // forward declaration
Expand All @@ -31,7 +36,7 @@ class Service {
/** /**
* Constructor. * Constructor.
*/ */
Service() {} Service() : globals(NULL) {}


/** /**
* Destructor. * Destructor.
Expand All @@ -52,6 +57,8 @@ class Service {
*/ */
virtual std::string getName() const = 0; virtual std::string getName() const = 0;


Server::Globals* globals;

// Service is non-copyable. // Service is non-copyable.
Service(const Service&) = delete; Service(const Service&) = delete;
Service& operator=(const Service&) = delete; Service& operator=(const Service&) = delete;
Expand Down
6 changes: 6 additions & 0 deletions RPC/ThreadDispatchService.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "Core/StringUtil.h" #include "Core/StringUtil.h"
#include "Core/ThreadId.h" #include "Core/ThreadId.h"
#include "RPC/ThreadDispatchService.h" #include "RPC/ThreadDispatchService.h"
#include "Server/Globals.h"
#include "Server/RaftConsensus.h"


namespace LogCabin { namespace LogCabin {
namespace RPC { namespace RPC {
Expand Down Expand Up @@ -68,6 +70,8 @@ ThreadDispatchService::handleRPC(ServerRPC serverRPC)
{ {
std::unique_lock<std::mutex> lockGuard(mutex); std::unique_lock<std::mutex> lockGuard(mutex);
assert(!exit); assert(!exit);
if (globals != NULL)
++globals->raft->dispatchQueue;
rpcQueue.push(std::move(serverRPC)); rpcQueue.push(std::move(serverRPC));
if (numFreeWorkers == 0 && threads.size() < maxThreads) if (numFreeWorkers == 0 && threads.size() < maxThreads)
threads.emplace_back(&ThreadDispatchService::workerMain, this); threads.emplace_back(&ThreadDispatchService::workerMain, this);
Expand Down Expand Up @@ -99,6 +103,8 @@ ThreadDispatchService::workerMain()
return; return;
rpc = std::move(rpcQueue.front()); rpc = std::move(rpcQueue.front());
rpcQueue.pop(); rpcQueue.pop();
if (globals != NULL)
--globals->raft->dispatchQueue;
} }
// execute RPC handler // execute RPC handler
threadSafeService->handleRPC(std::move(rpc)); threadSafeService->handleRPC(std::move(rpc));
Expand Down
5 changes: 3 additions & 2 deletions SConstruct
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ opts.AddVariables(
("BUILDTYPE", "Build type (RELEASE or DEBUG)", "DEBUG"), ("BUILDTYPE", "Build type (RELEASE or DEBUG)", "DEBUG"),
("VERBOSE", "Show full build information (0 or 1)", "0"), ("VERBOSE", "Show full build information (0 or 1)", "0"),
("NUMCPUS", "Number of CPUs to use for build (0 means auto).", "0"), ("NUMCPUS", "Number of CPUs to use for build (0 means auto).", "0"),
("PROTOC", "x", "protoc"),
) )


env = Environment(options = opts, env = Environment(options = opts,
Expand All @@ -41,7 +42,7 @@ env.Prepend(CXXFLAGS = [
if env["BUILDTYPE"] == "DEBUG": if env["BUILDTYPE"] == "DEBUG":
env.Append(CPPFLAGS = [ "-g", "-DDEBUG" ]) env.Append(CPPFLAGS = [ "-g", "-DDEBUG" ])
elif env["BUILDTYPE"] == "RELEASE": elif env["BUILDTYPE"] == "RELEASE":
env.Append(CPPFLAGS = [ "-DNDEBUG", "-O2" ]) env.Append(CPPFLAGS = [ "-g", "-DNDEBUG", "-O2", "-fno-omit-frame-pointer" ])
else: else:
print "Error BUILDTYPE must be RELEASE or DEBUG" print "Error BUILDTYPE must be RELEASE or DEBUG"
sys.exit(-1) sys.exit(-1)
Expand All @@ -66,7 +67,7 @@ def Protobuf(env, source):
PROTOCOUTDIR = ".")[1] PROTOCOUTDIR = ".")[1]
# Then build the resulting C++ file with no warnings # Then build the resulting C++ file with no warnings
return env.StaticObject(cc, return env.StaticObject(cc,
CXXFLAGS = "-std=c++0x -Ibuild") CXXFLAGS = "-std=c++0x -Ibuild -isystem/home/ongaro/protobuf/protobuf-2.5.0/install/include")
env.AddMethod(Protobuf) env.AddMethod(Protobuf)


def GetNumCPUs(): def GetNumCPUs():
Expand Down
60 changes: 57 additions & 3 deletions Server/ClientService.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ ClientService::handleRPC(RPC::ServerRPC rpc)
{ {
using Protocol::Client::OpCode; using Protocol::Client::OpCode;


++globals.raft->activeWorkers;

// TODO(ongaro): If this is not the current cluster leader, need to // TODO(ongaro): If this is not the current cluster leader, need to
// redirect the client. // redirect the client.


Expand All @@ -82,6 +84,8 @@ ClientService::handleRPC(RPC::ServerRPC rpc)
default: default:
rpc.rejectInvalidRequest(); rpc.rejectInvalidRequest();
} }

--globals.raft->activeWorkers;
} }


std::string std::string
Expand Down Expand Up @@ -123,9 +127,11 @@ std::pair<Result, uint64_t>
ClientService::submit(RPC::ServerRPC& rpc, ClientService::submit(RPC::ServerRPC& rpc,
const google::protobuf::Message& command) const google::protobuf::Message& command)
{ {
// TODO(ongaro): Switch from string to binary format. This is probably RPC::Buffer contents;
// really slow to serialize. RPC::ProtoBuf::serialize(command, contents);
std::string cmdStr = Core::ProtoBuf::dumpString(command); // TODO: silly copy
std::string cmdStr(static_cast<char*>(contents.getData()),
contents.getLength());
std::pair<Result, uint64_t> result = globals.raft->replicate(cmdStr); std::pair<Result, uint64_t> result = globals.raft->replicate(cmdStr);
if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) { if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) {
Protocol::Client::Error error; Protocol::Client::Error error;
Expand All @@ -135,6 +141,11 @@ ClientService::submit(RPC::ServerRPC& rpc,
error.set_leader_hint(leaderHint); error.set_leader_hint(leaderHint);
rpc.returnError(error); rpc.returnError(error);
} }
if (result.first == Result::SUCCESS) {
VERBOSE("%s committed at index %lu",
cmdStr.c_str(), result.second);
}

return result; return result;
} }


Expand All @@ -161,6 +172,9 @@ ClientService::getResponse(RPC::ServerRPC& rpc,
const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo, const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo,
Protocol::Client::CommandResponse& response) Protocol::Client::CommandResponse& response)
{ {
VERBOSE("index %lu, %s",
entryId,
Core::ProtoBuf::dumpString(rpcInfo).c_str());
globals.stateMachine->wait(entryId); globals.stateMachine->wait(entryId);
bool ok = globals.stateMachine->getResponse(rpcInfo, response); bool ok = globals.stateMachine->getResponse(rpcInfo, response);
if (!ok) { if (!ok) {
Expand Down Expand Up @@ -255,23 +269,63 @@ ClientService::readOnlyTreeRPC(RPC::ServerRPC rpc)
rpc.reply(response); rpc.reply(response);
} }


#if 1
void void
ClientService::readWriteTreeRPC(RPC::ServerRPC rpc) ClientService::readWriteTreeRPC(RPC::ServerRPC rpc)
{ {
#if 1
PRELUDE(ReadWriteTree); PRELUDE(ReadWriteTree);
Command command; Command command;
command.set_nanoseconds_since_epoch(timeNanos()); command.set_nanoseconds_since_epoch(timeNanos());
*command.mutable_tree() = request; *command.mutable_tree() = request;
tstat.ticksQueueStart = rdtsc();
++globals.raft->workersRaft;
std::pair<Result, uint64_t> result = submit(rpc, command); std::pair<Result, uint64_t> result = submit(rpc, command);
--globals.raft->workersRaft;
if (result.first != Result::SUCCESS) if (result.first != Result::SUCCESS)
return; return;
++globals.raft->workersSM;
#endif
CommandResponse commandResponse; CommandResponse commandResponse;
#if 1
if (!getResponse(rpc, result.second, request.exactly_once(), if (!getResponse(rpc, result.second, request.exactly_once(),
commandResponse)) { commandResponse)) {
return; return;
} }
#else
commandResponse.mutable_tree()->set_status(Protocol::Client::Status::OK);
#endif
--globals.raft->workersSM;
tstat.ticksReply = rdtsc();
++globals.raft->workersReply;
rpc.reply(commandResponse.tree()); rpc.reply(commandResponse.tree());
--globals.raft->workersReply;

uint64_t total = tstat.ticksReply - tstat.ticksQueueStart;
uint64_t good = tstat.ticksCommitted - tstat.ticksAppended;
uint64_t queued = tstat.ticksReplicateStart - tstat.ticksQueueStart;
uint64_t postCommit = tstat.ticksReply - tstat.ticksCommitted;
VERBOSE("RPC total %lu ticks, good %lu%%, queued %lu%%, post-commit %lu%%",
total,
100 * good / total,
100 * queued / total,
100 * postCommit / total);
}
#else
void
ClientService::readWriteTreeRPC(RPC::ServerRPC rpc)
{
PRELUDE(ReadWriteTree);
Command command;
*command.mutable_tree() = request;
RPC::Buffer contents;
RPC::ProtoBuf::serialize(command, contents);
std::string cmdStr(static_cast<char*>(contents.getData()),
contents.getLength());
globals.raft->replicate2(cmdStr,
ClientRequest { std::move(rpc), std::move(command) });
} }
#endif






Expand Down
4 changes: 4 additions & 0 deletions Server/Consensus.cc
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
namespace LogCabin { namespace LogCabin {
namespace Server { namespace Server {


__thread ThreadStat tstat;

Consensus::Entry::Entry() Consensus::Entry::Entry()
: entryId() : entryId()
, type(SKIP) , type(SKIP)
, data() , data()
, snapshotReader() , snapshotReader()
, request()
{ {
} }


Expand All @@ -32,6 +35,7 @@ Consensus::Entry::Entry(Entry&& other)
, type(other.type) , type(other.type)
, data(std::move(other.data)) , data(std::move(other.data))
, snapshotReader(std::move(other.snapshotReader)) , snapshotReader(std::move(other.snapshotReader))
, request(std::move(other.request))
{ {
} }


Expand Down
Loading

0 comments on commit a7ce12d

Please sign in to comment.