Skip to content

Commit

Permalink
Follow-up-of-PR-2532 (#2570)
Browse files Browse the repository at this point in the history
* 1. remove separate thread to clean up fully processed registry item; Relay on assertion-build return value / error / time out.
2. update according to PR comments from PR-2532

* fmt
  • Loading branch information
BillyWooo committed Mar 13, 2024
1 parent c075eb1 commit 851f2fc
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 51 deletions.
2 changes: 0 additions & 2 deletions tee-worker/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions tee-worker/enclave-runtime/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 27 additions & 42 deletions tee-worker/litentry/core/vc-task/receiver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ use std::{
mpsc::{channel, Sender},
Arc,
},
thread::{self, sleep},
time::{Duration, Instant},
thread,
time::Instant,
vec::Vec,
};

Expand Down Expand Up @@ -107,13 +107,6 @@ pub fn run_vc_handler_runner<ShieldingKeyRepository, A, S, H, O, Z, N>(

// use local registry to manage request reponse status
let req_registry = RequestRegistry::new();
let req_registry_cloned = req_registry.clone();
thread::spawn(move || loop {
// Doesn't harm anything to clean up slowly.
let wait_seconds = 200u64;
req_registry_cloned.clean_up(wait_seconds);
sleep(Duration::from_secs(wait_seconds));
});

while let Ok(mut req) = vc_task_receiver.recv() {
let request = &mut req.request;
Expand Down Expand Up @@ -200,9 +193,9 @@ pub fn run_vc_handler_runner<ShieldingKeyRepository, A, S, H, O, Z, N>(
tcs.call.clone(),
);

//Totally fine to `unwrap` here. Because new item just added above.
let (size, len) = req_registry_pool.update_item(connection_hash, 0).unwrap();
send_vc_response(connection_hash, context_pool, response, 0u8, len, size < len);
// Totally fine to `unwrap` here. Because new item was just added above.
let do_watch = req_registry_pool.update_item(connection_hash).unwrap();
send_vc_response(connection_hash, context_pool, response, 0u8, 1, do_watch);
});
} else if let TrustedCall::request_batch_vc(
signer,
Expand Down Expand Up @@ -253,28 +246,27 @@ pub fn run_vc_handler_runner<ShieldingKeyRepository, A, S, H, O, Z, N>(
new_call,
);

//Totally fine to `unwrap` here. Because new item just added above.
let (size, len) =
req_registry_pool.update_item(connection_hash, idx as u8).unwrap();
// Totally fine to `unwrap` here. Because new item was just added above.
let do_watch = req_registry_pool.update_item(connection_hash).unwrap();
send_vc_response(
connection_hash,
context_pool,
response,
idx as u8,
assertion_len,
size < len,
do_watch,
);
});
} else {
//Totally fine to `unwrap` here. Because new item just added above.
let (size, len) = req_registry.update_item(connection_hash, idx as u8).unwrap();
// Totally fine to `unwrap` here. Because new item was just added above.
let do_watch = req_registry.update_item(connection_hash).unwrap();
send_vc_response(
connection_hash,
context.clone(),
Err("Duplicate assertion request".to_string()),
idx as u8,
assertion_len,
size < len,
do_watch,
);
}
}
Expand All @@ -298,46 +290,39 @@ struct RequestRegistry {
}

struct AssertionStatus {
pub time_stamp: Instant,
pub len: u8,
pub indexes: Vec<u8>,
pub total: u8,
pub processed: u8,
}

impl RequestRegistry {
pub fn new() -> Self {
Self { status_map: Arc::new(Mutex::new(HashMap::new())) }
}

pub fn add_new_item(&self, key: H256, len: u8) {
pub fn add_new_item(&self, key: H256, total: u8) {
let mut map = self.status_map.lock().unwrap();
map.insert(key, AssertionStatus { time_stamp: Instant::now(), len, indexes: Vec::new() });
map.insert(key, AssertionStatus { total, processed: 0u8 });
}

pub fn update_item(&self, key: H256, idx: u8) -> Result<(u8, u8), &'static str> {
// Return value indicates whether some item is still not yet processed.
pub fn update_item(&self, key: H256) -> Result<bool, &'static str> {
let mut map = self.status_map.lock().unwrap();

#[allow(unused_assignments)]
let mut all_processed = false;

if let Some(entry) = map.get_mut(&key) {
entry.time_stamp = Instant::now();
entry.indexes.push(idx);
Ok((entry.indexes.len() as u8, entry.len))
entry.processed += 1;
all_processed = entry.processed == entry.total;
} else {
Err("Item not found in map")
return Err("Item not found in map")
}
}

pub fn clean_up(&self, wait_seconds: u64) {
let mut map = self.status_map.lock().unwrap();
let items_to_remove: Vec<H256> = map
.iter()
.filter(|(_, status)| {
status.time_stamp.elapsed() >= Duration::from_secs(wait_seconds)
&& status.indexes.len() as u8 == status.len
})
.map(|(key, _)| *key)
.collect();

for key in items_to_remove {
if all_processed {
map.remove(&key);
}

Ok(!all_processed)
}
}

Expand Down
5 changes: 0 additions & 5 deletions tee-worker/sidechain/rpc-handler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ edition = "2021"
# sgx dependencies
sgx_tstd = { branch = "master", git = "https://github.com/apache/teaclave-sgx-sdk.git", optional = true }

# substrate deps
sp-runtime = { default-features = false, git = "https://github.com/paritytech/substrate.git", branch = "polkadot-v0.9.42" }

# local dependencies
ita-stf = { path = "../../app-libs/stf", default-features = false }
itp-rpc = { path = "../../core-primitives/rpc", default-features = false }
itp-stf-primitives = { path = "../../core-primitives/stf-primitives", default-features = false }
itp-top-pool-author = { path = "../../core-primitives/top-pool-author", default-features = false }
Expand Down Expand Up @@ -52,7 +48,6 @@ std = [
"log/std",
"rust-base58",
"lc-vc-task-sender/std",
"sp-runtime/std",
]
sgx = [
"futures_sgx",
Expand Down

0 comments on commit 851f2fc

Please sign in to comment.