Skip to content

Commit b78ef73

Browse files
author
Chris Tilt
committed
Apply some formatting and spelling corrections
1 parent 9a24135 commit b78ef73

File tree

2 files changed

+92
-75
lines changed

2 files changed

+92
-75
lines changed

common/src/resolver.rs

Lines changed: 57 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
//! ```
1717
1818
use anyhow::{bail, Context, Result};
19-
use serde::{Serialize, Deserialize};
2019
use bytes::Bytes;
2120
use dashmap::DashMap;
2221
use memmap2::Mmap;
22+
use serde::{Deserialize, Serialize};
2323
use std::{fs::File, ops::Range, sync::Arc};
2424

2525
/***
@@ -178,24 +178,17 @@ mod tests {
178178
// use std::{sync::Arc, thread};
179179

180180
fn unique_path() -> std::path::PathBuf {
181-
let nanos = SystemTime::now()
182-
.duration_since(UNIX_EPOCH)
183-
.unwrap()
184-
.as_nanos();
181+
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
185182
std::env::temp_dir().join(format!("acropolis_resolver_{nanos}.bin"))
186183
}
187184

188185
fn create_file_with(bytes: &[u8]) -> Result<File> {
189186
let path = unique_path();
190-
let mut f = OpenOptions::new()
191-
.read(true)
192-
.write(true)
193-
.create(true)
194-
.truncate(true)
195-
.open(&path)?;
187+
let mut f =
188+
OpenOptions::new().read(true).write(true).create(true).truncate(true).open(&path)?;
196189
f.write_all(bytes)?;
197190
f.sync_all()?; // ensure size/content visible to mmap
198-
// Reopen read-only (optional, but mirrors production “reader” role)
191+
// Reopen read-only (optional, but mirrors production “reader” role)
199192
drop(f);
200193
let f = OpenOptions::new().read(true).open(&path)?;
201194
Ok(f)
@@ -244,7 +237,10 @@ mod tests {
244237
let loc = Loc {
245238
store,
246239
object: obj,
247-
region: Region { offset: 100, len: 20 },
240+
region: Region {
241+
offset: 100,
242+
len: 20,
243+
},
248244
inline: None,
249245
};
250246

@@ -328,56 +324,57 @@ mod tests {
328324
}
329325

330326
#[test]
331-
fn concurrent_resolves_share_backing() -> Result<()> {
332-
use std::sync::Arc;
327+
fn concurrent_resolves_share_backing() -> Result<()> {
328+
use std::sync::Arc;
333329

334-
// 1) Create file content.
335-
let mut bytes = Vec::with_capacity(1024);
336-
for i in 0..1024u32 {
337-
bytes.push((i % 251) as u8);
338-
}
330+
// 1) Create file content.
331+
let mut bytes = Vec::with_capacity(1024);
332+
for i in 0..1024u32 {
333+
bytes.push((i % 251) as u8);
334+
}
339335

340-
// 2) Pre-compute the expected slice and share it via Arc.
341-
let expected: Arc<Vec<u8>> = Arc::new(bytes[128..384].to_vec());
342-
343-
// 3) Register the file once.
344-
let file = create_file_with(&bytes)?;
345-
let reg = Arc::new(Registry::default());
346-
let store = StoreId(11);
347-
let obj = ObjectId(0xABCD);
348-
reg.register_file(store, obj, &file)?;
349-
350-
let loc = Loc {
351-
store,
352-
object: obj,
353-
region: Region { offset: 128, len: 256 },
354-
inline: None,
355-
};
356-
357-
358-
// 4) Resolve in parallel without ever capturing `bytes`.
359-
let mut handles = Vec::new();
360-
for _ in 0..8 {
361-
let reg_cloned = Arc::clone(&reg);
362-
let loc_cloned = loc.clone();
363-
let expected_cloned = Arc::clone(&expected);
364-
365-
handles.push(std::thread::spawn(move || {
366-
let resolver = Resolver::new(&reg_cloned);
367-
let r = resolver.resolve(&loc_cloned).expect("resolve");
368-
assert_eq!(r.as_slice(), &expected_cloned[..]);
369-
}));
370-
}
336+
// 2) Pre-compute the expected slice and share it via Arc.
337+
let expected: Arc<Vec<u8>> = Arc::new(bytes[128..384].to_vec());
371338

372-
// 4) Concurrently, sanity check in the main thread.
373-
let resolver_main = Resolver::new(&reg);
374-
let r_main = resolver_main.resolve(&loc)?;
375-
assert_eq!(r_main.as_slice(), &expected[..]);
339+
// 3) Register the file once.
340+
let file = create_file_with(&bytes)?;
341+
let reg = Arc::new(Registry::default());
342+
let store = StoreId(11);
343+
let obj = ObjectId(0xABCD);
344+
reg.register_file(store, obj, &file)?;
376345

377-
for h in handles {
378-
h.join().expect("thread join ok");
379-
}
380-
Ok(())
381-
}
346+
let loc = Loc {
347+
store,
348+
object: obj,
349+
region: Region {
350+
offset: 128,
351+
len: 256,
352+
},
353+
inline: None,
354+
};
382355

356+
// 4) Resolve in parallel without ever capturing `bytes`.
357+
let mut handles = Vec::new();
358+
for _ in 0..8 {
359+
let reg_cloned = Arc::clone(&reg);
360+
let loc_cloned = loc.clone();
361+
let expected_cloned = Arc::clone(&expected);
362+
363+
handles.push(std::thread::spawn(move || {
364+
let resolver = Resolver::new(&reg_cloned);
365+
let r = resolver.resolve(&loc_cloned).expect("resolve");
366+
assert_eq!(r.as_slice(), &expected_cloned[..]);
367+
}));
368+
}
369+
370+
// 4) Concurrently, sanity check in the main thread.
371+
let resolver_main = Resolver::new(&reg);
372+
let r_main = resolver_main.resolve(&loc)?;
373+
assert_eq!(r_main.as_slice(), &expected[..]);
374+
375+
for h in handles {
376+
h.join().expect("thread join ok");
377+
}
378+
Ok(())
379+
}
383380
}

common/tests/loc_over_bus.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,16 @@ use std::{
77
time::Duration,
88
};
99

10-
use anyhow::{Result};
10+
use anyhow::Result;
1111
use caryatid_sdk::{module, Context, Module};
1212
use serde::{Deserialize, Serialize};
1313
use tempfile::NamedTempFile;
1414
use tokio::{sync::watch, time::timeout};
15-
use tracing::{info};
15+
use tracing::info;
1616

17-
18-
use config::{Config, Environment, File};
19-
use caryatid_process::Process;
2017
use acropolis_common::resolver::{Loc, ObjectId, Region, Registry, Resolver, StoreId};
18+
use caryatid_process::Process;
19+
use config::{Config, Environment, File};
2120

2221
// --------- shared test completion signaling ---------
2322
static TEST_COMPLETION_TX: Mutex<Option<watch::Sender<bool>>> = Mutex::new(None);
@@ -40,7 +39,7 @@ fn registry() -> Arc<Registry> {
4039
enum BusMsg {
4140
#[default]
4241
None, // Just so we have a simple default
43-
42+
4443
Loc(Loc),
4544
Ack(String), // response back to publisher
4645
}
@@ -73,14 +72,32 @@ impl Subscriber {
7372
let slice = view.as_slice();
7473
// trivial check: non-empty
7574
if !slice.is_empty() {
76-
context.publish(&ack_topic, Arc::new(BusMsg::Ack("ok".to_string()))).await.expect("Failed to publish ACK");
75+
context
76+
.publish(
77+
&ack_topic,
78+
Arc::new(BusMsg::Ack("ok".to_string())),
79+
)
80+
.await
81+
.expect("Failed to publish ACK");
7782
} else {
78-
context.publish(&ack_topic, Arc::new(BusMsg::Ack("empty".to_string()))).await.expect("Failed to publish ACK");
83+
context
84+
.publish(
85+
&ack_topic,
86+
Arc::new(BusMsg::Ack("empty".to_string())),
87+
)
88+
.await
89+
.expect("Failed to publish ACK");
7990
}
8091
break; // test done
8192
}
8293
Err(_) => {
83-
context.publish(&ack_topic, Arc::new(BusMsg::Ack("resolve_err".to_string()))).await.expect("Failed to publish ACK");
94+
context
95+
.publish(
96+
&ack_topic,
97+
Arc::new(BusMsg::Ack("resolve_err".to_string())),
98+
)
99+
.await
100+
.expect("Failed to publish ACK");
84101
break;
85102
}
86103
}
@@ -120,9 +137,12 @@ impl Publisher {
120137
context.run(async move {
121138
// Custom struct
122139
let message = BusMsg::Loc(Loc {
123-
store: StoreId(1),
140+
store: StoreId(1),
124141
object: ObjectId(0xFEED_CAFE_BEEF),
125-
region: Region { offset: 100, len: 40 },
142+
region: Region {
143+
offset: 100,
144+
len: 40,
145+
},
126146
inline: None,
127147
});
128148
info!("Sending {:?}", message);
@@ -138,7 +158,7 @@ impl Publisher {
138158
signal_test_completion();
139159
break;
140160
} else {
141-
assert!(false, "Unexpected ACK message: {}", s);
161+
panic!("Unexpected ACK message: {}", s);
142162
}
143163
}
144164
}
@@ -179,7 +199,7 @@ async fn loc_round_trip_over_caryatid() -> Result<()> {
179199

180200
// Create the process
181201
let mut process = Process::<BusMsg>::create(config).await;
182-
202+
183203
// Register modules
184204
Subscriber::register(&mut process);
185205
Publisher::register(&mut process);
@@ -189,7 +209,7 @@ async fn loc_round_trip_over_caryatid() -> Result<()> {
189209

190210
match timeout(Duration::from_secs(5), async {
191211
tokio::select! {
192-
// run everythng
212+
// run everything
193213
result = process.run() => {
194214
result
195215
}
@@ -202,7 +222,7 @@ async fn loc_round_trip_over_caryatid() -> Result<()> {
202222
{
203223
Ok(result) => result?,
204224
Err(_) => {
205-
assert!(false, "Test timed out after 5 seconds");
225+
panic!("Test timed out after 5 seconds");
206226
}
207227
}
208228
Ok(())

0 commit comments

Comments
 (0)