Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Don't remove confirmed requests to early. #4933

Merged
merged 2 commits into from Mar 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions rpc/src/v1/helpers/mod.rs
Expand Up @@ -38,5 +38,6 @@ pub use self::requests::{
};
pub use self::signing_queue::{
ConfirmationsQueue, ConfirmationPromise, ConfirmationResult, SigningQueue, QueueEvent, DefaultAccount,
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
};
pub use self::signer::SignerService;
2 changes: 1 addition & 1 deletion rpc/src/v1/helpers/signing_queue.rs
Expand Up @@ -77,7 +77,7 @@ pub enum QueueAddError {
}

// TODO [todr] to consider: timeout instead of limit?
const QUEUE_LIMIT: usize = 50;
pub const QUEUE_LIMIT: usize = 50;

/// A queue of transactions awaiting to be confirmed and signed.
pub trait SigningQueue: Send + Sync {
Expand Down
45 changes: 33 additions & 12 deletions rpc/src/v1/impls/signing.rs
Expand Up @@ -27,7 +27,7 @@ use jsonrpc_core::Error;
use v1::helpers::{
errors,
DefaultAccount,
SigningQueue, ConfirmationPromise, ConfirmationResult, SignerService
SIGNING_QUEUE_LIMIT, SigningQueue, ConfirmationPromise, ConfirmationResult, SignerService
};
use v1::helpers::dispatch::{self, Dispatcher};
use v1::metadata::Metadata;
Expand All @@ -42,7 +42,10 @@ use v1::types::{
Origin,
};

const MAX_PENDING_DURATION: u64 = 60 * 60;
/// After 60s entries that are not queried with `check_request` will get garbage collected.
const MAX_PENDING_DURATION_SEC: u64 = 60;
/// Max number of total requests pending and completed, before we start garbage collecting them.
const MAX_TOTAL_REQUESTS: usize = SIGNING_QUEUE_LIMIT;

enum DispatchResult {
Promise(ConfirmationPromise),
Expand Down Expand Up @@ -71,14 +74,29 @@ fn handle_dispatch<OnResponse>(res: Result<DispatchResult, Error>, on_response:
}
}

fn collect_garbage(map: &mut TransientHashMap<U256, ConfirmationPromise>) {
map.prune();
if map.len() > MAX_TOTAL_REQUESTS {
// Remove all non-waiting entries.
let non_waiting: Vec<_> = map
.iter()
.filter(|&(_, val)| val.result() != ConfirmationResult::Waiting)
.map(|(key, _)| *key)
.collect();
for k in non_waiting {
map.remove(&k);
}
}
}

impl<D: Dispatcher + 'static> SigningQueueClient<D> {
/// Creates a new signing queue client given shared signing queue.
pub fn new(signer: &Arc<SignerService>, dispatcher: D, accounts: &Arc<AccountProvider>) -> Self {
SigningQueueClient {
signer: Arc::downgrade(signer),
accounts: Arc::downgrade(accounts),
dispatcher: dispatcher,
pending: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION))),
pending: Arc::new(Mutex::new(TransientHashMap::new(MAX_PENDING_DURATION_SEC))),
}
}

Expand Down Expand Up @@ -124,7 +142,10 @@ impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
DispatchResult::Value(v) => RpcEither::Or(v),
DispatchResult::Promise(promise) => {
let id = promise.id();
pending.lock().insert(id, promise);
let mut pending = pending.lock();
collect_garbage(&mut pending);
pending.insert(id, promise);

RpcEither::Either(id.into())
},
})
Expand All @@ -138,26 +159,26 @@ impl<D: Dispatcher + 'static> ParitySigning for SigningQueueClient<D> {
DispatchResult::Value(v) => RpcEither::Or(v),
DispatchResult::Promise(promise) => {
let id = promise.id();
pending.lock().insert(id, promise);
let mut pending = pending.lock();
collect_garbage(&mut pending);
pending.insert(id, promise);

RpcEither::Either(id.into())
},
})
.boxed()
}

fn check_request(&self, id: RpcU256) -> Result<Option<RpcConfirmationResponse>, Error> {
let mut pending = self.pending.lock();
let id: U256 = id.into();
let res = match pending.get(&id) {
match self.pending.lock().get(&id) {
Some(ref promise) => match promise.result() {
ConfirmationResult::Waiting => { return Ok(None); }
ConfirmationResult::Waiting => Ok(None),
ConfirmationResult::Rejected => Err(errors::request_rejected()),
ConfirmationResult::Confirmed(rpc_response) => rpc_response.map(Some),
},
_ => { return Err(errors::request_not_found()); }
};
pending.remove(&id);
res
_ => Err(errors::request_not_found()),
}
}

fn decrypt_message(&self, meta: Metadata, address: RpcH160, data: RpcBytes) -> BoxFuture<RpcBytes, Error> {
Expand Down