Skip to content

Commit 88f047f

Browse files
authored
restore connections to other musig2 signers (#2894)
* restore connections to other musig2 signers * adjust to new code * review suggestions
1 parent 986d954 commit 88f047f

File tree

9 files changed

+142
-71
lines changed

9 files changed

+142
-71
lines changed

bitacross-worker/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bitacross-worker/bitacross/core/bc-enclave-registry/src/lib.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pub trait EnclaveRegistryUpdater {
137137
pub trait EnclaveRegistryLookup {
138138
fn contains_key(&self, account: &Address32) -> bool;
139139
fn get_all(&self) -> Vec<(Address32, String)>;
140+
fn get_worker_url(&self, account: &Address32) -> Option<String>;
140141
}
141142

142143
impl EnclaveRegistrySealer for EnclaveRegistry {
@@ -226,29 +227,20 @@ impl EnclaveRegistryUpdater for EnclaveRegistry {
226227
}
227228

228229
impl EnclaveRegistryLookup for EnclaveRegistry {
229-
#[cfg(feature = "std")]
230-
fn contains_key(&self, account: &Address32) -> bool {
231-
let registry = self.registry.read().unwrap();
232-
registry.contains_key(account)
233-
}
234-
235-
#[cfg(feature = "std")]
236230
fn get_all(&self) -> Vec<(Address32, String)> {
237231
let registry = self.registry.read().unwrap();
238232
registry.iter().map(|(k, v)| (*k, v.clone())).collect()
239233
}
240234

241-
#[cfg(feature = "sgx")]
242235
fn contains_key(&self, account: &Address32) -> bool {
243236
// Using unwrap becaused poisoned locks are unrecoverable errors
244237
let registry = self.registry.read().unwrap();
245238
registry.contains_key(account)
246239
}
247240

248-
#[cfg(feature = "sgx")]
249-
fn get_all(&self) -> Vec<(Address32, String)> {
241+
fn get_worker_url(&self, account: &Address32) -> Option<String> {
250242
// Using unwrap becaused poisoned locks are unrecoverable errors
251243
let registry = self.registry.read().unwrap();
252-
registry.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
244+
registry.get(account).cloned()
253245
}
254246
}

bitacross-worker/bitacross/core/bc-musig2-ceremony/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub enum CeremonyCommand {
8686
KillCeremony,
8787
}
8888

89-
// commands are created by ceremony and executed by runner
89+
// events are created by ceremony and executed by runner
9090
#[derive(Debug, Eq, PartialEq)]
9191
pub enum CeremonyEvent {
9292
FirstRoundStarted(Signers, CeremonyId, PubNonce),

bitacross-worker/bitacross/core/bc-musig2-event/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ threadpool = { version = "1.8.0", optional = true }
1414
sgx_tstd = { git = "https://github.com/apache/teaclave-sgx-sdk.git", branch = "master", optional = true, features = ["net", "thread"] }
1515
threadpool_sgx = { git = "https://github.com/mesalock-linux/rust-threadpool-sgx", package = "threadpool", tag = "sgx_1.1.3", optional = true }
1616

17+
bc-enclave-registry = { path = "../bc-enclave-registry", default-features = false }
1718
bc-musig2-ceremony = { path = "../bc-musig2-ceremony", default-features = false }
1819
itc-direct-rpc-client = { path = "../../../core/direct-rpc-client", default-features = false }
1920
itc-direct-rpc-server = { path = "../../../core/direct-rpc-server", default-features = false }
@@ -45,6 +46,7 @@ std = [
4546
"litentry-primitives/std",
4647
"itp-rpc/std",
4748
"bc-musig2-ceremony/std",
49+
"bc-enclave-registry/std",
4850
"lc-direct-call/std",
4951
"itp-sgx-crypto/std",
5052
"rand",
@@ -57,6 +59,7 @@ sgx = [
5759
"litentry-primitives/sgx",
5860
"itp-rpc/sgx",
5961
"bc-musig2-ceremony/sgx",
62+
"bc-enclave-registry/sgx",
6063
"lc-direct-call/sgx",
6164
"itp-sgx-crypto/sgx",
6265
"sgx_rand",

bitacross-worker/bitacross/core/bc-musig2-event/src/lib.rs

Lines changed: 111 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ extern crate sgx_tstd as std;
2323
#[cfg(all(feature = "std", feature = "sgx"))]
2424
compile_error!("feature \"std\" and feature \"sgx\" cannot be enabled at the same time");
2525

26+
use core::time::Duration;
2627
#[cfg(feature = "std")]
2728
use threadpool::ThreadPool;
2829

@@ -35,34 +36,45 @@ use std::sync::Mutex;
3536
#[cfg(feature = "sgx")]
3637
use std::sync::SgxMutex as Mutex;
3738

38-
use bc_musig2_ceremony::{CeremonyEvent, CeremonyId};
39+
#[cfg(feature = "std")]
40+
use std::sync::RwLock;
41+
42+
#[cfg(feature = "sgx")]
43+
use std::sync::SgxRwLock as RwLock;
44+
45+
use bc_enclave_registry::EnclaveRegistryLookup;
46+
use bc_musig2_ceremony::{CeremonyEvent, CeremonyId, CeremonyRegistry, SignerId};
3947
use codec::Encode;
40-
use itc_direct_rpc_client::{DirectRpcClient, RpcClient};
48+
use itc_direct_rpc_client::{DirectRpcClient, DirectRpcClientFactory, RpcClient, RpcClientFactory};
4149
use itc_direct_rpc_server::SendRpcResponse;
4250
use itp_ocall_api::EnclaveAttestationOCallApi;
4351
use itp_rpc::{Id, RpcRequest};
44-
use itp_sgx_crypto::key_repository::AccessKey;
52+
use itp_sgx_crypto::{key_repository::AccessKey, schnorr::Pair as SchnorrPair};
4553
pub use itp_types::{DirectRequestStatus, Hash};
4654
use itp_utils::hex::ToHexPrefixed;
4755
use lc_direct_call::CeremonyRoundCall;
4856
use litentry_primitives::{Address32, Identity, PlainRequest, ShardIdentifier};
4957
use log::*;
5058
use sp_core::{blake2_256, ed25519, Pair as SpCorePair, H256};
51-
use std::{collections::HashMap, string::ToString, sync::Arc, vec};
59+
use std::{collections::HashMap, string::ToString, sync::Arc, thread::sleep, vec};
5260

5361
#[allow(clippy::too_many_arguments)]
54-
pub fn process_event<OCallApi, SIGNINGAK, Responder>(
62+
pub fn process_event<OCallApi, SIGNINGAK, Responder, ECL, BKR>(
5563
signing_key_access: Arc<SIGNINGAK>,
5664
ocall_api: Arc<OCallApi>,
5765
responder: Arc<Responder>,
66+
enclave_registry_lookup: Arc<ECL>,
5867
event: CeremonyEvent,
5968
ceremony_id: CeremonyId,
6069
event_threads_pool: ThreadPool,
6170
peers_map: Arc<Mutex<HashMap<[u8; 32], DirectRpcClient>>>,
71+
ceremony_registry: Arc<RwLock<CeremonyRegistry<BKR>>>,
6272
) where
6373
OCallApi: EnclaveAttestationOCallApi + 'static,
6474
SIGNINGAK: AccessKey<KeyType = ed25519::Pair> + Send + Sync + 'static,
6575
Responder: SendRpcResponse<Hash = H256> + 'static,
76+
ECL: EnclaveRegistryLookup + Send + Sync + 'static,
77+
BKR: AccessKey<KeyType = SchnorrPair> + Send + Sync + 'static,
6678
{
6779
let my_identity: Address32 = signing_key_access.retrieve_key().unwrap().public().0.into();
6880
let identity = Identity::Substrate(my_identity);
@@ -80,20 +92,21 @@ pub fn process_event<OCallApi, SIGNINGAK, Responder>(
8092
);
8193

8294
let signer_id = *signer_id;
83-
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
84-
if let Some(mut client) = client {
85-
let request = request.clone();
86-
event_threads_pool.execute(move || {
87-
if let Err(e) = client.send(&request) {
88-
error!(
89-
"Could not send request to signer: {:?}, reason: {:?}",
90-
signer_id, e
91-
)
92-
}
93-
});
94-
} else {
95-
error!("Fail to share nonce, unknown signer: {:?}", signer_id);
96-
}
95+
let peers_map_clone = peers_map.clone();
96+
let request = request.clone();
97+
let enclave_lookup_cloned = enclave_registry_lookup.clone();
98+
let ceremony_registry_cloned = ceremony_registry.clone();
99+
let ceremony_id_cloned = ceremony_id.clone();
100+
event_threads_pool.execute(move || {
101+
send_request(
102+
signer_id,
103+
&ceremony_id_cloned,
104+
request,
105+
peers_map_clone,
106+
enclave_lookup_cloned,
107+
ceremony_registry_cloned,
108+
);
109+
});
97110
});
98111
},
99112
CeremonyEvent::SecondRoundStarted(signers, message, signature) => {
@@ -108,20 +121,21 @@ pub fn process_event<OCallApi, SIGNINGAK, Responder>(
108121
);
109122

110123
let signer_id = *signer_id;
111-
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
112-
if let Some(mut client) = client {
113-
let request = request.clone();
114-
event_threads_pool.execute(move || {
115-
if let Err(e) = client.send(&request) {
116-
error!(
117-
"Could not send request to signer: {:?}, reason: {:?}",
118-
signer_id, e
119-
)
120-
}
121-
});
122-
} else {
123-
error!("Fail to share partial signature, unknown signer: {:?}", signer_id);
124-
}
124+
let peers_map_clone = peers_map.clone();
125+
let request = request.clone();
126+
let enclave_lookup_cloned = enclave_registry_lookup.clone();
127+
let ceremony_registry_cloned = ceremony_registry.clone();
128+
let ceremony_id_cloned = ceremony_id.clone();
129+
event_threads_pool.execute(move || {
130+
send_request(
131+
signer_id,
132+
&ceremony_id_cloned,
133+
request,
134+
peers_map_clone,
135+
enclave_lookup_cloned,
136+
ceremony_registry_cloned,
137+
);
138+
});
125139
});
126140
},
127141
CeremonyEvent::CeremonyEnded(signature, is_check_run, verification_result) => {
@@ -168,25 +182,75 @@ pub fn process_event<OCallApi, SIGNINGAK, Responder>(
168182
);
169183

170184
let signer_id = *signer_id;
171-
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
172-
if let Some(mut client) = client {
173-
let request = request.clone();
174-
event_threads_pool.execute(move || {
175-
if let Err(e) = client.send(&request) {
176-
error!(
177-
"Could not send request to signer: {:?}, reason: {:?}",
178-
signer_id, e
179-
)
180-
}
181-
});
182-
} else {
183-
error!("Fail to share killing info, unknown signer: {:?}", signer_id);
184-
}
185+
let peers_map_clone = peers_map.clone();
186+
let request = request.clone();
187+
let enclave_lookup_cloned = enclave_registry_lookup.clone();
188+
let ceremony_registry_cloned = ceremony_registry.clone();
189+
let ceremony_id_cloned = ceremony_id.clone();
190+
event_threads_pool.execute(move || {
191+
send_request(
192+
signer_id,
193+
&ceremony_id_cloned,
194+
request,
195+
peers_map_clone,
196+
enclave_lookup_cloned,
197+
ceremony_registry_cloned,
198+
);
199+
});
185200
});
186201
},
187202
}
188203
}
189204

205+
// it will try to send request until it succeeds, the peer is removed from registry or ceremony is removed
206+
fn send_request<ECL, BKR>(
207+
signer_id: SignerId,
208+
ceremony_id: &CeremonyId,
209+
request: RpcRequest,
210+
peers_map: Arc<Mutex<HashMap<[u8; 32], DirectRpcClient>>>,
211+
enclave_registry_lookup: Arc<ECL>,
212+
ceremony_registry: Arc<RwLock<CeremonyRegistry<BKR>>>,
213+
) where
214+
ECL: EnclaveRegistryLookup,
215+
BKR: AccessKey<KeyType = SchnorrPair>,
216+
{
217+
loop {
218+
let client = peers_map.lock().unwrap().get(&signer_id).cloned();
219+
if let Some(mut client) = client {
220+
if let Err(e) = client.send(&request) {
221+
error!("Could not send request to signer: {:?}, reason: {:?}", signer_id, e);
222+
sleep(Duration::from_secs(5));
223+
let mut peers_lock = peers_map.lock().unwrap();
224+
peers_lock.remove(&signer_id);
225+
} else {
226+
// finish if request was sent
227+
break
228+
}
229+
} else {
230+
// check if ceremony still exists, if not stop
231+
if !ceremony_registry.read().unwrap().contains_key(ceremony_id) {
232+
break
233+
}
234+
235+
if let Some(url) = enclave_registry_lookup.get_worker_url(&Address32::from(signer_id)) {
236+
match (DirectRpcClientFactory {}).create(&url) {
237+
Ok(new_client) => {
238+
peers_map.lock().unwrap().insert(signer_id, new_client.clone());
239+
},
240+
Err(e) => {
241+
error!("Could not connect to peer {}, reason: {:?}", url, e);
242+
sleep(Duration::from_secs(5));
243+
},
244+
}
245+
} else {
246+
error!("Could not find {:?} in registry", signer_id.to_hex());
247+
// stop if peer is not found in registry
248+
break
249+
}
250+
}
251+
}
252+
}
253+
190254
fn prepare_request<SIGNINGAK>(
191255
signing_key_access: &SIGNINGAK,
192256
mr_enclave: [u8; 32],

bitacross-worker/bitacross/core/bc-task-processor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,10 +417,12 @@ fn handle_ceremony_command<SKR, SIGNINGAK, EKR, BKR, S, H, O, RRL, ERL, SRL, Res
417417
context.signing_key_access.clone(),
418418
context.ocall_api.clone(),
419419
context.responder.clone(),
420+
context.enclave_registry_lookup.clone(),
420421
event,
421422
ceremony_id.clone(),
422423
event_threads_pool.clone(),
423424
peers_map.clone(),
425+
context.ceremony_registry.clone(),
424426
);
425427
}
426428
}

bitacross-worker/core-primitives/rpc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub struct RpcResponse {
6464
pub id: Id,
6565
}
6666

67-
#[derive(Clone, Encode, Decode, Serialize, Deserialize)]
67+
#[derive(Clone, Encode, Decode, Serialize, Deserialize, Debug)]
6868
pub struct RpcRequest {
6969
pub jsonrpc: String,
7070
pub method: String,

bitacross-worker/core/direct-rpc-client/src/lib.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ pub trait RpcClient {
114114

115115
#[derive(Clone)]
116116
pub struct DirectRpcClient {
117-
request_sink: Sender<String>,
117+
request_sink: Sender<(String, Sender<bool>)>,
118118
}
119119

120120
impl DirectRpcClient {
@@ -133,20 +133,21 @@ impl DirectRpcClient {
133133
client_tls_with_config(server_url.as_str(), stream, None, Some(connector))
134134
.map_err(|e| format!("Could not open websocket connection: {:?}", e))?;
135135

136-
let (request_sender, request_receiver) = channel();
136+
let (request_sender, request_receiver) = channel::<(String, Sender<bool>)>();
137137

138138
//it fails to perform handshake in non_blocking mode so we are setting it up after the handshake is performed
139139
Self::switch_to_non_blocking(&mut socket);
140140

141141
std::thread::spawn(move || {
142-
loop {
143-
// let's flush all pending requests first
144-
while let Ok(request) = request_receiver.try_recv() {
145-
if let Err(e) = socket.write_message(Message::Text(request)) {
146-
error!("Could not write message to socket, reason: {:?}", e)
147-
}
142+
while let Ok((request, result_sender)) = request_receiver.recv() {
143+
let mut result = true;
144+
if let Err(e) = socket.write_message(Message::Text(request)) {
145+
error!("Could not write message to socket, reason: {:?}", e);
146+
result = false;
147+
}
148+
if let Err(e) = result_sender.send(result) {
149+
log::error!("Could not send rpc result back, reason: {:?}", e);
148150
}
149-
std::thread::sleep(Duration::from_millis(1))
150151
}
151152
});
152153
debug!("Connected to peer: {}", url);
@@ -183,8 +184,15 @@ impl RpcClient for DirectRpcClient {
183184
fn send(&mut self, request: &RpcRequest) -> Result<(), Box<dyn Error>> {
184185
let request = serde_json::to_string(request)
185186
.map_err(|e| format!("Could not parse RpcRequest {:?}", e))?;
187+
let (sender, receiver) = channel();
186188
self.request_sink
187-
.send(request)
188-
.map_err(|e| format!("Could not write message, reason: {:?}", e).into())
189+
.send((request, sender))
190+
.map_err(|e| format!("Could not parse RpcRequest {:?}", e))?;
191+
192+
if receiver.recv()? {
193+
Ok(())
194+
} else {
195+
Err("Could not send request".into())
196+
}
189197
}
190198
}

bitacross-worker/enclave-runtime/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)