Skip to content

Commit 508a0fe

Browse files
committed
Volume: Stream open_read_stream and write_from_stream on LocalPosixVolume
Prerequisite for Phase 4.1 trait unification. Adds a `LocalPosixReadStream` that yields 1 MiB chunks on the blocking pool and a chunked `write_from_stream` that pipes incoming stream chunks to disk via `spawn_blocking`. Flips `supports_streaming()` to `true`. Rewrote the old `export_to_local`/`import_from_local` tests onto the streaming equivalents (they were the only callers of those two methods on LocalPosix).
1 parent a8a0f69 commit 508a0fe

2 files changed

Lines changed: 174 additions & 86 deletions

File tree

apps/desktop/src-tauri/src/file_system/volume/local_posix.rs

Lines changed: 143 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//! Local POSIX file system volume implementation.
22
33
use super::{
4-
CopyScanResult, ScanConflict, SourceItemInfo, SpaceInfo, Volume, VolumeError, VolumeScanner, VolumeWatcher,
4+
CopyScanResult, ScanConflict, SourceItemInfo, SpaceInfo, Volume, VolumeError, VolumeReadStream, VolumeScanner,
5+
VolumeWatcher,
56
};
67
use crate::file_system::listing::{FileEntry, get_single_entry, list_directory_core};
78
use crate::indexing::scanner::{self, ScanConfig, ScanError, ScanHandle, ScanSummary};
@@ -295,33 +296,102 @@ impl Volume for LocalPosixVolume {
295296
})
296297
}
297298

298-
fn export_to_local<'a>(
299+
fn supports_streaming(&self) -> bool {
300+
true
301+
}
302+
303+
fn open_read_stream<'a>(
299304
&'a self,
300-
source: &'a Path,
301-
local_dest: &'a Path,
302-
_on_progress: &'a (dyn Fn(u64, u64) -> std::ops::ControlFlow<()> + Sync),
303-
) -> Pin<Box<dyn Future<Output = Result<u64, VolumeError>> + Send + 'a>> {
304-
let src_abs = self.resolve(source);
305-
let local_dest = local_dest.to_path_buf();
305+
path: &'a Path,
306+
) -> Pin<Box<dyn Future<Output = Result<Box<dyn VolumeReadStream>, VolumeError>> + Send + 'a>> {
307+
let abs_path = self.resolve(path);
306308
Box::pin(async move {
307-
spawn_blocking(move || copy_recursive(&src_abs, &local_dest))
308-
.await
309-
.unwrap()
309+
spawn_blocking(move || {
310+
let metadata = std::fs::metadata(&abs_path)?;
311+
if metadata.is_dir() {
312+
return Err(VolumeError::IoError {
313+
message: "Cannot stream a directory".into(),
314+
raw_os_error: None,
315+
});
316+
}
317+
let total_size = metadata.len();
318+
let file = std::fs::File::open(&abs_path)?;
319+
Ok(Box::new(LocalPosixReadStream {
320+
file: Some(file),
321+
total_size,
322+
bytes_read: 0,
323+
}) as Box<dyn VolumeReadStream>)
324+
})
325+
.await
326+
.unwrap()
310327
})
311328
}
312329

313-
fn import_from_local<'a>(
330+
fn write_from_stream<'a>(
314331
&'a self,
315-
local_source: &'a Path,
316332
dest: &'a Path,
317-
_on_progress: &'a (dyn Fn(u64, u64) -> std::ops::ControlFlow<()> + Sync),
333+
size: u64,
334+
mut stream: Box<dyn VolumeReadStream>,
335+
on_progress: &'a (dyn Fn(u64, u64) -> std::ops::ControlFlow<()> + Sync),
318336
) -> Pin<Box<dyn Future<Output = Result<u64, VolumeError>> + Send + 'a>> {
319-
let local_source = local_source.to_path_buf();
320337
let dest_abs = self.resolve(dest);
321338
Box::pin(async move {
322-
spawn_blocking(move || copy_recursive(&local_source, &dest_abs))
339+
// Ensure parent directory exists
340+
if let Some(parent) = dest_abs.parent() {
341+
let parent = parent.to_path_buf();
342+
spawn_blocking(move || std::fs::create_dir_all(&parent))
343+
.await
344+
.unwrap()
345+
.map_err(VolumeError::from)?;
346+
}
347+
348+
// Open destination file on the blocking pool.
349+
let dest_for_open = dest_abs.clone();
350+
let mut file = spawn_blocking(move || std::fs::File::create(&dest_for_open))
323351
.await
324352
.unwrap()
353+
.map_err(VolumeError::from)?;
354+
355+
let mut bytes_written = 0u64;
356+
while let Some(chunk_result) = stream.next_chunk().await {
357+
let chunk = chunk_result?;
358+
if chunk.is_empty() {
359+
continue;
360+
}
361+
let chunk_len = chunk.len() as u64;
362+
363+
// Write the chunk on the blocking pool.
364+
let (file_ret, write_res) = spawn_blocking(move || {
365+
use std::io::Write;
366+
let res = file.write_all(&chunk);
367+
(file, res)
368+
})
369+
.await
370+
.unwrap();
371+
file = file_ret;
372+
write_res.map_err(VolumeError::from)?;
373+
374+
bytes_written += chunk_len;
375+
376+
if on_progress(bytes_written, size) == std::ops::ControlFlow::Break(()) {
377+
// Drop the file handle and try to clean up the partial file.
378+
drop(file);
379+
let partial = dest_abs.clone();
380+
let _ = spawn_blocking(move || std::fs::remove_file(&partial)).await;
381+
return Err(VolumeError::Cancelled("Operation cancelled by user".to_string()));
382+
}
383+
}
384+
385+
// Flush and close on the blocking pool.
386+
let flush_res = spawn_blocking(move || {
387+
use std::io::Write;
388+
file.flush()
389+
})
390+
.await
391+
.unwrap();
392+
flush_res.map_err(VolumeError::from)?;
393+
394+
Ok(bytes_written)
325395
})
326396
}
327397

@@ -416,30 +486,65 @@ impl VolumeWatcher for LocalPosixWatcher {
416486
}
417487
}
418488

419-
/// Recursively copies a file or directory from source to destination.
420-
/// Returns total bytes copied.
421-
fn copy_recursive(source: &Path, dest: &Path) -> Result<u64, VolumeError> {
422-
let meta = std::fs::metadata(source)?;
423-
let mut total_bytes = 0;
424-
425-
if meta.is_file() {
426-
// Copy single file
427-
std::fs::copy(source, dest)?;
428-
total_bytes = meta.len();
429-
} else if meta.is_dir() {
430-
// Create destination directory
431-
std::fs::create_dir_all(dest)?;
432-
433-
// Copy all contents
434-
for entry in std::fs::read_dir(source)? {
435-
let entry = entry?;
436-
let src_path = entry.path();
437-
let dest_path = dest.join(entry.file_name());
438-
total_bytes += copy_recursive(&src_path, &dest_path)?;
439-
}
489+
/// Streaming reader for `LocalPosixVolume` files.
490+
///
491+
/// Reads the file in 1 MiB chunks on the blocking thread pool via
492+
/// `tokio::task::spawn_blocking`. Each `next_chunk` call hands the file handle
493+
/// to the blocking pool, reads one chunk, and returns ownership along with the
494+
/// data.
495+
struct LocalPosixReadStream {
496+
file: Option<std::fs::File>,
497+
total_size: u64,
498+
bytes_read: u64,
499+
}
500+
501+
/// 1 MiB chunks — matches `chunked_copy.rs`'s constant.
502+
const LOCAL_STREAM_CHUNK_SIZE: usize = 1024 * 1024;
503+
504+
impl VolumeReadStream for LocalPosixReadStream {
505+
fn next_chunk(&mut self) -> Pin<Box<dyn Future<Output = Option<Result<Vec<u8>, VolumeError>>> + Send + '_>> {
506+
Box::pin(async move {
507+
let mut file = self.file.take()?;
508+
509+
let (file_ret, result) = spawn_blocking(move || {
510+
use std::io::Read;
511+
let mut buf = vec![0u8; LOCAL_STREAM_CHUNK_SIZE];
512+
let n = match file.read(&mut buf) {
513+
Ok(n) => n,
514+
Err(e) => return (file, Err(VolumeError::from(e))),
515+
};
516+
buf.truncate(n);
517+
(file, Ok(buf))
518+
})
519+
.await
520+
.unwrap();
521+
522+
match result {
523+
Ok(buf) if buf.is_empty() => {
524+
// EOF — drop the file handle.
525+
drop(file_ret);
526+
None
527+
}
528+
Ok(buf) => {
529+
self.bytes_read += buf.len() as u64;
530+
self.file = Some(file_ret);
531+
Some(Ok(buf))
532+
}
533+
Err(e) => {
534+
drop(file_ret);
535+
Some(Err(e))
536+
}
537+
}
538+
})
440539
}
441540

442-
Ok(total_bytes)
541+
fn total_size(&self) -> u64 {
542+
self.total_size
543+
}
544+
545+
fn bytes_read(&self) -> u64 {
546+
self.bytes_read
547+
}
443548
}
444549

445550
/// Gets space information for a path.

apps/desktop/src-tauri/src/file_system/volume/local_posix_test.rs

Lines changed: 31 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ fn test_supports_watching_returns_true() {
102102
}
103103

104104
#[test]
105-
fn test_supports_streaming_returns_false() {
106-
// LocalPosixVolume uses the default implementation which returns false.
107-
// Streaming is primarily for MTP-to-MTP transfers.
105+
fn test_supports_streaming_returns_true() {
106+
// Since Phase 4, LocalPosixVolume exposes open_read_stream and
107+
// write_from_stream so cross-volume copies can pipe through it.
108108
let volume = LocalPosixVolume::new("Test", "/tmp");
109-
assert!(!volume.supports_streaming());
109+
assert!(volume.supports_streaming());
110110
}
111111

112112
#[tokio::test]
@@ -350,83 +350,67 @@ async fn test_scan_for_copy_directory() {
350350
}
351351

352352
#[tokio::test]
353-
async fn test_export_to_local_single_file() {
353+
async fn test_open_read_stream_single_file() {
354354
use std::fs;
355355

356-
let src_dir = std::env::temp_dir().join("cmdr_export_src_test");
357-
let dst_dir = std::env::temp_dir().join("cmdr_export_dst_test");
356+
let src_dir = std::env::temp_dir().join("cmdr_read_stream_src_test");
358357
let _ = fs::remove_dir_all(&src_dir);
359-
let _ = fs::remove_dir_all(&dst_dir);
360358
fs::create_dir_all(&src_dir).unwrap();
361-
fs::create_dir_all(&dst_dir).unwrap();
362359

363360
fs::write(src_dir.join("source.txt"), "Test content").unwrap();
364361

365362
let volume = LocalPosixVolume::new("Test", src_dir.to_str().unwrap());
366-
let bytes = volume
367-
.export_to_local(Path::new("source.txt"), &dst_dir.join("dest.txt"), &|_, _| {
368-
std::ops::ControlFlow::Continue(())
369-
})
370-
.await
371-
.unwrap();
363+
let mut stream = volume.open_read_stream(Path::new("source.txt")).await.unwrap();
364+
assert_eq!(stream.total_size(), 12);
372365

373-
assert_eq!(bytes, 12); // "Test content" is 12 bytes
374-
assert_eq!(fs::read_to_string(dst_dir.join("dest.txt")).unwrap(), "Test content");
366+
let mut content = Vec::new();
367+
while let Some(chunk) = stream.next_chunk().await {
368+
content.extend_from_slice(&chunk.unwrap());
369+
}
370+
assert_eq!(content, b"Test content");
375371

376372
let _ = fs::remove_dir_all(&src_dir);
377-
let _ = fs::remove_dir_all(&dst_dir);
378373
}
379374

380375
#[tokio::test]
381-
async fn test_export_to_local_directory() {
376+
async fn test_open_read_stream_rejects_directory() {
382377
use std::fs;
383378

384-
let src_dir = std::env::temp_dir().join("cmdr_export_dir_src_test");
385-
let dst_dir = std::env::temp_dir().join("cmdr_export_dir_dst_test");
379+
let src_dir = std::env::temp_dir().join("cmdr_read_stream_dir_test");
386380
let _ = fs::remove_dir_all(&src_dir);
387-
let _ = fs::remove_dir_all(&dst_dir);
388381
fs::create_dir_all(&src_dir).unwrap();
389-
fs::create_dir_all(&dst_dir).unwrap();
390382

391-
// Create source directory with files
392-
let source_subdir = src_dir.join("sourcedir");
393-
fs::create_dir(&source_subdir).unwrap();
394-
fs::write(source_subdir.join("file1.txt"), "AAA").unwrap();
395-
fs::write(source_subdir.join("file2.txt"), "BBBBB").unwrap();
383+
// Create a nested dir so we can attempt to stream it.
384+
fs::create_dir(src_dir.join("sourcedir")).unwrap();
396385

397386
let volume = LocalPosixVolume::new("Test", src_dir.to_str().unwrap());
398-
let bytes = volume
399-
.export_to_local(Path::new("sourcedir"), &dst_dir.join("destdir"), &|_, _| {
400-
std::ops::ControlFlow::Continue(())
401-
})
402-
.await
403-
.unwrap();
404-
405-
assert_eq!(bytes, 8); // 3 + 5 bytes
406-
assert!(dst_dir.join("destdir").is_dir());
407-
assert_eq!(fs::read_to_string(dst_dir.join("destdir/file1.txt")).unwrap(), "AAA");
408-
assert_eq!(fs::read_to_string(dst_dir.join("destdir/file2.txt")).unwrap(), "BBBBB");
387+
let result = volume.open_read_stream(Path::new("sourcedir")).await;
388+
assert!(result.is_err(), "streaming a directory should fail");
409389

410390
let _ = fs::remove_dir_all(&src_dir);
411-
let _ = fs::remove_dir_all(&dst_dir);
412391
}
413392

414393
#[tokio::test]
415-
async fn test_import_from_local_single_file() {
394+
async fn test_write_from_stream_creates_file() {
395+
use super::InMemoryVolume;
416396
use std::fs;
417397

418-
let local_dir = std::env::temp_dir().join("cmdr_import_local_test");
419-
let vol_dir = std::env::temp_dir().join("cmdr_import_vol_test");
420-
let _ = fs::remove_dir_all(&local_dir);
398+
let vol_dir = std::env::temp_dir().join("cmdr_write_from_stream_test");
421399
let _ = fs::remove_dir_all(&vol_dir);
422-
fs::create_dir_all(&local_dir).unwrap();
423400
fs::create_dir_all(&vol_dir).unwrap();
424401

425-
fs::write(local_dir.join("local.txt"), "Imported content").unwrap();
402+
// Source: an in-memory file that we stream into LocalPosix.
403+
let source = InMemoryVolume::new("Source");
404+
source
405+
.create_file(Path::new("/local.txt"), b"Imported content")
406+
.await
407+
.unwrap();
426408

409+
let stream = source.open_read_stream(Path::new("/local.txt")).await.unwrap();
410+
let size = stream.total_size();
427411
let volume = LocalPosixVolume::new("Test", vol_dir.to_str().unwrap());
428412
let bytes = volume
429-
.import_from_local(&local_dir.join("local.txt"), Path::new("imported.txt"), &|_, _| {
413+
.write_from_stream(Path::new("imported.txt"), size, stream, &|_, _| {
430414
std::ops::ControlFlow::Continue(())
431415
})
432416
.await
@@ -438,7 +422,6 @@ async fn test_import_from_local_single_file() {
438422
"Imported content"
439423
);
440424

441-
let _ = fs::remove_dir_all(&local_dir);
442425
let _ = fs::remove_dir_all(&vol_dir);
443426
}
444427

0 commit comments

Comments
 (0)