Permalink
Browse files

WIP - Ugly mess of stuff used for Diego's dissertation

This is a big rats' nest of changes, none of which are complete or
tested enough to go into master quite yet. I used something close to
this to produce the results in my dissertation, modulo a few flags and
options.

I don't expect anyone to be able to run this code, nor will I support
it. I'm uploading now so that I can later take the good bits and pieces
from here, clean them up, and commit to master in a more sane form. It
was never meant to be public, but it doesn't seem particularly
incriminating.

These changes are (c) 2014 Stanford University. Since I've graduated,
this will likely be my last commit on their behalf.
  • Loading branch information...
ongardie committed Nov 19, 2014
1 parent 4f74c25 commit a7ce12da98eb181ba073087ffbd063d476ee8c49
@@ -74,14 +74,18 @@ LeaderRPC::call(OpCode opCode,
case Status::OK:
return;
case Status::SERVICE_SPECIFIC_ERROR:
WARNING("service specific error");
handleServiceSpecificError(cachedSession,
serviceSpecificError);
break;
case Status::RPC_FAILED:
WARNING("RPC failed");
// If the session is broken, get a new one and try again.
connectRandom(cachedSession);
break;
}
WARNING("retry RPC");
usleep(10000);
}
}

@@ -89,7 +89,7 @@ setName(const std::string& name)
Internal::threadNames[id] = name;
// set system thread name, useful for gdb
// name is truncated at first 16 characters
prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
//prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
}

std::string
@@ -104,6 +104,11 @@ message Entry {
* The term in which the entry was first created.
*/
required uint64 term = 1;
/**
* The index for the entry. It's not used over the network, but it's here
* because some storage backends might want to use it.
*/
optional uint64 index = 5;
/**
* See EntryType.
*/
@@ -92,8 +92,10 @@ ClientRPC::waitForReply(google::protobuf::Message* response,
{
opaqueRPC.waitForReply();
std::string error = opaqueRPC.getErrorMessage();
if (!error.empty())
if (!error.empty()) {
WARNING("RPC failed: %s", error.c_str());
return Status::RPC_FAILED;
}
const Buffer& responseBuffer = *opaqueRPC.peekReply();

// Extract the response's status field.
@@ -33,7 +33,7 @@
*
* TODO(ongaro): How does this interact with TCP?
*/
enum { TIMEOUT_MS = 100 };
enum { TIMEOUT_MS = 1000 };

/**
* A message ID reserved for ping messages used to check the server's liveness.
@@ -161,12 +161,12 @@ ClientSession::Timer::handleTimerEvent()

// Send a ping or expire the session.
if (!session.activePing) {
VERBOSE("ClientSession is suspicious. Sending ping.");
WARNING("ClientSession is suspicious. Sending ping.");
session.activePing = true;
session.messageSocket->sendMessage(PING_MESSAGE_ID, Buffer());
schedule(TIMEOUT_MS * 1000 * 1000);
} else {
VERBOSE("ClientSession to %s timed out.",
PANIC("ClientSession to %s timed out.",
session.address.toString().c_str());
// Fail all current and future RPCs.
session.errorMessage = ("Server " +
@@ -15,9 +15,9 @@

#include <cassert>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
@@ -52,14 +52,17 @@ MessageSocket::SendSocket::SendSocket(Event::Loop& eventLoop,
: Event::File(eventLoop, fd, 0)
, messageSocket(messageSocket)
{
#if 1
int flag = 1;
int r = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
if (r < 0) {
// This should be a warning, but some unit tests pass weird types of
// file descriptors in here. It's not very important, anyhow.
NOTICE("Could not set TCP_NODELAY flag on sending socket %d: %s",
fd, strerror(errno));
}
int result = setsockopt(fd, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical
cruft */
sizeof(int)); /* length of option value */
if (result < 0)
WARNING("could not set TCP_NODELAY");
#endif
}

MessageSocket::SendSocket::~SendSocket()
@@ -80,16 +83,17 @@ MessageSocket::ReceiveSocket::ReceiveSocket(Event::Loop& eventLoop,
: Event::File(eventLoop, fd, EPOLLIN)
, messageSocket(messageSocket)
{
// I don't know that TCP_NODELAY has any effect if we're only reading from
// this file descriptor, but I guess it can't hurt.
#if 1
int flag = 1;
int r = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
if (r < 0) {
// This should be a warning, but some unit tests pass weird types of
// file descriptors in here. It's not very important, anyhow.
NOTICE("Could not set TCP_NODELAY flag on receiving socket %d: %s",
fd, strerror(errno));
}
int result = setsockopt(fd, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical
cruft */
sizeof(int)); /* length of option value */
if (result < 0)
WARNING("could not set TCP_NODELAY");
#endif
}

MessageSocket::ReceiveSocket::~ReceiveSocket()
@@ -38,8 +38,13 @@ Server::registerService(uint16_t serviceId,
uint32_t maxThreads)
{
std::unique_lock<std::mutex> lockGuard(mutex);
services[serviceId] =
std::make_shared<ThreadDispatchService>(service, 0, maxThreads);
#if 0
if (serviceId == 1) // client TODO
services[serviceId] = service;
else
#endif
services[serviceId] =
std::make_shared<ThreadDispatchService>(service, 0, maxThreads);
}

void
@@ -89,6 +89,7 @@ class Server : public OpaqueServer {
* Maps from service IDs to ThreadDispatchService instances.
* Protected by #mutex.
*/
public:
std::unordered_map<uint16_t, std::shared_ptr<Service>> services;

// Server is non-copyable.
@@ -17,6 +17,11 @@
#define LOGCABIN_RPC_SERVICE_H

namespace LogCabin {

namespace Server {
class Globals;
}

namespace RPC {

// forward declaration
@@ -31,7 +36,7 @@ class Service {
/**
* Constructor.
*/
Service() {}
Service() : globals(NULL) {}

/**
* Destructor.
@@ -52,6 +57,8 @@ class Service {
*/
virtual std::string getName() const = 0;

Server::Globals* globals;

// Service is non-copyable.
Service(const Service&) = delete;
Service& operator=(const Service&) = delete;
@@ -18,6 +18,8 @@
#include "Core/StringUtil.h"
#include "Core/ThreadId.h"
#include "RPC/ThreadDispatchService.h"
#include "Server/Globals.h"
#include "Server/RaftConsensus.h"

namespace LogCabin {
namespace RPC {
@@ -68,6 +70,8 @@ ThreadDispatchService::handleRPC(ServerRPC serverRPC)
{
std::unique_lock<std::mutex> lockGuard(mutex);
assert(!exit);
if (globals != NULL)
++globals->raft->dispatchQueue;
rpcQueue.push(std::move(serverRPC));
if (numFreeWorkers == 0 && threads.size() < maxThreads)
threads.emplace_back(&ThreadDispatchService::workerMain, this);
@@ -99,6 +103,8 @@ ThreadDispatchService::workerMain()
return;
rpc = std::move(rpcQueue.front());
rpcQueue.pop();
if (globals != NULL)
--globals->raft->dispatchQueue;
}
// execute RPC handler
threadSafeService->handleRPC(std::move(rpc));
@@ -15,6 +15,7 @@ opts.AddVariables(
("BUILDTYPE", "Build type (RELEASE or DEBUG)", "DEBUG"),
("VERBOSE", "Show full build information (0 or 1)", "0"),
("NUMCPUS", "Number of CPUs to use for build (0 means auto).", "0"),
("PROTOC", "x", "protoc"),
)

env = Environment(options = opts,
@@ -41,7 +42,7 @@ env.Prepend(CXXFLAGS = [
if env["BUILDTYPE"] == "DEBUG":
env.Append(CPPFLAGS = [ "-g", "-DDEBUG" ])
elif env["BUILDTYPE"] == "RELEASE":
env.Append(CPPFLAGS = [ "-DNDEBUG", "-O2" ])
env.Append(CPPFLAGS = [ "-g", "-DNDEBUG", "-O2", "-fno-omit-frame-pointer" ])
else:
print "Error BUILDTYPE must be RELEASE or DEBUG"
sys.exit(-1)
@@ -66,7 +67,7 @@ def Protobuf(env, source):
PROTOCOUTDIR = ".")[1]
# Then build the resulting C++ file with no warnings
return env.StaticObject(cc,
CXXFLAGS = "-std=c++0x -Ibuild")
CXXFLAGS = "-std=c++0x -Ibuild -isystem/home/ongaro/protobuf/protobuf-2.5.0/install/include")
env.AddMethod(Protobuf)

def GetNumCPUs():
@@ -56,6 +56,8 @@ ClientService::handleRPC(RPC::ServerRPC rpc)
{
using Protocol::Client::OpCode;

++globals.raft->activeWorkers;

// TODO(ongaro): If this is not the current cluster leader, need to
// redirect the client.

@@ -82,6 +84,8 @@ ClientService::handleRPC(RPC::ServerRPC rpc)
default:
rpc.rejectInvalidRequest();
}

--globals.raft->activeWorkers;
}

std::string
@@ -123,9 +127,11 @@ std::pair<Result, uint64_t>
ClientService::submit(RPC::ServerRPC& rpc,
const google::protobuf::Message& command)
{
// TODO(ongaro): Switch from string to binary format. This is probably
// really slow to serialize.
std::string cmdStr = Core::ProtoBuf::dumpString(command);
RPC::Buffer contents;
RPC::ProtoBuf::serialize(command, contents);
// TODO: silly copy
std::string cmdStr(static_cast<char*>(contents.getData()),
contents.getLength());
std::pair<Result, uint64_t> result = globals.raft->replicate(cmdStr);
if (result.first == Result::RETRY || result.first == Result::NOT_LEADER) {
Protocol::Client::Error error;
@@ -135,6 +141,11 @@ ClientService::submit(RPC::ServerRPC& rpc,
error.set_leader_hint(leaderHint);
rpc.returnError(error);
}
if (result.first == Result::SUCCESS) {
VERBOSE("%s committed at index %lu",
cmdStr.c_str(), result.second);
}

return result;
}

@@ -161,6 +172,9 @@ ClientService::getResponse(RPC::ServerRPC& rpc,
const Protocol::Client::ExactlyOnceRPCInfo& rpcInfo,
Protocol::Client::CommandResponse& response)
{
VERBOSE("index %lu, %s",
entryId,
Core::ProtoBuf::dumpString(rpcInfo).c_str());
globals.stateMachine->wait(entryId);
bool ok = globals.stateMachine->getResponse(rpcInfo, response);
if (!ok) {
@@ -255,23 +269,63 @@ ClientService::readOnlyTreeRPC(RPC::ServerRPC rpc)
rpc.reply(response);
}

#if 1
void
ClientService::readWriteTreeRPC(RPC::ServerRPC rpc)
{
#if 1
PRELUDE(ReadWriteTree);
Command command;
command.set_nanoseconds_since_epoch(timeNanos());
*command.mutable_tree() = request;
tstat.ticksQueueStart = rdtsc();
++globals.raft->workersRaft;
std::pair<Result, uint64_t> result = submit(rpc, command);
--globals.raft->workersRaft;
if (result.first != Result::SUCCESS)
return;
++globals.raft->workersSM;
#endif
CommandResponse commandResponse;
#if 1
if (!getResponse(rpc, result.second, request.exactly_once(),
commandResponse)) {
return;
}
#else
commandResponse.mutable_tree()->set_status(Protocol::Client::Status::OK);
#endif
--globals.raft->workersSM;
tstat.ticksReply = rdtsc();
++globals.raft->workersReply;
rpc.reply(commandResponse.tree());
--globals.raft->workersReply;

uint64_t total = tstat.ticksReply - tstat.ticksQueueStart;
uint64_t good = tstat.ticksCommitted - tstat.ticksAppended;
uint64_t queued = tstat.ticksReplicateStart - tstat.ticksQueueStart;
uint64_t postCommit = tstat.ticksReply - tstat.ticksCommitted;
VERBOSE("RPC total %lu ticks, good %lu%%, queued %lu%%, post-commit %lu%%",
total,
100 * good / total,
100 * queued / total,
100 * postCommit / total);
}
#else
void
ClientService::readWriteTreeRPC(RPC::ServerRPC rpc)
{
PRELUDE(ReadWriteTree);
Command command;
*command.mutable_tree() = request;
RPC::Buffer contents;
RPC::ProtoBuf::serialize(command, contents);
std::string cmdStr(static_cast<char*>(contents.getData()),
contents.getLength());
globals.raft->replicate2(cmdStr,
ClientRequest { std::move(rpc), std::move(command) });
}
#endif



@@ -19,11 +19,14 @@
namespace LogCabin {
namespace Server {

__thread ThreadStat tstat;

Consensus::Entry::Entry()
: entryId()
, type(SKIP)
, data()
, snapshotReader()
, request()
{
}

@@ -32,6 +35,7 @@ Consensus::Entry::Entry(Entry&& other)
, type(other.type)
, data(std::move(other.data))
, snapshotReader(std::move(other.snapshotReader))
, request(std::move(other.request))
{
}

Oops, something went wrong.

0 comments on commit a7ce12d

Please sign in to comment.