Skip to content

Commit

Permalink
feat: improve logging for tari_mining_node
Browse files Browse the repository at this point in the history
This PR improves the logging for tari_mining_node by making use of the log4rs target, it also sets the default logging level to info.
This PR also adds basic statistics for pool mining which are automatically displayed on a 20s interval.

Review comments
Add additional log target, split logging messages between terminal and file.
Updated stratum miner to display basic statistics every 20 seconds instead of when changing jobs.
Added info message to indicate hashing is in progress for stratum miner.
Added hash rate to displayed statistics in additon to share rate.
Included metric for share and hash rates.

Error handling for rejected share
  • Loading branch information
StriderDM committed Oct 17, 2021
1 parent 1f52ffc commit fbe17c7
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 110 deletions.
83 changes: 56 additions & 27 deletions applications/tari_mining_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ mod utils;

use crate::{
miner::MiningReport,
stratum::{stratum_controller::controller::Controller, stratum_miner::miner::StratumMiner},
stratum::{
stratum_controller::controller::Controller,
stratum_miner::miner::StratumMiner,
stratum_statistics::stats::Statistics,
},
};
use errors::{err_empty, MinerError};
use miner::Miner;
Expand All @@ -42,6 +46,7 @@ use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
RwLock,
},
thread,
time::Instant,
Expand All @@ -58,14 +63,17 @@ use tokio::{runtime::Runtime, time::sleep};
use tonic::transport::Channel;
use utils::{coinbase_request, extract_outputs_and_kernels};

pub const LOG_TARGET: &str = "tari_mining_node::miner::main";
pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::main";

/// Application entry point
fn main() {
let rt = Runtime::new().expect("Failed to start tokio runtime");
match rt.block_on(main_inner()) {
Ok(_) => std::process::exit(0),
Err(exit_code) => {
eprintln!("Fatal error: {:?}", exit_code);
error!("Exiting with code: {:?}", exit_code);
error!(target: LOG_TARGET, "Exiting with code: {:?}", exit_code);
std::process::exit(exit_code.as_i32())
},
}
Expand All @@ -80,8 +88,8 @@ async fn main_inner() -> Result<(), ExitCodes> {
config.mining_worker_name = global.mining_worker_name.clone();
config.mining_wallet_address = global.mining_wallet_address.clone();
config.mining_pool_address = global.mining_pool_address.clone();
debug!("{:?}", bootstrap);
debug!("{:?}", config);
debug!(target: LOG_TARGET_FILE, "{:?}", bootstrap);
debug!(target: LOG_TARGET_FILE, "{:?}", config);

if !config.mining_wallet_address.is_empty() && !config.mining_pool_address.is_empty() {
let url = config.mining_pool_address.clone();
Expand All @@ -91,18 +99,25 @@ async fn main_inner() -> Result<(), ExitCodes> {
if !config.mining_worker_name.is_empty() {
miner_address += &format!("{}{}", ".", &config.mining_worker_name);
}
let mut mc = Controller::new().unwrap_or_else(|e| {
let stats = Arc::new(RwLock::new(Statistics::default()));
let mut mc = Controller::new(stats.clone()).unwrap_or_else(|e| {
debug!(target: LOG_TARGET_FILE, "Error loading mining controller: {}", e);
panic!("Error loading mining controller: {}", e);
});
let cc = stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone())
.unwrap_or_else(|e| {
panic!("Error loading stratum client controller: {:?}", e);
});
let cc =
stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone(), stats.clone())
.unwrap_or_else(|e| {
debug!(
target: LOG_TARGET_FILE,
"Error loading stratum client controller: {:?}", e
);
panic!("Error loading stratum client controller: {:?}", e);
});
let miner_stopped = Arc::new(AtomicBool::new(false));
let client_stopped = Arc::new(AtomicBool::new(false));

mc.set_client_tx(cc.tx.clone());
let mut miner = StratumMiner::new(config);
let mut miner = StratumMiner::new(config, stats);
if let Err(e) = miner.start_solvers() {
println!("Error. Please check logs for further info.");
println!("Error details:");
Expand All @@ -115,7 +130,7 @@ async fn main_inner() -> Result<(), ExitCodes> {
.name("mining_controller".to_string())
.spawn(move || {
if let Err(e) = mc.run(miner) {
error!("Error. Please check logs for further info: {:?}", e);
error!(target: LOG_TARGET, "Error. Please check logs for further info: {:?}", e);
return;
}
miner_stopped_internal.store(true, Ordering::Relaxed);
Expand All @@ -139,19 +154,21 @@ async fn main_inner() -> Result<(), ExitCodes> {
Ok(())
} else {
config.mine_on_tip_only = global.mine_on_tip_only;
debug!("mine_on_tip_only is {}", config.mine_on_tip_only);

debug!(
target: LOG_TARGET_FILE,
"mine_on_tip_only is {}", config.mine_on_tip_only
);
let (mut node_conn, mut wallet_conn) = connect(&config, &global).await.map_err(ExitCodes::grpc)?;

let mut blocks_found: u64 = 0;
loop {
debug!("Starting new mining cycle");
debug!(target: LOG_TARGET_FILE, "Starting new mining cycle");
match mining_cycle(&mut node_conn, &mut wallet_conn, &config, &bootstrap).await {
err @ Err(MinerError::GrpcConnection(_)) | err @ Err(MinerError::GrpcStatus(_)) => {
// Any GRPC error we will try to reconnect with a standard delay
error!("Connection error: {:?}", err);
error!(target: LOG_TARGET, "Connection error: {:?}", err);
loop {
debug!("Holding for {:?}", config.wait_timeout());
debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout());
sleep(config.wait_timeout()).await;
match connect(&config, &global).await {
Ok((nc, wc)) => {
Expand All @@ -160,22 +177,28 @@ async fn main_inner() -> Result<(), ExitCodes> {
break;
},
Err(err) => {
error!("Connection error: {:?}", err);
error!(target: LOG_TARGET, "Connection error: {:?}", err);
continue;
},
}
}
},
Err(MinerError::MineUntilHeightReached(h)) => {
info!("Prescribed blockchain height {} reached. Aborting ...", h);
info!(
target: LOG_TARGET,
"Prescribed blockchain height {} reached. Aborting ...", h
);
return Ok(());
},
Err(MinerError::MinerLostBlock(h)) => {
info!("Height {} already mined by other node. Restarting ...", h);
info!(
target: LOG_TARGET,
"Height {} already mined by other node. Restarting ...", h
);
},
Err(err) => {
error!("Error: {:?}", err);
debug!("Holding for {:?}", config.wait_timeout());
error!(target: LOG_TARGET, "Error: {:?}", err);
debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout());
sleep(config.wait_timeout()).await;
},
Ok(submitted) => {
Expand All @@ -198,10 +221,10 @@ async fn connect(
global: &GlobalConfig,
) -> Result<(BaseNodeClient<Channel>, WalletClient<Channel>), MinerError> {
let base_node_addr = config.base_node_addr(global);
info!("Connecting to base node at {}", base_node_addr);
info!(target: LOG_TARGET, "Connecting to base node at {}", base_node_addr);
let node_conn = BaseNodeClient::connect(base_node_addr.clone()).await?;
let wallet_addr = config.wallet_addr(global);
info!("Connecting to wallet at {}", wallet_addr);
info!(target: LOG_TARGET, "Connecting to wallet at {}", wallet_addr);
let wallet_conn = WalletClient::connect(wallet_addr.clone()).await?;

Ok((node_conn, wallet_conn))
Expand Down Expand Up @@ -264,26 +287,31 @@ async fn mining_cycle(
if report.difficulty < min_diff {
submit = false;
debug!(
"Mined difficulty {} below minimum difficulty {}. Not submitting.",
report.difficulty, min_diff
target: LOG_TARGET_FILE,
"Mined difficulty {} below minimum difficulty {}. Not submitting.", report.difficulty, min_diff
);
}
}
if let Some(max_diff) = bootstrap.miner_max_diff {
if report.difficulty > max_diff {
submit = false;
debug!(
target: LOG_TARGET_FILE,
"Mined difficulty {} greater than maximum difficulty {}. Not submitting.",
report.difficulty, max_diff
report.difficulty,
max_diff
);
}
}
if submit {
// Mined a block fitting the difficulty
let block_header = BlockHeader::try_from(header.clone()).map_err(MinerError::Conversion)?;
info!(
target: LOG_TARGET,
"Miner {} found block header {} with difficulty {:?}",
report.miner, block_header, report.difficulty,
report.miner,
block_header,
report.difficulty,
);
let mut mined_block = block.clone();
mined_block.header = Some(header);
Expand All @@ -310,6 +338,7 @@ async fn mining_cycle(
async fn display_report(report: &MiningReport, config: &MinerConfig) {
let hashrate = report.hashes as f64 / report.elapsed.as_micros() as f64;
debug!(
target: LOG_TARGET_FILE,
"Miner {} reported {:.2}MH/s with total {:.2}MH/s over {} threads. Height: {}. Target: {})",
report.miner,
hashrate,
Expand Down
26 changes: 14 additions & 12 deletions applications/tari_mining_node/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use std::{
use tari_app_grpc::{conversions::timestamp, tari_rpc::BlockHeader};
use thread::JoinHandle;

pub const LOG_TARGET: &str = "tari_mining_node::miner::standalone";

// Identify how often mining thread is reporting / checking context
// ~400_000 hashes per second
const REPORTING_FREQUENCY: u64 = 3_000_000;
Expand Down Expand Up @@ -107,20 +109,20 @@ impl Stream for Miner {
type Item = MiningReport;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
trace!("Polling Miner");
trace!(target: LOG_TARGET, "Polling Miner");
// First poll would start all the threads passing async context waker
if self.threads.is_empty() && self.num_threads > 0 {
debug!(
"Starting {} mining threads for target difficulty {}",
self.num_threads, self.target_difficulty
target: LOG_TARGET,
"Starting {} mining threads for target difficulty {}", self.num_threads, self.target_difficulty
);
self.start_threads(ctx);
return Poll::Pending;
} else if self.num_threads == 0 {
error!("Cannot mine: no mining threads");
error!(target: LOG_TARGET, "Cannot mine: no mining threads");
return Poll::Ready(None);
} else if self.channels.is_empty() {
debug!("Finished mining");
debug!(target: LOG_TARGET, "Finished mining");
return Poll::Ready(None);
}

Expand Down Expand Up @@ -167,14 +169,14 @@ pub fn mining_task(
let mut hasher = BlockHeaderSha3::new(header).unwrap();
hasher.random_nonce();
// We're mining over here!
info!("Mining thread {} started", miner);
info!(target: LOG_TARGET, "Mining thread {} started", miner);
// Mining work
loop {
let difficulty = hasher.difficulty();
if difficulty >= target_difficulty {
debug!(
"Miner {} found nonce {} with matching difficulty {}",
miner, hasher.nonce, difficulty
target: LOG_TARGET,
"Miner {} found nonce {} with matching difficulty {}", miner, hasher.nonce, difficulty
);
if let Err(err) = sender.try_send(MiningReport {
miner,
Expand All @@ -186,10 +188,10 @@ pub fn mining_task(
header: Some(hasher.into_header()),
target_difficulty,
}) {
error!("Miner {} failed to send report: {}", miner, err);
error!(target: LOG_TARGET, "Miner {} failed to send report: {}", miner, err);
}
waker.wake();
info!("Mining thread {} stopped", miner);
info!(target: LOG_TARGET, "Mining thread {} stopped", miner);
return;
}
if hasher.nonce % REPORTING_FREQUENCY == 0 {
Expand All @@ -204,9 +206,9 @@ pub fn mining_task(
target_difficulty,
});
waker.clone().wake();
trace!("Reporting from {} result {:?}", miner, res);
trace!(target: LOG_TARGET, "Reporting from {} result {:?}", miner, res);
if let Err(TrySendError::Disconnected(_)) = res {
info!("Mining thread {} disconnected", miner);
info!(target: LOG_TARGET, "Mining thread {} disconnected", miner);
return;
}
hasher.set_forward_timestamp(timestamp().seconds as u64);
Expand Down
Loading

0 comments on commit fbe17c7

Please sign in to comment.