Skip to content

Commit

Permalink
Fixed many race conditions and a hard deadlock
Browse files Browse the repository at this point in the history
Also some progress report polishing
  • Loading branch information
fogodev committed May 17, 2024
1 parent a690432 commit 97744e8
Show file tree
Hide file tree
Showing 17 changed files with 481 additions and 233 deletions.
77 changes: 52 additions & 25 deletions core/crates/heavy-lifting/src/file_identifier/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use futures_concurrency::future::TryJoin;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::{trace, warn};
use tracing::{debug, error, trace, warn};

use super::{
orphan_path_filters_deep, orphan_path_filters_shallow,
Expand Down Expand Up @@ -269,10 +269,22 @@ impl FileIdentifier {
)
.await?;

// Multiplying by 2 as each batch will have 2 tasks
self.metadata.total_tasks *= 2;

ctx.progress(vec![
ProgressUpdate::TaskCount(self.metadata.total_tasks),
ProgressUpdate::Message(format!(
"{} files to be identified",
self.metadata.total_found_orphans
)),
])
.await;

self.metadata.seeking_orphans_time = start.elapsed();
} else {
ctx.progress(vec![
ProgressUpdate::TaskCount(self.metadata.total_found_orphans),
ProgressUpdate::TaskCount(self.metadata.total_tasks),
ProgressUpdate::Message(format!(
"{} files to be identified",
self.metadata.total_found_orphans
Expand Down Expand Up @@ -336,10 +348,14 @@ impl FileIdentifier {
dispatcher: &JobTaskDispatcher,
) -> Option<TaskHandle<Error>> {
self.metadata.extract_metadata_time += extract_metadata_time;
self.errors.extend(errors);

if identified_files.is_empty() {
self.metadata.completed_tasks += 1;
if !errors.is_empty() {
error!("Non critical errors while extracting metadata: {errors:#?}");
self.errors.extend(errors);
}

let maybe_task = if identified_files.is_empty() {
self.metadata.completed_tasks += 2; // Adding 2 as we will not have an ObjectProcessorTask

ctx.progress(vec![ProgressUpdate::CompletedTaskCount(
self.metadata.completed_tasks,
Expand All @@ -348,8 +364,13 @@ impl FileIdentifier {

None
} else {
ctx.progress_msg(format!("Identified {} files", identified_files.len()))
.await;
self.metadata.completed_tasks += 1;

ctx.progress(vec![
ProgressUpdate::CompletedTaskCount(self.metadata.completed_tasks),
ProgressUpdate::Message(format!("Identified {} files", identified_files.len())),
])
.await;

let with_priority = self.priority_tasks_ids.remove(&task_id);

Expand All @@ -367,7 +388,14 @@ impl FileIdentifier {
}

Some(task)
}
};

debug!(
"Processed {}/{} file identifier tasks, took: {extract_metadata_time:?}",
self.metadata.completed_tasks, self.metadata.total_tasks,
);

maybe_task
}

async fn process_object_processor_output<OuterCtx: OuterContext>(
Expand Down Expand Up @@ -408,6 +436,16 @@ impl FileIdentifier {
file_path_ids: file_path_ids_with_new_object,
});
}

debug!(
"Processed {}/{} file identifier tasks, took: {:?}",
self.metadata.completed_tasks,
self.metadata.total_tasks,
assign_cas_ids_time
+ fetch_existing_objects_time
+ assign_to_existing_object_time
+ create_object_time,
);
}

async fn dispatch_priority_identifier_tasks<OuterCtx: OuterContext>(
Expand Down Expand Up @@ -450,14 +488,7 @@ impl FileIdentifier {
*last_orphan_file_path_id =
Some(orphan_paths.last().expect("orphan_paths is not empty").id);

ctx.progress(vec![
ProgressUpdate::TaskCount(self.metadata.total_found_orphans),
ProgressUpdate::Message(format!(
"{} files to be identified",
self.metadata.total_found_orphans
)),
])
.await;
self.metadata.total_tasks += 1;

let priority_task = dispatcher
.dispatch(ExtractFileMetadataTask::new(
Expand Down Expand Up @@ -521,14 +552,7 @@ impl FileIdentifier {

self.metadata.total_found_orphans += orphan_paths.len() as u64;

ctx.progress(vec![
ProgressUpdate::TaskCount(self.metadata.total_found_orphans),
ProgressUpdate::Message(format!(
"{} files to be identified",
self.metadata.total_found_orphans
)),
])
.await;
self.metadata.total_tasks += 1;

pending_running_tasks.push(
dispatcher
Expand Down Expand Up @@ -579,6 +603,7 @@ pub struct Metadata {
created_objects_count: u64,
linked_objects_count: u64,
completed_tasks: u64,
total_tasks: u64,
}

impl From<Metadata> for Vec<ReportOutputMetadata> {
Expand All @@ -594,6 +619,7 @@ impl From<Metadata> for Vec<ReportOutputMetadata> {
created_objects_count,
linked_objects_count,
completed_tasks,
total_tasks,
}: Metadata,
) -> Self {
vec![
Expand All @@ -618,7 +644,8 @@ impl From<Metadata> for Vec<ReportOutputMetadata> {
("total_found_orphans".into(), json!(total_found_orphans)),
("created_objects_count".into(), json!(created_objects_count)),
("linked_objects_count".into(), json!(linked_objects_count)),
("total_tasks".into(), json!(completed_tasks)),
("completed_tasks".into(), json!(completed_tasks)),
("total_tasks".into(), json!(total_tasks)),
])),
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
use prisma_client_rust::Select;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tracing::{debug, trace};
use tracing::trace;
use uuid::Uuid;

use super::IdentifiedFile;
Expand Down Expand Up @@ -153,7 +153,7 @@ impl Task<Error> for ObjectProcessorTask {
*assign_to_existing_object_time = start.elapsed();
*linked_objects_count = assigned_file_path_pub_ids.len() as u64;

debug!(
trace!(
"Found {} existing Objects, linked file paths to them",
existing_objects_by_cas_id.len()
);
Expand Down
13 changes: 11 additions & 2 deletions core/crates/heavy-lifting/src/indexer/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::warn;
use tracing::{debug, error, warn};

use super::{
remove_non_existing_file_paths, reverse_update_directories_sizes,
Expand Down Expand Up @@ -401,7 +401,10 @@ impl Indexer {
.map(|WalkedEntry { iso_file_path, .. }| iso_file_path.clone()),
);

self.errors.extend(errors);
if !errors.is_empty() {
error!("Non critical errors while indexing: {errors:#?}");
self.errors.extend(errors);
}

let db_delete_time = Instant::now();
self.metadata.removed_count +=
Expand Down Expand Up @@ -457,6 +460,8 @@ impl Indexer {
])
.await;

debug!("Processed walk task in the indexer, took: {scan_time:?}");

Ok(handles)
}

Expand All @@ -472,6 +477,8 @@ impl Indexer {
self.metadata.db_write_time += save_duration;

ctx.progress_msg(format!("Saved {saved_count} files")).await;

debug!("Processed save task in the indexer, took: {save_duration:?}");
}

async fn process_update_output<OuterCtx: OuterContext>(
Expand All @@ -487,6 +494,8 @@ impl Indexer {

ctx.progress_msg(format!("Updated {updated_count} files"))
.await;

debug!("Processed update task in the indexer, took: {update_duration:?}");
}

async fn process_handles<OuterCtx: OuterContext>(
Expand Down
17 changes: 17 additions & 0 deletions core/crates/heavy-lifting/src/job_system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,23 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystem<OuterCtx, J
.map(|()| id)
}

// Check if there are any active jobs for the desired `[OuterContext]`
/// # Panics
/// Panics only happen if internal channels are unexpectedly closed
pub async fn has_active_jobs(&self, ctx: OuterCtx) -> bool {
let ctx_id = ctx.id();

let (ack_tx, ack_rx) = oneshot::channel();
self.msgs_tx
.send(RunnerMessage::HasActiveJobs { ctx_id, ack_tx })
.await
.expect("runner msgs channel unexpectedly closed on has active jobs request");

ack_rx
.await
.expect("ack channel closed before receiving has active jobs response")
}

pub fn receive_job_outputs(
&self,
) -> impl Stream<Item = (JobId, Result<JobOutput, JobSystemError>)> {
Expand Down
26 changes: 23 additions & 3 deletions core/crates/heavy-lifting/src/job_system/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ pub(super) enum RunnerMessage<OuterCtx: OuterContext, JobCtx: JobContext<OuterCt
ack_tx: oneshot::Sender<bool>,
},
Shutdown,
HasActiveJobs {
ctx_id: Uuid,
ack_tx: oneshot::Sender<bool>,
},
}

struct JobsWorktables {
Expand Down Expand Up @@ -202,6 +206,9 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
.try_join()
.await?;

handle.ctx.invalidate_query("jobs.isActive");
handle.ctx.invalidate_query("jobs.reports");

handles.insert(id, handle);

Ok(())
Expand Down Expand Up @@ -362,6 +369,9 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
.await
.expect("job outputs channel unexpectedly closed on job completion");

handle.ctx.invalidate_query("jobs.isActive");
handle.ctx.invalidate_query("jobs.reports");

Ok(())
}

Expand Down Expand Up @@ -438,6 +448,12 @@ impl<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>> JobSystemRunner<Outer
.await
.map_err(|e| JobSystemError::StoredJobs(FileIOError::from((store_jobs_file, e))))
}

fn has_active_jobs(&self, ctx_id: Uuid) -> bool {
self.handles
.values()
.any(|handle| handle.ctx.id() == ctx_id)
}
}

async fn serialize_next_jobs_to_shutdown<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>>(
Expand Down Expand Up @@ -576,6 +592,12 @@ pub(super) async fn run<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>>(
.expect("ack channel closed before sending new job response");
}

StreamMessage::RunnerMessage(RunnerMessage::HasActiveJobs { ctx_id, ack_tx }) => {
ack_tx
.send(runner.has_active_jobs(ctx_id))
.expect("ack channel closed before sending has active jobs response");
}

StreamMessage::RunnerMessage(RunnerMessage::GetActiveReports { ack_tx }) => {
ack_tx
.send(runner.get_active_reports().await)
Expand Down Expand Up @@ -647,9 +669,7 @@ pub(super) async fn run<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>>(
}

// Memory cleanup tick
StreamMessage::CleanMemoryTick => {
runner.clean_memory();
}
StreamMessage::CleanMemoryTick => runner.clean_memory(),
}
}
}

Check warning on line 675 in core/crates/heavy-lifting/src/job_system/runner.rs

View workflow job for this annotation

GitHub Actions / Clippy (ubuntu-22.04)

this function has too many lines (102/100)

warning: this function has too many lines (102/100) --> core/crates/heavy-lifting/src/job_system/runner.rs:549:1 | 549 | / pub(super) async fn run<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>>( 550 | | mut runner: JobSystemRunner<OuterCtx, JobCtx>, 551 | | store_jobs_file: impl AsRef<Path> + Send, 552 | | msgs_rx: chan::Receiver<RunnerMessage<OuterCtx, JobCtx>>, ... | 674 | | } 675 | | } | |_^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#too_many_lines note: the lint level is defined here --> core/crates/heavy-lifting/src/lib.rs:4:2 | 4 | clippy::pedantic, | ^^^^^^^^^^^^^^^^ = note: `#[warn(clippy::too_many_lines)]` implied by `#[warn(clippy::pedantic)]`

Check warning on line 675 in core/crates/heavy-lifting/src/job_system/runner.rs

View workflow job for this annotation

GitHub Actions / Clippy (macos-14)

this function has too many lines (102/100)

warning: this function has too many lines (102/100) --> core/crates/heavy-lifting/src/job_system/runner.rs:549:1 | 549 | / pub(super) async fn run<OuterCtx: OuterContext, JobCtx: JobContext<OuterCtx>>( 550 | | mut runner: JobSystemRunner<OuterCtx, JobCtx>, 551 | | store_jobs_file: impl AsRef<Path> + Send, 552 | | msgs_rx: chan::Receiver<RunnerMessage<OuterCtx, JobCtx>>, ... | 674 | | } 675 | | } | |_^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#too_many_lines note: the lint level is defined here --> core/crates/heavy-lifting/src/lib.rs:4:2 | 4 | clippy::pedantic, | ^^^^^^^^^^^^^^^^ = note: `#[warn(clippy::too_many_lines)]` implied by `#[warn(clippy::pedantic)]`
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub const TARGET_PX: f32 = 1_048_576.0; // 1024x1024
/// and is treated as a percentage (so 60% in this case, or it's the same as multiplying by `0.6`).
pub const TARGET_QUALITY: f32 = 60.0;

/// How much time we allow for the thumbnail generation process to complete before we give up.
pub const THUMBNAIL_GENERATION_TIMEOUT: Duration = Duration::from_secs(60);
/// How much time we allow for the thumbnailer task to complete before we give up.
pub const THUMBNAILER_TASK_TIMEOUT: Duration = Duration::from_secs(60 * 5);

pub fn get_thumbnails_directory(data_directory: impl AsRef<Path>) -> PathBuf {
data_directory.as_ref().join(THUMBNAIL_CACHE_DIR_NAME)
Expand Down
Loading

0 comments on commit 97744e8

Please sign in to comment.