Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 1 addition & 36 deletions turbopack/crates/turbo-tasks-backend/benches/overhead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use criterion::{BenchmarkId, Criterion, black_box};
use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
use tokio::spawn;
use turbo_tasks::{TurboTasks, TurboTasksApi};
use turbo_tasks::TurboTasks;
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};

#[global_allocator]
Expand Down Expand Up @@ -79,15 +79,6 @@ pub fn overhead(c: &mut Criterion) {
run_turbo::<Uncached>(&rt, b, d, false);
},
);
// Same as turbo-uncached but reports the time as measured by turbotasks itself
// This allows us to understand the cost of the indirection within turbotasks
group.bench_with_input(
BenchmarkId::new("turbo-uncached-stats", micros),
&duration,
|b, &d| {
run_turbo_stats(&rt, b, d);
},
);

group.bench_with_input(
BenchmarkId::new("turbo-cached-same-keys", micros),
Expand Down Expand Up @@ -224,29 +215,3 @@ fn run_turbo<Mode: TurboMode>(
}
});
}

fn run_turbo_stats(rt: &tokio::runtime::Runtime, b: &mut criterion::Bencher<'_>, d: Duration) {
b.to_async(rt).iter_custom(|iters| {
// It is important to create the tt instance here to ensure the cache is not shared across
// iterations.
let tt = TurboTasks::new(TurboTasksBackend::new(
BackendOptions {
storage_mode: None,
..Default::default()
},
noop_backing_storage(),
));
let stats = tt.task_statistics().enable().clone();

async move {
tt.run_once(async move {
for i in 0..iters {
black_box(busy_turbo(i, black_box(d)).await?);
}
Ok(stats.get(&BUSY_TURBO_FUNCTION).duration)
})
.await
.unwrap()
}
});
}
16 changes: 0 additions & 16 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
self.task_statistics
.map(|stats| stats.increment_cache_miss(task_type.native_fn));
}

fn track_task_duration(&self, task_id: TaskId, duration: std::time::Duration) {
self.task_statistics.map(|stats| {
if let Some(task_type) = self.task_cache.lookup_reverse(&task_id) {
stats.increment_execution_duration(task_type.native_fn, duration);
}
});
}
}

pub(crate) struct OperationGuard<'a, B: BackingStorage> {
Expand Down Expand Up @@ -1751,8 +1743,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
fn task_execution_completed(
&self,
task_id: TaskId,
duration: Duration,
_memory_usage: usize,
result: Result<RawVc, TurboTasksExecutionError>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
Expand All @@ -1776,8 +1766,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let span = tracing::trace_span!("task execution completed", immutable = Empty).entered();
let mut ctx = self.execute_context(turbo_tasks);

self.track_task_duration(task_id, duration);

let Some(TaskExecutionCompletePrepareResult {
new_children,
mut removed_data,
Expand Down Expand Up @@ -3194,8 +3182,6 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
fn task_execution_completed(
&self,
task_id: TaskId,
_duration: Duration,
_memory_usage: usize,
result: Result<RawVc, TurboTasksExecutionError>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
Expand All @@ -3204,8 +3190,6 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
) -> bool {
self.0.task_execution_completed(
task_id,
_duration,
_memory_usage,
result,
cell_counters,
stateful,
Expand Down
3 changes: 0 additions & 3 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
hash::{BuildHasherDefault, Hash},
pin::Pin,
sync::Arc,
time::Duration,
};

use anyhow::{Result, anyhow};
Expand Down Expand Up @@ -541,8 +540,6 @@ pub trait Backend: Sync + Send {
fn task_execution_completed(
&self,
task: TaskId,
duration: Duration,
memory_usage: usize,
result: Result<RawVc, TurboTasksExecutionError>,
cell_counters: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
stateful: bool,
Expand Down
77 changes: 4 additions & 73 deletions turbopack/crates/turbo-tasks/src/capture_future.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,31 @@
use std::{
borrow::Cow,
cell::RefCell,
fmt::Display,
future::Future,
panic,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use anyhow::Result;
use pin_project_lite::pin_project;
use serde::{Deserialize, Serialize};
use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};

use crate::{backend::TurboTasksExecutionErrorMessage, panic_hooks::LAST_ERROR_LOCATION};

struct ThreadLocalData {
duration: Duration,
allocations: usize,
deallocations: usize,
}

thread_local! {
static EXTRA: RefCell<Option<*mut ThreadLocalData>> = const { RefCell::new(None) };
}

pin_project! {
pub struct CaptureFuture<T, F: Future<Output = T>> {
#[pin]
future: F,
duration: Duration,
allocations: AllocationInfo,
}
}

impl<T, F: Future<Output = T>> CaptureFuture<T, F> {
pub fn new(future: F) -> Self {
Self {
future,
duration: Duration::ZERO,
allocations: AllocationInfo::ZERO,
}
Self { future }
}
}

fn try_with_thread_local_data(f: impl FnOnce(&mut ThreadLocalData)) {
EXTRA.with_borrow(|cell| {
if let Some(data) = cell {
// Safety: This data is thread local and only accessed in this thread
unsafe {
f(&mut **data);
}
}
});
}

pub fn add_duration(duration: Duration) {
try_with_thread_local_data(|data| {
data.duration += duration;
});
}

pub fn add_allocation_info(alloc_info: AllocationInfo) {
try_with_thread_local_data(|data| {
data.allocations += alloc_info.allocations;
data.deallocations += alloc_info.deallocations;
});
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TurboTasksPanic {
pub message: TurboTasksExecutionErrorMessage,
Expand All @@ -93,21 +50,10 @@ impl Display for TurboTasksPanic {
}

impl<T, F: Future<Output = T>> Future for CaptureFuture<T, F> {
type Output = (Result<T, TurboTasksPanic>, Duration, AllocationInfo);
type Output = Result<T, TurboTasksPanic>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let start = Instant::now();
let start_allocations = TurboMalloc::allocation_counters();
let guard = ThreadLocalDataDropGuard;
let mut data = ThreadLocalData {
duration: Duration::ZERO,
allocations: 0,
deallocations: 0,
};
EXTRA.with_borrow_mut(|cell| {
*cell = Some(&mut data as *mut ThreadLocalData);
});

let result =
panic::catch_unwind(panic::AssertUnwindSafe(|| this.future.poll(cx))).map_err(|err| {
Expand All @@ -132,25 +78,10 @@ impl<T, F: Future<Output = T>> Future for CaptureFuture<T, F> {
})
});

drop(guard);
let elapsed = start.elapsed();
let allocations = start_allocations.until_now();
*this.duration += elapsed + data.duration;
*this.allocations += allocations;
match result {
Err(err) => Poll::Ready((Err(err), *this.duration, this.allocations.clone())),
Ok(Poll::Ready(r)) => Poll::Ready((Ok(r), *this.duration, this.allocations.clone())),
Err(err) => Poll::Ready(Err(err)),
Ok(Poll::Ready(r)) => Poll::Ready(Ok(r)),
Ok(Poll::Pending) => Poll::Pending,
}
}
}

struct ThreadLocalDataDropGuard;

impl Drop for ThreadLocalDataDropGuard {
fn drop(&mut self) {
EXTRA.with_borrow_mut(|cell| {
*cell = None;
});
}
}
8 changes: 3 additions & 5 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
.scope(
self.pin(),
CURRENT_TASK_STATE.scope(current_task_state, async {
let (result, _duration, _alloc_info) = CaptureFuture::new(future).await;
let result = CaptureFuture::new(future).await;

// wait for all spawned local tasks using `local` to finish
let ltt =
Expand Down Expand Up @@ -736,7 +736,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
};

async {
let (result, duration, alloc_info) = CaptureFuture::new(future).await;
let result = CaptureFuture::new(future).await;

// wait for all spawned local tasks using `local` to finish
let ltt = CURRENT_TASK_STATE
Expand All @@ -758,8 +758,6 @@ impl<B: Backend + 'static> TurboTasks<B> {
.with(|ts| ts.write().unwrap().cell_counters.take().unwrap());
this.backend.task_execution_completed(
task_id,
duration,
alloc_info.memory_usage(),
result,
&cell_counters,
stateful,
Expand Down Expand Up @@ -835,7 +833,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
let TaskExecutionSpec { future, span } =
crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
async move {
let (result, _duration, _memory_usage) = CaptureFuture::new(future).await;
let result = CaptureFuture::new(future).await;

let result = match result {
Ok(Ok(raw_vc)) => Ok(raw_vc),
Expand Down
27 changes: 8 additions & 19 deletions turbopack/crates/turbo-tasks/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,20 @@ use std::{
pin::Pin,
task::{Context, Poll},
thread,
time::{Duration, Instant},
};

use anyhow::Result;
use futures::{FutureExt, ready};
use tokio::runtime::Handle;
use tracing::{Instrument, Span, info_span};
use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};

use crate::{
TurboTasksPanic,
capture_future::{self, CaptureFuture},
manager::turbo_tasks_future_scope,
turbo_tasks, turbo_tasks_scope,
TurboTasksPanic, capture_future::CaptureFuture, manager::turbo_tasks_future_scope, turbo_tasks,
turbo_tasks_scope,
};

pub struct JoinHandle<T> {
join_handle: tokio::task::JoinHandle<(Result<T, TurboTasksPanic>, Duration, AllocationInfo)>,
join_handle: tokio::task::JoinHandle<Result<T, TurboTasksPanic>>,
}

impl<T: Send + 'static> JoinHandle<T> {
Expand All @@ -35,14 +31,10 @@ impl<T> Future for JoinHandle<T> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match ready!(this.join_handle.poll_unpin(cx)) {
Ok((res, duration, alloc_info)) => {
capture_future::add_duration(duration);
capture_future::add_allocation_info(alloc_info);
match res {
Ok(res) => Poll::Ready(res),
Err(e) => resume_unwind(e.into_panic()),
}
}
Ok(res) => match res {
Ok(res) => Poll::Ready(res),
Err(e) => resume_unwind(e.into_panic()),
},
Err(e) => resume_unwind(e.into_panic()),
}
}
Expand Down Expand Up @@ -71,10 +63,7 @@ pub fn spawn_blocking<T: Send + 'static>(
let span = Span::current();
let join_handle = tokio::task::spawn_blocking(|| {
let _guard = span.entered();
let start = Instant::now();
let start_allocations = TurboMalloc::allocation_counters();
let r = turbo_tasks_scope(turbo_tasks, func);
(Ok(r), start.elapsed(), start_allocations.until_now())
Ok(turbo_tasks_scope(turbo_tasks, func))
});
JoinHandle { join_handle }
}
Expand Down
15 changes: 0 additions & 15 deletions turbopack/crates/turbo-tasks/src/task_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@ impl TaskStatistics {
self.with_task_type_statistics(native_fn, |stats| stats.cache_miss += 1)
}

pub fn increment_execution_duration(
&self,
native_fn: &'static NativeFunction,
duration: std::time::Duration,
) {
self.with_task_type_statistics(native_fn, |stats| {
stats.executions += 1;
stats.duration += duration
})
}

fn with_task_type_statistics(
&self,
native_fn: &'static NativeFunction,
Expand All @@ -75,10 +64,6 @@ impl TaskStatistics {
pub struct TaskFunctionStatistics {
pub cache_hit: u32,
pub cache_miss: u32,
// Generally executions == cache_miss, however they can diverge when there are invalidations.
// The caller gets one cache miss but we might execute multiple times.
pub executions: u32,
pub duration: std::time::Duration,
}

impl Serialize for TaskStatistics {
Expand Down
Loading