diff --git a/switch/mining2/src/work_manager.rs b/switch/mining2/src/work_manager.rs index 06e53f6..7843f18 100644 --- a/switch/mining2/src/work_manager.rs +++ b/switch/mining2/src/work_manager.rs @@ -33,7 +33,7 @@ use log::warn; use parking_lot::RwLock; use std::sync::Arc; use std::collections::HashMap; -use serde::export::fmt::Debug; +use serde::export::fmt::{Debug, Display}; use yee_serde_hex::SerdeHex; use tokio::runtime::Runtime; use std::thread; @@ -50,10 +50,12 @@ use log::{info, debug}; use futures::future; use jsonrpc_core::BoxFuture; use yee_runtime::BlockNumber; +use futures::future::{Loop, Either}; const EXTRA_DATA: &str = "yee-switch"; const RAW_WORK_LIFE: Duration = Duration::from_secs(60); -const REFRESH_JOB_DELAY: Duration = Duration::from_secs(2); +const REFRESH_JOB_DELAY: Duration = Duration::from_millis(500); +const REFRESH_MAX_TRY_TIMES: usize = 40; pub trait WorkManager { type Hashing: HashT; @@ -70,7 +72,7 @@ pub trait WorkManager { } impl WorkManager for DefaultWorkManager where - Number: Clone + Debug + SerdeHex + DeserializeOwned + Send + Sync + 'static, + Number: Clone + Debug + SerdeHex + DeserializeOwned + Send + Sync + PartialOrd + Display + 'static, AuthorityId: Clone + Debug + DeserializeOwned + Send + Sync + 'static, Hashing: HashT + Send + Sync + 'static, Hashing::Output: Ord + Encode + Decode, @@ -157,7 +159,8 @@ impl WorkManager for DefaultWorkManager::Output, Self::Number>) -> error::Result<()> { - self.accept_work(work) + tokio::run(self.accept_work_future(work).map_err(|_|())); + Ok(()) } fn submit_work_future(&self, work: Work<::Output, Self::Number>) -> Box + Send> { @@ -256,7 +259,7 @@ pub struct DefaultWorkManager where impl DefaultWorkManager where - Number: Send + Sync + Debug + DeserializeOwned + SerdeHex + Clone + 'static, + Number: Send + Sync + Debug + DeserializeOwned + SerdeHex + Clone + PartialOrd + Display + 'static, AuthorityId: Send + Sync + Debug + DeserializeOwned + Clone + 'static, Hashing: HashT + Send + Sync + 'static, Hashing::Output: Ord + DeserializeOwned + Encode + Decode + Send + Sync + 'static, @@ -308,95 +311,6 @@ impl DefaultWorkManager) -> error::Result<()> { - let nonce = work.nonce.expect("qed"); - let nonce_target = work.nonce_target.expect("qed"); - let target = work.target.clone(); - - info!("accept_work: merkle_root: {:?}, nonce: {}, nonce_target: {:#x}, target: {:#x}", work.merkle_root, nonce, nonce_target, target); - - let merkle_root = &work.merkle_root; - - let raw_work = Self::get_cache(self.work_cache.clone(), merkle_root).ok_or(error::Error::from(error::ErrorKind::WorkExpired))?; - - let shard_count = raw_work.shard_count; - - let shard_jobs = raw_work.shard_jobs; - let merkle_tree = raw_work.merkle_tree.expect("qed"); - - let merkle_tree = Arc::new(merkle_tree); - let work = Arc::new(work); - - let mut tasks = Vec::new(); - - for actual_shard_num in 0..shard_count { - if let Some((config_shard_num, job)) = shard_jobs.get(&actual_shard_num) { - let job_target = job.digest_item.pow_target; - - let config_shard_num = config_shard_num.clone(); - let job = job.clone(); - let merkle_tree = merkle_tree.clone(); - let work = work.clone(); - - let shard = Arc::new(self.config.shards.get(&format!("{}", config_shard_num)).expect("qed").to_owned()); - let shard2 = shard.clone(); - - let jobs = self.jobs.clone(); - - let task = future::lazy(move || { - if nonce_target <= job_target { - let merkle_proof = merkle_tree.gen_proof(actual_shard_num as usize); - - let merkle_proof = OriginalMerkleProof { - proof: merkle_proof, - num: actual_shard_num, - count: shard_count, - }; - let merkle_proof: CompactMerkleProof = merkle_proof.into(); - - let job_result = JobResult { - hash: job.hash, - digest_item: ResultDigestItem { - work_proof: WorkProof::Multi(ProofMulti { - extra_data: work.extra_data.clone(), - merkle_root: work.merkle_root.clone(), - merkle_proof: merkle_proof.proof, - nonce, - }) - }, - }; - future::ok(job_result) - } else { - warn!("nonce_target: {:#x}, job_target: {:#x}", nonce_target, job_target); - future::err(error::Error::from(error::ErrorKind::TargetNotAccpect)) - } - }).and_then(move |job_result| { - info!("New block mined: actual_shard_num: {}, config_shard_num: {}, nonce_target: {:#x}, job_target: {:#x}", actual_shard_num, config_shard_num, nonce_target, job_target); - Self::submit_job_future(&shard, job_result) - }).and_then(move |result| { - debug!("Job submitted: actual_shard_num: {}, config_shard_num: {}, new_block_hash: {:?}", actual_shard_num, config_shard_num, result); - Delay::new(Instant::now().add(REFRESH_JOB_DELAY)).then(move |_| { - Self::get_job_future(&shard2).then(move |job| { - info!("Job refreshed: actual_shard_num: {}, config_shard_num: {}, job_hash: {:?}", actual_shard_num, config_shard_num, job.as_ref().map(|job| job.hash)); - match job { - Ok(job) => jobs.write().insert(config_shard_num, job), - Err(_e) => jobs.write().remove(&config_shard_num), - }; - Ok(()) - }) - }) - }); - tasks.push(task); - } - } - - let task = future::join_all(tasks).map(|_| ()).map_err(|e| warn!("{:?}", e)); - - tokio::run(task); - - Ok(()) - } - fn accept_work_future(&self, work: Work) -> Box + Send> { let nonce = work.nonce.expect("qed"); let nonce_target = work.nonce_target.expect("qed"); @@ -456,24 +370,53 @@ impl DefaultWorkManager jobs.write().insert(config_shard_num, job), - Err(_e) => jobs.write().remove(&config_shard_num), - }; - Ok(()) + }).and_then(move |(job_result, job)| { + info!("New block mined: actual_shard_num: {}, config_shard_num: {}, number: {}, nonce_target: {:#x}, job_target: {:#x}", + actual_shard_num, config_shard_num, job.header.number, nonce_target, job_target); + Self::submit_job_future(&shard, job_result).map(|result| (result, job)) + }).and_then(move |(result, job)| { + info!("Job submitted: actual_shard_num: {}, config_shard_num: {}, number: {}, new_block_hash: {:?}", + actual_shard_num, config_shard_num, job.header.number, result); + + future::loop_fn(0usize, move |i| { + let job_block_number = job.header.number.clone(); + let jobs = jobs.clone(); + Self::get_job_future(&shard2).then(move |new_job| { + let delay_continue = Delay::new(Instant::now().add(REFRESH_JOB_DELAY)).then(move |_|{ + future::ok(Loop::Continue(i+1)) + }); + match new_job { + Ok(new_job) => { + if new_job.header.number > job_block_number { + info!("Job refreshed: actual_shard_num: {}, config_shard_num: {}, number: {}, job_hash: {:?}", + actual_shard_num, config_shard_num, new_job.header.number, new_job.hash); + jobs.write().insert(config_shard_num, new_job); + Either::A(future::ok(Loop::Break(()))) + }else{ + if i < REFRESH_MAX_TRY_TIMES { + info!("Job not refreshed: number not incr, delay to continue: {}", i); + Either::B(delay_continue) + } else { + info!("Job not refreshed: number not incr, break"); + Either::A(future::ok(Loop::Break(()))) + } + } + }, + Err(e) => { + if i < REFRESH_MAX_TRY_TIMES { + info!("Job not refreshed: encounter error, delay to continue: {}", i); + Either::B(delay_continue) + } else { + info!("Job not refreshed: encounter error, break"); + Either::A(future::ok(Loop::Break(()))) + } + } + } }) }) });