Skip to content

Commit

Permalink
Merge pull request #4758 from oasisprotocol/kostko/stable/22.1.x/back…
Browse files Browse the repository at this point in the history
…port-multi-3

[BACKPORT/22.1.x] Multiple backports
  • Loading branch information
kostko committed May 25, 2022
2 parents 12a22e8 + 74a1fd4 commit 26bf3c4
Show file tree
Hide file tree
Showing 27 changed files with 620 additions and 107 deletions.
4 changes: 2 additions & 2 deletions .buildkite/go/nancy_audit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ set -euxo pipefail
########################################
# Check dependencies for vulnerabilities
########################################
pushd go
go list -json -m all | nancy sleuth
pushd go/oasis-node
go list -json -deps | nancy sleuth
popd
7 changes: 7 additions & 0 deletions .changelog/4709.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
runtime: Emit runtime logs as oasis-node logs

Previously, runtime logs used a slightly different format.
Also, they were written to stdout in a manner that was not
synchronized with node logs, so the two sets of logs
sometimes intertwined mid-line. Those annoyances are gone,
plus runtime logs are now annotated with the runtime ID.
1 change: 1 addition & 0 deletions .changelog/4754.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker/oasis-core-dev: Fix golangci-lint install
4 changes: 4 additions & 0 deletions .changelog/4757.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
runtime: Add support for reporting EnclaveRPC peer feedback

This makes EnclaveRPC more robust as the higher-level layer in the
runtime can trigger peer replacement on high level errors.
Empty file added .changelog/4762.trivial.md
Empty file.
117 changes: 102 additions & 15 deletions client/src/enclave_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum RpcClientError {
type SendqRequest = (
Arc<Context>,
types::Request,
oneshot::Sender<Result<types::Response, RpcClientError>>,
oneshot::Sender<Result<(u64, types::Response), RpcClientError>>,
usize,
);

Expand Down Expand Up @@ -120,13 +120,7 @@ impl RpcClient {

/// Construct an unconnected RPC client with runtime-internal transport.
pub fn new_runtime(builder: Builder, protocol: Arc<Protocol>, endpoint: &str) -> Self {
Self::new(
Box::new(RuntimeTransport {
protocol,
endpoint: endpoint.to_owned(),
}),
builder,
)
Self::new(Box::new(RuntimeTransport::new(protocol, endpoint)), builder)
}

/// Call a remote method.
Expand All @@ -145,18 +139,27 @@ impl RpcClient {
args: cbor::to_value(args),
};

let response = self.execute_call(ctx, request).await?;
match response.body {
types::Body::Success(value) => Ok(cbor::from_value(value)?),
let (pfid, response) = self.execute_call(ctx, request).await?;
let result = match response.body {
types::Body::Success(value) => cbor::from_value(value).map_err(Into::into),
types::Body::Error(error) => Err(RpcClientError::CallFailed(error)),
}
};

// Report peer feedback based on whether call was successful.
let pf = match result {
Ok(_) => types::PeerFeedback::Success,
Err(_) => types::PeerFeedback::Failure,
};
self.inner.transport.set_peer_feedback(pfid, Some(pf));

result
}

async fn execute_call(
&self,
ctx: Context,
request: types::Request,
) -> Result<types::Response, RpcClientError> {
) -> Result<(u64, types::Response), RpcClientError> {
// Spawn a new controller if we haven't spawned one yet.
if self
.inner
Expand Down Expand Up @@ -186,10 +189,23 @@ impl RpcClient {
}
.await;

// Update peer feedback for next request.
let pfid = inner.transport.get_peer_feedback_id();
if result.is_err() {
// In case there was a transport error we need to reset the session
// immediately as no progress is possible.
let mut session = inner.session.lock().unwrap();
session.reset();
// Set peer feedback immediately so retries can try new peers.
inner
.transport
.set_peer_feedback(pfid, Some(types::PeerFeedback::Failure));
}

match result {
ref r if r.is_ok() || retries >= inner.max_retries => {
// Request was successful or number of retries has been exceeded.
let _ = rsp_tx.send(result);
let _ = rsp_tx.send(result.map(|rsp| (pfid, rsp)));
}

_ => {
Expand Down Expand Up @@ -373,6 +389,8 @@ mod test {
rak: Arc<RAK>,
demux: Arc<Mutex<Demux>>,
next_error: Arc<AtomicBool>,
peer_feedback: Arc<Mutex<(u64, Option<types::PeerFeedback>)>>,
peer_feedback_history: Arc<Mutex<Vec<(u64, Option<types::PeerFeedback>)>>>,
}

impl MockTransport {
Expand All @@ -383,6 +401,8 @@ mod test {
rak: rak.clone(),
demux: Arc::new(Mutex::new(Demux::new(rak))),
next_error: Arc::new(AtomicBool::new(false)),
peer_feedback: Arc::new(Mutex::new((0, None))),
peer_feedback_history: Arc::new(Mutex::new(Vec::new())),
}
}

Expand All @@ -394,6 +414,17 @@ mod test {
fn induce_transport_error(&self) {
self.next_error.store(true, Ordering::SeqCst);
}

fn take_peer_feedback_history(&self) -> Vec<(u64, Option<types::PeerFeedback>)> {
let mut pfh: Vec<_> = {
let mut pfh = self.peer_feedback_history.lock().unwrap();
std::mem::take(&mut pfh)
};
// Also add the pending feedback.
let pf = self.peer_feedback.lock().unwrap();
pfh.push(pf.clone());
pfh
}
}

impl Transport for MockTransport {
Expand All @@ -402,9 +433,23 @@ mod test {
_ctx: Context,
data: Vec<u8>,
) -> BoxFuture<Result<Vec<u8>, anyhow::Error>> {
let pf = {
let mut pf = self.peer_feedback.lock().unwrap();
let peer_feedback = pf.1.take();

if !matches!(peer_feedback, None | Some(types::PeerFeedback::Success)) {
pf.0 += 1;
}

(pf.0, peer_feedback)
};
self.peer_feedback_history.lock().unwrap().push(pf);

// Induce error when configured to do so.
if self
.next_error
.compare_and_swap(true, false, Ordering::SeqCst)
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
return Box::pin(future::err(anyhow!("transport error")));
}
Expand Down Expand Up @@ -438,6 +483,19 @@ mod test {
}
}
}

fn set_peer_feedback(&self, pfid: u64, peer_feedback: Option<types::PeerFeedback>) {
let mut pf = self.peer_feedback.lock().unwrap();
if pf.0 != pfid {
return;
}

pf.1 = peer_feedback;
}

fn get_peer_feedback_id(&self) -> u64 {
self.peer_feedback.lock().unwrap().0
}
}

#[test]
Expand All @@ -454,6 +512,15 @@ mod test {
.block_on(client.call(Context::background(), "test", 42))
.unwrap();
assert_eq!(result, 42, "call should work");
assert_eq!(
transport.take_peer_feedback_history(),
vec![
(0, None), // Handshake.
(0, None), // Handshake.
(0, None), // Call.
(0, Some(types::PeerFeedback::Success)), // Handled call.
]
);

// Reset all sessions on the server and make sure that we can still get a response.
transport.reset();
Expand All @@ -462,6 +529,16 @@ mod test {
.block_on(client.call(Context::background(), "test", 43))
.unwrap();
assert_eq!(result, 43, "call should work");
assert_eq!(
transport.take_peer_feedback_history(),
vec![
(0, Some(types::PeerFeedback::Success)), // Previous handled call.
(1, Some(types::PeerFeedback::Failure)), // Failed call due to session reset.
(1, None), // New handshake.
(1, None), // New handshake.
(1, Some(types::PeerFeedback::Success)), // Handled call.
]
);

// Induce a single transport error without resetting the server sessions and make sure we
// can still get a response.
Expand All @@ -471,5 +548,15 @@ mod test {
.block_on(client.call(Context::background(), "test", 44))
.unwrap();
assert_eq!(result, 44, "call should work");
assert_eq!(
transport.take_peer_feedback_history(),
vec![
(1, Some(types::PeerFeedback::Success)), // Previous handled call.
(2, Some(types::PeerFeedback::Failure)), // Failed call due to induced error.
(2, None), // New handshake.
(2, None), // New handshake.
(2, Some(types::PeerFeedback::Success)), // Handled call.
]
);
}
}
51 changes: 50 additions & 1 deletion client/src/enclave_rpc/transport.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Error as AnyError};
use futures::future::{self, BoxFuture};
Expand Down Expand Up @@ -30,13 +30,34 @@ pub trait Transport: Send + Sync {
ctx: Context,
data: Vec<u8>,
) -> BoxFuture<Result<Vec<u8>, AnyError>>;

fn set_peer_feedback(&self, _pfid: u64, _peer_feedback: Option<types::PeerFeedback>) {
// Default implementation doesn't do anything.
}

fn get_peer_feedback_id(&self) -> u64 {
// Default implementation doesn't do anything.
0
}
}

/// A transport implementation which can be used from inside the runtime and uses the Runtime Host
/// Protocol to transport EnclaveRPC frames.
pub struct RuntimeTransport {
pub protocol: Arc<Protocol>,
pub endpoint: String,

peer_feedback: Mutex<(u64, Option<types::PeerFeedback>)>,
}

impl RuntimeTransport {
pub fn new(protocol: Arc<Protocol>, endpoint: &str) -> Self {
Self {
protocol,
endpoint: endpoint.to_string(),
peer_feedback: Mutex::new((0, None)),
}
}
}

impl Transport for RuntimeTransport {
Expand All @@ -45,13 +66,28 @@ impl Transport for RuntimeTransport {
ctx: Context,
data: Vec<u8>,
) -> BoxFuture<Result<Vec<u8>, AnyError>> {
let peer_feedback = {
let mut pf = self.peer_feedback.lock().unwrap();
let peer_feedback = pf.1.take();

// If non-success feedback was propagated this means that the peer will be changed for
// subsequent requests. Increment pfid to make sure that we don't incorporate stale
// feedback.
if !matches!(peer_feedback, None | Some(types::PeerFeedback::Success)) {
pf.0 += 1;
}

peer_feedback
};

// NOTE: This is not actually async in SGX, but futures should be
// dispatched on the current thread anyway.
let rsp = self.protocol.call_host(
ctx,
Body::HostRPCCallRequest {
endpoint: self.endpoint.clone(),
request: data,
peer_feedback,
},
);

Expand All @@ -61,4 +97,17 @@ impl Transport for RuntimeTransport {
Ok(_) => Box::pin(future::err(anyhow!("bad response type"))),
}
}

fn set_peer_feedback(&self, pfid: u64, peer_feedback: Option<types::PeerFeedback>) {
let mut pf = self.peer_feedback.lock().unwrap();
if pf.0 != pfid {
return;
}

pf.1 = peer_feedback;
}

fn get_peer_feedback_id(&self) -> u64 {
self.peer_feedback.lock().unwrap().0
}
}
10 changes: 5 additions & 5 deletions docker/development/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM ubuntu:20.04

# Package versions.
ARG GO_VERSION=1.17.7
ARG GO_NANCY_VERSION=1.0.0
ARG GO_NANCY_CHECKSUM=13804837a34c07e7a933b0d6f52c5e580b03ccb209e38fc3d6394b791b414c33
ARG GO_VERSION=1.17.9
ARG GO_NANCY_VERSION=1.0.33
ARG GO_NANCY_CHECKSUM=a4bf5290d41b095c04f941ed5380674770c79d59735e33b1bd07a5cd5fbb135d
ARG GO_PROTOC_VERSION=3.6.1
ARG GO_PROTOC_GEN_GO_VERSION=1.21.0
ARG GOLANGCILINT_VERSION=1.41.1
Expand Down Expand Up @@ -61,12 +61,12 @@ RUN wget https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz && \
# Install a specific version of protoc-gen-go.
go install google.golang.org/protobuf/cmd/protoc-gen-go@v${GO_PROTOC_GEN_GO_VERSION} && \
# Install golangci-lint.
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /tmp/bin v${GOLANGCILINT_VERSION} && \
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b /tmp/bin v${GOLANGCILINT_VERSION} && \
mv /tmp/bin/golangci-lint /go/bin && \
# Install gocovmerge for e2e coverage.
go install github.com/wadey/gocovmerge@${GOCOVMERGE_VERSION} && \
# Install nancy for auditing dependencies.
curl -sfL -o nancy https://github.com/sonatype-nexus-community/nancy/releases/download/v${GO_NANCY_VERSION}/nancy-linux.amd64-v${GO_NANCY_VERSION} && \
curl -sfL -o nancy https://github.com/sonatype-nexus-community/nancy/releases/download/v${GO_NANCY_VERSION}/nancy-v${GO_NANCY_VERSION}-linux-amd64 && \
echo "${GO_NANCY_CHECKSUM} nancy" | sha256sum -c && \
mv nancy /go/bin/nancy && \
chmod +x /go/bin/nancy && \
Expand Down
1 change: 0 additions & 1 deletion go/.nancy-ignore
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
CVE-2020-26160 # Until viper and etcd/prometheus are upgraded to not need jwt-go.

0 comments on commit 26bf3c4

Please sign in to comment.