Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full command forwarding #75

Merged
merged 21 commits into from May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions CMakeLists.txt
Expand Up @@ -43,12 +43,6 @@ if(RECORD_TRACE)
add_definitions(-DNO_COMMITTED_TX_HISTORY)
endif()

option(RPC_FORWARD_TO_LEADER "Followers verify signatures and forward RPC to leader" OFF)
achamayou marked this conversation as resolved.
Show resolved Hide resolved
if (RPC_FORWARD_TO_LEADER)
add_definitions(-DRPC_FORWARD_TO_LEADER)
set(TEST_FORWARD_TO_LEADER "--leader-forwarding")
endif()

option(BUILD_SMALLBANK "Build SmallBank sample app and clients" ON)

# MemberClient executable
Expand Down
32 changes: 18 additions & 14 deletions src/apps/luageneric/luageneric_test.cpp
Expand Up @@ -114,13 +114,15 @@ template <typename F, typename K, typename V>
void check_store_load(F frontend, K k, V v)
{
const Cert u0 = {0};
enclave::RpcContext rpc_ctx(0, u0);

// store
const auto pc0 = make_pc("store", {{"k", k}, {"v", v}});
check_success(frontend->process(u0, pc0), true);
check_success(frontend->process(rpc_ctx, pc0), true);

// load and check that we get the right result
const auto pc1 = make_pc("load", {{"k", k}});
check_success(frontend->process(u0, pc1), v);
check_success(frontend->process(rpc_ctx, pc1), v);
}

TEST_CASE("simple lua apps")
Expand All @@ -130,6 +132,7 @@ TEST_CASE("simple lua apps")
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, notifier, 1, 3);
const Cert u0 = {0};
enclave::RpcContext rpc_ctx(0, u0);

SUBCASE("echo")
{
Expand All @@ -149,7 +152,7 @@ TEST_CASE("simple lua apps")
// call "echo" function with "hello"
const string verb = "hello";
const auto pc = make_pc("echo", {{"verb", verb}});
check_success(frontend->process(u0, pc), verb);
check_success(frontend->process(rpc_ctx, pc), verb);
}

SUBCASE("store/load different types in generic table")
Expand Down Expand Up @@ -191,7 +194,7 @@ TEST_CASE("simple lua apps")

// (3) attempt to read non-existing key (set of integers)
const auto pc = make_pc("load", {{"k", set{5, 6, 7}}});
check_error(frontend->process(u0, pc), ErrorCodes::INVALID_PARAMS);
check_error(frontend->process(rpc_ctx, pc), ErrorCodes::INVALID_PARAMS);
}

SUBCASE("access gov tables")
Expand Down Expand Up @@ -223,12 +226,12 @@ TEST_CASE("simple lua apps")
map<string, MemberInfo> expected = {{"0", {MemberStatus::ACTIVE}},
{"1", {MemberStatus::ACTIVE}},
{"2", {MemberStatus::ACTIVE}}};
check_success(frontend->process(u0, pc), expected);
check_success(frontend->process(rpc_ctx, pc), expected);

// (2) try to write to members table
const auto pc1 = make_pc(
"put_member", {{"k", 99}, {"v", MemberInfo{MemberStatus::ACTIVE}}});
check_error(frontend->process(u0, pc1), ErrorCodes::SCRIPT_ERROR);
check_error(frontend->process(rpc_ctx, pc1), ErrorCodes::SCRIPT_ERROR);
}
}

Expand All @@ -239,6 +242,7 @@ TEST_CASE("simple bank")
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, notifier, 1, 3);
const Cert u0 = {0};
enclave::RpcContext rpc_ctx(0, u0);

constexpr auto app = R"xxx(
tables, gov_tables, caller_id, method, params = ...
Expand Down Expand Up @@ -293,34 +297,34 @@ TEST_CASE("simple bank")

{
const auto pc = make_pc("SB_create", {{"dst", 1}, {"amt", 123}});
check_success<string>(frontend->process(u0, pc), "OK");
check_success<string>(frontend->process(rpc_ctx, pc), "OK");

const auto pc1 = make_pc("SB_read", {{"account", 1}});
check_success(frontend->process(u0, pc1), 123);
check_success(frontend->process(rpc_ctx, pc1), 123);
}

{
const auto pc = make_pc("SB_create", {{"dst", 2}, {"amt", 999}});
check_success<string>(frontend->process(u0, pc), "OK");
check_success<string>(frontend->process(rpc_ctx, pc), "OK");

const auto pc1 = make_pc("SB_read", {{"account", 2}});
check_success(frontend->process(u0, pc1), 999);
check_success(frontend->process(rpc_ctx, pc1), 999);
}

{
const auto pc = make_pc("SB_read", {{"account", 3}});
check_error(frontend->process(u0, pc), ErrorCodes::INVALID_PARAMS);
check_error(frontend->process(rpc_ctx, pc), ErrorCodes::INVALID_PARAMS);
}

{
const auto pc =
make_pc("SB_transfer", {{"src", 1}, {"dst", 2}, {"amt", 5}});
check_success<string>(frontend->process(u0, pc), "OK");
check_success<string>(frontend->process(rpc_ctx, pc), "OK");

const auto pc1 = make_pc("SB_read", {{"account", 1}});
check_success(frontend->process(u0, pc1), 123 - 5);
check_success(frontend->process(rpc_ctx, pc1), 123 - 5);

const auto pc2 = make_pc("SB_read", {{"account", 2}});
check_success(frontend->process(u0, pc2), 999 + 5);
check_success(frontend->process(rpc_ctx, pc2), 999 + 5);
}
}
28 changes: 16 additions & 12 deletions src/enclave/enclave.h
Expand Up @@ -11,6 +11,7 @@
#include "node/nodestate.h"
#include "node/nodetypes.h"
#include "node/notifier.h"
#include "node/rpc/forwarder.h"
#include "node/rpc/managementfrontend.h"
#include "node/rpc/memberfrontend.h"
#include "node/rpc/nodefrontend.h"
Expand All @@ -28,6 +29,7 @@ namespace enclave
ccf::NetworkState network;
ccf::NodeState node;
std::shared_ptr<ccf::NodeToNode> n2n_channels;
std::shared_ptr<ccf::Forwarder> cmd_forwarder;
ccf::Notifier notifier;
std::shared_ptr<RpcMap> rpc_map;
bool recover = false;
Expand All @@ -38,8 +40,9 @@ namespace enclave
writer_factory(circuit, config->writer_config),
rpcsessions(writer_factory),
n2n_channels(std::make_shared<ccf::NodeToNode>(writer_factory)),
node(writer_factory, network, rpcsessions, notifier),
notifier(writer_factory)
node(writer_factory, network, rpcsessions),
notifier(writer_factory),
cmd_forwarder(std::make_shared<ccf::Forwarder>(rpcsessions, n2n_channels))
{
rpc_map = std::make_shared<RpcMap>();
rpc_map->emplace(
Expand All @@ -62,14 +65,20 @@ namespace enclave
frontend->set_sig_intervals(
config->signature_intervals.sig_max_tx,
config->signature_intervals.sig_max_ms);
frontend->set_n2n_channels(n2n_channels);

// TODO: All frontends should be able to forward to/be forwarded to.
achamayou marked this conversation as resolved.
Show resolved Hide resolved
if (r.first == ccf::Actors::USERS)
{
frontend->set_cmd_forwarder(cmd_forwarder);
}
}

logger::config::msg() = AdminMessage::log_msg;
logger::config::writer() = writer_factory.create_writer_to_outside();

node.initialize(config->raft_config, n2n_channels);
rpcsessions.initialize(rpc_map);
cmd_forwarder->initialize(rpc_map);
}

bool create_node(
Expand Down Expand Up @@ -132,22 +141,17 @@ namespace enclave

DISPATCHER_SET_MESSAGE_HANDLER(
bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
auto [body] =
const auto [body] =
ringbuffer::read_message<ccf::node_inbound>(data, size);

const auto& body_ = body;
auto p = body_.data();
auto psize = body_.size();
auto p = body.data();
auto psize = body.size();

if (
serialized::peek<ccf::NodeMsgType>(p, psize) ==
ccf::NodeMsgType::forwarded_msg)
{
serialized::skip(p, psize, sizeof(ccf::NodeMsgType));
LOG_DEBUG << "RPC forwarded: " << ccf::Actors::USERS << std::endl;

rpc_map->at(std::string(ccf::Actors::USERS))
->process_forwarded(p, psize);
cmd_forwarder->recv_message(p, psize);
}
else
{
Expand Down
21 changes: 19 additions & 2 deletions src/enclave/rpcendpoint.h
Expand Up @@ -14,6 +14,7 @@ namespace enclave
private:
std::shared_ptr<RpcMap> rpc_map;
std::shared_ptr<RpcHandler> handler;
size_t session_id;
CBuffer caller;

public:
Expand All @@ -23,7 +24,8 @@ namespace enclave
ringbuffer::AbstractWriterFactory& writer_factory,
std::unique_ptr<tls::Context> ctx) :
FramedTLSEndpoint(session_id, writer_factory, move(ctx)),
rpc_map(rpc_map_)
rpc_map(rpc_map_),
session_id(session_id)
{}

bool handle_data(const std::vector<uint8_t>& data)
Expand All @@ -42,7 +44,22 @@ namespace enclave
caller = peer_cert();
}

send(handler->process(caller, data));
// Create a new RPC context for each command since some may require
// forwarding to the leader.
RpcContext rpc_ctx(session_id, caller);
auto rep = handler->process(rpc_ctx, data);

if (rpc_ctx.is_forwarded)
{
// If the RPC has been forwarded, hold the connection.
return true;
}
else
{
// Otherwise, reply to the client synchronously.
send(rep);
}

return true;
}
};
Expand Down
22 changes: 17 additions & 5 deletions src/enclave/rpchandler.h
@@ -1,24 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "../ds/buffer.h"
#include "ds/buffer.h"

#include <chrono>
#include <limits>
#include <stdint.h>
#include <vector>

namespace enclave
{
static constexpr size_t InvalidSessionId = std::numeric_limits<size_t>::max();

struct RpcContext
{
const size_t session_id;
const CBuffer caller;
bool is_forwarded = false;

RpcContext(size_t session_id_, CBuffer caller_) :
session_id(session_id_),
caller(caller_)
{}
};

class RpcHandler
{
public:
virtual ~RpcHandler() {}

virtual std::vector<uint8_t> process(
CBuffer caller, const std::vector<uint8_t>& input) = 0;

virtual std::vector<uint8_t> process_forwarded(
const uint8_t* data, size_t size) = 0;
RpcContext& rpc_ctx, const std::vector<uint8_t>& input) = 0;

virtual void tick(std::chrono::milliseconds elapsed_ms_count) {}
};
Expand Down
14 changes: 14 additions & 0 deletions src/enclave/rpcsessions.h
Expand Up @@ -78,6 +78,20 @@ namespace enclave
sessions.insert(std::make_pair(id, std::move(session)));
}

void reply_forwarded(size_t id, const std::vector<uint8_t>& data)
{
std::lock_guard<SpinLock> guard(lock);

auto search = sessions.find(id);
if (search == sessions.end())
{
throw std::logic_error(
"reply forwarded for unknown session: " + std::to_string(id));
}

search->second->send(data);
}

void remove_session(size_t id)
{
std::lock_guard<SpinLock> guard(lock);
Expand Down
4 changes: 2 additions & 2 deletions src/enclave/tlsendpoint.h
Expand Up @@ -153,8 +153,8 @@ namespace enclave
// MBEDTLS_ERR_SSL_WANT_READ. Probably hit a size limit - try again
if (exact && (total < up_to))
{
LOG_INFO << "Asked for exactly " << up_to << ", received " << total
<< ", retrying" << std::endl;
LOG_DEBUG << "Asked for exactly " << up_to << ", received " << total
<< ", retrying" << std::endl;
read_buffer = move(data);
return read(up_to, exact);
}
Expand Down
2 changes: 1 addition & 1 deletion src/node/channels.h
Expand Up @@ -112,7 +112,7 @@ namespace ccf
const GcmHdr& header, CBuffer aad, CBuffer cipher, Buffer plain)
{
if (status != ESTABLISHED)
throw std::logic_error("Channel is not established for encrypting");
throw std::logic_error("Channel is not established for decrypting");

return key->decrypt(header.getIv(), header.tag, cipher, aad, plain.p);
}
Expand Down
13 changes: 4 additions & 9 deletions src/node/nodestate.h
Expand Up @@ -19,7 +19,6 @@
#include "kv/replicator.h"
#include "networkstate.h"
#include "nodetonode.h"
#include "notifier.h"
#include "rpc/consts.h"
#include "rpc/frontend.h"
#include "rpc/serialization.h"
Expand Down Expand Up @@ -126,7 +125,6 @@ namespace ccf
enclave::RPCSessions& rpcsessions;
std::shared_ptr<kv::TxHistory> history;
std::shared_ptr<kv::AbstractTxEncryptor> encryptor;
Notifier& notifier;

//
// join protocol
Expand Down Expand Up @@ -160,15 +158,13 @@ namespace ccf
NodeState(
ringbuffer::AbstractWriterFactory& writer_factory,
NetworkState& network,
enclave::RPCSessions& rpcsessions,
Notifier& notifier) :
enclave::RPCSessions& rpcsessions) :
sm(State::uninitialized),
self(INVALID_ID),
writer_factory(writer_factory),
to_host(writer_factory.create_writer_to_outside()),
network(network),
rpcsessions(rpcsessions),
notifier(notifier)
rpcsessions(rpcsessions)
{
::EverCrypt_AutoConfig2_init();
}
Expand Down Expand Up @@ -880,14 +876,14 @@ namespace ccf
#endif
}

bool node_msg(const std::vector<uint8_t>& data)
void node_msg(const std::vector<uint8_t>& data)
{
// Only process messages once part of network
if (
!sm.check(State::partOfNetwork) &&
!sm.check(State::partOfPublicNetwork))
{
return false;
return;
}

auto p = data.data();
Expand All @@ -912,7 +908,6 @@ namespace ccf
default:
{}
}
return true;
}

bool pbft_msg(const uint8_t* data, size_t size)
Expand Down