-
Notifications
You must be signed in to change notification settings - Fork 132
/
process.rs
131 lines (120 loc) · 4.8 KB
/
process.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use crate::{
fragment::{Logs, Pools},
intercom::{NetworkMsg, TransactionMsg},
stats_counter::StatsCounter,
utils::{
async_msg::{MessageBox, MessageQueue},
task::TokioServiceInfo,
},
};
use std::collections::HashMap;
use std::fs::File;
use thiserror::Error;
use tokio_stream::StreamExt;
use tracing::{span, Level};
use tracing_futures::Instrument;
pub struct Process {
pool_max_entries: usize,
logs: Logs,
network_msg_box: MessageBox<NetworkMsg>,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("transaction pool error")]
Pool(#[from] crate::fragment::pool::Error),
}
impl Process {
pub fn new(
pool_max_entries: usize,
logs_max_entries: usize,
network_msg_box: MessageBox<NetworkMsg>,
) -> Self {
let logs = Logs::new(logs_max_entries);
Process {
pool_max_entries,
logs,
network_msg_box,
}
}
pub async fn start(
self,
n_pools: usize,
service_info: TokioServiceInfo,
stats_counter: StatsCounter,
mut input: MessageQueue<TransactionMsg>,
persistent_log: Option<File>,
) -> Result<(), Error> {
let mut pool = Pools::new(
self.pool_max_entries,
n_pools,
self.logs,
self.network_msg_box,
persistent_log,
);
async move {
while let Some(input_result) = input.next().await {
match input_result {
TransactionMsg::SendTransaction(origin, txs) => {
// Note that we cannot use apply_block here, since we don't have a valid context to which to apply
// those blocks. one valid tx in a given context, could be invalid in another. for example
// fee calculations, existence utxo / account solvency.
// FIXME/TODO check that the txs are valid within themselves with basic requirements (e.g. inputs >= outputs).
// we also want to keep a basic capability to filter away repetitive queries or definitely discarded txid.
// This interface only makes sense for messages coming from arbitrary users (like transaction, certificates),
// for other message we don't want to receive them through this interface, and possibly
// put them in another pool.
let stats_counter = stats_counter.clone();
pool.insert_and_propagate_all(origin, txs)
.await
.map(move |count| stats_counter.add_tx_recv_cnt(count))?;
}
TransactionMsg::RemoveTransactions(fragment_ids, status) => {
tracing::debug!(
"removing fragments added to block {:?}: {:?}",
status,
fragment_ids
);
pool.remove_added_to_block(fragment_ids, status);
}
TransactionMsg::GetLogs(reply_handle) => {
let logs = pool.logs().logs().cloned().collect();
reply_handle.reply_ok(logs);
}
TransactionMsg::GetStatuses(fragment_ids, reply_handle) => {
let mut statuses = HashMap::new();
pool.logs().logs_by_ids(fragment_ids).into_iter().for_each(
|(fragment_id, log)| {
statuses.insert(fragment_id, log.status().clone());
},
);
reply_handle.reply_ok(statuses);
}
TransactionMsg::SelectTransactions {
pool_idx,
ledger,
ledger_params,
selection_alg,
reply_handle,
soft_deadline_future,
hard_deadline_future,
} => {
let contents = pool
.select(
pool_idx,
ledger,
ledger_params,
selection_alg,
soft_deadline_future,
hard_deadline_future,
)
.await;
reply_handle.reply_ok(contents);
}
}
}
Ok(())
}
.instrument(span!(parent: service_info.span(), Level::TRACE, "process", kind = "fragment"))
.await
}
}