Skip to content

Commit

Permalink
refactor(ilp-node): make ilp-node able to compile entirely without Redis
Browse files Browse the repository at this point in the history
BREAKING CHANGE: the "redis_url" command-line option to the ilp-node binary
has been renamed to "database_url"
  • Loading branch information
Ben Striegel committed Dec 12, 2019
1 parent 80afa3c commit ad6edd3
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 180 deletions.
12 changes: 9 additions & 3 deletions crates/ilp-node/Cargo.toml
Expand Up @@ -9,11 +9,18 @@ repository = "https://github.com/interledger-rs/interledger-rs"
default-run = "ilp-node"

[features]
default = ["balance-tracking"]
default = ["balance-tracking", "redis"]
balance-tracking = []
# This is an experimental feature that enables submitting packet
# records to Google Cloud PubSub. This may be removed in the future.
google-pubsub = ["base64", "chrono", "parking_lot", "reqwest", "serde_json", "yup-oauth2"]
redis = ["redis_crate", "interledger/redis"]

[[test]]
name = "redis_tests"
path = "tests/redis/redis_tests.rs"
required-features = ["redis"]


[dependencies]
bytes = { version = "0.4.12", default-features = false }
Expand All @@ -27,7 +34,7 @@ metrics = { version = "0.12.0", default-features = false, features = ["std"] }
metrics-core = { version = "0.5.1", default-features = false }
metrics-runtime = { version = "0.12.0", default-features = false, features = ["metrics-observer-prometheus"] }
num-bigint = { version = "0.2.3", default-features = false, features = ["std"] }
redis = { version = "0.13.0", default-features = false, features = ["executor"] }
redis_crate = { package = "redis", version = "0.13.0", default-features = false, features = ["executor"], optional = true }
ring = { version = "0.16.9", default-features = false }
serde = { version = "1.0.101", default-features = false }
tokio = { version = "0.1.22", default-features = false }
Expand All @@ -53,7 +60,6 @@ approx = { version = "0.3.2", default-features = false }
base64 = { version = "0.10.1", default-features = false }
net2 = { version = "0.2.33", default-features = false }
rand = { version = "0.7.2", default-features = false }
redis = { version = "0.13.0", default-features = false, features = ["executor"] }
reqwest = { version = "0.9.22", default-features = false, features = ["default-tls"] }
serde_json = { version = "1.0.41", default-features = false }
tokio-retry = { version = "0.2.0", default-features = false }
Expand Down
11 changes: 9 additions & 2 deletions crates/ilp-node/src/lib.rs
@@ -1,8 +1,15 @@
#![type_length_limit = "1152885"]

#[cfg(feature = "google-pubsub")]
mod google_pubsub;
mod metrics;
mod node;
mod trace;

#[cfg(feature = "google-pubsub")]
mod google_pubsub;
#[cfg(feature = "redis")]
mod redis_store;

pub use node::*;
#[allow(deprecated)]
#[cfg(feature = "redis")]
pub use redis_store::insert_account_with_redis_store;
23 changes: 14 additions & 9 deletions crates/ilp-node/src/main.rs
@@ -1,9 +1,19 @@
#![type_length_limit = "1152885"]

mod metrics;
mod node;
mod trace;

#[cfg(feature = "google-pubsub")]
mod google_pubsub;
#[cfg(feature = "redis")]
mod redis_store;

use clap::{crate_version, App, Arg, ArgMatches};
use config::{Config, Source};
use config::{FileFormat, Value};
use libc::{c_int, isatty};
use node::InterledgerNode;
use std::{
ffi::{OsStr, OsString},
io::Read,
Expand All @@ -14,13 +24,6 @@ use tracing_subscriber::{
fmt::{time::ChronoUtc, Subscriber},
};

#[cfg(feature = "google-pubsub")]
mod google_pubsub;
mod metrics;
mod node;
mod trace;
use node::InterledgerNode;

pub fn main() {
Subscriber::builder()
.with_timer(ChronoUtc::rfc3339())
Expand Down Expand Up @@ -69,8 +72,10 @@ pub fn main() {
.takes_value(true)
.required(true)
.help("HTTP Authorization token for the node admin (sent as a Bearer token)"),
Arg::with_name("redis_url")
.long("redis_url")
Arg::with_name("database_url")
.long("database_url")
// temporary alias for backwards compatibility
.alias("redis_url")
.takes_value(true)
.default_value("redis://127.0.0.1:6379")
.help("Redis URI (for example, \"redis://127.0.0.1:6379\" or \"unix:/tmp/redis.sock\")"),
Expand Down
171 changes: 66 additions & 105 deletions crates/ilp-node/src/node.rs
@@ -1,20 +1,11 @@
use crate::metrics::{incoming_metrics, outgoing_metrics};
use crate::trace::{trace_forwarding, trace_incoming, trace_outgoing};
use bytes::Bytes;
use futures::{
future::{err, result, Either},
future::{err, Either},
Future,
};
use hex::FromHex;
#[doc(hidden)]
pub use interledger::api::AccountDetails;
pub use interledger::service_util::ExchangeRateProvider;
use std::sync::Arc;

#[cfg(feature = "google-pubsub")]
use crate::google_pubsub::{create_google_pubsub_wrapper, PubsubConfig};
use crate::metrics::{incoming_metrics, outgoing_metrics};
use crate::trace::{trace_forwarding, trace_incoming, trace_outgoing};
#[cfg(feature = "balance-tracking")]
use interledger::service_util::BalanceService;
use interledger::{
api::{NodeApi, NodeStore},
btp::{btp_service_as_filter, connect_client, BtpOutgoingService, BtpStore},
Expand All @@ -30,40 +21,46 @@ use interledger::{
},
service_util::{
BalanceStore, EchoService, ExchangeRateFetcher, ExchangeRateService, ExchangeRateStore,
ExpiryShortenerService, MaxPacketAmountAccount, MaxPacketAmountService, RateLimitAccount,
RateLimitService, RateLimitStore, RoundTripTimeAccount, ValidatorService,
ExpiryShortenerService, MaxPacketAmountService, RateLimitService, RateLimitStore,
ValidatorService,
},
settlement::{
api::{create_settlements_filter, SettlementMessageService},
core::{
idempotency::IdempotentStore,
types::{LeftoversStore, SettlementAccount, SettlementStore},
types::{LeftoversStore, SettlementStore},
},
},
store::{account::Account, redis::RedisStoreBuilder},
store::account::Account,
stream::{StreamNotificationsStore, StreamReceiverService},
};
use lazy_static::lazy_static;
use metrics_core::{Builder, Drain, Observe};
use metrics_runtime;
use num_bigint::BigUint;
use redis::{ConnectionInfo, IntoConnectionInfo};
use ring::hmac;
use serde::{de::Error as DeserializeError, Deserialize, Deserializer, Serialize};
use serde::{de::Error as DeserializeError, Deserialize, Deserializer};
use std::sync::Arc;
use std::{convert::TryFrom, net::SocketAddr, str, str::FromStr, time::Duration};
use tokio::spawn;
use tracing::{debug, debug_span, error, info};
use tracing_futures::Instrument;
use url::Url;
use uuid::Uuid;
use warp::{
self,
http::{Response, StatusCode},
Filter,
};

static REDIS_SECRET_GENERATION_STRING: &str = "ilp_redis_secret";
static DEFAULT_REDIS_URL: &str = "redis://127.0.0.1:6379";
#[cfg(feature = "google-pubsub")]
use crate::google_pubsub::{create_google_pubsub_wrapper, PubsubConfig};
#[cfg(feature = "redis")]
use crate::redis_store::*;
#[cfg(feature = "balance-tracking")]
use interledger::service_util::BalanceService;

#[doc(hidden)]
pub use interledger::service_util::ExchangeRateProvider;

lazy_static! {
static ref DEFAULT_ILP_ADDRESS: Address = Address::from_str("local.host").unwrap();
}
Expand All @@ -74,8 +71,15 @@ fn default_settlement_api_bind_address() -> SocketAddr {
fn default_http_bind_address() -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], 7770))
}
fn default_redis_url() -> ConnectionInfo {
DEFAULT_REDIS_URL.into_connection_info().unwrap()
// We allow unreachable code on the below function because there must always be exactly one default
// regardless of how many data sources the crate is compiled to support,
// but we don't know which will be enabled or in which quantities or configurations.
// This return-based pattern effectively gives us fallthrough behavior.
#[allow(unreachable_code)]
fn default_database_url() -> String {
#[cfg(feature = "redis")]
return default_redis_url();
panic!("no backing store configured")
}

fn deserialize_optional_address<'de, D>(deserializer: D) -> Result<Option<Address>, D::Error>
Expand Down Expand Up @@ -116,21 +120,6 @@ where
}
}

fn deserialize_redis_connection<'de, D>(deserializer: D) -> Result<ConnectionInfo, D::Error>
where
D: Deserializer<'de>,
{
Url::parse(&String::deserialize(deserializer)?)
.map_err(|err| DeserializeError::custom(format!("Invalid URL: {:?}", err)))?
.into_connection_info()
.map_err(|err| {
DeserializeError::custom(format!(
"Error converting into Redis connection info: {:?}",
err
))
})
}

/// Configuration for [Prometheus](https://prometheus.io) metrics collection.
#[derive(Deserialize, Clone)]
pub struct PrometheusConfig {
Expand Down Expand Up @@ -195,7 +184,9 @@ impl ExchangeRateConfig {
}

/// An all-in-one Interledger node that includes sender and receiver functionality,
/// a connector, and a management API. The node uses Redis for persistence.
/// a connector, and a management API.
/// Will connect to the database at the given URL; see the crate features defined in
/// Cargo.toml to see a list of all supported stores.
#[derive(Deserialize, Clone)]
pub struct InterledgerNode {
/// ILP address of the node
Expand All @@ -207,13 +198,13 @@ pub struct InterledgerNode {
pub secret_seed: [u8; 32],
/// HTTP Authorization token for the node admin (sent as a Bearer token)
pub admin_auth_token: String,
/// Redis URI (for example, "redis://127.0.0.1:6379" or "unix:/tmp/redis.sock")
/// Data store URI (for example, "redis://127.0.0.1:6379" or "redis+unix:/tmp/redis.sock")
#[serde(
deserialize_with = "deserialize_redis_connection",
default = "default_redis_url",
default = "default_database_url",
// temporary alias for backwards compatibility
alias = "redis_url"
)]
pub redis_connection: ConnectionInfo,
pub database_url: String,
/// IP address and port to listen for HTTP connections
/// This is used for both the API and ILP over HTTP packets
#[serde(default = "default_http_bind_address")]
Expand Down Expand Up @@ -259,48 +250,42 @@ impl InterledgerNode {
}
}

fn serve_node(self) -> impl Future<Item = (), Error = ()> {
let redis_addr = self.redis_connection.addr.clone();
let redis_secret = generate_redis_secret(&self.secret_seed);
fn serve_node(self) -> Box<dyn Future<Item = (), Error = ()> + Send + 'static> {
let ilp_address = if let Some(address) = &self.ilp_address {
address.clone()
} else {
DEFAULT_ILP_ADDRESS.clone()
};

debug!(target: "interledger-node",
"Starting Interledger node with ILP address: {}",
ilp_address
);
// TODO: store a Url directly in InterledgerNode rather than a String?
let database_url = match Url::parse(&self.database_url) {
Ok(url) => url,
Err(e) => {
error!(
"The string '{}' could not be parsed as a URL: {}",
&self.database_url, e
);
return Box::new(err(()));
}
};

Box::new(RedisStoreBuilder::new(self.redis_connection.clone(), redis_secret)
.node_ilp_address(ilp_address.clone())
.connect()
.map_err(move |err| error!(target: "interledger-node", "Error connecting to Redis: {:?} {:?}", redis_addr, err))
.and_then(move |store| self.chain_services(store, ilp_address)))
match database_url.scheme() {
#[cfg(feature = "redis")]
"redis" | "redis+unix" => Box::new(serve_redis_node(self, ilp_address)),
other => {
error!("unsupported data source scheme: {}", other);
Box::new(err(()))
}
}
}

#[allow(clippy::cognitive_complexity)]
fn chain_services<S>(self, store: S, ilp_address: Address) -> impl Future<Item = (), Error = ()>
pub(crate) fn chain_services<S>(
self,
store: S,
ilp_address: Address,
) -> impl Future<Item = (), Error = ()>
where
// Should we use a generic rather than the concrete Account type?
// I spent long enough banging my head against this that I don't care
// anymore, but feel free to take a whack at it.
/*A: AccountTrait
+ SettlementAccount
+ HttpAccount
+ BtpAccount
+ CcpRoutingAccount
+ RateLimitAccount
+ RoundTripTimeAccount
+ MaxPacketAmountAccount
+ Send
+ Sync
+ 'static
+ Serialize
+ Clone,*/
// Likewise, should this be generic?
//AT: ToString,
S: NodeStore<Account = Account>
+ BtpStore<Account = Account>
+ HttpStore<Account = Account>
Expand All @@ -321,6 +306,11 @@ impl InterledgerNode {
+ Sync
+ 'static,
{
debug!(target: "interledger-node",
"Starting Interledger node with ILP address: {}",
ilp_address
);

let secret_seed = Bytes::from(&self.secret_seed[..]);
let http_bind_address = self.http_bind_address;
let settlement_api_bind_address = self.settlement_api_bind_address;
Expand Down Expand Up @@ -513,7 +503,7 @@ impl InterledgerNode {
},
)
})
.in_current_span()
.in_current_span()
}

/// Starts a Prometheus metrics server that will listen on the configured address.
Expand Down Expand Up @@ -576,35 +566,6 @@ impl InterledgerNode {
pub fn run(self) {
tokio_run(self.serve());
}

#[doc(hidden)]
#[allow(dead_code)]
pub fn insert_account(&self, account: AccountDetails) -> impl Future<Item = Uuid, Error = ()> {
let redis_secret = generate_redis_secret(&self.secret_seed);
result(self.redis_connection.clone().into_connection_info())
.map_err(|err| error!(target: "interledger-node", "Invalid Redis connection details: {:?}", err))
.and_then(move |redis_url| RedisStoreBuilder::new(redis_url, redis_secret).connect())
.map_err(|err| error!(target: "interledger-node", "Error connecting to Redis: {:?}", err))
.and_then(move |store| {
store
.insert_account(account)
.map_err(|_| error!(target: "interledger-node", "Unable to create account"))
.and_then(|account| {
debug!(target: "interledger-node", "Created account: {}", account.id());
Ok(account.id())
})
})
}
}

fn generate_redis_secret(secret_seed: &[u8; 32]) -> [u8; 32] {
let mut redis_secret: [u8; 32] = [0; 32];
let sig = hmac::sign(
&hmac::Key::new(hmac::HMAC_SHA256, secret_seed),
REDIS_SECRET_GENERATION_STRING.as_bytes(),
);
redis_secret.copy_from_slice(sig.as_ref());
redis_secret
}

#[doc(hidden)]
Expand Down

0 comments on commit ad6edd3

Please sign in to comment.