Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Turborepo): Enable rust daemon #5964

Merged
merged 14 commits into from
Sep 27, 2023
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"preLaunchTask": "prepare turbo",
"program": "${workspaceRoot}/target/debug/turbo",
"args": ["daemon"],
"args": ["--skip-infer", "daemon"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for debugging, and it ensures that we run the daemon that we're intending to debug, rather than whatever the repo depends on at the time.

"cwd": "${workspaceRoot}"
}
]
Expand Down
9 changes: 5 additions & 4 deletions cli/internal/daemon/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ func (c *Connector) getOrStartDaemon() (int, error) {
lockFile := c.lockFile()
daemonProcess, getDaemonProcessErr := lockFile.GetOwner()
if getDaemonProcessErr != nil {
// If we're in a clean state this isn't an "error" per se.
// We attempt to start a daemon.
if errors.Is(getDaemonProcessErr, fs.ErrNotExist) {
// We expect the daemon to write the pid file, so a non-existent or stale
// pid file is fine. The daemon will write its own, after verifying that it
// doesn't exist or is stale.
if errors.Is(getDaemonProcessErr, fs.ErrNotExist) || errors.Is(getDaemonProcessErr, lockfile.ErrDeadOwner) {
if c.Opts.DontStart {
return 0, ErrDaemonNotRunning
}
Expand Down Expand Up @@ -339,7 +340,7 @@ func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodC
case codes.FailedPrecondition:
return ErrVersionMismatch
case codes.Unavailable:
return errConnectionFailure
return errUnavailable
default:
return err
}
Expand Down
21 changes: 18 additions & 3 deletions cli/internal/daemonclient/daemonclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package daemonclient

import (
"context"
"path/filepath"

"github.com/vercel/turbo/cli/internal/daemon/connector"
"github.com/vercel/turbo/cli/internal/fs"
Expand Down Expand Up @@ -33,9 +34,14 @@ func New(client *connector.Client) *DaemonClient {

// GetChangedOutputs implements runcache.OutputWatcher.GetChangedOutputs
func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoRelativeOutputGlobs []string) ([]string, int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the output globs being passed in using a \ path separator on Windows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some parts of the glob have \ separators, I think from filepaths that we concat with the globs.

// The daemon expects globs to be unix paths
var outputGlobs []string
for _, outputGlob := range repoRelativeOutputGlobs {
outputGlobs = append(outputGlobs, filepath.ToSlash(outputGlob))
}
resp, err := d.client.GetChangedOutputs(ctx, &turbodprotocol.GetChangedOutputsRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs,
OutputGlobs: outputGlobs,
})
if err != nil {
return nil, 0, err
Expand All @@ -45,10 +51,19 @@ func (d *DaemonClient) GetChangedOutputs(ctx context.Context, hash string, repoR

// NotifyOutputsWritten implements runcache.OutputWatcher.NotifyOutputsWritten
func (d *DaemonClient) NotifyOutputsWritten(ctx context.Context, hash string, repoRelativeOutputGlobs fs.TaskOutputs, timeSaved int) error {
// The daemon expects globs to be unix paths
var inclusions []string
var exclusions []string
for _, inclusion := range repoRelativeOutputGlobs.Inclusions {
inclusions = append(inclusions, filepath.ToSlash(inclusion))
}
for _, exclusion := range repoRelativeOutputGlobs.Exclusions {
exclusions = append(exclusions, filepath.ToSlash(exclusion))
}
_, err := d.client.NotifyOutputsWritten(ctx, &turbodprotocol.NotifyOutputsWrittenRequest{
Hash: hash,
OutputGlobs: repoRelativeOutputGlobs.Inclusions,
OutputExclusionGlobs: repoRelativeOutputGlobs.Exclusions,
OutputGlobs: inclusions,
OutputExclusionGlobs: exclusions,
TimeSaved: uint64(timeSaved),
})
return err
Expand Down
4 changes: 3 additions & 1 deletion crates/turborepo-filewatch/src/cookie_jar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::{broadcast, oneshot},
time::error::Elapsed,
};
use tracing::debug;
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::NotifyError;
Expand Down Expand Up @@ -101,6 +101,7 @@ impl CookieJar {
opts.truncate(true).create(true).write(true);
{
// dropping the resulting file closes the handle
trace!("writing cookie {}", cookie_path);
_ = cookie_path.open_with_options(opts)?;
}
// ??? -> timeout, recv failure, actual cookie failure
Expand Down Expand Up @@ -129,6 +130,7 @@ async fn watch_cookies(
.try_into()
.expect("Non-absolute path from filewatching");
if root.relation_to_path(abs_path) == PathRelation::Parent {
trace!("saw cookie: {}", abs_path);
if let Some(responder) = watches.cookies.remove(&path) {
if responder.send(Ok(())).is_err() {
// Note that cookie waiters will time out if they don't get a
Expand Down
42 changes: 35 additions & 7 deletions crates/turborepo-filewatch/src/globwatcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{HashMap, HashSet},
fmt::Display,
future::IntoFuture,
str::FromStr,
};
Expand All @@ -24,26 +25,53 @@ pub struct GlobSet {
exclude: Any<'static>,
}

#[derive(Debug, Error)]
pub struct GlobError {
// Boxed to minimize error size
underlying: Box<wax::BuildError>,
raw_glob: String,
}

impl Display for GlobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.underlying, self.raw_glob)
}
}

fn compile_glob(raw: &str) -> Result<Glob<'static>, GlobError> {
Glob::from_str(raw)
.map(|g| g.to_owned())
.map_err(|e| GlobError {
underlying: Box::new(e),
raw_glob: raw.to_owned(),
})
}

impl GlobSet {
pub fn from_raw(
raw_includes: Vec<String>,
raw_excludes: Vec<String>,
) -> Result<Self, wax::BuildError> {
) -> Result<Self, GlobError> {
let include = raw_includes
.into_iter()
.map(|raw_glob| {
let glob = Glob::from_str(&raw_glob)?.to_owned();
let glob = compile_glob(&raw_glob)?;
Ok((raw_glob, glob))
})
.collect::<Result<HashMap<_, _>, wax::BuildError>>()?;
.collect::<Result<HashMap<_, _>, GlobError>>()?;
let excludes = raw_excludes
.into_iter()
.iter()
.map(|raw_glob| {
let glob = Glob::from_str(&raw_glob)?.to_owned();
let glob = compile_glob(raw_glob)?;
Ok(glob)
})
.collect::<Result<Vec<_>, wax::BuildError>>()?;
let exclude = wax::any(excludes)?.to_owned();
.collect::<Result<Vec<_>, GlobError>>()?;
let exclude = wax::any(excludes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the doc comment of wax::any, since we're passing Globs I think we'll only fail if the combined excluded globs end up being 10 MB. If we end up hitting that, we probably shouldn't print out that much data to the terminal and just say that the combined excludes are too large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went to switch this to something like "globs elided, combined excludes are too large", but decided to back out the change. I think in the (hopefully extremely rare) case when this happens, we still want to know what those globs are, as a pointer to where they came from, vs not having any idea.

If in the future we have some provenance information, like "the globs from line 5 of turbo.json are too big", I think it would be better to just do that, but while we're lacking that information, I think explicit is better here so that a user / we have a chance at tracking down where the globs came from.

.map_err(|e| GlobError {
underlying: Box::new(e),
raw_glob: format!("{{{}}}", raw_excludes.join(",")),
})?
.to_owned();
Ok(Self { include, exclude })
}
}
Expand Down
7 changes: 6 additions & 1 deletion crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use notify::{Config, RecommendedWatcher};
use notify::{Event, EventHandler, RecursiveMode, Watcher};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tracing::warn;
use tracing::{debug, warn};
// windows -> no recursive watch, watch ancestors
// linux -> recursive watch, watch ancestors
#[cfg(feature = "watch_ancestors")]
Expand All @@ -33,6 +33,7 @@ use {
ErrorKind,
},
std::io,
tracing::trace,
walkdir::WalkDir,
};

Expand Down Expand Up @@ -93,9 +94,11 @@ impl FileSystemWatcher {
let (send_file_events, mut recv_file_events) = mpsc::channel(1024);
let watch_root = root.to_owned();
let broadcast_sender = sender.clone();
debug!("starting filewatcher");
let watcher = run_watcher(&watch_root, send_file_events)?;
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
// Ensure we are ready to receive new events, not events for existing state
debug!("waiting for initial filesystem cookie");
wait_for_cookie(root, &mut recv_file_events).await?;
tokio::task::spawn(watch_events(
watcher,
Expand All @@ -104,6 +107,7 @@ impl FileSystemWatcher {
exit_signal,
broadcast_sender,
));
debug!("filewatching ready");
Ok(Self {
sender,
_exit_ch: exit_ch,
Expand Down Expand Up @@ -273,6 +277,7 @@ fn manually_add_recursive_watches(
for dir in WalkDir::new(root).follow_links(false).into_iter() {
let dir = dir?;
if dir.file_type().is_dir() {
trace!("manually watching {}", dir.path().display());
match watcher.watch(dir.path(), RecursiveMode::NonRecursive) {
Ok(()) => {}
// If we try to watch a non-existent path, we can just skip
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MPL-2.0"
[features]
# Allows configuring a specific tls backend for reqwest.
# See top level Cargo.toml for more details.
default = ["rustls-tls", "go-daemon"]
default = ["rustls-tls"]
native-tls = ["turborepo-api-client/native-tls", "turbo-updater/native-tls"]
rustls-tls = ["turborepo-api-client/rustls-tls", "turbo-updater/rustls-tls"]
run-stub = []
Expand Down
25 changes: 20 additions & 5 deletions crates/turborepo-lib/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Resul
let client = connector.connect().await?;
client.restart().await?;
}
// connector.connect will have already started the daemon if needed,
// so this is a no-op
DaemonCommand::Start => {}
DaemonCommand::Start => {
// We don't care about the client, but we do care that we can connect
// which ensures that daemon is started if it wasn't already.
let _ = connector.connect().await?;
println!("Daemon is running");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

}
DaemonCommand::Stop => {
let client = connector.connect().await?;
client.stop().await?;
Expand Down Expand Up @@ -171,8 +174,20 @@ pub async fn daemon_server(
}
CloseReason::Interrupt
});
// TODO: be more methodical about this choice:
let cookie_dir = base.repo_root.join_component(".git");
// We already store logs in .turbo and recommend it be gitignore'd.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this, but we should note that we only add .turbo to people's gitignore when they run turbo link, so if someone adds turbo and it starts the daemon, then they might end up accidentally committing this cookie dir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Watchman uses .git, but we can't guarantee that git is present _or_
// that the turbo root is the same as the git root.
let cookie_dir = base.repo_root.join_components(&[".turbo", "cookies"]);
// We need to ensure that the cookie directory is cleared out first so
// that we can start over with cookies.
if cookie_dir.exists() {
cookie_dir
.remove_dir_all()
.map_err(|e| DaemonError::CookieDir(e, cookie_dir.clone()))?;
}
cookie_dir
.create_dir_all()
.map_err(|e| DaemonError::CookieDir(e, cookie_dir.clone()))?;
let reason = crate::daemon::serve(
&base.repo_root,
cookie_dir,
Expand Down
6 changes: 6 additions & 0 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::io;

use thiserror::Error;
use tonic::{Code, Status};
use tracing::info;
use turbopath::AbsoluteSystemPathBuf;

use self::proto::turbod_client::TurbodClient;
use super::{
Expand Down Expand Up @@ -160,6 +163,9 @@ pub enum DaemonError {

#[error("unable to complete daemon clean")]
CleanFailed,

#[error("failed to setup cookie dir {1}: {0}")]
CookieDir(io::Error, AbsoluteSystemPathBuf),
}

impl From<Status> for DaemonError {
Expand Down
15 changes: 11 additions & 4 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use tokio::{
};
use tonic::transport::{NamedService, Server};
use tower::ServiceBuilder;
use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turborepo_filewatch::{
cookie_jar::CookieJar,
globwatcher::{Error as GlobWatcherError, GlobSet, GlobWatcher},
globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher},
FileSystemWatcher, WatchError,
};

Expand Down Expand Up @@ -68,7 +68,7 @@ enum RpcError {
#[error("deadline exceeded")]
DeadlineExceeded,
#[error("invalid glob: {0}")]
InvalidGlob(#[from] wax::BuildError),
InvalidGlob(#[from] GlobError),
#[error("globwatching failed: {0}")]
GlobWatching(#[from] GlobWatcherError),
#[error("filewatching unavailable")]
Expand Down Expand Up @@ -105,6 +105,9 @@ async fn start_filewatching(
Ok(())
}

/// Timeout for every RPC the server handles
const REQUEST_TIMEOUT: Duration = Duration::from_millis(100);

/// run a gRPC server providing the Turbod interface. external_shutdown
/// can be used to deliver a signal to shutdown the server. This is expected
/// to be wired to signal handling.
Expand Down Expand Up @@ -144,6 +147,7 @@ where
error!("filewatching failed to start: {}", e);
let _ = fw_shutdown.send(()).await;
}
info!("filewatching started");
});
// exit_root_watch delivers a signal to the root watch loop to exit.
// In the event that the server shuts down via some other mechanism, this
Expand Down Expand Up @@ -187,13 +191,15 @@ where
));

Server::builder()
// set a max timeout for RPCs
.timeout(REQUEST_TIMEOUT)
.add_service(service)
.serve_with_incoming_shutdown(stream, shutdown_fut)
};
// Wait for the server to exit.
// This can be triggered by timeout, root watcher, or an RPC
let _ = server_fut.await;
trace!("gRPC server exited");
info!("gRPC server exited");
// Ensure our timer will exit
running.store(false, Ordering::SeqCst);
// We expect to have a signal from the grpc server on what triggered the exit
Expand Down Expand Up @@ -223,6 +229,7 @@ struct TurboGrpcService {

impl TurboGrpcService {
async fn trigger_shutdown(&self) {
info!("triggering shutdown");
let _ = self.shutdown.send(()).await;
}

Expand Down
Loading
Loading