Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove flurry in favor of DashMap #2780

Merged
merged 1 commit into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 1 addition & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ bench = false
anyhow = "1.0.47"
concurrent-queue = "1.2.2"
dashmap = "5.4.0"
flurry = "0.4.0"
lazy_static = "1.4.0"
nohash-hasher = "0.2.0"
num_cpus = "1.13.1"
Expand Down
100 changes: 49 additions & 51 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{

use anyhow::{anyhow, Result};
use concurrent_queue::ConcurrentQueue;
use dashmap::{mapref::entry::Entry, DashMap, DashSet};
use turbo_tasks::{
backend::{
Backend, BackendJobId, CellContent, CellMappings, PersistentTaskType, TaskExecutionSpec,
Expand Down Expand Up @@ -135,19 +136,19 @@ enum BackgroundJob {
pub struct MemoryBackendWithPersistedGraph<P: PersistedGraph> {
pub pg: P,
tasks: NoMoveVec<Task>,
cache: flurry::HashMap<PersistentTaskType, TaskId>,
cache: DashMap<PersistentTaskType, TaskId>,
background_job_id_factory: IdFactory<BackendJobId>,
background_jobs: NoMoveVec<BackgroundJob>,
only_known_to_memory_tasks: flurry::HashSet<TaskId>,
only_known_to_memory_tasks: DashSet<TaskId>,
/// Tasks that were selected to persist
persist_queue1: ConcurrentQueue<TaskId>,
persist_queue1_queued: flurry::HashSet<TaskId>,
need_persisting: flurry::HashSet<TaskId>,
persist_queue1_queued: DashSet<TaskId>,
need_persisting: DashSet<TaskId>,
/// Task sorted by importance, sharded to avoid lock contention
persist_queue_by_duration: [Mutex<BinaryHeap<(Duration, TaskId)>>; 64],
persist_capacity: AtomicUsize,
persist_job: BackendJobId,
partial_lookups: flurry::HashMap<PersistentTaskType, bool>,
partial_lookups: DashMap<PersistentTaskType, bool>,
#[cfg(feature = "unsafe_once_map")]
partial_lookup: turbo_tasks::util::OnceConcurrentlyMap<PersistentTaskType, bool>,
#[cfg(not(feature = "unsafe_once_map"))]
Expand All @@ -164,17 +165,17 @@ impl<P: PersistedGraph> MemoryBackendWithPersistedGraph<P> {
Self {
pg,
tasks: NoMoveVec::new(),
cache: flurry::HashMap::new(),
cache: DashMap::new(),
background_job_id_factory,
background_jobs: NoMoveVec::new(),
only_known_to_memory_tasks: flurry::HashSet::new(),
only_known_to_memory_tasks: DashSet::new(),
persist_queue1: ConcurrentQueue::unbounded(),
persist_queue1_queued: flurry::HashSet::new(),
need_persisting: flurry::HashSet::new(),
persist_queue1_queued: DashSet::new(),
need_persisting: DashSet::new(),
persist_queue_by_duration: [(); 64].map(|_| Mutex::new(BinaryHeap::new())),
persist_capacity: AtomicUsize::new(num_cpus::get()),
persist_job,
partial_lookups: flurry::HashMap::new(),
partial_lookups: DashMap::new(),
#[cfg(feature = "unsafe_once_map")]
partial_lookup: turbo_tasks::util::OnceConcurrentlyMap::new(),
#[cfg(not(feature = "unsafe_once_map"))]
Expand Down Expand Up @@ -315,22 +316,21 @@ impl<P: PersistedGraph> MemoryBackendWithPersistedGraph<P> {

fn lookup(
&self,
cache: &flurry::HashMapRef<'_, PersistentTaskType, TaskId>,
task_type: &PersistentTaskType,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> Option<TaskId> {
for i in 0..task_type.len() {
let partial = task_type.partial(i);
let complete_cached = self.partial_lookups.pin().get(&partial).copied();
let complete_cached = self.partial_lookups.get(&partial).map(|x| *x);
let complete = complete_cached.unwrap_or_else(|| {
self.partial_lookup.action(&partial, || {
let complete = self.pg_lookup(&partial, turbo_tasks);
self.partial_lookups.pin().insert(partial.clone(), complete);
self.partial_lookups.insert(partial.clone(), complete);
complete
})
});
if complete {
return cache.get(task_type).copied();
return self.cache.get(task_type).map(|x| *x);
}
}
self.pg_lookup_one(task_type, turbo_tasks)
Expand Down Expand Up @@ -739,13 +739,11 @@ impl<P: PersistedGraph> MemoryBackendWithPersistedGraph<P> {
}

fn persist(&self, turbo_tasks: &dyn TurboTasksBackendApi) -> bool {
let persist_queue1_queued = self.persist_queue1_queued.pin();
loop {
if let Ok(mut task) = self.persist_queue1.pop() {
persist_queue1_queued.remove(&task);
let need_persisting = self.need_persisting.pin();
self.persist_queue1_queued.remove(&task);
'outer: loop {
need_persisting.remove(&task);
self.need_persisting.remove(&task);
let (mut state, task_info) = self.state_mut(task, turbo_tasks);
if let TaskState {
ref mut persisted,
Expand Down Expand Up @@ -775,20 +773,18 @@ impl<P: PersistedGraph> MemoryBackendWithPersistedGraph<P> {
.map(|vc| vc.get_task_id())
.chain(children.iter().copied())
{
if need_persisting.contains(&higher_prio_task) {
if persist_queue1_queued.insert(task) {
if self.need_persisting.contains(&higher_prio_task) {
if self.persist_queue1_queued.insert(task) {
self.persist_queue1.push(task).unwrap();
}
task = higher_prio_task;
continue 'outer;
}
}
if !dependencies.is_empty() {
let only_known_to_memory_tasks =
self.only_known_to_memory_tasks.pin();
for dep in dependencies.iter() {
let task = dep.get_task_id();
only_known_to_memory_tasks.remove(&task);
self.only_known_to_memory_tasks.remove(&task);
}
}
let data = TaskData {
Expand Down Expand Up @@ -857,12 +853,12 @@ impl<P: PersistedGraph> MemoryBackendWithPersistedGraph<P> {
// .map(|d| d.get_task_id())
// .collect::<HashSet<_>>();
// for dep in dependencies {
// if persist_queue1_queued.insert(dep) {
// if self.persist_queue1_queued.insert(dep) {
// self.persist_queue1.push(dep).unwrap();
// did_something = true;
// }
// }
if persist_queue1_queued.insert(task) {
if self.persist_queue1_queued.insert(task) {
self.persist_queue1.push(task).unwrap();
did_something = true;
}
Expand Down Expand Up @@ -1155,9 +1151,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
drop(state);

if let TaskType::Persistent(_) = task_info.task_type {
if has_changes
&& (is_persisted || !self.only_known_to_memory_tasks.pin().contains(&task))
{
if has_changes && (is_persisted || !self.only_known_to_memory_tasks.contains(&task)) {
for task in self.pg_make_dependent_dirty(RawVc::TaskOutput(task), turbo_tasks) {
let (mut state, _) = self.state_mut(task, turbo_tasks);
if !state.scheduled {
Expand All @@ -1169,7 +1163,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
}
}
if has_changes || is_dirty_persisted {
self.need_persisting.pin().insert(task);
self.need_persisting.insert(task);
self.persist_queue_by_duration[*task % self.persist_queue_by_duration.len()]
.lock()
.unwrap()
Expand Down Expand Up @@ -1519,7 +1513,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
drop(state);
}
if let TaskType::Persistent(_) = task_info.task_type {
if is_persisted || !self.only_known_to_memory_tasks.pin().contains(&task) {
if is_persisted || !self.only_known_to_memory_tasks.contains(&task) {
for task in self.pg_make_dependent_dirty(RawVc::TaskCell(task, index), turbo_tasks)
{
let (mut state, _) = self.state_mut(task, turbo_tasks);
Expand All @@ -1540,12 +1534,11 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
parent_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi,
) -> TaskId {
let cache = self.cache.pin();
if let Some(task) = cache.get(&task_type) {
if let Some(task) = self.cache.get(&task_type) {
self.connect(parent_task, *task, turbo_tasks);
return *task;
}
if let Some(task) = self.lookup(&cache, &task_type, turbo_tasks) {
if let Some(task) = self.lookup(&task_type, turbo_tasks) {
// a return value from lookup was already added to the cache by the id mapping
self.connect(parent_task, task, turbo_tasks);
return task;
Expand All @@ -1567,23 +1560,24 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
unsafe {
self.tasks.insert(*task, new_task);
}
match cache.try_insert(task_type, task) {
Ok(_) => {
self.only_known_to_memory_tasks.pin().insert(task);
#[cfg(feature = "log_scheduled_tasks")]
println!("schedule({task}) in get_or_create_persistent_task");
turbo_tasks.schedule(task);
self.connect_already_counted(parent_task, task, turbo_tasks);
task
}
Err(e) => {
match self.cache.entry(task_type) {
Entry::Occupied(e) => {
let existing_task = *e.into_ref();
// SAFETY: We are still the only owner of this task and id
unsafe {
self.tasks.remove(*task);
turbo_tasks.reuse_task_id(task);
}
let task = *e.current;
self.connect(parent_task, task, turbo_tasks);
self.connect(parent_task, existing_task, turbo_tasks);
existing_task
}
Entry::Vacant(e) => {
e.insert(task);
self.only_known_to_memory_tasks.insert(task);
#[cfg(feature = "log_scheduled_tasks")]
println!("schedule({task}) in get_or_create_persistent_task");
turbo_tasks.schedule(task);
self.connect_already_counted(parent_task, task, turbo_tasks);
task
}
}
Expand Down Expand Up @@ -1613,7 +1607,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
unsafe {
self.tasks.insert(*task, new_task);
}
self.only_known_to_memory_tasks.pin().insert(task);
self.only_known_to_memory_tasks.insert(task);
task
}
}
Expand All @@ -1625,7 +1619,7 @@ struct MemoryBackendPersistedGraphApi<'a, P: PersistedGraph> {

impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi<'a, P> {
fn get_or_create_task_type(&self, task_type: PersistentTaskType) -> TaskId {
let cache = self.backend.cache.pin();
let cache = &self.backend.cache;
// We could try a cache.get first to avoid insert and remove
// but it seems very unlikely that we actually already know the task type
let new_task = Task {
Expand All @@ -1638,13 +1632,17 @@ impl<'a, P: PersistedGraph> PersistedGraphApi for MemoryBackendPersistedGraphApi
unsafe {
self.backend.tasks.insert(*task, new_task);
}
match cache.try_insert(task_type, task) {
Ok(_) => task,
Err(e) => {
match cache.entry(task_type) {
Entry::Occupied(e) => {
let value = *e.into_ref();
unsafe {
self.turbo_tasks.reuse_task_id(task);
}
*e.current
value
}
Entry::Vacant(e) => {
e.insert(task);
task
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ hanging_detection = []
any_key = "0.1.1"
anyhow = "1.0.47"
bitflags = "1.3.2"
dashmap = "5.4.0"
erased-serde = "0.3.20"
event-listener = "2.5.3"
flurry = "0.4.0"
futures = "0.3.21"
indexmap = { workspace = true, features = ["serde"] }
mopa = "0.2.0"
Expand Down
Loading