Skip to content

Commit a1377e3

Browse files
authored
Merge 6925dd6 into 719cdb4
2 parents 719cdb4 + 6925dd6 commit a1377e3

File tree

3 files changed

+350
-7
lines changed

3 files changed

+350
-7
lines changed

src/api/blobs.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt};
2929
use quinn::SendStream;
3030
use range_collections::{range_set::RangeSetRange, RangeSet2};
3131
use ref_cast::RefCast;
32+
use serde::{Deserialize, Serialize};
3233
use tokio::io::AsyncWriteExt;
3334
use tracing::trace;
35+
mod reader;
36+
pub use reader::Reader;
3437

3538
// Public reexports from the proto module.
3639
//
@@ -102,6 +105,14 @@ impl Blobs {
102105
})
103106
}
104107

108+
pub fn reader(&self, hash: impl Into<Hash>) -> Reader {
109+
self.reader_with_opts(ReaderOptions { hash: hash.into() })
110+
}
111+
112+
pub fn reader_with_opts(&self, options: ReaderOptions) -> Reader {
113+
Reader::new(self.clone(), options)
114+
}
115+
105116
/// Delete a blob.
106117
///
107118
/// This function is not public, because it does not work as expected when called manually,
@@ -647,6 +658,12 @@ impl<'a> AddProgress<'a> {
647658
}
648659
}
649660

661+
/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
662+
#[derive(Debug, Clone, Serialize, Deserialize)]
663+
pub struct ReaderOptions {
664+
pub hash: Hash,
665+
}
666+
650667
/// An observe result. Awaiting this will return the current state.
651668
///
652669
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
@@ -856,7 +873,7 @@ impl ExportRangesProgress {
856873
/// range of 0..100, you will get the entire first chunk, 0..1024.
857874
///
858875
/// It is up to the caller to clip the ranges to the requested ranges.
859-
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
876+
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
860877
Gen::new(|co| async move {
861878
let mut rx = match self.inner.await {
862879
Ok(rx) => rx,

src/api/blobs/reader.rs

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
use std::{
2+
io::{self, ErrorKind, SeekFrom},
3+
ops::DerefMut,
4+
pin::Pin,
5+
sync::{Arc, Mutex},
6+
task::{Context, Poll},
7+
};
8+
9+
use n0_future::StreamExt;
10+
11+
use crate::api::{
12+
blobs::{Blobs, ReaderOptions},
13+
proto::ExportRangesItem,
14+
};
15+
16+
#[derive(Debug)]
17+
pub struct Reader {
18+
blobs: Blobs,
19+
options: ReaderOptions,
20+
state: Arc<Mutex<ReaderState>>,
21+
}
22+
23+
#[derive(Default, derive_more::Debug)]
24+
enum ReaderState {
25+
Idle {
26+
position: u64,
27+
},
28+
Seeking {
29+
position: u64,
30+
},
31+
Reading {
32+
position: u64,
33+
#[debug(skip)]
34+
op: n0_future::boxed::BoxStream<ExportRangesItem>,
35+
},
36+
#[default]
37+
Poisoned,
38+
}
39+
40+
impl Reader {
41+
pub fn new(blobs: Blobs, options: ReaderOptions) -> Self {
42+
Self {
43+
blobs,
44+
options,
45+
state: Arc::new(Mutex::new(ReaderState::Idle { position: 0 })),
46+
}
47+
}
48+
}
49+
50+
impl tokio::io::AsyncRead for Reader {
51+
fn poll_read(
52+
self: Pin<&mut Self>,
53+
cx: &mut Context<'_>,
54+
buf: &mut tokio::io::ReadBuf<'_>,
55+
) -> Poll<io::Result<()>> {
56+
let this = self.get_mut();
57+
let mut position1 = None;
58+
loop {
59+
let mut guard = this.state.lock().unwrap();
60+
match std::mem::take(guard.deref_mut()) {
61+
ReaderState::Idle { position } => {
62+
// todo: read until next page boundary instead of fixed size
63+
let len = buf.remaining() as u64;
64+
let end = position.checked_add(len).ok_or_else(|| {
65+
io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
66+
})?;
67+
// start the export op for the entire size of the buffer, and convert to a stream
68+
let stream = this
69+
.blobs
70+
.export_ranges(this.options.hash, position..end)
71+
.stream();
72+
position1 = Some(position);
73+
*guard = ReaderState::Reading {
74+
position,
75+
op: Box::pin(stream),
76+
};
77+
}
78+
ReaderState::Reading { position, mut op } => {
79+
let position1 = position1.get_or_insert(position);
80+
match op.poll_next(cx) {
81+
Poll::Ready(Some(ExportRangesItem::Data(data))) => {
82+
if data.offset != *position1 {
83+
break Poll::Ready(Err(io::Error::other(
84+
"Data offset does not match expected position",
85+
)));
86+
}
87+
buf.put_slice(&data.data);
88+
// update just local position1, not the position in the state.
89+
*position1 =
90+
position1
91+
.checked_add(data.data.len() as u64)
92+
.ok_or_else(|| {
93+
io::Error::new(ErrorKind::InvalidInput, "Position overflow")
94+
})?;
95+
*guard = ReaderState::Reading { position, op };
96+
}
97+
Poll::Ready(Some(ExportRangesItem::Error(err))) => {
98+
*guard = ReaderState::Idle { position };
99+
break Poll::Ready(Err(io::Error::other(
100+
format!("Error reading data: {err}"),
101+
)));
102+
}
103+
Poll::Ready(Some(ExportRangesItem::Size(_size))) => {
104+
// put back the state and continue reading
105+
*guard = ReaderState::Reading { position, op };
106+
}
107+
Poll::Ready(None) => {
108+
// done with the stream, go back in idle.
109+
*guard = ReaderState::Idle {
110+
position: *position1,
111+
};
112+
break Poll::Ready(Ok(()));
113+
}
114+
Poll::Pending => {
115+
break if position != *position1 {
116+
// we read some data so we need to abort the op.
117+
//
118+
// we can't be sure we won't be called with the same buf size next time.
119+
*guard = ReaderState::Idle {
120+
position: *position1,
121+
};
122+
Poll::Ready(Ok(()))
123+
} else {
124+
// nothing was read yet, we remain in the reading state
125+
//
126+
// we make an assumption here that the next call will be with the same buf size.
127+
*guard = ReaderState::Reading {
128+
position: *position1,
129+
op,
130+
};
131+
Poll::Pending
132+
};
133+
}
134+
}
135+
}
136+
state @ ReaderState::Seeking { .. } => {
137+
*this.state.lock().unwrap() = state;
138+
break Poll::Ready(Err(io::Error::other(
139+
"Can't read while seeking",
140+
)));
141+
}
142+
ReaderState::Poisoned => {
143+
break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
144+
}
145+
};
146+
}
147+
}
148+
}
149+
150+
impl tokio::io::AsyncSeek for Reader {
151+
fn start_seek(
152+
self: std::pin::Pin<&mut Self>,
153+
seek_from: tokio::io::SeekFrom,
154+
) -> io::Result<()> {
155+
let this = self.get_mut();
156+
let mut guard = this.state.lock().unwrap();
157+
match std::mem::take(guard.deref_mut()) {
158+
ReaderState::Idle { position } => {
159+
let position1 = match seek_from {
160+
SeekFrom::Start(pos) => pos,
161+
SeekFrom::Current(offset) => {
162+
position.checked_add_signed(offset).ok_or_else(|| {
163+
io::Error::new(
164+
ErrorKind::InvalidInput,
165+
"Position overflow when seeking",
166+
)
167+
})?
168+
}
169+
SeekFrom::End(_offset) => {
170+
// todo: support seeking from end if we know the size
171+
return Err(io::Error::new(
172+
ErrorKind::InvalidInput,
173+
"Seeking from end is not supported yet",
174+
))?;
175+
}
176+
};
177+
*guard = ReaderState::Seeking {
178+
position: position1,
179+
};
180+
Ok(())
181+
}
182+
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
183+
ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
184+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
185+
}
186+
}
187+
188+
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
189+
let this = self.get_mut();
190+
let mut guard = this.state.lock().unwrap();
191+
Poll::Ready(match std::mem::take(guard.deref_mut()) {
192+
ReaderState::Seeking { position } => {
193+
*guard = ReaderState::Idle { position };
194+
Ok(position)
195+
}
196+
ReaderState::Idle { position } => {
197+
// seek calls poll_complete just in case, to finish a pending seek operation
198+
// before the next seek operation. So it is poll_complete/start_seek/poll_complete
199+
*guard = ReaderState::Idle { position };
200+
Ok(position)
201+
}
202+
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
203+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
204+
})
205+
}
206+
}
207+
208+
#[cfg(test)]
209+
mod tests {
210+
use bao_tree::ChunkRanges;
211+
use testresult::TestResult;
212+
use tokio::io::{AsyncReadExt, AsyncSeekExt};
213+
214+
use super::*;
215+
use crate::{
216+
store::{
217+
fs::{
218+
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
219+
FsStore,
220+
},
221+
mem::MemStore,
222+
},
223+
util::ChunkRangesExt,
224+
};
225+
226+
async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
227+
for size in INTERESTING_SIZES {
228+
let data = test_data(size);
229+
let tag = blobs.add_bytes(data.clone()).await?;
230+
// read all
231+
{
232+
let mut reader = blobs.reader(tag.hash);
233+
let mut buf = Vec::new();
234+
reader.read_to_end(&mut buf).await?;
235+
assert_eq!(buf, data);
236+
let pos = reader.stream_position().await?;
237+
assert_eq!(pos, data.len() as u64);
238+
}
239+
// seek to mid and read all
240+
{
241+
let mut reader = blobs.reader(tag.hash);
242+
let mid = size / 2;
243+
reader.seek(SeekFrom::Start(mid as u64)).await?;
244+
let mut buf = Vec::new();
245+
reader.read_to_end(&mut buf).await?;
246+
assert_eq!(buf, data[mid..].to_vec());
247+
let pos = reader.stream_position().await?;
248+
assert_eq!(pos, data.len() as u64);
249+
}
250+
}
251+
Ok(())
252+
}
253+
254+
async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
255+
for size in INTERESTING_SIZES {
256+
let data = test_data(size);
257+
let ranges = ChunkRanges::chunk(0);
258+
let (hash, bao) = create_n0_bao(&data, &ranges)?;
259+
println!("importing {} bytes", bao.len());
260+
blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
261+
// read the first chunk or the entire blob, whatever is smaller
262+
// this should work!
263+
{
264+
let mut reader = blobs.reader(hash);
265+
let valid = size.min(1024);
266+
let mut buf = vec![0u8; valid];
267+
reader.read_exact(&mut buf).await?;
268+
assert_eq!(buf, data[..valid]);
269+
let pos = reader.stream_position().await?;
270+
assert_eq!(pos, valid as u64);
271+
}
272+
if size > 1024 {
273+
// read the part we don't have - should immediately return an error
274+
{
275+
let mut reader = blobs.reader(hash);
276+
let mut rest = vec![0u8; size - 1024];
277+
reader.seek(SeekFrom::Start(1024)).await?;
278+
let res = reader.read_exact(&mut rest).await;
279+
assert!(res.is_err());
280+
}
281+
// read crossing the end of the blob - should return an error despite
282+
// the first bytes being valid.
283+
// A read that fails should not update the stream position.
284+
{
285+
let mut reader = blobs.reader(hash);
286+
let mut buf = vec![0u8; size];
287+
let res = reader.read(&mut buf).await;
288+
assert!(res.is_err());
289+
let pos = reader.stream_position().await?;
290+
assert_eq!(pos, 0);
291+
}
292+
}
293+
}
294+
Ok(())
295+
}
296+
297+
#[tokio::test]
298+
async fn reader_partial_fs() -> TestResult<()> {
299+
let testdir = tempfile::tempdir()?;
300+
let store = FsStore::load(testdir.path().to_owned()).await?;
301+
// reader_smoke_raw(store.blobs()).await?;
302+
reader_partial(store.blobs()).await?;
303+
Ok(())
304+
}
305+
306+
#[tokio::test]
307+
async fn reader_partial_memory() -> TestResult<()> {
308+
let store = MemStore::new();
309+
reader_partial(store.blobs()).await?;
310+
Ok(())
311+
}
312+
313+
#[tokio::test]
314+
async fn reader_smoke_fs() -> TestResult<()> {
315+
let testdir = tempfile::tempdir()?;
316+
let store = FsStore::load(testdir.path().to_owned()).await?;
317+
// reader_smoke_raw(store.blobs()).await?;
318+
reader_smoke(store.blobs()).await?;
319+
Ok(())
320+
}
321+
322+
#[tokio::test]
323+
async fn reader_smoke_memory() -> TestResult<()> {
324+
let store = MemStore::new();
325+
reader_smoke(store.blobs()).await?;
326+
Ok(())
327+
}
328+
}

0 commit comments

Comments
 (0)