Skip to content

Commit

Permalink
rework the package_watcher to expose package changes and use it in hash
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Feb 2, 2024
1 parent afa8e69 commit 58bea99
Show file tree
Hide file tree
Showing 9 changed files with 439 additions and 25 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/turborepo-filewatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ dashmap = { workspace = true }
futures = { version = "0.3.26" }
itertools = { workspace = true }
notify = "6.0.1"
serde = { version = "1.0.190", features = ["derive"] }
serde_json = "1.0.106"
thiserror = "1.0.38"
tokio = { workspace = true, features = ["full", "time"] }
tokio-stream = "0.1.14"
tracing = "0.1.37"
tracing-test = "0.2.4"
turbopath = { workspace = true }
Expand Down
181 changes: 181 additions & 0 deletions crates/turborepo-filewatch/src/broadcast_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::{collections::HashMap, hash::Hash};

use tokio::sync::broadcast::error::SendError;

#[derive(Clone)]
pub enum HashmapEvent<V> {
Insert(V),
Update(V),
Remove,
}

pub struct UpdatingHashMap<K, V>(
HashMap<K, V>,
tokio::sync::broadcast::Sender<(K, HashmapEvent<V>)>,
);

impl<K, V> UpdatingHashMap<K, V>
where
K: Eq + PartialEq + Hash + Clone,
V: Clone + PartialEq,
{
pub fn new() -> Self {

Check warning on line 22 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, self-hosted, linux, x64, metal)

associated function `new` is never used

Check warning on line 22 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

associated function `new` is never used

Check warning on line 22 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

associated function `new` is never used

Check warning on line 22 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (ubuntu-latest)

associated function `new` is never used

Check warning on line 22 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (macos-latest)

associated function `new` is never used

Check warning on line 22 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (windows-latest)

associated function `new` is never used
HashMap::new().into()
}

pub fn insert(&mut self, key: K, value: V) -> Option<SendError<(K, HashmapEvent<V>)>> {
if let Some(e) = self.0.get(&key) {
if e == &value {
None
} else {
self.1
.send((key.clone(), HashmapEvent::Update(value.clone())))
.err()
}
} else {
self.0.insert(key.clone(), value.clone());
self.1.send((key, HashmapEvent::Insert(value))).err()
}
}

pub fn remove(&mut self, key: K) -> Option<SendError<(K, HashmapEvent<V>)>> {
if let Some(_) = self.0.remove(&key) {
self.1.send((key, HashmapEvent::Remove)).err()
} else {
None
}
}

/// replace the entire map with a new one, emitting events for changes that
/// actually have a difference
pub fn replace(&mut self, new: HashMap<K, V>) -> Option<SendError<(K, HashmapEvent<V>)>> {
// check items that were removed and emit
for (k, _) in self.0.iter() {
if !new.contains_key(k) {
self.1.send((k.clone(), HashmapEvent::Remove)).err()?;
}
}

// check items that were inserted or changed and emit
for (k, v) in new.iter() {
if !self.0.contains_key(k) {
self.1
.send((k.clone(), HashmapEvent::Insert(v.clone())))
.err();
}
if self.0.get(k) != Some(v) {
self.1
.send((k.clone(), HashmapEvent::Update(v.clone())))
.err();
}
}

self.0 = new;
None
}

pub fn as_inner(&self) -> &HashMap<K, V> {
&self.0
}

pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<(K, HashmapEvent<V>)> {
self.1.subscribe()
}
}

impl<K, V> From<HashMap<K, V>> for UpdatingHashMap<K, V>
where
K: Eq + PartialEq + Hash + Clone,
V: Clone,
{
fn from(map: HashMap<K, V>) -> Self {
let (tx, _) = tokio::sync::broadcast::channel(100);
let mut updating_map = Self(map, tx);

Check warning on line 93 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, self-hosted, linux, x64, metal)

variable does not need to be mutable

Check warning on line 93 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

variable does not need to be mutable

Check warning on line 93 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

variable does not need to be mutable

Check warning on line 93 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (ubuntu-latest)

variable does not need to be mutable

Check warning on line 93 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (macos-latest)

variable does not need to be mutable

Check warning on line 93 in crates/turborepo-filewatch/src/broadcast_map.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (windows-latest)

variable does not need to be mutable
for (k, v) in updating_map.0.iter() {
updating_map
.1
.send((k.clone(), HashmapEvent::Insert(v.clone())))
.ok();
}
updating_map
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use tokio::{runtime::Runtime, sync::broadcast};

use super::*;

fn run_test<T>(test: T) -> ()
where
T: FnOnce() -> () + Send + 'static,
{
let rt = Runtime::new().unwrap();
rt.block_on(async { test() });
}

#[test]
fn test_insert() {
run_test(|| {
let mut map: UpdatingHashMap<String, i32> = UpdatingHashMap::new();
let mut receiver = map.subscribe();

assert!(map.insert("a".to_string(), 1).is_none());

if let Ok((key, event)) = receiver.try_recv() {
assert_eq!(key, "a");
match event {
HashmapEvent::Insert(value) => assert_eq!(value, 1),
_ => panic!("Expected Insert event"),
}
} else {
panic!("Expected an event");
}
});
}

#[test]
fn test_update() {
run_test(|| {
let mut map: UpdatingHashMap<String, i32> = UpdatingHashMap::new();
let _ = map.insert("a".to_string(), 1);
let mut receiver = map.subscribe();

assert!(map.insert("a".to_string(), 2).is_none());

if let Ok((key, event)) = receiver.try_recv() {
assert_eq!(key, "a");
match event {
HashmapEvent::Update(value) => assert_eq!(value, 2),
_ => panic!("Expected Update event"),
}
} else {
panic!("Expected an event");
}
});
}

#[test]
fn test_remove() {
run_test(|| {
let mut map: UpdatingHashMap<String, i32> = UpdatingHashMap::new();
let _ = map.insert("a".to_string(), 1);
let mut receiver = map.subscribe();

assert!(map.remove("a".to_string()).is_none());

if let Ok((key, event)) = receiver.try_recv() {
assert_eq!(key, "a");
match event {
HashmapEvent::Remove => (),
_ => panic!("Expected Remove event"),
}
} else {
panic!("Expected an event");
}
});
}
}
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
walkdir::WalkDir,
};

mod broadcast_map;
pub mod cookie_jar;
#[cfg(target_os = "macos")]
mod fsevent;
Expand Down

0 comments on commit 58bea99

Please sign in to comment.