Skip to content

Commit

Permalink
refresh job after submitting
Browse files Browse the repository at this point in the history
  • Loading branch information
guoxbin committed Jun 28, 2020
1 parent acf837f commit 75aa83d
Showing 1 changed file with 51 additions and 108 deletions.
159 changes: 51 additions & 108 deletions switch/mining2/src/work_manager.rs
Expand Up @@ -33,7 +33,7 @@ use log::warn;
use parking_lot::RwLock;
use std::sync::Arc;
use std::collections::HashMap;
use serde::export::fmt::Debug;
use serde::export::fmt::{Debug, Display};
use yee_serde_hex::SerdeHex;
use tokio::runtime::Runtime;
use std::thread;
Expand All @@ -50,10 +50,12 @@ use log::{info, debug};
use futures::future;
use jsonrpc_core::BoxFuture;
use yee_runtime::BlockNumber;
use futures::future::{Loop, Either};

const EXTRA_DATA: &str = "yee-switch";
const RAW_WORK_LIFE: Duration = Duration::from_secs(60);
const REFRESH_JOB_DELAY: Duration = Duration::from_secs(2);
const REFRESH_JOB_DELAY: Duration = Duration::from_millis(500);
const REFRESH_MAX_TRY_TIMES: usize = 40;

pub trait WorkManager {
type Hashing: HashT;
Expand All @@ -70,7 +72,7 @@ pub trait WorkManager {
}

impl<Number, AuthorityId, Hashing> WorkManager for DefaultWorkManager<Number, AuthorityId, Hashing> where
Number: Clone + Debug + SerdeHex + DeserializeOwned + Send + Sync + 'static,
Number: Clone + Debug + SerdeHex + DeserializeOwned + Send + Sync + PartialOrd + Display + 'static,
AuthorityId: Clone + Debug + DeserializeOwned + Send + Sync + 'static,
Hashing: HashT + Send + Sync + 'static,
Hashing::Output: Ord + Encode + Decode,
Expand Down Expand Up @@ -157,7 +159,8 @@ impl<Number, AuthorityId, Hashing> WorkManager for DefaultWorkManager<Number, Au
}

fn submit_work(&self, work: Work<<Self::Hashing as HashT>::Output, Self::Number>) -> error::Result<()> {
self.accept_work(work)
tokio::run(self.accept_work_future(work).map_err(|_|()));
Ok(())
}

fn submit_work_future(&self, work: Work<<Self::Hashing as HashT>::Output, Self::Number>) -> Box<dyn Future<Item=(), Error=error::Error> + Send> {
Expand Down Expand Up @@ -256,7 +259,7 @@ pub struct DefaultWorkManager<Number, AuthorityId, Hashing> where

impl<Number, AuthorityId, Hashing> DefaultWorkManager<Number, AuthorityId, Hashing>
where
Number: Send + Sync + Debug + DeserializeOwned + SerdeHex + Clone + 'static,
Number: Send + Sync + Debug + DeserializeOwned + SerdeHex + Clone + PartialOrd + Display + 'static,
AuthorityId: Send + Sync + Debug + DeserializeOwned + Clone + 'static,
Hashing: HashT + Send + Sync + 'static,
Hashing::Output: Ord + DeserializeOwned + Encode + Decode + Send + Sync + 'static,
Expand Down Expand Up @@ -308,95 +311,6 @@ impl<Number, AuthorityId, Hashing> DefaultWorkManager<Number, AuthorityId, Hashi
Ok(())
}

fn accept_work(&self, work: Work<Hashing::Output, Number>) -> error::Result<()> {
let nonce = work.nonce.expect("qed");
let nonce_target = work.nonce_target.expect("qed");
let target = work.target.clone();

info!("accept_work: merkle_root: {:?}, nonce: {}, nonce_target: {:#x}, target: {:#x}", work.merkle_root, nonce, nonce_target, target);

let merkle_root = &work.merkle_root;

let raw_work = Self::get_cache(self.work_cache.clone(), merkle_root).ok_or(error::Error::from(error::ErrorKind::WorkExpired))?;

let shard_count = raw_work.shard_count;

let shard_jobs = raw_work.shard_jobs;
let merkle_tree = raw_work.merkle_tree.expect("qed");

let merkle_tree = Arc::new(merkle_tree);
let work = Arc::new(work);

let mut tasks = Vec::new();

for actual_shard_num in 0..shard_count {
if let Some((config_shard_num, job)) = shard_jobs.get(&actual_shard_num) {
let job_target = job.digest_item.pow_target;

let config_shard_num = config_shard_num.clone();
let job = job.clone();
let merkle_tree = merkle_tree.clone();
let work = work.clone();

let shard = Arc::new(self.config.shards.get(&format!("{}", config_shard_num)).expect("qed").to_owned());
let shard2 = shard.clone();

let jobs = self.jobs.clone();

let task = future::lazy(move || {
if nonce_target <= job_target {
let merkle_proof = merkle_tree.gen_proof(actual_shard_num as usize);

let merkle_proof = OriginalMerkleProof {
proof: merkle_proof,
num: actual_shard_num,
count: shard_count,
};
let merkle_proof: CompactMerkleProof<Hashing> = merkle_proof.into();

let job_result = JobResult {
hash: job.hash,
digest_item: ResultDigestItem {
work_proof: WorkProof::Multi(ProofMulti {
extra_data: work.extra_data.clone(),
merkle_root: work.merkle_root.clone(),
merkle_proof: merkle_proof.proof,
nonce,
})
},
};
future::ok(job_result)
} else {
warn!("nonce_target: {:#x}, job_target: {:#x}", nonce_target, job_target);
future::err(error::Error::from(error::ErrorKind::TargetNotAccpect))
}
}).and_then(move |job_result| {
info!("New block mined: actual_shard_num: {}, config_shard_num: {}, nonce_target: {:#x}, job_target: {:#x}", actual_shard_num, config_shard_num, nonce_target, job_target);
Self::submit_job_future(&shard, job_result)
}).and_then(move |result| {
debug!("Job submitted: actual_shard_num: {}, config_shard_num: {}, new_block_hash: {:?}", actual_shard_num, config_shard_num, result);
Delay::new(Instant::now().add(REFRESH_JOB_DELAY)).then(move |_| {
Self::get_job_future(&shard2).then(move |job| {
info!("Job refreshed: actual_shard_num: {}, config_shard_num: {}, job_hash: {:?}", actual_shard_num, config_shard_num, job.as_ref().map(|job| job.hash));
match job {
Ok(job) => jobs.write().insert(config_shard_num, job),
Err(_e) => jobs.write().remove(&config_shard_num),
};
Ok(())
})
})
});
tasks.push(task);
}
}

let task = future::join_all(tasks).map(|_| ()).map_err(|e| warn!("{:?}", e));

tokio::run(task);

Ok(())
}

fn accept_work_future(&self, work: Work<Hashing::Output, Number>) -> Box<dyn Future<Item=(), Error=error::Error> + Send> {
let nonce = work.nonce.expect("qed");
let nonce_target = work.nonce_target.expect("qed");
Expand Down Expand Up @@ -456,24 +370,53 @@ impl<Number, AuthorityId, Hashing> DefaultWorkManager<Number, AuthorityId, Hashi
})
},
};
future::ok(job_result)
future::ok((job_result, job))
} else {
warn!("nonce_target: {:#x}, job_target: {:#x}", nonce_target, job_target);
future::err(error::Error::from(error::ErrorKind::TargetNotAccpect))
}
}).and_then(move |job_result| {
info!("New block mined: actual_shard_num: {}, config_shard_num: {}, nonce_target: {:#x}, job_target: {:#x}", actual_shard_num, config_shard_num, nonce_target, job_target);
Self::submit_job_future(&shard, job_result)
}).and_then(move |result| {
info!("Job submitted: actual_shard_num: {}, config_shard_num: {}, new_block_hash: {:?}", actual_shard_num, config_shard_num, result);
Delay::new(Instant::now().add(REFRESH_JOB_DELAY)).then(move |_| {
Self::get_job_future(&shard2).then(move |job| {
info!("Job refreshed: actual_shard_num: {}, config_shard_num: {}, job_hash: {:?}", actual_shard_num, config_shard_num, job.as_ref().map(|job| job.hash));
match job {
Ok(job) => jobs.write().insert(config_shard_num, job),
Err(_e) => jobs.write().remove(&config_shard_num),
};
Ok(())
}).and_then(move |(job_result, job)| {
info!("New block mined: actual_shard_num: {}, config_shard_num: {}, number: {}, nonce_target: {:#x}, job_target: {:#x}",
actual_shard_num, config_shard_num, job.header.number, nonce_target, job_target);
Self::submit_job_future(&shard, job_result).map(|result| (result, job))
}).and_then(move |(result, job)| {
info!("Job submitted: actual_shard_num: {}, config_shard_num: {}, number: {}, new_block_hash: {:?}",
actual_shard_num, config_shard_num, job.header.number, result);

future::loop_fn(0usize, move |i| {
let job_block_number = job.header.number.clone();
let jobs = jobs.clone();
Self::get_job_future(&shard2).then(move |new_job| {
let delay_continue = Delay::new(Instant::now().add(REFRESH_JOB_DELAY)).then(move |_|{
future::ok(Loop::Continue(i+1))
});
match new_job {
Ok(new_job) => {
if new_job.header.number > job_block_number {
info!("Job refreshed: actual_shard_num: {}, config_shard_num: {}, number: {}, job_hash: {:?}",
actual_shard_num, config_shard_num, new_job.header.number, new_job.hash);
jobs.write().insert(config_shard_num, new_job);
Either::A(future::ok(Loop::Break(())))
}else{
if i < REFRESH_MAX_TRY_TIMES {
info!("Job not refreshed: number not incr, delay to continue: {}", i);
Either::B(delay_continue)
} else {
info!("Job not refreshed: number not incr, break");
Either::A(future::ok(Loop::Break(())))
}
}
},
Err(e) => {
if i < REFRESH_MAX_TRY_TIMES {
info!("Job not refreshed: encounter error, delay to continue: {}", i);
Either::B(delay_continue)
} else {
info!("Job not refreshed: encounter error, break");
Either::A(future::ok(Loop::Break(())))
}
}
}
})
})
});
Expand Down

0 comments on commit 75aa83d

Please sign in to comment.