Skip to content

Commit

Permalink
Drop excessive cells after task reexecution (#7765)
Browse files Browse the repository at this point in the history
### Description

When cells become unused after recomputation of a task, drop them.

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->


Closes PACK-2788
  • Loading branch information
sokra committed Apr 17, 2024
1 parent d1a7d66 commit adbfe4c
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 8 deletions.
13 changes: 11 additions & 2 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Expand Up @@ -15,6 +15,7 @@ use std::{
use anyhow::{bail, Result};
use auto_hash_map::AutoMap;
use dashmap::{mapref::entry::Entry, DashMap};
use nohash_hasher::NoHashHasher;
use rustc_hash::FxHasher;
use tokio::task::futures::TaskLocalFuture;
use tracing::trace_span;
Expand All @@ -25,7 +26,7 @@ use turbo_tasks::{
},
event::EventListener,
util::{IdFactory, NoMoveVec},
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused,
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, ValueTypeId,
};

use crate::{
Expand Down Expand Up @@ -315,11 +316,19 @@ impl Backend for MemoryBackend {
task_id: TaskId,
duration: Duration,
instant: Instant,
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<NoHashHasher<ValueTypeId>>, 8>,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> bool {
let reexecute = self.with_task(task_id, |task| {
task.execution_completed(duration, instant, stateful, self, turbo_tasks)
task.execution_completed(
duration,
instant,
cell_counters,
stateful,
self,
turbo_tasks,
)
});
if !reexecute {
self.run_gc(false, turbo_tasks);
Expand Down
5 changes: 4 additions & 1 deletion crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::{BinaryHeap, HashMap},
fmt::Debug,
future::Future,
hash::BuildHasherDefault,
mem::{replace, take},
pin::Pin,
sync::{
Expand All @@ -16,6 +17,7 @@ use anyhow::{anyhow, Result};
use auto_hash_map::{AutoMap, AutoSet};
use concurrent_queue::ConcurrentQueue;
use dashmap::{mapref::entry::Entry, DashMap, DashSet};
use nohash_hasher::NoHashHasher;
use turbo_tasks::{
backend::{
Backend, BackendJobId, CellContent, PersistentTaskType, TaskExecutionSpec,
Expand All @@ -27,7 +29,7 @@ use turbo_tasks::{
PersistedGraphApi, ReadTaskState, TaskCell, TaskData,
},
util::{IdFactory, NoMoveVec, SharedError},
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused,
CellId, RawVc, TaskId, TaskIdSet, TraitTypeId, TurboTasksBackendApi, Unused, ValueTypeId,
};

type RootTaskFn =
Expand Down Expand Up @@ -1154,6 +1156,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
task: TaskId,
duration: Duration,
_instant: Instant,
_cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<NoHashHasher<ValueTypeId>>, 8>,
_stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackendWithPersistedGraph<P>>,
) -> bool {
Expand Down
13 changes: 12 additions & 1 deletion crates/turbo-tasks-memory/src/task.rs
Expand Up @@ -20,7 +20,7 @@ use std::{

use anyhow::Result;
use auto_hash_map::{AutoMap, AutoSet};
use nohash_hasher::BuildNoHashHasher;
use nohash_hasher::{BuildNoHashHasher, NoHashHasher};
use parking_lot::{Mutex, RwLock};
use rustc_hash::FxHasher;
use smallvec::SmallVec;
Expand Down Expand Up @@ -963,6 +963,7 @@ impl Task {
&self,
duration: Duration,
instant: Instant,
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<NoHashHasher<ValueTypeId>>, 8>,
stateful: bool,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
Expand All @@ -973,13 +974,20 @@ impl Task {
let mut change_job = None;
#[cfg(feature = "lazy_remove_children")]
let mut remove_job = None;
let mut drained_cells = SmallVec::<[Cell; 8]>::new();
let mut dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take());
{
let mut state = self.full_state_mut();

state
.stats
.register_execution(duration, turbo_tasks.program_duration_until(instant));
for (value_type, cells) in state.cells.iter_mut() {
let counter = cell_counters.get(value_type).copied().unwrap_or_default();
if counter != cells.len() as u32 {
drained_cells.extend(cells.drain(counter as usize..));
}
}
match state.state_type {
InProgress {
ref mut event,
Expand Down Expand Up @@ -1049,6 +1057,9 @@ impl Task {
if !dependencies.is_empty() {
self.clear_dependencies(dependencies, backend, turbo_tasks);
}
for cell in drained_cells {
cell.gc_drop(turbo_tasks);
}
if let Some(job) = change_job {
job();
}
Expand Down
8 changes: 5 additions & 3 deletions crates/turbo-tasks/src/backend.rs
@@ -1,9 +1,9 @@
use std::{
any::Any,
borrow::Cow,
fmt,
fmt::{Debug, Display, Write},
fmt::{self, Debug, Display, Write},
future::Future,
hash::BuildHasherDefault,
mem::take,
pin::Pin,
sync::Arc,
Expand All @@ -12,14 +12,15 @@ use std::{

use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use nohash_hasher::NoHashHasher;
use serde::{Deserialize, Serialize};
use tracing::Span;

pub use crate::id::BackendJobId;
use crate::{
event::EventListener, manager::TurboTasksBackendApi, raw_vc::CellId, registry,
ConcreteTaskInput, FunctionId, RawVc, ReadRef, SharedReference, TaskId, TaskIdProvider,
TaskIdSet, TraitRef, TraitTypeId, VcValueTrait, VcValueType,
TaskIdSet, TraitRef, TraitTypeId, ValueTypeId, VcValueTrait, VcValueType,
};

pub enum TaskType {
Expand Down Expand Up @@ -228,6 +229,7 @@ pub trait Backend: Sync + Send {
task: TaskId,
duration: Duration,
instant: Instant,
cell_counters: AutoMap<ValueTypeId, u32, BuildHasherDefault<NoHashHasher<ValueTypeId>>, 8>,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool;
Expand Down
9 changes: 8 additions & 1 deletion crates/turbo-tasks/src/manager.rs
Expand Up @@ -490,8 +490,15 @@ impl<B: Backend + 'static> TurboTasks<B> {
});
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
let cell_counters =
CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut()));
this.backend.task_execution_completed(
task_id, duration, instant, stateful, &*this,
task_id,
duration,
instant,
cell_counters,
stateful,
&*this,
)
}
.instrument(span)
Expand Down

0 comments on commit adbfe4c

Please sign in to comment.