Skip to content

Commit

Permalink
refactor hmr handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Aug 7, 2023
1 parent 18cd569 commit 6de13a1
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 154 deletions.
87 changes: 51 additions & 36 deletions packages/next-swc/crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{path::PathBuf, sync::Arc};

use anyhow::{Context, Result};
use napi::{bindgen_prelude::External, JsFunction, JsUnknown};
use napi::{bindgen_prelude::External, JsFunction};
use next_api::{
project::{Middleware, ProjectContainer, ProjectOptions},
route::{Endpoint, Route},
Expand All @@ -22,16 +22,19 @@ use turbopack_binding::{
trace_writer::{TraceWriter, TraceWriterGuard},
tracing_presets::TRACING_OVERVIEW_TARGETS,
},
core::{error::PrettyPrintError, version::Update},
core::{
error::PrettyPrintError,
version::{PartialUpdate, TotalUpdate, Update},
},
ecmascript_hmr_protocol::{ClientUpdateInstruction, ResourceIdentifier},
},
};

use super::{
endpoint::ExternalEndpoint,
utils::{
get_diagnostics, get_issues, serde_enum_to_string, stream_channel, subscribe,
NapiDiagnostic, NapiIssue, RootTask, TurbopackResult, VcArc,
get_diagnostics, get_issues, serde_enum_to_string, subscribe, NapiDiagnostic, NapiIssue,
RootTask, TurbopackResult, VcArc,
},
};
use crate::register;
Expand Down Expand Up @@ -344,11 +347,6 @@ pub fn project_entrypoints_subscribe(
)
}

#[napi(object)]
struct NapiUpdate {
pub update: JsUnknown,
}

#[napi(ts_return_type = "{ __napiType: \"RootTask\" }")]
pub fn project_hmr_events(
#[napi(ts_arg_type = "{ __napiType: \"Project\" }")] project: External<
Expand All @@ -359,51 +357,68 @@ pub fn project_hmr_events(
) -> napi::Result<External<RootTask>> {
let turbo_tasks = project.turbo_tasks().clone();
let project = **project;
stream_channel(
let session = TransientInstance::new(());
subscribe(
turbo_tasks.clone(),
func,
{
let identifier = identifier.clone();
move |sender| {
let session = session.clone();
move || {
let identifier = identifier.clone();
let session = session.clone();
async move {
project
let state = project
.project()
.hmr_events(identifier, TransientInstance::new(sender))
.await?;

Ok(())
.hmr_version_state(identifier.clone(), session);
let update = project.project().hmr_update(identifier, state);
let issues = get_issues(update).await?;
let diags = get_diagnostics(update).await?;
let update = update.strongly_consistent().await?;
match &*update {
Update::None => {}
Update::Total(TotalUpdate { to }) => {
state.set(to.clone()).await?;
}
Update::Partial(PartialUpdate { to, .. }) => {
state.set(to.clone()).await?;
}
}
Ok((update, issues, diags))
}
}
},
move |ctx| {
let item = ctx.value;
let (update, issues, diags) = ctx.value;

let napi_issues = issues
.iter()
.map(|issue| NapiIssue::from(&**issue))
.collect();
let update_issues = issues
.iter()
.map(|issue| (&**issue).into())
.collect::<Vec<_>>();

let issues: Vec<_> = item.issues.iter().map(|issue| (&**issue).into()).collect();
let identifier = ResourceIdentifier {
path: identifier.clone(),
headers: None,
};
let update = match &*item.update {
Update::Total(_) => ClientUpdateInstruction::restart(&identifier, &issues),
Update::Partial(update) => {
ClientUpdateInstruction::partial(&identifier, &update.instruction, &issues)
}
Update::None => ClientUpdateInstruction::issues(&identifier, &issues),
let update = match &*update {
Update::Total(_) => ClientUpdateInstruction::restart(&identifier, &update_issues),
Update::Partial(update) => ClientUpdateInstruction::partial(
&identifier,
&update.instruction,
&update_issues,
),
Update::None => ClientUpdateInstruction::issues(&identifier, &update_issues),
};

let result = TurbopackResult {
result: NapiUpdate {
update: ctx.env.to_js_value(&update)?,
},
issues: item
.issues
.iter()
.map(|issue| NapiIssue::from(&**issue))
.collect(),
diagnostics: vec![],
};
Ok(vec![result])
Ok(vec![TurbopackResult {
result: ctx.env.to_js_value(&update)?,
issues: napi_issues,
diagnostics: diags.iter().map(|d| NapiDiagnostic::from(d)).collect(),
}])
},
)
}
48 changes: 0 additions & 48 deletions packages/next-swc/crates/napi/src/next_api/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use anyhow::{anyhow, Context, Result};
use napi::{
bindgen_prelude::{External, ToNapiValue},
threadsafe_function::{ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode},
tokio::sync::mpsc::{channel, Sender},
JsFunction, JsObject, JsUnknown, NapiRaw, NapiValue, Status,
};
use serde::Serialize;
Expand Down Expand Up @@ -275,50 +274,3 @@ pub fn subscribe<T: 'static + Send + Sync, F: Future<Output = Result<T>> + Send,
task_id: Some(task_id),
}))
}

pub fn stream_channel<
T: 'static + Send + Sync,
F: Future<Output = Result<()>> + Send,
V: ToNapiValue,
>(
turbo_tasks: Arc<TurboTasks<MemoryBackend>>,
func: JsFunction,
handler: impl 'static + Sync + Send + Clone + Fn(Sender<Result<T>>) -> F,
mapper: impl 'static + Sync + Send + FnMut(ThreadSafeCallContext<T>) -> napi::Result<Vec<V>>,
) -> napi::Result<External<RootTask>> {
let func: ThreadsafeFunction<T> = func.create_threadsafe_function(0, mapper)?;
let task_id = turbo_tasks.spawn_root_task(move || {
let handler = handler.clone();
let func = func.clone();
Box::pin(async move {
let (sx, mut rx) = channel(32);

if let Err(err) = handler(sx).await {
call_with(&func, Err(err))?;
} else {
while let Some(value) = rx.recv().await {
call_with(&func, value)?;
}
}

Ok(unit().node)
})
});
Ok(External::new(RootTask {
turbo_tasks,
task_id: Some(task_id),
}))
}

fn call_with<T>(func: &ThreadsafeFunction<T>, res: Result<T>) -> Result<()> {
let status = func.call(
res.map_err(|err| napi::Error::from_reason(PrettyPrintError(&err).to_string())),
ThreadsafeFunctionCallMode::NonBlocking,
);
if !matches!(status, Status::Ok) {
let error = anyhow!("Error calling JS function: {}", status);
eprintln!("{}", error);
return Err(error);
}
Ok(())
}
Empty file.
109 changes: 39 additions & 70 deletions packages/next-swc/crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ use next_core::{
util::NextSourceConfig,
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Sender;
use turbo_tasks::{
debug::ValueDebugFormat, trace::TraceRawVcs, unit, Completion, IntoTraitRef, ReadRef, State,
TaskInput, TraitRef, TransientInstance, Vc,
debug::ValueDebugFormat, trace::TraceRawVcs, unit, Completion, IntoTraitRef, State, TaskInput,
TransientInstance, Vc,
};
use turbopack_binding::{
turbo::{
Expand All @@ -31,9 +30,8 @@ use turbopack_binding::{
compile_time_info::CompileTimeInfo,
diagnostics::DiagnosticExt,
environment::ServerAddr,
issue::{IssueFilePathExt, PlainIssue},
output::OutputAssets,
version::{PartialUpdate, TotalUpdate, Update, VersionState, VersionedContent},
version::{Update, Version, VersionState, VersionedContent},
PROJECT_FILESYSTEM_NAME,
},
dev::DevChunkingContext,
Expand Down Expand Up @@ -509,83 +507,54 @@ impl Project {
self.node_root(),
))
}
}

impl Project {
/// Emits opaque HMR events whenever a change is detected in the chunk group
/// internally known as `identifier`.
pub async fn hmr_events(
#[turbo_tasks::function]
async fn hmr_content(
self: Vc<Self>,
identifier: String,
sender: TransientInstance<Sender<Result<ReadRef<UpdateItem>>>>,
) -> Result<()> {
let content = self
) -> Result<Vc<Box<dyn VersionedContent>>> {
Ok(self
.await?
.versioned_content_map
.get(self.client_root().join(identifier));

let version = content.version();
let version_state = VersionState::new(version.into_trait_ref().await?).await?;

compute_update_stream(content, version_state, sender).await?;

Ok(())
.get(self.client_root().join(identifier)))
}
}

#[turbo_tasks::function]
async fn compute_update_stream(
content: Vc<Box<dyn VersionedContent>>,
from: Vc<VersionState>,
sender: TransientInstance<Sender<Result<ReadRef<UpdateItem>>>>,
) -> Result<Vc<()>> {
let item = get_update_item(content, from).strongly_consistent().await;

let send = if let Ok(item) = &item {
match &*item.update {
Update::Total(TotalUpdate { to: version, .. })
| Update::Partial(PartialUpdate { to: version, .. }) => {
from.set(TraitRef::clone(&version)).await?;
true
}
// Don't send no-op updates when there are no issues.
Update::None => !item.issues.is_empty(),
}
} else {
true
};
#[turbo_tasks::function]
async fn hmr_version(self: Vc<Self>, identifier: String) -> Result<Vc<Box<dyn Version>>> {
let content = self.hmr_content(identifier);

if send {
// Send update. Ignore channel closed error.
let _ = sender.send(item).await;
Ok(content.version())
}

Ok(unit())
}

#[turbo_tasks::function]
async fn get_update_item(
content: Vc<Box<dyn VersionedContent>>,
from: Vc<VersionState>,
) -> Result<Vc<UpdateItem>> {
let from = from.get();
let update = content.update(from);
/// Get the version state for a session. Initialized with the first seen
/// version in that session.
#[turbo_tasks::function]
pub async fn hmr_version_state(
self: Vc<Self>,
identifier: String,
session: TransientInstance<()>,
) -> Result<Vc<VersionState>> {
let version = self.hmr_version(identifier);

let issues = {
let captured = update.peek_issues_with_path().await?.await?;
captured.get_plain_issues().await?
};
// The session argument is important to avoid caching this function between
// sessions.
let _ = session;

Ok(UpdateItem {
update: update.await?,
issues,
// INVALIDATION: This is intentionally untracked to avoid invalidating this
// function completely. We want to initialize the VersionState with the
// first seen version of the session.
Ok(VersionState::new(version.into_trait_ref_untracked().await?).await?)
}
.cell())
}

#[derive(Debug)]
#[turbo_tasks::value(serialization = "none")]
pub struct UpdateItem {
pub update: ReadRef<Update>,
pub issues: Vec<ReadRef<PlainIssue>>,
/// Emits opaque HMR events whenever a change is detected in the chunk group
/// internally known as `identifier`.
#[turbo_tasks::function]
pub async fn hmr_update(
self: Vc<Self>,
identifier: String,
from: Vc<VersionState>,
) -> Result<Vc<Update>> {
let from = from.get();
Ok(self.hmr_content(identifier).update(from))
}
}

0 comments on commit 6de13a1

Please sign in to comment.