diff --git a/applications/tari_mining_node/src/main.rs b/applications/tari_mining_node/src/main.rs index 687b1ce52b..568bc67eb9 100644 --- a/applications/tari_mining_node/src/main.rs +++ b/applications/tari_mining_node/src/main.rs @@ -33,7 +33,11 @@ mod utils; use crate::{ miner::MiningReport, - stratum::{stratum_controller::controller::Controller, stratum_miner::miner::StratumMiner}, + stratum::{ + stratum_controller::controller::Controller, + stratum_miner::miner::StratumMiner, + stratum_statistics::stats::Statistics, + }, }; use errors::{err_empty, MinerError}; use miner::Miner; @@ -42,6 +46,7 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, Arc, + RwLock, }, thread, time::Instant, @@ -61,6 +66,9 @@ use tokio::{runtime::Runtime, time::sleep}; use tonic::transport::Channel; use utils::{coinbase_request, extract_outputs_and_kernels}; +pub const LOG_TARGET: &str = "tari_mining_node::miner::main"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::main"; + /// Application entry point fn main() { let rt = Runtime::new().expect("Failed to start tokio runtime"); @@ -68,7 +76,7 @@ fn main() { Ok(_) => std::process::exit(0), Err(exit_code) => { eprintln!("Fatal error: {:?}", exit_code); - error!("Exiting with code: {:?}", exit_code); + error!(target: LOG_TARGET, "Exiting with code: {:?}", exit_code); std::process::exit(exit_code.as_i32()) }, } @@ -83,8 +91,8 @@ async fn main_inner() -> Result<(), ExitCodes> { config.mining_worker_name = global.mining_worker_name.clone(); config.mining_wallet_address = global.mining_wallet_address.clone(); config.mining_pool_address = global.mining_pool_address.clone(); - debug!("{:?}", bootstrap); - debug!("{:?}", config); + debug!(target: LOG_TARGET_FILE, "{:?}", bootstrap); + debug!(target: LOG_TARGET_FILE, "{:?}", config); if !config.mining_wallet_address.is_empty() && !config.mining_pool_address.is_empty() { let url = config.mining_pool_address.clone(); @@ -94,18 +102,25 @@ async fn main_inner() -> Result<(), ExitCodes> { if !config.mining_worker_name.is_empty() { miner_address += &format!("{}{}", ".", &config.mining_worker_name); } - let mut mc = Controller::new().unwrap_or_else(|e| { + let stats = Arc::new(RwLock::new(Statistics::default())); + let mut mc = Controller::new(stats.clone()).unwrap_or_else(|e| { + debug!(target: LOG_TARGET_FILE, "Error loading mining controller: {}", e); panic!("Error loading mining controller: {}", e); }); - let cc = stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone()) - .unwrap_or_else(|e| { - panic!("Error loading stratum client controller: {:?}", e); - }); + let cc = + stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone(), stats.clone()) + .unwrap_or_else(|e| { + debug!( + target: LOG_TARGET_FILE, + "Error loading stratum client controller: {:?}", e + ); + panic!("Error loading stratum client controller: {:?}", e); + }); let miner_stopped = Arc::new(AtomicBool::new(false)); let client_stopped = Arc::new(AtomicBool::new(false)); mc.set_client_tx(cc.tx.clone()); - let mut miner = StratumMiner::new(config); + let mut miner = StratumMiner::new(config, stats); if let Err(e) = miner.start_solvers() { println!("Error. Please check logs for further info."); println!("Error details:"); @@ -118,7 +133,7 @@ async fn main_inner() -> Result<(), ExitCodes> { .name("mining_controller".to_string()) .spawn(move || { if let Err(e) = mc.run(miner) { - error!("Error. Please check logs for further info: {:?}", e); + error!(target: LOG_TARGET, "Error. Please check logs for further info: {:?}", e); return; } miner_stopped_internal.store(true, Ordering::Relaxed); @@ -142,19 +157,21 @@ async fn main_inner() -> Result<(), ExitCodes> { Ok(()) } else { config.mine_on_tip_only = global.mine_on_tip_only; - debug!("mine_on_tip_only is {}", config.mine_on_tip_only); - + debug!( + target: LOG_TARGET_FILE, + "mine_on_tip_only is {}", config.mine_on_tip_only + ); let (mut node_conn, mut wallet_conn) = connect(&config, &global).await.map_err(ExitCodes::grpc)?; let mut blocks_found: u64 = 0; loop { - debug!("Starting new mining cycle"); + debug!(target: LOG_TARGET_FILE, "Starting new mining cycle"); match mining_cycle(&mut node_conn, &mut wallet_conn, &config, &bootstrap).await { err @ Err(MinerError::GrpcConnection(_)) | err @ Err(MinerError::GrpcStatus(_)) => { // Any GRPC error we will try to reconnect with a standard delay - error!("Connection error: {:?}", err); + error!(target: LOG_TARGET, "Connection error: {:?}", err); loop { - debug!("Holding for {:?}", config.wait_timeout()); + debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout()); sleep(config.wait_timeout()).await; match connect(&config, &global).await { Ok((nc, wc)) => { @@ -163,22 +180,28 @@ async fn main_inner() -> Result<(), ExitCodes> { break; }, Err(err) => { - error!("Connection error: {:?}", err); + error!(target: LOG_TARGET, "Connection error: {:?}", err); continue; }, } } }, Err(MinerError::MineUntilHeightReached(h)) => { - info!("Prescribed blockchain height {} reached. Aborting ...", h); + info!( + target: LOG_TARGET, + "Prescribed blockchain height {} reached. Aborting ...", h + ); return Ok(()); }, Err(MinerError::MinerLostBlock(h)) => { - info!("Height {} already mined by other node. Restarting ...", h); + info!( + target: LOG_TARGET, + "Height {} already mined by other node. Restarting ...", h + ); }, Err(err) => { - error!("Error: {:?}", err); - debug!("Holding for {:?}", config.wait_timeout()); + error!(target: LOG_TARGET, "Error: {:?}", err); + debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout()); sleep(config.wait_timeout()).await; }, Ok(submitted) => { @@ -201,10 +224,10 @@ async fn connect( global: &GlobalConfig, ) -> Result<(BaseNodeClient, WalletClient), MinerError> { let base_node_addr = config.base_node_addr(global); - info!("Connecting to base node at {}", base_node_addr); + info!(target: LOG_TARGET, "Connecting to base node at {}", base_node_addr); let node_conn = BaseNodeClient::connect(base_node_addr.clone()).await?; let wallet_addr = config.wallet_addr(global); - info!("Connecting to wallet at {}", wallet_addr); + info!(target: LOG_TARGET, "Connecting to wallet at {}", wallet_addr); let wallet_conn = WalletClient::connect(wallet_addr.clone()).await?; Ok((node_conn, wallet_conn)) @@ -267,8 +290,8 @@ async fn mining_cycle( if report.difficulty < min_diff { submit = false; debug!( - "Mined difficulty {} below minimum difficulty {}. Not submitting.", - report.difficulty, min_diff + target: LOG_TARGET_FILE, + "Mined difficulty {} below minimum difficulty {}. Not submitting.", report.difficulty, min_diff ); } } @@ -276,8 +299,10 @@ async fn mining_cycle( if report.difficulty > max_diff { submit = false; debug!( + target: LOG_TARGET_FILE, "Mined difficulty {} greater than maximum difficulty {}. Not submitting.", - report.difficulty, max_diff + report.difficulty, + max_diff ); } } @@ -285,8 +310,11 @@ async fn mining_cycle( // Mined a block fitting the difficulty let block_header = BlockHeader::try_from(header.clone()).map_err(MinerError::Conversion)?; info!( + target: LOG_TARGET, "Miner {} found block header {} with difficulty {:?}", - report.miner, block_header, report.difficulty, + report.miner, + block_header, + report.difficulty, ); let mut mined_block = block.clone(); mined_block.header = Some(header); @@ -313,6 +341,7 @@ async fn mining_cycle( async fn display_report(report: &MiningReport, config: &MinerConfig) { let hashrate = report.hashes as f64 / report.elapsed.as_micros() as f64; debug!( + target: LOG_TARGET_FILE, "Miner {} reported {:.2}MH/s with total {:.2}MH/s over {} threads. Height: {}. Target: {})", report.miner, hashrate, diff --git a/applications/tari_mining_node/src/miner.rs b/applications/tari_mining_node/src/miner.rs index e2bc4352a9..21f7b891f3 100644 --- a/applications/tari_mining_node/src/miner.rs +++ b/applications/tari_mining_node/src/miner.rs @@ -33,6 +33,8 @@ use std::{ use tari_app_grpc::{conversions::timestamp, tari_rpc::BlockHeader}; use thread::JoinHandle; +pub const LOG_TARGET: &str = "tari_mining_node::miner::standalone"; + // Identify how often mining thread is reporting / checking context // ~400_000 hashes per second const REPORTING_FREQUENCY: u64 = 3_000_000; @@ -107,20 +109,20 @@ impl Stream for Miner { type Item = MiningReport; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - trace!("Polling Miner"); + trace!(target: LOG_TARGET, "Polling Miner"); // First poll would start all the threads passing async context waker if self.threads.is_empty() && self.num_threads > 0 { debug!( - "Starting {} mining threads for target difficulty {}", - self.num_threads, self.target_difficulty + target: LOG_TARGET, + "Starting {} mining threads for target difficulty {}", self.num_threads, self.target_difficulty ); self.start_threads(ctx); return Poll::Pending; } else if self.num_threads == 0 { - error!("Cannot mine: no mining threads"); + error!(target: LOG_TARGET, "Cannot mine: no mining threads"); return Poll::Ready(None); } else if self.channels.is_empty() { - debug!("Finished mining"); + debug!(target: LOG_TARGET, "Finished mining"); return Poll::Ready(None); } @@ -167,14 +169,14 @@ pub fn mining_task( let mut hasher = BlockHeaderSha3::new(header).unwrap(); hasher.random_nonce(); // We're mining over here! - info!("Mining thread {} started", miner); + info!(target: LOG_TARGET, "Mining thread {} started", miner); // Mining work loop { let difficulty = hasher.difficulty(); if difficulty >= target_difficulty { debug!( - "Miner {} found nonce {} with matching difficulty {}", - miner, hasher.nonce, difficulty + target: LOG_TARGET, + "Miner {} found nonce {} with matching difficulty {}", miner, hasher.nonce, difficulty ); if let Err(err) = sender.try_send(MiningReport { miner, @@ -186,10 +188,10 @@ pub fn mining_task( header: Some(hasher.into_header()), target_difficulty, }) { - error!("Miner {} failed to send report: {}", miner, err); + error!(target: LOG_TARGET, "Miner {} failed to send report: {}", miner, err); } waker.wake(); - info!("Mining thread {} stopped", miner); + info!(target: LOG_TARGET, "Mining thread {} stopped", miner); return; } if hasher.nonce % REPORTING_FREQUENCY == 0 { @@ -204,9 +206,9 @@ pub fn mining_task( target_difficulty, }); waker.clone().wake(); - trace!("Reporting from {} result {:?}", miner, res); + trace!(target: LOG_TARGET, "Reporting from {} result {:?}", miner, res); if let Err(TrySendError::Disconnected(_)) = res { - info!("Mining thread {} disconnected", miner); + info!(target: LOG_TARGET, "Mining thread {} disconnected", miner); return; } hasher.set_forward_timestamp(timestamp().seconds as u64); diff --git a/applications/tari_mining_node/src/stratum/controller.rs b/applications/tari_mining_node/src/stratum/controller.rs index 1fb6e76256..288e94157e 100644 --- a/applications/tari_mining_node/src/stratum/controller.rs +++ b/applications/tari_mining_node/src/stratum/controller.rs @@ -20,16 +20,19 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // -use crate::stratum::{error::Error, stratum_types as types, stream::Stream}; +use crate::stratum::{error::Error, stratum_statistics::stats, stratum_types as types, stream::Stream}; use log::*; use std::{ self, io::{BufRead, ErrorKind, Write}, - sync::mpsc, + sync::{mpsc, Arc, RwLock}, thread, }; +pub const LOG_TARGET: &str = "tari_mining_node::miner::stratum::controller"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::stratum::controller"; + pub struct Controller { server_url: String, server_login: Option, @@ -40,6 +43,7 @@ pub struct Controller { pub tx: mpsc::Sender, miner_tx: mpsc::Sender, last_request_id: String, + stats: Arc>, } // fn invalid_error_response() -> types::RpcError { @@ -56,6 +60,7 @@ impl Controller { server_password: Option, server_tls_enabled: Option, miner_tx: mpsc::Sender, + stats: Arc>, ) -> Result { let (tx, rx) = mpsc::channel::(); Ok(Controller { @@ -68,6 +73,7 @@ impl Controller { rx, miner_tx, last_request_id: "".to_string(), + stats, }) } @@ -96,7 +102,7 @@ impl Controller { Err(ref e) if e.kind() == ErrorKind::BrokenPipe => Err(Error::Connection("broken pipe".to_string())), Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(None), Err(e) => { - error!("Communication error with stratum server: {}", e); + error!(target: LOG_TARGET, "Communication error with stratum server: {}", e); Err(Error::Connection("broken pipe".to_string())) }, } @@ -106,7 +112,7 @@ impl Controller { if self.stream.is_none() { return Err(Error::Connection(String::from("No server connection"))); } - debug!("sending request: {}", message); + debug!(target: LOG_TARGET_FILE, "sending request: {}", message); let _ = self.stream.as_mut().unwrap().write(message.as_bytes()); let _ = self.stream.as_mut().unwrap().write(b"\n"); let _ = self.stream.as_mut().unwrap().flush(); @@ -172,7 +178,10 @@ impl Controller { } fn send_message_submit(&mut self, job_id: u64, hash: String, nonce: u64) -> Result<(), Error> { - info!("Submitting Solution with hash {} and nonce {}", hash, nonce); + info!( + target: LOG_TARGET, + "Submitting share with hash {} and nonce {}", hash, nonce + ); let params_in = types::submit_params::SubmitParams { id: self.last_request_id.to_string(), job_id, @@ -211,15 +220,15 @@ impl Controller { } pub fn handle_request(&mut self, req: types::rpc_request::RpcRequest) -> Result<(), Error> { - debug!("Received request type: {}", req.method); + debug!(target: LOG_TARGET_FILE, "Received request type: {}", req.method); match req.method.as_str() { "job" => match req.params { None => Err(Error::Request("No params in job request".to_owned())), Some(params) => { let job = serde_json::from_value::(params)?; info!( - "Got a new job for height {} with target difficulty {}", - job.height, job.target + target: LOG_TARGET, + "Got a new job for height {} with target difficulty {}", job.height, job.target ); self.send_miner_job(job) }, @@ -228,13 +237,31 @@ impl Controller { } } + fn handle_error(&mut self, error: types::rpc_error::RpcError) { + if vec![-1, 24].contains(&error.code) { + // unauthorized + let _ = self.send_login(); + } else if vec![21, 20, 22, 23, 25].contains(&error.code) { + // problem with template + let _ = self.send_message_get_job_template(); + } + } + + #[allow(clippy::cognitive_complexity)] pub fn handle_response(&mut self, res: types::rpc_response::RpcResponse) -> Result<(), Error> { - debug!("Received response with id: {}", res.id); + debug!(target: LOG_TARGET_FILE, "Received response with id: {}", res.id); match res.result { Some(result) => { let login_response = serde_json::from_value::(result.clone()); if let Ok(st) = login_response { - info!("Successful login to server, worker identifier is {}", st.id); + info!( + target: LOG_TARGET, + "Successful login to server, worker identifier is {}", st.id + ); + info!( + target: LOG_TARGET, + "Got a new job for height {} with target difficulty {}", st.job.height, st.job.target + ); self.last_request_id = st.id; let _ = self.send_miner_job(st.job); return Ok(()); @@ -242,31 +269,42 @@ impl Controller { let job_response = serde_json::from_value::(result.clone()); if let Ok(st) = job_response { info!( - "Got a new job for height {} with target difficulty {}", - st.height, st.target + target: LOG_TARGET, + "Got a new job for height {} with target difficulty {}", st.height, st.target ); let _ = self.send_miner_job(st); return Ok(()); }; + let submit_response = serde_json::from_value::(result.clone()); + if let Ok(st) = submit_response { + let error = st.error; + if let Some(error) = error { + // rejected share + self.handle_error(error.clone()); + info!(target: LOG_TARGET, "Rejected"); + debug!(target: LOG_TARGET_FILE, "Share rejected: {:?}", error); + let mut stats = self.stats.write().unwrap(); + stats.mining_stats.solution_stats.rejected += 1; + } else { + // accepted share + info!(target: LOG_TARGET, "Accepted"); + debug!(target: LOG_TARGET_FILE, "Share accepted: {:?}", st.status); + } + return Ok(()); + } let rpc_response = serde_json::from_value::(result); if let Ok(st) = rpc_response { let error = st.error; if let Some(error) = error { - if vec![-1, 24].contains(&error.code) { - // unauthorized - let _ = self.send_login(); - } else if vec![21, 20, 22, 23, 25].contains(&error.code) { - // problem with template - let _ = self.send_message_get_job_template(); - } - } else { - info!("{:?}", st.result); + self.handle_error(error); } return Ok(()); + } else { + debug!(target: LOG_TARGET_FILE, "RPC Response: {:?}", rpc_response); }; }, None => { - error!("{:?}", res); + error!(target: LOG_TARGET, "RPC error: {:?}", res); }, } Ok(()) @@ -298,7 +336,7 @@ impl Controller { self.stream = None; } else { let status = format!("Connection Status: Connected to server at {}.", self.server_url); - info!("{}", status); + info!(target: LOG_TARGET, "{}", status); } next_server_retry = time::get_time().sec + server_retry_interval; if self.stream.is_none() { @@ -319,41 +357,41 @@ impl Controller { Ok(Some(m)) => { // figure out what kind of message, // and dispatch appropriately - debug!("Received message: {}", m); + debug!(target: LOG_TARGET_FILE, "Received message: {}", m); // Deserialize to see what type of object it is if let Ok(v) = serde_json::from_str::(&m) { // Is this a response or request? if v["method"] == "job" { // this is a request match serde_json::from_str::(&m) { - Err(e) => error!("Error parsing request {} : {:?}", m, e), + Err(e) => error!(target: LOG_TARGET, "Error parsing request {} : {:?}", m, e), Ok(request) => { if let Err(err) = self.handle_request(request) { - error!("Error handling request {} : :{:?}", m, err) + error!(target: LOG_TARGET, "Error handling request {} : :{:?}", m, err) } }, } } else { // this is a response match serde_json::from_str::(&m) { - Err(e) => error!("Error parsing response {} : {:?}", m, e), + Err(e) => error!(target: LOG_TARGET, "Error parsing response {} : {:?}", m, e), Ok(response) => { if let Err(err) = self.handle_response(response) { - error!("Error handling response {} : :{:?}", m, err) + error!(target: LOG_TARGET, "Error handling response {} : :{:?}", m, err) } }, } } continue; } else { - error!("Error parsing message: {}", m) + error!(target: LOG_TARGET, "Error parsing message: {}", m) } }, Ok(None) => { // noop, nothing to read for this interval }, Err(e) => { - error!("Error reading message: {:?}", e); + error!(target: LOG_TARGET, "Error reading message: {:?}", e); self.stream = None; continue; }, @@ -364,19 +402,19 @@ impl Controller { // Talk to the miner algorithm while let Some(message) = self.rx.try_iter().next() { - debug!("Client received message: {:?}", message); + debug!(target: LOG_TARGET_FILE, "Client received message: {:?}", message); let result = match message { types::client_message::ClientMessage::FoundSolution(job_id, hash, nonce) => { self.send_message_submit(job_id, hash, nonce) }, types::client_message::ClientMessage::KeepAlive => self.send_keepalive(), types::client_message::ClientMessage::Shutdown => { - debug!("Shutting down client controller"); + debug!(target: LOG_TARGET_FILE, "Shutting down client controller"); return; }, }; if let Err(e) = result { - error!("Mining Controller Error {:?}", e); + error!(target: LOG_TARGET, "Mining Controller Error {:?}", e); self.stream = None; } } diff --git a/applications/tari_mining_node/src/stratum/mod.rs b/applications/tari_mining_node/src/stratum/mod.rs index b426a7c4c5..b71d5ca82b 100644 --- a/applications/tari_mining_node/src/stratum/mod.rs +++ b/applications/tari_mining_node/src/stratum/mod.rs @@ -24,5 +24,6 @@ pub mod controller; pub mod error; pub mod stratum_controller; pub mod stratum_miner; +pub mod stratum_statistics; pub mod stratum_types; pub mod stream; diff --git a/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs b/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs index f40e432e6f..b7315600b7 100644 --- a/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs +++ b/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs @@ -22,10 +22,19 @@ // use crate::{ stratum, - stratum::{stratum_miner::miner::StratumMiner, stratum_types as types}, + stratum::{stratum_miner::miner::StratumMiner, stratum_statistics::stats, stratum_types as types}, }; use log::*; -use std::{self, sync::mpsc, thread, time::SystemTime}; +use std::{ + self, + sync::{mpsc, Arc, RwLock}, + thread, + time::{Duration, SystemTime}, +}; + +pub const LOG_TARGET: &str = "tari_mining_node::miner::stratum::controller"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::stratum::controller"; +const REPORTING_FREQUENCY: u64 = 20; pub struct Controller { rx: mpsc::Receiver, @@ -35,10 +44,12 @@ pub struct Controller { current_job_id: u64, current_blob: String, keep_alive_time: SystemTime, + stats: Arc>, + elapsed: SystemTime, } impl Controller { - pub fn new() -> Result { + pub fn new(stats: Arc>) -> Result { let (tx, rx) = mpsc::channel::(); Ok(Controller { rx, @@ -48,6 +59,8 @@ impl Controller { current_job_id: 0, current_blob: "".to_string(), keep_alive_time: SystemTime::now(), + stats, + elapsed: SystemTime::now(), }) } @@ -55,10 +68,66 @@ impl Controller { self.client_tx = Some(client_tx); } + fn display_stats(&mut self, elapsed: Duration) { + let mut stats = self.stats.write().unwrap(); + debug!(target: LOG_TARGET_FILE, "{:?}", stats.mining_stats); + info!( + target: LOG_TARGET, + "{}", + "--------------- Mining Statistics ---------------".to_string() + ); + info!( + target: LOG_TARGET, + "{}", + format!("Number of solver threads: {}", stats.mining_stats.solvers) + ); + if stats.mining_stats.solution_stats.found > 0 { + info!( + target: LOG_TARGET, + "{}", + format!( + "Estimated combined solver share rate: {:.1$} (S/s)", + stats.mining_stats.sols(), + 5 + ) + ); + } + info!( + target: LOG_TARGET, + "{}", + format!( + "Combined solver hash rate: {:.1$} (Mh/s)", + stats.mining_stats.hash_rate(elapsed), + 5 + ) + ); + info!( + target: LOG_TARGET, + "{}", + format!( + "Shares found: {}, accepted: {}, rejected: {}", + stats.mining_stats.solution_stats.found, + stats.mining_stats.solution_stats.found - stats.mining_stats.solution_stats.rejected, + stats.mining_stats.solution_stats.rejected + ) + ); + info!( + target: LOG_TARGET, + "{}", + "-------------------------------------------------".to_string() + ); + } + pub fn run(&mut self, mut miner: StratumMiner) -> Result<(), stratum::error::Error> { loop { + if let Ok(report) = self.elapsed.elapsed() { + if report.as_secs() >= REPORTING_FREQUENCY { + self.display_stats(report); + self.elapsed = SystemTime::now(); + } + } while let Some(message) = self.rx.try_iter().next() { - debug!("Miner received message: {:?}", message); + debug!(target: LOG_TARGET_FILE, "Miner received message: {:?}", message); let result: Result<(), stratum::error::Error> = match message { types::miner_message::MinerMessage::ReceivedJob(height, job_id, diff, blob) => { self.current_height = height; @@ -72,24 +141,27 @@ impl Controller { ) }, types::miner_message::MinerMessage::StopJob => { - debug!("Stopping jobs"); + debug!(target: LOG_TARGET_FILE, "Stopping jobs"); miner.pause_solvers(); Ok(()) }, types::miner_message::MinerMessage::ResumeJob => { - debug!("Resuming jobs"); + debug!(target: LOG_TARGET_FILE, "Resuming jobs"); miner.resume_solvers(); Ok(()) }, types::miner_message::MinerMessage::Shutdown => { - debug!("Stopping jobs and Shutting down mining controller"); + debug!( + target: LOG_TARGET_FILE, + "Stopping jobs and Shutting down mining controller" + ); miner.stop_solvers(); miner.wait_for_solver_shutdown(); Ok(()) }, }; if let Err(e) = result { - error!("Mining Controller Error {:?}", e); + error!(target: LOG_TARGET, "Mining Controller Error {:?}", e); } } @@ -103,6 +175,8 @@ impl Controller { ss.job_id, ss.hash, ss.nonce, )); self.keep_alive_time = SystemTime::now(); + let mut stats = self.stats.write().unwrap(); + stats.mining_stats.solution_stats.found += 1; } else if self.keep_alive_time.elapsed().unwrap().as_secs() >= 30 { self.keep_alive_time = SystemTime::now(); let _ = self diff --git a/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs b/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs index 50dc44b6c7..303edb9f09 100644 --- a/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs +++ b/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs @@ -24,11 +24,14 @@ use crate::{ config::MinerConfig, difficulty::BlockHeaderSha3, stratum, - stratum::stratum_miner::{ - control_message::ControlMessage, - job_shared_data::{JobSharedData, JobSharedDataType}, - solution::Solution, - solver_instance::SolverInstance, + stratum::{ + stratum_miner::{ + control_message::ControlMessage, + job_shared_data::{JobSharedData, JobSharedDataType}, + solution::Solution, + solver_instance::SolverInstance, + }, + stratum_statistics::stats::Statistics, }, }; use log::*; @@ -37,22 +40,31 @@ use std::{ sync::{mpsc, Arc, RwLock}, thread, time, + time::{Duration, SystemTime}, }; use tari_core::{ blocks::BlockHeader, crypto::tari_utilities::{hex::Hex, Hashable}, }; +pub const LOG_TARGET: &str = "tari_mining_node::miner::stratum::controller"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::stratum::controller"; + +fn calculate_sols(elapsed: Duration) -> f64 { + 1.0 / ((elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64) as f64 / 1_000_000_000.0) +} + pub struct StratumMiner { config: MinerConfig, pub shared_data: Arc>, control_txs: Vec>, solver_loop_txs: Vec>, solver_stopped_rxs: Vec>, + stats: Arc>, } impl StratumMiner { - pub fn new(config: MinerConfig) -> StratumMiner { + pub fn new(config: MinerConfig, stats: Arc>) -> StratumMiner { let threads = config.num_mining_threads; StratumMiner { config, @@ -60,6 +72,7 @@ impl StratumMiner { control_txs: vec![], solver_loop_txs: vec![], solver_stopped_rxs: vec![], + stats, } } @@ -70,19 +83,20 @@ impl StratumMiner { control_rx: mpsc::Receiver, solver_loop_rx: mpsc::Receiver, solver_stopped_tx: mpsc::Sender, + statistics: Arc>, ) { let stop_handle = thread::spawn(move || loop { while let Some(message) = control_rx.iter().next() { match message { ControlMessage::Stop => { - info!("Stopping Solvers"); + debug!(target: LOG_TARGET_FILE, "Stopping Solvers"); return; }, ControlMessage::Pause => { - info!("Pausing Solvers"); + debug!(target: LOG_TARGET_FILE, "Pausing Solvers"); }, ControlMessage::Resume => { - info!("Resuming Solvers"); + debug!(target: LOG_TARGET_FILE, "Resuming Solvers"); }, _ => {}, }; @@ -90,16 +104,23 @@ impl StratumMiner { }); let mut paused = true; + let mut timer = SystemTime::now(); loop { if let Some(message) = solver_loop_rx.try_iter().next() { - debug!("solver_thread - solver_loop_rx got msg: {:?}", message); + debug!( + target: LOG_TARGET_FILE, + "solver_thread - solver_loop_rx got msg: {:?}", message + ); match message { ControlMessage::Stop => break, ControlMessage::Pause => { paused = true; solver.solver_reset = true; }, - ControlMessage::Resume => paused = false, + ControlMessage::Resume => { + paused = false; + timer = SystemTime::now(); + }, _ => {}, } } @@ -115,7 +136,6 @@ impl StratumMiner { let height = { shared_data.read().unwrap().height }; let job_id = { shared_data.read().unwrap().job_id }; let target_difficulty = { shared_data.read().unwrap().difficulty }; - let mut hasher = BlockHeaderSha3::new(tari_app_grpc::tari_rpc::BlockHeader::from(header)).unwrap(); if solver.solver_reset { @@ -127,17 +147,20 @@ impl StratumMiner { hasher.inc_nonce(); solver.current_nonce = hasher.nonce; } - + let mut stats = statistics.write().unwrap(); let difficulty = hasher.difficulty(); + stats.mining_stats.add_hash(); if difficulty >= target_difficulty { let block_header: BlockHeader = BlockHeader::try_from(hasher.into_header()).unwrap(); info!( + target: LOG_TARGET, "Miner found share with hash {}, nonce {} and difficulty {:?}", block_header.hash().to_hex(), solver.current_nonce, difficulty ); debug!( + target: LOG_TARGET_FILE, "Miner found share with hash {}, difficulty {:?} and data {:?}", block_header.hash().to_hex(), difficulty, @@ -154,6 +177,11 @@ impl StratumMiner { hash: block_header.hash().to_hex(), nonce: block_header.nonce, }); + if let Ok(elapsed) = timer.elapsed() { + let sols = calculate_sols(elapsed); + stats.mining_stats.add_sols(sols); + } + timer = SystemTime::now(); } } solver.solutions = Solution::default(); @@ -169,8 +197,10 @@ impl StratumMiner { } pub fn start_solvers(&mut self) -> Result<(), stratum::error::Error> { + let mut stats = self.stats.write().unwrap(); + stats.mining_stats.solvers = self.config.num_mining_threads; let num_solvers = self.config.num_mining_threads; - info!("Spawning {} solvers", num_solvers); + info!(target: LOG_TARGET, "Spawning {} solvers", num_solvers); let mut solvers = Vec::with_capacity(num_solvers); while solvers.len() < solvers.capacity() { solvers.push(SolverInstance::new()?); @@ -183,8 +213,9 @@ impl StratumMiner { self.control_txs.push(control_tx); self.solver_loop_txs.push(solver_tx); self.solver_stopped_rxs.push(solver_stopped_rx); + let stats = self.stats.clone(); thread::spawn(move || { - StratumMiner::solver_thread(s, i, sd, control_rx, solver_rx, solver_stopped_tx); + StratumMiner::solver_thread(s, i, sd, control_rx, solver_rx, solver_stopped_tx, stats); }); } Ok(()) @@ -216,6 +247,10 @@ impl StratumMiner { sd.difficulty = difficulty; sd.header = Some(header); if paused { + info!( + target: LOG_TARGET, + "Hashing in progress... height: {}, target difficulty: {}", height, difficulty + ); self.resume_solvers(); } Ok(()) @@ -239,7 +274,7 @@ impl StratumMiner { for t in self.solver_loop_txs.iter() { let _ = t.send(ControlMessage::Stop); } - debug!("Stop message sent"); + debug!(target: LOG_TARGET_FILE, "Stop message sent"); } pub fn pause_solvers(&self) { @@ -249,7 +284,7 @@ impl StratumMiner { for t in self.solver_loop_txs.iter() { let _ = t.send(ControlMessage::Pause); } - debug!("Pause message sent"); + debug!(target: LOG_TARGET_FILE, "Pause message sent"); } pub fn resume_solvers(&self) { @@ -259,13 +294,13 @@ impl StratumMiner { for t in self.solver_loop_txs.iter() { let _ = t.send(ControlMessage::Resume); } - debug!("Resume message sent"); + debug!(target: LOG_TARGET_FILE, "Resume message sent"); } pub fn wait_for_solver_shutdown(&self) { for r in self.solver_stopped_rxs.iter() { if let Some(ControlMessage::SolverStopped(i)) = r.iter().next() { - debug!("Solver stopped: {}", i); + debug!(target: LOG_TARGET_FILE, "Solver stopped: {}", i); } } } diff --git a/applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs b/applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs new file mode 100644 index 0000000000..b3ca0d2f63 --- /dev/null +++ b/applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs @@ -0,0 +1 @@ +pub(crate) mod stats; diff --git a/applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs b/applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs new file mode 100644 index 0000000000..cc0d758dfc --- /dev/null +++ b/applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs @@ -0,0 +1,100 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +use std::time::Duration; + +#[derive(Clone, Debug)] +pub struct SolutionStatistics { + /// Total found + pub found: u32, + /// Total rejected + pub rejected: u32, +} + +impl Default for SolutionStatistics { + fn default() -> SolutionStatistics { + SolutionStatistics { found: 0, rejected: 0 } + } +} + +#[derive(Clone, Debug)] +pub struct MiningStatistics { + /// Solutions per second + sols: Vec, + /// Hashes per second + hashes: u64, + /// Number Solvers + pub solvers: usize, + /// Solution statistics + pub solution_stats: SolutionStatistics, +} + +impl Default for MiningStatistics { + fn default() -> MiningStatistics { + MiningStatistics { + solvers: 0, + sols: vec![], + hashes: 0, + solution_stats: SolutionStatistics::default(), + } + } +} + +impl MiningStatistics { + pub fn add_sols(&mut self, val: f64) { + self.sols.insert(0, val); + self.sols.truncate(60); + } + + pub fn sols(&self) -> f64 { + if self.sols.is_empty() { + 0.0 + } else { + let sum: f64 = self.sols.iter().sum(); + sum / (self.sols.len() as f64) + } + } + + pub fn add_hash(&mut self) { + self.hashes += 1; + } + + pub fn hash_rate(&mut self, elapsed: Duration) -> f64 { + let hash_rate = self.hashes as f64 / elapsed.as_micros() as f64; + // reset the total number of hashes for this interval + self.hashes = 0; + hash_rate + } +} + +#[derive(Clone, Debug)] +pub struct Statistics { + pub mining_stats: MiningStatistics, +} + +impl Default for Statistics { + fn default() -> Statistics { + Statistics { + mining_stats: MiningStatistics::default(), + } + } +} diff --git a/applications/tari_mining_node/src/stratum/stratum_types/mod.rs b/applications/tari_mining_node/src/stratum/stratum_types/mod.rs index 432ab296ce..4142b77f46 100644 --- a/applications/tari_mining_node/src/stratum/stratum_types/mod.rs +++ b/applications/tari_mining_node/src/stratum/stratum_types/mod.rs @@ -30,5 +30,5 @@ pub(crate) mod rpc_error; pub(crate) mod rpc_request; pub(crate) mod rpc_response; pub(crate) mod submit_params; +pub(crate) mod submit_response; pub(crate) mod worker_identifier; -pub(crate) mod worker_status; diff --git a/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs b/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs index 118a4977ed..0b26a4891f 100644 --- a/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs +++ b/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs @@ -22,7 +22,7 @@ // use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RpcError { pub code: i32, pub message: String, diff --git a/applications/tari_mining_node/src/stratum/stratum_types/worker_status.rs b/applications/tari_mining_node/src/stratum/stratum_types/submit_response.rs similarity index 91% rename from applications/tari_mining_node/src/stratum/stratum_types/worker_status.rs rename to applications/tari_mining_node/src/stratum/stratum_types/submit_response.rs index 142e3fcb67..6c190d860c 100644 --- a/applications/tari_mining_node/src/stratum/stratum_types/worker_status.rs +++ b/applications/tari_mining_node/src/stratum/stratum_types/submit_response.rs @@ -20,14 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // +use crate::stratum::stratum_types::rpc_error::RpcError; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] -pub struct WorkerStatus { - pub id: String, - pub height: u64, - pub difficulty: u64, - pub accepted: u64, - pub rejected: u64, - pub stale: u64, +pub struct SubmitResponse { + pub status: Option, + pub error: Option, } diff --git a/common/logging/log4rs_sample_mining_node.yml b/common/logging/log4rs_sample_mining_node.yml index 16c4c43739..c3e85184ec 100644 --- a/common/logging/log4rs_sample_mining_node.yml +++ b/common/logging/log4rs_sample_mining_node.yml @@ -44,11 +44,14 @@ loggers: appenders: - mining_node additive: false - tari_mining_node: + tari_mining_node::logging: level: debug appenders: - mining_node + additive: false + tari_mining_node::miner: + level: info + appenders: - stdout additive: false -