Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(loging): Improve logging output of provider and get #932

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use futures::{Future, StreamExt};
use postcard::experimental::max_size::MaxSize;
use range_collections::RangeSet2;
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tracing::{debug, error};
use tracing::{debug, debug_span, error};
use tracing_futures::Instrument;

pub use crate::util::Hash;

Expand Down Expand Up @@ -153,17 +154,24 @@ where
C: FnMut(Hash, DataStream, String) -> FutC,
FutC: Future<Output = Result<DataStream>>,
{
let start = Instant::now();
let connection = dial_ticket(ticket, keylog, max_concurrent.into()).await?;
run_connection(
connection,
ticket.hash(),
ticket.token(),
start,
on_connected,
on_collection,
on_blob,
)
let span = debug_span!("get", hash=%ticket.hash());
async move {
let start = Instant::now();
let connection = dial_ticket(ticket, keylog, max_concurrent.into()).await?;
let span = debug_span!("connection", remote_addr=%connection.remote_address());
run_connection(
connection,
ticket.hash(),
ticket.token(),
start,
on_connected,
on_collection,
on_blob,
)
.instrument(span)
.await
}
.instrument(span)
.await
}

Expand Down Expand Up @@ -238,17 +246,24 @@ where
C: FnMut(Hash, DataStream, String) -> FutC,
FutC: Future<Output = Result<DataStream>>,
{
let now = Instant::now();
let connection = dial_peer(opts).await?;
run_connection(
connection,
hash,
auth_token,
now,
on_connected,
on_collection,
on_blob,
)
let span = debug_span!("get", %hash);
async move {
let now = Instant::now();
let connection = dial_peer(opts).await?;
let span = debug_span!("connection", remote_addr=%connection.remote_address());
run_connection(
connection,
hash,
auth_token,
now,
on_connected,
on_collection,
on_blob,
)
.instrument(span)
.await
}
.instrument(span)
.await
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod tests {

#[tokio::test]
async fn many_files() -> Result<()> {
setup_logging();
let num_files = [10, 100, 1000, 10000];
for num in num_files {
println!("NUM_FILES: {num}");
Expand Down
15 changes: 8 additions & 7 deletions src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, warn};
use tracing::{debug, debug_span, trace, trace_span, warn};
use tracing_futures::Instrument;
use walkdir::WalkDir;

Expand Down Expand Up @@ -749,7 +749,7 @@ async fn transfer_collection(

writer.write_all(&encoded).await?;
for (i, blob) in c.blobs().iter().enumerate() {
debug!("writing blob {}/{}", i, c.blobs().len());
trace!("writing blob {}/{}", i, c.blobs().len());
tokio::task::yield_now().await;
let (status, writer1, size) = send_blob(db.clone(), blob.hash, writer, buffer).await?;
writer = writer1;
Expand Down Expand Up @@ -810,7 +810,7 @@ async fn handle_stream(
};

let hash = request.name;
debug!("got request for ({hash})");
debug!(%hash, "received request");
let _ = events.send(Event::RequestReceived {
connection_id,
hash,
Expand All @@ -822,7 +822,7 @@ async fn handle_stream(
// We only respond to requests for collections, not individual blobs
Some(BlobOrCollection::Collection { outboard, data }) => (outboard, data),
_ => {
debug!("not found {}", hash);
debug!("not found");
notify_transfer_aborted(events, connection_id, request_id);
write_response(&mut writer, &mut out_buffer, Res::NotFound).await?;
writer.finish().await?;
Expand Down Expand Up @@ -975,7 +975,8 @@ fn compute_outboard(
"can only transfer blob data: {}",
path.display()
);
tracing::debug!("computing outboard for {}", path.display());
let span = trace_span!("outboard.compute", path = %path.display());
let _guard = span.enter();
let file = std::fs::File::open(&path)?;
// compute outboard size so we can pre-allocate the buffer.
//
Expand All @@ -1001,7 +1002,7 @@ fn compute_outboard(
let hash =
bao_tree::io::sync::outboard_post_order(&mut reader, size, IROH_BLOCK_SIZE, &mut outboard)?;
let ob = PostOrderMemOutboard::load(hash, Cursor::new(&outboard), IROH_BLOCK_SIZE)?.flip();
tracing::debug!("done. hash for {} is {hash}", path.display());
trace!(%hash, "done");

Ok((hash.into(), ob.into_inner()))
}
Expand Down Expand Up @@ -1133,7 +1134,7 @@ async fn write_response<W: AsyncWrite + Unpin>(

write_lp(&mut writer, used).await?;

debug!("written response of length {}", used.len());
trace!(len = used.len(), "wrote response message frame");
Ok(())
}

Expand Down