Skip to content
Permalink
Browse files

feat: rates module for exchange rate APIs

  • Loading branch information
kincaidoneil committed Jan 31, 2020
1 parent cb91ed4 commit bff029ee23724da330ae78068ff4ce5b90b6c6d1

Some generated files are not rendered by default. Learn more.

@@ -11,6 +11,7 @@ members = [
"./crates/interledger-ildcp",
"./crates/interledger-packet",
"./crates/interledger-router",
"./crates/interledger-rates",
"./crates/interledger-service",
"./crates/interledger-service-util",
"./crates/interledger-settlement",
@@ -38,15 +38,15 @@ use interledger::{
ildcp::IldcpService,
packet::Address,
packet::{ErrorCode, RejectBuilder},
rates::{ExchangeRateFetcher, ExchangeRateStore},
router::{Router, RouterStore},
service::{
outgoing_service_fn, Account as AccountTrait, AccountStore, AddressStore, OutgoingRequest,
Username,
},
service_util::{
BalanceStore, EchoService, ExchangeRateFetcher, ExchangeRateService, ExchangeRateStore,
ExpiryShortenerService, MaxPacketAmountService, RateLimitService, RateLimitStore,
ValidatorService,
BalanceStore, EchoService, ExchangeRateService, ExpiryShortenerService,
MaxPacketAmountService, RateLimitService, RateLimitStore, ValidatorService,
},
settlement::{
api::{create_settlements_filter, SettlementMessageService},
@@ -74,7 +74,7 @@ use crate::redis_store::*;
use interledger::service_util::BalanceService;

#[doc(hidden)]
pub use interledger::service_util::ExchangeRateProvider;
pub use interledger::rates::ExchangeRateProvider;

static DEFAULT_ILP_ADDRESS: Lazy<Address> = Lazy::new(|| Address::from_str("local.host").unwrap());

@@ -11,6 +11,7 @@ repository = "https://github.com/interledger-rs/interledger-rs"
interledger-packet = { path = "../interledger-packet", version = "^0.4.0", default-features = false }
interledger-http = { path = "../interledger-http", version = "^0.4.0", default-features = false }
interledger-ildcp = { path = "../interledger-ildcp", version = "^0.4.0", default-features = false }
interledger-rates = { path = "../interledger-rates", version = "^0.4.0", default-features = false }
interledger-router = { path = "../interledger-router", version = "^0.4.0", default-features = false }
interledger-service = { path = "../interledger-service", version = "^0.4.0", default-features = false }
interledger-service-util = { path = "../interledger-service-util", version = "^0.4.0", default-features = false }
@@ -5,11 +5,12 @@ use interledger_ccp::CcpRoutingAccount;
use interledger_errors::NodeStoreError;
use interledger_http::{HttpAccount, HttpStore};
use interledger_packet::Address;
use interledger_rates::ExchangeRateStore;
use interledger_router::RouterStore;
use interledger_service::{
Account, AccountStore, AddressStore, IncomingService, OutgoingService, Username,
};
use interledger_service_util::{BalanceStore, ExchangeRateStore};
use interledger_service_util::BalanceStore;
use interledger_settlement::core::types::{SettlementAccount, SettlementStore};
use interledger_stream::StreamNotificationsStore;
use secrecy::SecretString;
@@ -7,12 +7,13 @@ use interledger_errors::*;
use interledger_http::{deserialize_json, HttpAccount, HttpStore};
use interledger_ildcp::IldcpRequest;
use interledger_ildcp::IldcpResponse;
use interledger_rates::ExchangeRateStore;
use interledger_router::RouterStore;
use interledger_service::{
Account, AccountStore, AddressStore, IncomingService, OutgoingRequest, OutgoingService,
Username,
};
use interledger_service_util::{BalanceStore, ExchangeRateStore};
use interledger_service_util::BalanceStore;
use interledger_settlement::core::{types::SettlementAccount, SettlementClient};
use interledger_spsp::{pay, SpspResponder};
use interledger_stream::{PaymentNotification, StreamNotificationsStore};
@@ -4,6 +4,7 @@ use futures::TryFutureExt;
use interledger_errors::*;
use interledger_http::{deserialize_json, HttpAccount};
use interledger_packet::Address;
use interledger_rates::ExchangeRateStore;
use interledger_router::RouterStore;
use interledger_service::{Account, AccountStore, AddressStore, Username};
use interledger_service_util::ExchangeRateStore;
@@ -11,11 +11,12 @@ use interledger_ccp::{CcpRoutingAccount, RoutingRelation};
use interledger_errors::*;
use interledger_http::{HttpAccount, HttpStore};
use interledger_packet::{Address, ErrorCode, FulfillBuilder, RejectBuilder};
use interledger_rates::ExchangeRateStore;
use interledger_router::RouterStore;
use interledger_service::{
incoming_service_fn, outgoing_service_fn, Account, AccountStore, AddressStore, Username,
};
use interledger_service_util::{BalanceStore, ExchangeRateStore};
use interledger_service_util::BalanceStore;
use interledger_settlement::core::types::{SettlementAccount, SettlementEngineDetails};
use interledger_stream::{PaymentNotification, StreamNotificationsStore};
use once_cell::sync::Lazy;
@@ -0,0 +1,20 @@
[package]
name = "interledger-rates"
version = "0.4.0"
authors = ["Kincaid O'Neil <kincaidoneil@users.noreply.github.com>"]
edition = "2018"

# TODO Add other metadata here!

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.22"
futures = { version = "0.3.1", default-features = false }
interledger-errors = { path = "../interledger-errors", version = "^0.1.0" }
log = { version = "0.4.8", default-features = false }
once_cell = "1.3.1"
reqwest = { version = "0.10.0", default-features = false, features = ["default-tls", "json"] }
secrecy = { version = "0.6", default-features = false, features = ["alloc", "serde"] }
serde = { version = "1.0.101", default-features = false, features = ["derive"]}
tokio = { version = "0.2.6", default-features = false, features = ["macros", "time"] }
@@ -0,0 +1,3 @@
# interledger-rates

Utilities for fetching and caching exchange rates from external APIs, which supports CoinCap and CryptoCompare rate backends.
File renamed without changes.
@@ -0,0 +1,151 @@
use futures::TryFutureExt;
use interledger_errors::ExchangeRateStoreError;
use log::{debug, error, trace, warn};
use reqwest::Client;
use secrecy::SecretString;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio;

mod cryptocompare;

mod coincap;

pub trait ExchangeRateStore: Clone {
// TODO we may want to make this async if/when we use pubsub to broadcast
// rate changes to different instances of a horizontally-scalable node
fn set_exchange_rates(&self, rates: HashMap<String, f64>)
-> Result<(), ExchangeRateStoreError>;

fn get_exchange_rates(&self, asset_codes: &[&str]) -> Result<Vec<f64>, ExchangeRateStoreError>;

// TODO should this be on the API instead? That's where it's actually used
// TODO should we combine this method with get_exchange_rates?
// The downside of doing that is in this case we want a HashMap with owned values
// (so that we don't accidentally lock up the RwLock on the store's exchange_rates)
// but in the normal case of getting the rate between two assets, we don't want to
// copy all the rate data
fn get_all_exchange_rates(&self) -> Result<HashMap<String, f64>, ExchangeRateStoreError>;
}

/// This determines which external API service to poll for exchange rates.
#[derive(Debug, Clone, Deserialize)]
pub enum ExchangeRateProvider {
/// Use the [CoinCap] API.
///
/// Note that when configured with YAML, this MUST be specified as
/// "CoinCap", not "coincap".
///
/// [CoinCap]: https://coincap.io/
#[serde(alias = "coincap")]
CoinCap,
/// Use the [CryptoCompare] API. Note this service requires an
/// API key (but the free tier supports 100,000 requests / month at the
/// time of writing).
///
/// Note that when configured with YAML, this MUST be specified as
/// "CryptoCompare", not "crypto_compare".
///
/// [CryptoCompare]: https://cryptocompare.com
#[serde(alias = "cryptocompare")]
CryptoCompare(SecretString),
}

/// Poll exchange rate providers for the current exchange rates
#[derive(Clone)]
pub struct ExchangeRateFetcher<S> {
provider: ExchangeRateProvider,
consecutive_failed_polls: Arc<AtomicU32>,
failed_polls_before_invalidation: u32,
store: S,
client: Client, // TODO What is a Client?
}

impl<S> ExchangeRateFetcher<S>
where
S: ExchangeRateStore + Send + Sync + 'static,
{
/// Simple constructor
pub fn new(
provider: ExchangeRateProvider,
failed_polls_before_invalidation: u32,
store: S,
) -> Self {
ExchangeRateFetcher {
provider,
consecutive_failed_polls: Arc::new(AtomicU32::new(0)),
failed_polls_before_invalidation,
store,
client: Client::new(),
}
}

/// Spawns a future which calls [`self.update_rates()`](./struct.ExchangeRateFetcher.html#method.update_rates) every `interval`
pub fn spawn_interval(self, interval: Duration) {
debug!(
"Starting interval to poll exchange rate provider: {:?} for rates",
self.provider
);
let interval = async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
// Ignore errors so that they don't cause the Interval to stop
let _ = self.update_rates().await;
}
};
tokio::spawn(interval);
}

/// Calls the proper exchange rate provider
async fn fetch_rates(&self) -> Result<HashMap<String, f64>, ()> {
match self.provider {
ExchangeRateProvider::CryptoCompare(ref api_key) => {
cryptocompare::query_cryptocompare(&self.client, api_key).await
}
ExchangeRateProvider::CoinCap => coincap::query_coincap(&self.client).await,
}
}

/// Gets the exchange rates and proceeds to update the store with the newly polled values
async fn update_rates(&self) -> Result<(), ()> {
let consecutive_failed_polls = self.consecutive_failed_polls.clone();
let consecutive_failed_polls_zeroer = consecutive_failed_polls.clone();
let failed_polls_before_invalidation = self.failed_polls_before_invalidation;
let store = self.store.clone();
let store_clone = self.store.clone();
let provider = self.provider.clone();
let mut rates = self.fetch_rates()
.map_err(move |_| {
// Note that a race between the read on this line and the check on the line after
// is quite unlikely as long as the interval between polls is reasonable.
let failed_polls = consecutive_failed_polls.fetch_add(1, Ordering::Relaxed);
if failed_polls < failed_polls_before_invalidation {
warn!("Failed to update exchange rates (previous consecutive failed attempts: {})", failed_polls);
} else {
error!("Failed to update exchange rates (previous consecutive failed attempts: {}), removing old rates for safety", failed_polls);
// Clear out all of the old rates
if store.set_exchange_rates(HashMap::new()).is_err() {
error!("Failed to clear exchange rates cache after exchange rates server became unresponsive; panicking");
panic!("Failed to clear exchange rates cache after exchange rates server became unresponsive");
}
}
}).await?;

trace!("Fetched exchange rates: {:?}", rates);
let num_rates = rates.len();
rates.insert("USD".to_string(), 1.0);
if store_clone.set_exchange_rates(rates).is_ok() {
// Reset our invalidation counter
consecutive_failed_polls_zeroer.store(0, Ordering::Relaxed);
debug!("Updated {} exchange rates from {:?}", num_rates, provider);
Ok(())
} else {
error!("Error setting exchange rates in store");
Err(())
}
}
}
@@ -10,6 +10,7 @@ repository = "https://github.com/interledger-rs/interledger-rs"
[dependencies]
interledger-errors = { path = "../interledger-errors", version = "^0.1.0", default-features = false }
interledger-packet = { path = "../interledger-packet", version = "^0.4.0", default-features = false }
interledger-rates = { path = "../interledger-rates", version = "^0.4.0", default-features = false }
interledger-service = { path = "../interledger-service", version = "^0.4.0", default-features = false }
interledger-settlement = { path = "../interledger-settlement", version = "^0.3.0", default-features = false, features = ["settlement_api"] }

This file was deleted.

0 comments on commit bff029e

Please sign in to comment.
You can’t perform that action at this time.