|
6 | 6 | //! performance and fail-fast behavior. |
7 | 7 |
|
8 | 8 | use super::{ |
9 | | - CopyScanResult, ScanConflict, SmbConnectionState, SourceItemInfo, SpaceInfo, Volume, VolumeError, path_to_id, |
| 9 | + CopyScanResult, ScanConflict, SmbConnectionState, SourceItemInfo, SpaceInfo, Volume, VolumeError, VolumeReadStream, |
| 10 | + path_to_id, |
10 | 11 | }; |
11 | 12 | use crate::file_system::listing::FileEntry; |
12 | 13 | use log::{debug, warn}; |
@@ -561,6 +562,41 @@ impl SmbVolume { |
561 | 562 | } |
562 | 563 | } |
563 | 564 |
|
| 565 | +// ── Streaming support ────────────────────────��───────────────────── |
| 566 | + |
| 567 | +/// Chunk size for `SmbReadStream` iteration (1 MB). |
| 568 | +const SMB_STREAM_CHUNK_SIZE: usize = 1024 * 1024; |
| 569 | + |
| 570 | +/// Streaming reader for SMB files. |
| 571 | +/// |
| 572 | +/// The file data is read into memory in one shot via smb2's pipelined read, |
| 573 | +/// then yielded in fixed-size chunks. This avoids temp files for cross-volume |
| 574 | +/// copies (MTP↔SMB) while keeping memory bounded to one file at a time. |
| 575 | +struct SmbReadStream { |
| 576 | + data: Vec<u8>, |
| 577 | + offset: usize, |
| 578 | +} |
| 579 | + |
| 580 | +impl VolumeReadStream for SmbReadStream { |
| 581 | + fn next_chunk(&mut self) -> Option<Result<Vec<u8>, VolumeError>> { |
| 582 | + if self.offset >= self.data.len() { |
| 583 | + return None; |
| 584 | + } |
| 585 | + let end = (self.offset + SMB_STREAM_CHUNK_SIZE).min(self.data.len()); |
| 586 | + let chunk = self.data[self.offset..end].to_vec(); |
| 587 | + self.offset = end; |
| 588 | + Some(Ok(chunk)) |
| 589 | + } |
| 590 | + |
| 591 | + fn total_size(&self) -> u64 { |
| 592 | + self.data.len() as u64 |
| 593 | + } |
| 594 | + |
| 595 | + fn bytes_read(&self) -> u64 { |
| 596 | + self.offset as u64 |
| 597 | + } |
| 598 | +} |
| 599 | + |
564 | 600 | impl Volume for SmbVolume { |
565 | 601 | fn name(&self) -> &str { |
566 | 602 | &self.name |
@@ -1076,6 +1112,56 @@ impl Volume for SmbVolume { |
1076 | 1112 | Ok(conflicts) |
1077 | 1113 | } |
1078 | 1114 |
|
| 1115 | + fn supports_streaming(&self) -> bool { |
| 1116 | + true |
| 1117 | + } |
| 1118 | + |
| 1119 | + fn open_read_stream(&self, path: &Path) -> Result<Box<dyn VolumeReadStream>, VolumeError> { |
| 1120 | + let smb_path = self.to_smb_path(path); |
| 1121 | + let handle = self.runtime_handle.clone(); |
| 1122 | + let sp = smb_path.clone(); |
| 1123 | + |
| 1124 | + debug!( |
| 1125 | + "SmbVolume::open_read_stream: share={}, path={:?}", |
| 1126 | + self.share_name, smb_path |
| 1127 | + ); |
| 1128 | + |
| 1129 | + let data = self.with_smb("open_read_stream", |client, tree| { |
| 1130 | + handle.block_on(client.read_file_pipelined(tree, &sp)) |
| 1131 | + })?; |
| 1132 | + |
| 1133 | + Ok(Box::new(SmbReadStream { data, offset: 0 })) |
| 1134 | + } |
| 1135 | + |
| 1136 | + fn write_from_stream( |
| 1137 | + &self, |
| 1138 | + dest: &Path, |
| 1139 | + _size: u64, |
| 1140 | + mut stream: Box<dyn VolumeReadStream>, |
| 1141 | + ) -> Result<u64, VolumeError> { |
| 1142 | + let smb_path = self.to_smb_path(dest); |
| 1143 | + let handle = self.runtime_handle.clone(); |
| 1144 | + |
| 1145 | + debug!( |
| 1146 | + "SmbVolume::write_from_stream: share={}, path={:?}", |
| 1147 | + self.share_name, smb_path |
| 1148 | + ); |
| 1149 | + |
| 1150 | + // Collect all chunks into a buffer, then write in one pipelined call |
| 1151 | + let mut data = Vec::new(); |
| 1152 | + while let Some(result) = stream.next_chunk() { |
| 1153 | + data.extend_from_slice(&result?); |
| 1154 | + } |
| 1155 | + |
| 1156 | + let len = data.len() as u64; |
| 1157 | + let sp = smb_path; |
| 1158 | + self.with_smb("write_from_stream", |client, tree| { |
| 1159 | + handle.block_on(client.write_file_pipelined(tree, &sp, &data)) |
| 1160 | + })?; |
| 1161 | + |
| 1162 | + Ok(len) |
| 1163 | + } |
| 1164 | + |
1079 | 1165 | fn smb_connection_state(&self) -> Option<SmbConnectionState> { |
1080 | 1166 | match self.connection_state() { |
1081 | 1167 | ConnectionState::Direct => Some(SmbConnectionState::Direct), |
@@ -1799,4 +1885,81 @@ mod tests { |
1799 | 1885 | assert!(space.available_bytes > 0); |
1800 | 1886 | assert!(space.used_bytes <= space.total_bytes); |
1801 | 1887 | } |
| 1888 | + |
| 1889 | + // ── SmbReadStream tests ──────────────────────────────────────── |
| 1890 | + |
| 1891 | + #[test] |
| 1892 | + fn smb_read_stream_empty_file() { |
| 1893 | + let mut stream = SmbReadStream { |
| 1894 | + data: vec![], |
| 1895 | + offset: 0, |
| 1896 | + }; |
| 1897 | + assert_eq!(stream.total_size(), 0); |
| 1898 | + assert_eq!(stream.bytes_read(), 0); |
| 1899 | + assert!(stream.next_chunk().is_none()); |
| 1900 | + } |
| 1901 | + |
| 1902 | + #[test] |
| 1903 | + fn smb_read_stream_small_file_single_chunk() { |
| 1904 | + let data = vec![1u8; 100]; |
| 1905 | + let mut stream = SmbReadStream { data, offset: 0 }; |
| 1906 | + assert_eq!(stream.total_size(), 100); |
| 1907 | + |
| 1908 | + let chunk = stream.next_chunk().unwrap().unwrap(); |
| 1909 | + assert_eq!(chunk.len(), 100); |
| 1910 | + assert_eq!(stream.bytes_read(), 100); |
| 1911 | + assert!(stream.next_chunk().is_none()); |
| 1912 | + } |
| 1913 | + |
| 1914 | + #[test] |
| 1915 | + fn smb_read_stream_exact_chunk_boundary() { |
| 1916 | + let data = vec![0u8; SMB_STREAM_CHUNK_SIZE]; |
| 1917 | + let mut stream = SmbReadStream { data, offset: 0 }; |
| 1918 | + |
| 1919 | + let chunk = stream.next_chunk().unwrap().unwrap(); |
| 1920 | + assert_eq!(chunk.len(), SMB_STREAM_CHUNK_SIZE); |
| 1921 | + assert!(stream.next_chunk().is_none()); |
| 1922 | + } |
| 1923 | + |
| 1924 | + #[test] |
| 1925 | + fn smb_read_stream_multiple_chunks() { |
| 1926 | + let size = SMB_STREAM_CHUNK_SIZE * 2 + 500; |
| 1927 | + let data = vec![0xAB; size]; |
| 1928 | + let mut stream = SmbReadStream { data, offset: 0 }; |
| 1929 | + assert_eq!(stream.total_size(), size as u64); |
| 1930 | + |
| 1931 | + let c1 = stream.next_chunk().unwrap().unwrap(); |
| 1932 | + assert_eq!(c1.len(), SMB_STREAM_CHUNK_SIZE); |
| 1933 | + assert_eq!(stream.bytes_read(), SMB_STREAM_CHUNK_SIZE as u64); |
| 1934 | + |
| 1935 | + let c2 = stream.next_chunk().unwrap().unwrap(); |
| 1936 | + assert_eq!(c2.len(), SMB_STREAM_CHUNK_SIZE); |
| 1937 | + |
| 1938 | + let c3 = stream.next_chunk().unwrap().unwrap(); |
| 1939 | + assert_eq!(c3.len(), 500); |
| 1940 | + assert_eq!(stream.bytes_read(), size as u64); |
| 1941 | + |
| 1942 | + assert!(stream.next_chunk().is_none()); |
| 1943 | + } |
| 1944 | + |
| 1945 | + #[test] |
| 1946 | + fn smb_read_stream_data_integrity() { |
| 1947 | + let data: Vec<u8> = (0..=255).cycle().take(SMB_STREAM_CHUNK_SIZE + 100).collect(); |
| 1948 | + let expected = data.clone(); |
| 1949 | + let mut stream = SmbReadStream { data, offset: 0 }; |
| 1950 | + |
| 1951 | + let mut reassembled = Vec::new(); |
| 1952 | + while let Some(Ok(chunk)) = stream.next_chunk() { |
| 1953 | + reassembled.extend_from_slice(&chunk); |
| 1954 | + } |
| 1955 | + assert_eq!(reassembled, expected); |
| 1956 | + } |
| 1957 | + |
| 1958 | + #[test] |
| 1959 | + fn smb_supports_streaming() { |
| 1960 | + // SmbVolume should report streaming support so cross-volume copies |
| 1961 | + // (MTP↔SMB) use the streaming path instead of NotSupported/temp files. |
| 1962 | + let (vol, _rt) = make_test_volume(); |
| 1963 | + assert!(vol.supports_streaming()); |
| 1964 | + } |
1802 | 1965 | } |
0 commit comments