Skip to content

Commit

Permalink
Add logs update
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeSandwich committed Sep 20, 2019
1 parent 0c41cad commit edc564f
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 12 deletions.
3 changes: 2 additions & 1 deletion jormungandr/src/blockchain/process.rs
Expand Up @@ -109,6 +109,7 @@ pub fn handle_input(
Ok(maybe_updated) => {
if let Some(new_block_ref) = maybe_updated {
let header = new_block_ref.header().clone();
let date = header.block_date().clone().into();
process_new_ref(
blockchain.clone(),
blockchain_tip.clone(),
Expand All @@ -117,7 +118,7 @@ pub fn handle_input(
.wait()
.unwrap();
tx_msg_box
.try_send(TransactionMsg::RemoveTransactions(fragment_ids))
.try_send(TransactionMsg::RemoveTransactions(fragment_ids, date))
.unwrap_or_else(|err| {
error!(info.logger(), "cannot remove fragments from pool: {}", err)
});
Expand Down
12 changes: 12 additions & 0 deletions jormungandr/src/fragment/logs.rs
Expand Up @@ -48,6 +48,18 @@ impl Logs {
self.run_on_inner(move |inner| inner.modify(&fragment_id.into(), status))
}

pub fn modify_all(
&mut self,
fragment_ids: impl IntoIterator<Item = FragmentId>,
status: FragmentStatus,
) -> impl Future<Item = (), Error = ()> {
self.run_on_inner(move |inner| {
for fragment_id in fragment_ids {
inner.modify(&fragment_id.into(), status.clone())
}
})
}

pub fn remove(&mut self, fragment_id: FragmentId) -> impl Future<Item = (), Error = ()> {
self.run_on_inner(move |inner| inner.remove(&fragment_id.into()))
}
Expand Down
22 changes: 15 additions & 7 deletions jormungandr/src/fragment/pool.rs
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use chain_core::property::Fragment as _;
use chain_impl_mockchain::transaction::AuthenticatedTransaction;
use jormungandr_lib::interfaces::{FragmentLog, FragmentOrigin};
use jormungandr_lib::interfaces::{BlockDate, FragmentLog, FragmentOrigin, FragmentStatus};
use slog::Logger;
use std::time::Duration;
use tokio::{
Expand Down Expand Up @@ -86,19 +86,26 @@ impl Pool {
)
}

pub fn remove_by_ids(
pub fn remove_added_to_block(
&mut self,
fragment_ids: impl IntoIterator<Item = FragmentId>,
fragment_ids: Vec<FragmentId>,
date: BlockDate,
) -> impl Future<Item = (), Error = ()> {
let mut lock = self.pool.clone();
future::poll_fn(move || Ok(lock.poll_lock()))
.map(move |mut pool| pool.remove_all(fragment_ids))
let mut pool_lock = self.pool.clone();
let mut logs = self.logs.clone();
future::poll_fn(move || Ok(pool_lock.poll_lock()))
.map(move |mut pool| {
pool.remove_all(fragment_ids.iter().cloned());
fragment_ids
})
.and_then(move |fragment_ids| {
logs.modify_all(fragment_ids, FragmentStatus::InABlock { date })
})
}

pub fn poll_purge(&mut self) -> impl Future<Item = (), Error = timer::Error> {
let mut lock = self.pool.clone();
let purge_logs = self.logs.poll_purge();

future::poll_fn(move || Ok(lock.poll_lock()))
.and_then(move |mut guard| future::poll_fn(move || guard.poll_purge()))
.and_then(move |()| purge_logs)
Expand All @@ -117,6 +124,7 @@ impl Pool {
let mut lock = self.pool.clone();
let logs = self.logs().clone();

// FIXME deadlock hazard, nested pool lock and logs lock
future::poll_fn(move || Ok(lock.poll_lock()))
.and_then(move |pool| logs.inner().map(|logs| (pool, logs)))
.and_then(move |(mut pool, mut logs)| {
Expand Down
4 changes: 2 additions & 2 deletions jormungandr/src/fragment/process.rs
Expand Up @@ -73,8 +73,8 @@ impl Process {
.insert_and_propagate_all(origin, txs, service_info.logger().clone())
.map(move |count| stats_counter.add_tx_recv_cnt(count)))
}
TransactionMsg::RemoveTransactions(fragment_ids) => {
B(self.pool.clone().remove_by_ids(fragment_ids))
TransactionMsg::RemoveTransactions(fragment_ids, date) => {
B(self.pool.clone().remove_added_to_block(fragment_ids, date))
}
}
})
Expand Down
4 changes: 2 additions & 2 deletions jormungandr/src/intercom.rs
Expand Up @@ -3,7 +3,7 @@ use crate::network::p2p::topology::NodeId;
use blockchain::{Checkpoints, Ref};
use futures::prelude::*;
use futures::sync::{mpsc, oneshot};
use jormungandr_lib::interfaces::FragmentOrigin;
use jormungandr_lib::interfaces::{BlockDate, FragmentOrigin};
use network_core::error as core_error;
use slog::Logger;
use std::{
Expand Down Expand Up @@ -327,7 +327,7 @@ pub fn stream_request<T, E>(buffer: usize) -> (RequestStreamHandle<T>, RequestSi
#[derive(Debug)]
pub enum TransactionMsg {
SendTransaction(FragmentOrigin, Vec<Fragment>),
RemoveTransactions(Vec<FragmentId>),
RemoveTransactions(Vec<FragmentId>, BlockDate),
}

/// Client messages, mainly requests from connected peers to our node.
Expand Down

0 comments on commit edc564f

Please sign in to comment.