Skip to content
Permalink
Browse files

chore: address review comments

  • Loading branch information
gakonst committed Oct 18, 2019
1 parent f3e23d0 commit 6d4c0820a8086434b4b9a25fd2daefe6823cdfab

Some generated files are not rendered by default. Learn more.

@@ -7,6 +7,7 @@ use hex::FromHex;
#[doc(hidden)]
pub use interledger::api::AccountDetails;
pub use interledger::service_util::ExchangeRateProvider;
use std::{sync::Arc, time::Instant};

use interledger::{
api::{NodeApi, NodeStore},
@@ -23,3 +23,6 @@ http = { version = "0.1.18", default-features = false }
chrono = { version = "0.4.9", features = ["clock"], default-features = false }
regex = { version ="1.3.1", default-features = false, features = ["std"] }
lazy_static = { version ="1.4.0", default-features = false }

[features]
idempotency = []
@@ -91,20 +91,6 @@ pub const IDEMPOTENT_STORE_CALL_ERROR_TYPE: ApiErrorType = ApiErrorType {
status: StatusCode::CONFLICT,
};

// Number coversion errors
pub const CONVERSION_ERROR_TYPE: ApiErrorType = ApiErrorType {
r#type: &ProblemType::Default,
title: "Conversion error",
status: StatusCode::INTERNAL_SERVER_ERROR,
};

// Account without an engine error
pub const NO_ENGINE_CONFIGURED_ERROR_TYPE: ApiErrorType = ApiErrorType {
r#type: &ProblemType::Default,
status: StatusCode::NOT_FOUND,
title: "No settlement engine configured",
};

lazy_static! {
pub static ref IDEMPOTENT_STORE_CALL_ERROR: ApiError =
ApiError::from_api_error_type(&IDEMPOTENT_STORE_CALL_ERROR_TYPE)
@@ -1,6 +1,5 @@
mod error_types;
pub use error_types::*;
pub use http::StatusCode;

use chrono::{DateTime, Local};
use http::header::HeaderValue;
@@ -6,8 +6,24 @@ use futures::{
Future,
};
use http::StatusCode;
use log::error;

pub type IdempotentData = (StatusCode, Bytes, [u8; 32]);
#[derive(Debug, Clone, PartialEq)]
pub struct IdempotentData {
pub status: StatusCode,
pub body: Bytes,
pub input_hash: [u8; 32],
}

impl IdempotentData {
pub fn new(status: StatusCode, body: Bytes, input_hash: [u8; 32]) -> Self {
Self {
status,
body,
input_hash,
}
}
}

pub trait IdempotentStore {
/// Returns the API response that was saved when the idempotency key was used
@@ -48,8 +64,8 @@ where
// of the provided input data. If not, we should error out since
// the caller provided an idempotency key that was used for a
// different input.
if ret.2 == input_hash {
Ok(Some((ret.0, ret.1)))
if ret.input_hash == input_hash {
Ok(Some((ret.status, ret.body)))
} else {
Ok(Some((
StatusCode::from_u16(409).unwrap(),
@@ -66,7 +82,7 @@ where
// can reuse it for both the messages and the settlements calls
pub fn make_idempotent_call<S, F>(
store: S,
f: F,
non_idempotent_function: F,
input_hash: [u8; 32],
idempotency_key: Option<String>,
) -> impl Future<Item = (StatusCode, Bytes), Error = ApiError>
@@ -98,7 +114,7 @@ where
}
} else {
Either::B(
f().map_err({
non_idempotent_function().map_err({
let store = store.clone();
let idempotency_key = idempotency_key.clone();
move |ret: ApiError| {
@@ -109,10 +125,9 @@ where
input_hash,
status_code,
data,
));
).map_err(move |_| error!("Failed to connect to the store! The request will not be idempotent if retried.")));
ret
}
})
}})
.and_then(
move |ret: (StatusCode, Bytes)| {
store
@@ -122,7 +137,10 @@ where
ret.0,
ret.1.clone(),
)
.map_err(move |_| IDEMPOTENT_STORE_CALL_ERROR.clone())
.map_err(move |_| {
error!("Failed to connect to the store! The request will not be idempotent if retried.");
IDEMPOTENT_STORE_CALL_ERROR.clone()
})
.and_then(move |_| Ok((ret.0, ret.1)))
},
),
@@ -133,6 +151,8 @@ where
)
} else {
// otherwise just make the call w/o any idempotency saves
Either::B(f().and_then(move |ret: (StatusCode, Bytes)| Ok((ret.0, ret.1))))
Either::B(
non_idempotent_function().and_then(move |ret: (StatusCode, Bytes)| Ok((ret.0, ret.1))),
)
}
}
@@ -15,6 +15,9 @@ mod server;

// So that settlement engines can use errors
pub mod error;

// Idempotent API-related traits and helpers
#[cfg(feature = "idempotency")]
pub mod idempotency;

pub use self::client::HttpClientService;
@@ -11,7 +11,7 @@ repository = "https://github.com/interledger-rs/interledger-rs"
bytes = { version = "0.4.12", default-features = false }
futures = { version = "0.1.29", default-features = false }
hyper = { version = "0.12.35", default-features = false }
interledger-http = { path = "../interledger-http", version = "^0.2.2-alpha.1", default-features = false }
interledger-http = { path = "../interledger-http", version = "^0.2.2-alpha.1", default-features = false, features = ["idempotency"] }
interledger-packet = { path = "../interledger-packet", version = "^0.2.2-alpha.1", default-features = false }
interledger-service = { path = "../interledger-service", version = "^0.2.2-alpha.1", default-features = false }
log = { version = "0.4.8", default-features = false }
@@ -29,6 +29,20 @@ static PEER_PROTOCOL_CONDITION: [u8; 32] = [
110, 226, 51, 179, 144, 42, 89, 29, 13, 95, 41, 37,
];

// Number conversion errors
pub const CONVERSION_ERROR_TYPE: ApiErrorType = ApiErrorType {
r#type: &ProblemType::Default,
title: "Conversion error",
status: StatusCode::INTERNAL_SERVER_ERROR,
};

// Account without an engine error
pub const NO_ENGINE_CONFIGURED_ERROR_TYPE: ApiErrorType = ApiErrorType {
r#type: &ProblemType::Default,
status: StatusCode::NOT_FOUND,
title: "No settlement engine configured",
};

pub fn create_settlements_filter<S, O, A>(
store: S,
outgoing_handler: O,
@@ -47,26 +61,30 @@ where
{
let with_store = warp::any().map(move || store.clone()).boxed();
let idempotency = warp::header::optional::<String>("idempotency-key");
let account_id = warp::path("accounts").and(warp::path::param2::<String>()); // account_id
let account_id_filter = warp::path("accounts").and(warp::path::param2::<String>()); // account_id

// POST /accounts/:account_id/settlements (optional idempotency-key header)
// Body is a Quantity object
let settlement_endpoint = account_id.and(warp::path("settlements"));
let settlement_endpoint = account_id_filter.and(warp::path("settlements"));
let settlements = warp::post2()
.and(settlement_endpoint)
.and(warp::path::end())
.and(idempotency)
.and(warp::body::json())
.and(with_store.clone())
.and_then(
move |id: String, idempotency_key: Option<String>, quantity: Quantity, store: S| {
let input = format!("{}{:?}", id, quantity);
move |account_id: String,
idempotency_key: Option<String>,
quantity: Quantity,
store: S| {
let input = format!("{}{:?}", account_id, quantity);
let input_hash = get_hash_of(input.as_ref());

let idempotency_key_clone = idempotency_key.clone();
let store_clone = store.clone();
let receive_settlement_fn =
move || do_receive_settlement(store_clone, id, quantity, idempotency_key_clone);
let receive_settlement_fn = move || {
do_receive_settlement(store_clone, account_id, quantity, idempotency_key_clone)
};
make_idempotent_call(store, receive_settlement_fn, input_hash, idempotency_key)
.map_err::<_, Rejection>(move |err| err.into())
.and_then(move |(status_code, message)| {
@@ -89,7 +107,7 @@ where
// POST /accounts/:account_id/messages (optional idempotency-key header)
// Body is a Vec<u8> object
let with_outgoing_handler = warp::any().map(move || outgoing_handler.clone()).boxed();
let messages_endpoint = account_id.and(warp::path("messages"));
let messages_endpoint = account_id_filter.and(warp::path("messages"));
let messages = warp::post2()
.and(messages_endpoint)
.and(warp::path::end())
@@ -98,22 +116,23 @@ where
.and(with_store.clone())
.and(with_outgoing_handler.clone())
.and_then(
move |id: String,
move |account_id: String,
idempotency_key: Option<String>,
body: warp::body::FullBody,
store: S,
outgoing_handler: O| {
// Gets called by our settlement engine, forwards the request outwards
// until it reaches the peer's settlement engine.
let message = Vec::from_buf(body);
let input = format!("{}{:?}", id, message);
let input = format!("{}{:?}", account_id, message);
let input_hash = get_hash_of(input.as_ref());

let store_clone = store.clone();
// Wrap do_send_outgoing_message in a closure to be invoked by
// the idempotency wrapper
let send_outgoing_message_fn =
move || do_send_outgoing_message(store_clone, outgoing_handler, id, message);
let send_outgoing_message_fn = move || {
do_send_outgoing_message(store_clone, outgoing_handler, account_id, message)
};
make_idempotent_call(store, send_outgoing_message_fn, input_hash, idempotency_key)
.map_err::<_, Rejection>(move |err| err.into())
.and_then(move |(status_code, message)| {
@@ -482,8 +501,8 @@ mod tests {

let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 4);
assert_eq!(cached_data.0, StatusCode::OK);
let quantity: Quantity = serde_json::from_slice(&cached_data.1).unwrap();
assert_eq!(cached_data.status, StatusCode::OK);
let quantity: Quantity = serde_json::from_slice(&cached_data.body).unwrap();
assert_eq!(quantity, Quantity::new(2, CONNECTOR_SCALE));
}

@@ -590,8 +609,8 @@ mod tests {

let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 1);
assert_eq!(cached_data.0, 404);
assert_eq!(cached_data.1, &Bytes::from("Account 0 does not have settlement engine details configured. Cannot handle incoming settlement"));
assert_eq!(cached_data.status, 404);
assert_eq!(cached_data.body, &Bytes::from("Account 0 does not have settlement engine details configured. Cannot handle incoming settlement"));
}

#[test]
@@ -626,8 +645,8 @@ mod tests {

let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 2);
assert_eq!(cached_data.0, 400);
assert_eq!(cached_data.1, &Bytes::from("a is an invalid account ID"));
assert_eq!(cached_data.status, 400);
assert_eq!(cached_data.body, &Bytes::from("a is an invalid account ID"));
}

#[test]
@@ -647,8 +666,8 @@ mod tests {
let cached_data = cache.get(IDEMPOTENCY).unwrap();
let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 1);
assert_eq!(cached_data.0, 404);
assert_eq!(cached_data.1, &Bytes::from("Account 0 was not found"));
assert_eq!(cached_data.status, 404);
assert_eq!(cached_data.body, &Bytes::from("Account 0 was not found"));
}
}

@@ -721,8 +740,8 @@ mod tests {
let cached_data = cache.get(IDEMPOTENCY).unwrap();
let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 4);
assert_eq!(cached_data.0, StatusCode::OK);
assert_eq!(cached_data.1, &Bytes::from("hello!"));
assert_eq!(cached_data.status, StatusCode::OK);
assert_eq!(cached_data.body, &Bytes::from("hello!"));
}

#[test]
@@ -742,8 +761,8 @@ mod tests {
let cached_data = cache.get(IDEMPOTENCY).unwrap();
let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 1);
assert_eq!(cached_data.0, 502);
assert_eq!(cached_data.1, &Bytes::from("Error sending message to peer settlement engine. Packet rejected with code: F02, message: No other outgoing handler!"));
assert_eq!(cached_data.status, 502);
assert_eq!(cached_data.body, &Bytes::from("Error sending message to peer settlement engine. Packet rejected with code: F02, message: No other outgoing handler!"));
}

#[test]
@@ -769,8 +788,8 @@ mod tests {

let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 2);
assert_eq!(cached_data.0, 400);
assert_eq!(cached_data.1, &Bytes::from("a is an invalid account ID"));
assert_eq!(cached_data.status, 400);
assert_eq!(cached_data.body, &Bytes::from("a is an invalid account ID"));
}

#[test]
@@ -791,8 +810,8 @@ mod tests {

let cache_hits = s.cache_hits.read();
assert_eq!(*cache_hits, 1);
assert_eq!(cached_data.0, 404);
assert_eq!(cached_data.1, &Bytes::from("Account 0 was not found"));
assert_eq!(cached_data.status, 404);
assert_eq!(cached_data.body, &Bytes::from("Account 0 was not found"));
}
}
}
@@ -121,7 +121,7 @@ impl IdempotentStore for TestStore {
if let Some(data) = cache.get(&idempotency_key) {
let mut guard = self.cache_hits.write();
*guard += 1; // used to test how many times this branch gets executed
Box::new(ok(Some((data.0, data.1.clone(), data.2))))
Box::new(ok(Some(data.clone())))
} else {
Box::new(ok(None))
}
@@ -135,7 +135,10 @@ impl IdempotentStore for TestStore {
data: Bytes,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let mut cache = self.cache.write();
cache.insert(idempotency_key, (status_code, data, input_hash));
cache.insert(
idempotency_key,
IdempotentData::new(status_code, data, input_hash),
);
Box::new(ok(()))
}
}

0 comments on commit 6d4c082

Please sign in to comment.
You can’t perform that action at this time.