Skip to content

Commit

Permalink
fix(prover): Add logging for prover + WVGs (#723)
Browse files Browse the repository at this point in the history
## What ❔

Add better logging for prover and WVGs.

## Why ❔

During incidents, core noticed that logging is missing. These were added
as a hotfix via a different patch on the deployed version.

These changes are now being backported (and improved) on the main
branch.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
EmilLuta committed Dec 21, 2023
1 parent 5405a53 commit d7ce14c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/lib/types/src/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ pub struct SocketAddress {
pub port: u16,
}

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub enum GpuProverInstanceStatus {
// The instance is available for processing.
Available,
Expand Down
16 changes: 14 additions & 2 deletions prover/prover_fri/src/gpu_prover_job_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,21 @@ pub mod gpu_prover {
const SERVICE_NAME: &'static str = "FriGpuProver";

async fn get_next_job(&self) -> anyhow::Result<Option<(Self::JobId, Self::Job)>> {
let now = Instant::now();
tracing::info!("Attempting to get new job from assembly queue.");
let mut queue = self.witness_vector_queue.lock().await;
let is_full = queue.is_full();
tracing::info!(
"Queue has {} items with max capacity {}. Queue is_full = {}.",
queue.size(),
queue.capacity(),
is_full
);
match queue.remove() {
Err(_) => Ok(None),
Err(_) => {
tracing::warn!("No assembly available in queue after {:?}.", now.elapsed());
Ok(None)
}
Ok(item) => {
if is_full {
self.prover_connection_pool
Expand All @@ -216,7 +227,8 @@ pub mod gpu_prover {
.await;
}
tracing::info!(
"Started GPU proving for job: {:?}",
"Assembly received after {:?}. Starting GPU proving for job: {:?}",
now.elapsed(),
item.witness_vector_artifacts.prover_job.job_id
);
Ok(Some((
Expand Down
26 changes: 20 additions & 6 deletions prover/prover_fri/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ pub mod gpu_socket_listener {
.await
.context("could not accept connection")?
.0;
tracing::trace!(
"Received new assembly send connection, waited for {}ms.",
now.elapsed().as_millis()
tracing::info!(
"Received new witness vector generator connection, waited for {:?}.",
now.elapsed()
);

self.handle_incoming_file(stream)
Expand All @@ -110,23 +110,28 @@ pub mod gpu_socket_listener {
.await
.context("Failed reading from stream")?;
let file_size_in_gb = assembly.len() / (1024 * 1024 * 1024);
tracing::trace!(
"Read file of size: {}GB from stream took: {} seconds",
tracing::info!(
"Read file of size: {}GB from stream after {:?}",
file_size_in_gb,
started_at.elapsed().as_secs()
started_at.elapsed()
);

METRICS.witness_vector_blob_time[&(file_size_in_gb as u64)]
.observe(started_at.elapsed());

let witness_vector = bincode::deserialize::<WitnessVectorArtifacts>(&assembly)
.context("Failed deserializing witness vector")?;
tracing::info!(
"Deserialized witness vector after {:?}",
started_at.elapsed()
);
let assembly = generate_assembly_for_repeated_proving(
witness_vector.prover_job.circuit_wrapper.clone(),
witness_vector.prover_job.job_id,
witness_vector.prover_job.setup_data_key.circuit_id,
)
.context("generate_assembly_for_repeated_proving()")?;
tracing::info!("Generated assembly after {:?}", started_at.elapsed());
let gpu_prover_job = GpuProverJob {
witness_vector_artifacts: witness_vector,
assembly,
Expand All @@ -138,6 +143,10 @@ pub mod gpu_socket_listener {
queue
.add(gpu_prover_job)
.map_err(|err| anyhow::anyhow!("Failed saving witness vector to queue: {err}"))?;
tracing::info!(
"Added witness vector to queue after {:?}",
started_at.elapsed()
);
let status = if queue.capacity() == queue.size() {
GpuProverInstanceStatus::Full
} else {
Expand All @@ -151,6 +160,11 @@ pub mod gpu_socket_listener {
.fri_gpu_prover_queue_dal()
.update_prover_instance_status(self.address.clone(), status, self.zone.clone())
.await;
tracing::info!(
"Marked prover as {:?} after {:?}",
status,
started_at.elapsed()
);
Ok(())
}
}
Expand Down
19 changes: 16 additions & 3 deletions prover/witness_vector_generator/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,20 @@ impl JobProcessor for WitnessVectorGenerator {
.await;

if let Some(address) = prover {
tracing::info!(
"Found prover after {:?}. Sending witness vector job...",
now.elapsed()
);
let result = send_assembly(job_id, &serialized, &address);
handle_send_result(&result, job_id, &address, &self.pool, self.zone.clone()).await;

if result.is_ok() {
METRICS.prover_waiting_time[&circuit_type].observe(now.elapsed());
METRICS.prover_attempts_count[&circuit_type].observe(attempts as usize);
tracing::info!(
"Sent witness vector job to prover after {:?}",
now.elapsed()
);
return Ok(());
}

Expand All @@ -167,11 +175,16 @@ impl JobProcessor for WitnessVectorGenerator {
);
attempts += 1;
} else {
tracing::warn!(
"Could not find available prover. Time elapsed: {:?}. Will sleep for {:?}",
now.elapsed(),
self.config.prover_instance_poll_time()
);
sleep(self.config.prover_instance_poll_time()).await;
}
}
tracing::trace!(
"Not able to get any free prover instance for sending witness vector for job: {job_id}"
tracing::warn!(
"Not able to get any free prover instance for sending witness vector for job: {job_id} after {:?}", now.elapsed()
);
Ok(())
}
Expand Down Expand Up @@ -222,7 +235,7 @@ async fn handle_send_result(
}

Err(err) => {
tracing::trace!(
tracing::warn!(
"Failed sending assembly to address: {address:?}, socket not reachable \
reason: {err}"
);
Expand Down

0 comments on commit d7ce14c

Please sign in to comment.