Skip to content

Commit

Permalink
refactor: make package discovery use interior mutability
Browse files Browse the repository at this point in the history
Most uses actually do not need mutability anyways, and those that do
can often mitigate it quite trivially.
  • Loading branch information
arlyon committed Feb 8, 2024
1 parent 3b368bf commit d0140a8
Show file tree
Hide file tree
Showing 14 changed files with 79 additions and 55 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/turborepo-filewatch/src/package_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<T: PackageDiscovery + Send + 'static> Subscriber<T> {
exit_rx: oneshot::Receiver<()>,
repo_root: AbsoluteSystemPathBuf,
recv: broadcast::Receiver<Result<Event, NotifyError>>,
mut discovery: T,
discovery: T,
) -> Result<Self, Error> {
let initial_discovery = discovery.discover_packages().await?;

Expand Down Expand Up @@ -356,7 +356,7 @@ mod test {
}

impl super::PackageDiscovery for MockDiscovery {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, discovery::Error> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, discovery::Error> {
Ok(DiscoveryResponse {
package_manager: self.manager,
workspaces: self.package_data.lock().unwrap().clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ mod test {
struct MockDiscovery;
impl PackageDiscovery for MockDiscovery {
async fn discover_packages(
&mut self,
&self,
) -> Result<
turborepo_repository::discovery::DiscoveryResponse,
turborepo_repository::discovery::Error,
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/engine/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ mod test {
struct MockDiscovery;
impl PackageDiscovery for MockDiscovery {
async fn discover_packages(
&mut self,
&self,
) -> Result<
turborepo_repository::discovery::DiscoveryResponse,
turborepo_repository::discovery::Error,
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ mod test {

impl<'a> PackageDiscovery for DummyDiscovery<'a> {
async fn discover_packages(
&mut self,
&self,
) -> Result<
turborepo_repository::discovery::DiscoveryResponse,
turborepo_repository::discovery::Error,
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Run {
)
};
let fallback_discovery = FallbackPackageDiscovery::new(
daemon.as_mut().map(DaemonPackageDiscovery::new),
daemon.clone().map(DaemonPackageDiscovery::new),
fallback,
duration,
);
Expand Down
18 changes: 10 additions & 8 deletions crates/turborepo-lib/src/run/package_discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@ use turborepo_repository::discovery::{DiscoveryResponse, Error, PackageDiscovery
use crate::daemon::{proto::PackageManager, DaemonClient};

#[derive(Debug)]
pub struct DaemonPackageDiscovery<'a, C: Clone> {
daemon: &'a mut DaemonClient<C>,
pub struct DaemonPackageDiscovery<C> {
daemon: DaemonClient<C>,
}

impl<'a, C: Clone> DaemonPackageDiscovery<'a, C> {
pub fn new(daemon: &'a mut DaemonClient<C>) -> Self {
impl<C> DaemonPackageDiscovery<C> {
pub fn new(daemon: DaemonClient<C>) -> Self {
Self { daemon }
}
}

impl<'a, C: Clone + Send> PackageDiscovery for DaemonPackageDiscovery<'a, C> {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
impl<C: Clone + Send + Sync> PackageDiscovery for DaemonPackageDiscovery<C> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using daemon");

let response = self
.daemon
// clone here so we can make concurrent requests
let mut daemon = self.daemon.clone();

let response = daemon
.discover_packages()
.await
.map_err(|e| Error::Failed(Box::new(e)))?;
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/scope/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ mod test {
struct MockDiscovery;
impl PackageDiscovery for MockDiscovery {
async fn discover_packages(
&mut self,
&self,
) -> Result<
turborepo_repository::discovery::DiscoveryResponse,
turborepo_repository::discovery::Error,
Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-lsp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl Backend {
}

pub async fn package_discovery(&self) -> Result<DiscoveryResponse, discovery::Error> {
let mut daemon = {
let daemon = {
let mut daemon = self.daemon.clone();
let daemon = daemon.wait_for(|d| d.is_some()).await;
let daemon = daemon.as_ref().expect("only fails if self is dropped");
Expand All @@ -594,7 +594,7 @@ impl Backend {
.clone()
};

DaemonPackageDiscovery::new(&mut daemon)
DaemonPackageDiscovery::new(daemon)
.discover_packages()
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-repository/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ workspace = true

[dependencies]
anyhow = { workspace = true }
async-once-cell = "0.5.3"
globwalk = { version = "0.1.0", path = "../turborepo-globwalk" }
itertools = { workspace = true }
lazy-regex = "2.5.0"
Expand Down
81 changes: 47 additions & 34 deletions crates/turborepo-repository/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
//! these strategies will implement some sort of monad-style composition so that
//! we can track areas of run that are performing sub-optimally.

use std::sync::Arc;

use tokio_stream::{iter, StreamExt};
use turbopath::AbsoluteSystemPathBuf;

Expand Down Expand Up @@ -41,7 +43,7 @@ pub enum Error {
pub trait PackageDiscovery {
// desugar to assert that the future is Send
fn discover_packages(
&mut self,
&self,
) -> impl std::future::Future<Output = Result<DiscoveryResponse, Error>> + Send;
}

Expand All @@ -57,8 +59,8 @@ pub trait PackageDiscoveryBuilder {
fn build(self) -> Result<Self::Output, Self::Error>;
}

impl<T: PackageDiscovery + Send> PackageDiscovery for Option<T> {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
impl<T: PackageDiscovery + Send + Sync> PackageDiscovery for Option<T> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using optional strategy");

match self {
Expand All @@ -71,6 +73,12 @@ impl<T: PackageDiscovery + Send> PackageDiscovery for Option<T> {
}
}

impl<T: PackageDiscovery + Send + Sync> PackageDiscovery for Arc<T> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
self.as_ref().discover_packages().await
}
}

pub struct LocalPackageDiscovery {
repo_root: AbsoluteSystemPathBuf,
package_manager: PackageManager,
Expand Down Expand Up @@ -125,7 +133,7 @@ impl PackageDiscoveryBuilder for LocalPackageDiscoveryBuilder {
}

impl PackageDiscovery for LocalPackageDiscovery {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using local strategy");

let package_paths = match self.package_manager.get_package_jsons(&self.repo_root) {
Expand Down Expand Up @@ -192,10 +200,10 @@ impl<T: PackageDiscovery> PackageDiscoveryBuilder for T {
}
}

impl<A: PackageDiscovery + Send, B: PackageDiscovery + Send> PackageDiscovery
impl<A: PackageDiscovery + Send + Sync, B: PackageDiscovery + Send + Sync> PackageDiscovery
for FallbackPackageDiscovery<A, B>
{
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using fallback strategy");

tracing::debug!("attempting primary strategy");
Expand All @@ -220,65 +228,66 @@ impl<A: PackageDiscovery + Send, B: PackageDiscovery + Send> PackageDiscovery

pub struct CachingPackageDiscovery<P: PackageDiscovery> {
primary: P,
data: Option<DiscoveryResponse>,
data: async_once_cell::OnceCell<DiscoveryResponse>,
}

impl<P: PackageDiscovery> CachingPackageDiscovery<P> {
pub fn new(primary: P) -> Self {
Self {
primary,
data: None,
data: Default::default(),
}
}
}

impl<P: PackageDiscovery + Send> PackageDiscovery for CachingPackageDiscovery<P> {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
impl<P: PackageDiscovery + Send + Sync> PackageDiscovery for CachingPackageDiscovery<P> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using caching strategy");
match self.data.clone() {
Some(data) => Ok(data),
None => {
tracing::debug!("no cached data, running primary strategy");
let data = self.primary.discover_packages().await?;
self.data = Some(data.clone());
Ok(data)
}
}
self.data
.get_or_try_init(async {
tracing::debug!("discovering packages using primary strategy");
self.primary.discover_packages().await
})
.await
.map(ToOwned::to_owned)
}
}

#[cfg(test)]
mod fallback_tests {
use std::time::Duration;
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};

use tokio::runtime::Runtime;

use super::*;

struct MockDiscovery {
should_fail: bool,
calls: usize,
calls: AtomicUsize,
}

impl MockDiscovery {
fn new(should_fail: bool) -> Self {
Self {
should_fail,
calls: 0,
calls: Default::default(),
}
}
}

impl PackageDiscovery for MockDiscovery {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
if self.should_fail {
Err(Error::Failed(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"mock error",
))))
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
self.calls += 1;
self.calls.fetch_add(1, Ordering::SeqCst);
// Simulate successful package discovery
Ok(DiscoveryResponse {
package_manager: PackageManager::Npm,
Expand All @@ -305,8 +314,8 @@ mod fallback_tests {
assert!(result.is_ok());

// Assert that the fallback was used
assert_eq!(discovery.primary.calls, 0);
assert_eq!(discovery.fallback.calls, 1);
assert_eq!(*discovery.primary.calls.get_mut(), 0);
assert_eq!(*discovery.fallback.calls.get_mut(), 1);
});
}

Expand All @@ -327,25 +336,27 @@ mod fallback_tests {
assert!(result.is_ok());

// Assert that the fallback was used
assert_eq!(discovery.primary.calls, 0);
assert_eq!(discovery.fallback.calls, 1);
assert_eq!(*discovery.primary.calls.get_mut(), 0);
assert_eq!(*discovery.fallback.calls.get_mut(), 1);
});
}
}

#[cfg(test)]
mod caching_tests {
use std::sync::atomic::{AtomicUsize, Ordering};

use tokio::runtime::Runtime;

use super::*;

struct MockPackageDiscovery {
call_count: usize,
call_count: AtomicUsize,
}

impl PackageDiscovery for MockPackageDiscovery {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
self.call_count += 1;
async fn discover_packages(&self) -> Result<DiscoveryResponse, Error> {
self.call_count.fetch_add(1, Ordering::SeqCst);
// Simulate successful package discovery
Ok(DiscoveryResponse {
package_manager: PackageManager::Npm,
Expand All @@ -358,16 +369,18 @@ mod caching_tests {
fn test_caching_package_discovery() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let primary = MockPackageDiscovery { call_count: 0 };
let primary = MockPackageDiscovery {
call_count: Default::default(),
};
let mut discovery = CachingPackageDiscovery::new(primary);

// First call should use primary discovery
let _first_result = discovery.discover_packages().await.unwrap();
assert_eq!(discovery.primary.call_count, 1);
assert_eq!(*discovery.primary.call_count.get_mut(), 1);

// Second call should use cached data and not increase call count
let _second_result = discovery.discover_packages().await.unwrap();
assert_eq!(discovery.primary.call_count, 1);
assert_eq!(*discovery.primary.call_count.get_mut(), 1);
});
}
}
6 changes: 3 additions & 3 deletions crates/turborepo-repository/src/package_graph/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<'a, P> PackageGraphBuilder<'a, P> {
impl<'a, T> PackageGraphBuilder<'a, T>
where
T: PackageDiscoveryBuilder,
T::Output: Send,
T::Output: Send + Sync,
T::Error: Into<crate::package_manager::Error>,
{
/// Build the `PackageGraph`.
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<'a, T: PackageDiscovery> BuildState<'a, ResolvedPackageManager, T> {
workspace_graph,
node_lookup,
lockfile,
mut package_discovery,
package_discovery,
..
} = self;

Expand Down Expand Up @@ -767,7 +767,7 @@ mod test {
struct MockDiscovery;
impl PackageDiscovery for MockDiscovery {
async fn discover_packages(
&mut self,
&self,
) -> Result<crate::discovery::DiscoveryResponse, crate::discovery::Error> {
Ok(crate::discovery::DiscoveryResponse {
package_manager: crate::package_manager::PackageManager::Npm,
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-repository/src/package_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ mod test {
struct MockDiscovery;
impl PackageDiscovery for MockDiscovery {
async fn discover_packages(
&mut self,
&self,
) -> Result<crate::discovery::DiscoveryResponse, crate::discovery::Error> {
Ok(crate::discovery::DiscoveryResponse {
package_manager: PackageManager::Npm,
Expand Down

0 comments on commit d0140a8

Please sign in to comment.