Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 53 additions & 45 deletions src/burnchains/burnchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1291,32 +1291,36 @@ impl Burnchain {
// TODO: don't re-process blocks. See if the block hash is already present in the burn db,
// and if so, do nothing.
let download_thread: thread::JoinHandle<Result<(), burnchain_error>> =
thread::spawn(move || {
while let Ok(Some(ipc_header)) = downloader_recv.recv() {
debug!("Try recv next header");

let download_start = get_epoch_time_ms();
let ipc_block = downloader.download(&ipc_header)?;
let download_end = get_epoch_time_ms();

debug!(
"Downloaded block {} in {}ms",
ipc_block.height(),
download_end.saturating_sub(download_start)
);
thread::Builder::new()
.name("burnchain-downloader".to_string())
.spawn(move || {
while let Ok(Some(ipc_header)) = downloader_recv.recv() {
debug!("Try recv next header");

let download_start = get_epoch_time_ms();
let ipc_block = downloader.download(&ipc_header)?;
let download_end = get_epoch_time_ms();

debug!(
"Downloaded block {} in {}ms",
ipc_block.height(),
download_end.saturating_sub(download_start)
);

parser_send
.send(Some(ipc_block))
.map_err(|_e| burnchain_error::ThreadChannelError)?;
}
parser_send
.send(Some(ipc_block))
.send(None)
.map_err(|_e| burnchain_error::ThreadChannelError)?;
}
parser_send
.send(None)
.map_err(|_e| burnchain_error::ThreadChannelError)?;
Ok(())
});
Ok(())
})
.unwrap();

let parse_thread: thread::JoinHandle<Result<(), burnchain_error>> =
thread::spawn(move || {
let parse_thread: thread::JoinHandle<Result<(), burnchain_error>> = thread::Builder::new()
.name("burnchain-parser".to_string())
.spawn(move || {
while let Ok(Some(ipc_block)) = parser_recv.recv() {
debug!("Try recv next block");

Expand All @@ -1338,34 +1342,38 @@ impl Burnchain {
.send(None)
.map_err(|_e| burnchain_error::ThreadChannelError)?;
Ok(())
});
})
.unwrap();

let db_thread: thread::JoinHandle<Result<BurnchainBlockHeader, burnchain_error>> =
thread::spawn(move || {
let mut last_processed = burn_chain_tip;
while let Ok(Some(burnchain_block)) = db_recv.recv() {
debug!("Try recv next parsed block");
thread::Builder::new()
.name("burnchain-db".to_string())
.spawn(move || {
let mut last_processed = burn_chain_tip;
while let Ok(Some(burnchain_block)) = db_recv.recv() {
debug!("Try recv next parsed block");

if burnchain_block.block_height() == 0 {
continue;
}

if burnchain_block.block_height() == 0 {
continue;
}
let insert_start = get_epoch_time_ms();
last_processed =
Burnchain::process_block(&myself, &mut burnchain_db, &burnchain_block)?;
if !coord_comm.announce_new_burn_block() {
return Err(burnchain_error::CoordinatorClosed);
}
let insert_end = get_epoch_time_ms();

let insert_start = get_epoch_time_ms();
last_processed =
Burnchain::process_block(&myself, &mut burnchain_db, &burnchain_block)?;
if !coord_comm.announce_new_burn_block() {
return Err(burnchain_error::CoordinatorClosed);
debug!(
"Inserted block {} in {}ms",
burnchain_block.block_height(),
insert_end.saturating_sub(insert_start)
);
}
let insert_end = get_epoch_time_ms();

debug!(
"Inserted block {} in {}ms",
burnchain_block.block_height(),
insert_end.saturating_sub(insert_start)
);
}
Ok(last_processed)
});
Ok(last_processed)
})
.unwrap();

// feed the pipeline!
let input_headers = indexer.read_headers(start_block + 1, end_block + 1)?;
Expand Down
5 changes: 4 additions & 1 deletion src/util/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ fn print_msg_header(mut rd: &mut dyn RecordDecorator, record: &Record) -> io::Re
write!(rd, " ")?;
write!(rd, "[{}:{}]", record.file(), record.line())?;
write!(rd, " ")?;
write!(rd, "[{:?}]", thread::current().id())?;
match thread::current().name() {
None => write!(rd, "[{:?}]", thread::current().id())?,
Some(name) => write!(rd, "[{}]", name)?,
}

rd.start_whitespace()?;
write!(rd, " ")?;
Expand Down
Loading