diff --git a/core/lib/types/src/proofs.rs b/core/lib/types/src/proofs.rs index 0067552c829..a23b0f44416 100644 --- a/core/lib/types/src/proofs.rs +++ b/core/lib/types/src/proofs.rs @@ -446,7 +446,7 @@ pub struct SocketAddress { pub port: u16, } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub enum GpuProverInstanceStatus { // The instance is available for processing. Available, diff --git a/prover/prover_fri/src/gpu_prover_job_processor.rs b/prover/prover_fri/src/gpu_prover_job_processor.rs index b56388bce05..5e576bc114b 100644 --- a/prover/prover_fri/src/gpu_prover_job_processor.rs +++ b/prover/prover_fri/src/gpu_prover_job_processor.rs @@ -198,10 +198,21 @@ pub mod gpu_prover { const SERVICE_NAME: &'static str = "FriGpuProver"; async fn get_next_job(&self) -> anyhow::Result> { + let now = Instant::now(); + tracing::info!("Attempting to get new job from assembly queue."); let mut queue = self.witness_vector_queue.lock().await; let is_full = queue.is_full(); + tracing::info!( + "Queue has {} items with max capacity {}. Queue is_full = {}.", + queue.size(), + queue.capacity(), + is_full + ); match queue.remove() { - Err(_) => Ok(None), + Err(_) => { + tracing::warn!("No assembly available in queue after {:?}.", now.elapsed()); + Ok(None) + } Ok(item) => { if is_full { self.prover_connection_pool @@ -216,7 +227,8 @@ pub mod gpu_prover { .await; } tracing::info!( - "Started GPU proving for job: {:?}", + "Assembly received after {:?}. Starting GPU proving for job: {:?}", + now.elapsed(), item.witness_vector_artifacts.prover_job.job_id ); Ok(Some(( diff --git a/prover/prover_fri/src/socket_listener.rs b/prover/prover_fri/src/socket_listener.rs index 653fb4eb8da..36efc772145 100644 --- a/prover/prover_fri/src/socket_listener.rs +++ b/prover/prover_fri/src/socket_listener.rs @@ -90,9 +90,9 @@ pub mod gpu_socket_listener { .await .context("could not accept connection")? .0; - tracing::trace!( - "Received new assembly send connection, waited for {}ms.", - now.elapsed().as_millis() + tracing::info!( + "Received new witness vector generator connection, waited for {:?}.", + now.elapsed() ); self.handle_incoming_file(stream) @@ -110,10 +110,10 @@ pub mod gpu_socket_listener { .await .context("Failed reading from stream")?; let file_size_in_gb = assembly.len() / (1024 * 1024 * 1024); - tracing::trace!( - "Read file of size: {}GB from stream took: {} seconds", + tracing::info!( + "Read file of size: {}GB from stream after {:?}", file_size_in_gb, - started_at.elapsed().as_secs() + started_at.elapsed() ); METRICS.witness_vector_blob_time[&(file_size_in_gb as u64)] @@ -121,12 +121,17 @@ pub mod gpu_socket_listener { let witness_vector = bincode::deserialize::(&assembly) .context("Failed deserializing witness vector")?; + tracing::info!( + "Deserialized witness vector after {:?}", + started_at.elapsed() + ); let assembly = generate_assembly_for_repeated_proving( witness_vector.prover_job.circuit_wrapper.clone(), witness_vector.prover_job.job_id, witness_vector.prover_job.setup_data_key.circuit_id, ) .context("generate_assembly_for_repeated_proving()")?; + tracing::info!("Generated assembly after {:?}", started_at.elapsed()); let gpu_prover_job = GpuProverJob { witness_vector_artifacts: witness_vector, assembly, @@ -138,6 +143,10 @@ pub mod gpu_socket_listener { queue .add(gpu_prover_job) .map_err(|err| anyhow::anyhow!("Failed saving witness vector to queue: {err}"))?; + tracing::info!( + "Added witness vector to queue after {:?}", + started_at.elapsed() + ); let status = if queue.capacity() == queue.size() { GpuProverInstanceStatus::Full } else { @@ -151,6 +160,11 @@ pub mod gpu_socket_listener { .fri_gpu_prover_queue_dal() .update_prover_instance_status(self.address.clone(), status, self.zone.clone()) .await; + tracing::info!( + "Marked prover as {:?} after {:?}", + status, + started_at.elapsed() + ); Ok(()) } } diff --git a/prover/witness_vector_generator/src/generator.rs b/prover/witness_vector_generator/src/generator.rs index 74e25b38988..a81250f7a7f 100644 --- a/prover/witness_vector_generator/src/generator.rs +++ b/prover/witness_vector_generator/src/generator.rs @@ -150,12 +150,20 @@ impl JobProcessor for WitnessVectorGenerator { .await; if let Some(address) = prover { + tracing::info!( + "Found prover after {:?}. Sending witness vector job...", + now.elapsed() + ); let result = send_assembly(job_id, &serialized, &address); handle_send_result(&result, job_id, &address, &self.pool, self.zone.clone()).await; if result.is_ok() { METRICS.prover_waiting_time[&circuit_type].observe(now.elapsed()); METRICS.prover_attempts_count[&circuit_type].observe(attempts as usize); + tracing::info!( + "Sent witness vector job to prover after {:?}", + now.elapsed() + ); return Ok(()); } @@ -167,11 +175,16 @@ impl JobProcessor for WitnessVectorGenerator { ); attempts += 1; } else { + tracing::warn!( + "Could not find available prover. Time elapsed: {:?}. Will sleep for {:?}", + now.elapsed(), + self.config.prover_instance_poll_time() + ); sleep(self.config.prover_instance_poll_time()).await; } } - tracing::trace!( - "Not able to get any free prover instance for sending witness vector for job: {job_id}" + tracing::warn!( + "Not able to get any free prover instance for sending witness vector for job: {job_id} after {:?}", now.elapsed() ); Ok(()) } @@ -222,7 +235,7 @@ async fn handle_send_result( } Err(err) => { - tracing::trace!( + tracing::warn!( "Failed sending assembly to address: {address:?}, socket not reachable \ reason: {err}" );