Skip to content

Commit 78dc7e4

Browse files
authored
Merge b2b0216 into 719cdb4
2 parents 719cdb4 + b2b0216 commit 78dc7e4

File tree

3 files changed

+350
-7
lines changed

3 files changed

+350
-7
lines changed

src/api/blobs.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt};
2929
use quinn::SendStream;
3030
use range_collections::{range_set::RangeSetRange, RangeSet2};
3131
use ref_cast::RefCast;
32+
use serde::{Deserialize, Serialize};
3233
use tokio::io::AsyncWriteExt;
3334
use tracing::trace;
35+
mod reader;
36+
pub use reader::BlobReader;
3437

3538
// Public reexports from the proto module.
3639
//
@@ -102,6 +105,14 @@ impl Blobs {
102105
})
103106
}
104107

108+
pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
109+
self.reader_with_opts(ReaderOptions { hash: hash.into() })
110+
}
111+
112+
pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
113+
BlobReader::new(self.clone(), options)
114+
}
115+
105116
/// Delete a blob.
106117
///
107118
/// This function is not public, because it does not work as expected when called manually,
@@ -647,6 +658,12 @@ impl<'a> AddProgress<'a> {
647658
}
648659
}
649660

661+
/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
662+
#[derive(Debug, Clone, Serialize, Deserialize)]
663+
pub struct ReaderOptions {
664+
pub hash: Hash,
665+
}
666+
650667
/// An observe result. Awaiting this will return the current state.
651668
///
652669
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
@@ -856,7 +873,7 @@ impl ExportRangesProgress {
856873
/// range of 0..100, you will get the entire first chunk, 0..1024.
857874
///
858875
/// It is up to the caller to clip the ranges to the requested ranges.
859-
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
876+
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
860877
Gen::new(|co| async move {
861878
let mut rx = match self.inner.await {
862879
Ok(rx) => rx,

src/api/blobs/reader.rs

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
use std::{
2+
io::{self, ErrorKind, SeekFrom},
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
use n0_future::StreamExt;
8+
9+
use crate::api::{
10+
blobs::{Blobs, ReaderOptions},
11+
proto::ExportRangesItem,
12+
};
13+
14+
#[derive(Debug)]
15+
pub struct BlobReader {
16+
blobs: Blobs,
17+
options: ReaderOptions,
18+
state: ReaderState,
19+
}
20+
21+
#[derive(Default, derive_more::Debug)]
22+
enum ReaderState {
23+
Idle {
24+
position: u64,
25+
},
26+
Seeking {
27+
position: u64,
28+
},
29+
Reading {
30+
position: u64,
31+
#[debug(skip)]
32+
op: n0_future::boxed::BoxStream<ExportRangesItem>,
33+
},
34+
#[default]
35+
Poisoned,
36+
}
37+
38+
impl BlobReader {
39+
pub fn new(blobs: Blobs, options: ReaderOptions) -> Self {
40+
Self {
41+
blobs,
42+
options,
43+
state: ReaderState::Idle { position: 0 },
44+
}
45+
}
46+
}
47+
48+
impl tokio::io::AsyncRead for BlobReader {
49+
fn poll_read(
50+
self: Pin<&mut Self>,
51+
cx: &mut Context<'_>,
52+
buf: &mut tokio::io::ReadBuf<'_>,
53+
) -> Poll<io::Result<()>> {
54+
let this = self.get_mut();
55+
let mut position1 = None;
56+
loop {
57+
let guard = &mut this.state;
58+
match std::mem::take(guard) {
59+
ReaderState::Idle { position } => {
60+
// todo: read until next page boundary instead of fixed size
61+
let len = buf.remaining() as u64;
62+
let end = position.checked_add(len).ok_or_else(|| {
63+
io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
64+
})?;
65+
// start the export op for the entire size of the buffer, and convert to a stream
66+
let stream = this
67+
.blobs
68+
.export_ranges(this.options.hash, position..end)
69+
.stream();
70+
position1 = Some(position);
71+
*guard = ReaderState::Reading {
72+
position,
73+
op: Box::pin(stream),
74+
};
75+
}
76+
ReaderState::Reading { position, mut op } => {
77+
let position1 = position1.get_or_insert(position);
78+
match op.poll_next(cx) {
79+
Poll::Ready(Some(ExportRangesItem::Size(_))) => {
80+
*guard = ReaderState::Reading { position, op };
81+
}
82+
Poll::Ready(Some(ExportRangesItem::Data(data))) => {
83+
if data.offset != *position1 {
84+
break Poll::Ready(Err(io::Error::other(
85+
"Data offset does not match expected position",
86+
)));
87+
}
88+
buf.put_slice(&data.data);
89+
// update just local position1, not the position in the state.
90+
*position1 =
91+
position1
92+
.checked_add(data.data.len() as u64)
93+
.ok_or_else(|| {
94+
io::Error::new(ErrorKind::InvalidInput, "Position overflow")
95+
})?;
96+
*guard = ReaderState::Reading { position, op };
97+
}
98+
Poll::Ready(Some(ExportRangesItem::Error(err))) => {
99+
*guard = ReaderState::Idle { position };
100+
break Poll::Ready(Err(io::Error::other(format!(
101+
"Error reading data: {err}"
102+
))));
103+
}
104+
Poll::Ready(None) => {
105+
// done with the stream, go back in idle.
106+
*guard = ReaderState::Idle {
107+
position: *position1,
108+
};
109+
break Poll::Ready(Ok(()));
110+
}
111+
Poll::Pending => {
112+
break if position != *position1 {
113+
// we read some data so we need to abort the op.
114+
//
115+
// we can't be sure we won't be called with the same buf size next time.
116+
*guard = ReaderState::Idle {
117+
position: *position1,
118+
};
119+
Poll::Ready(Ok(()))
120+
} else {
121+
// nothing was read yet, we remain in the reading state
122+
//
123+
// we make an assumption here that the next call will be with the same buf size.
124+
*guard = ReaderState::Reading {
125+
position: *position1,
126+
op,
127+
};
128+
Poll::Pending
129+
};
130+
}
131+
}
132+
}
133+
state @ ReaderState::Seeking { .. } => {
134+
// should I try to recover from this or just keep it poisoned?
135+
this.state = state;
136+
break Poll::Ready(Err(io::Error::other("Can't read while seeking")));
137+
}
138+
ReaderState::Poisoned => {
139+
break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
140+
}
141+
};
142+
}
143+
}
144+
}
145+
146+
impl tokio::io::AsyncSeek for BlobReader {
147+
fn start_seek(
148+
self: std::pin::Pin<&mut Self>,
149+
seek_from: tokio::io::SeekFrom,
150+
) -> io::Result<()> {
151+
let this = self.get_mut();
152+
let guard = &mut this.state;
153+
match std::mem::take(guard) {
154+
ReaderState::Idle { position } => {
155+
let position1 = match seek_from {
156+
SeekFrom::Start(pos) => pos,
157+
SeekFrom::Current(offset) => {
158+
position.checked_add_signed(offset).ok_or_else(|| {
159+
io::Error::new(
160+
ErrorKind::InvalidInput,
161+
"Position overflow when seeking",
162+
)
163+
})?
164+
}
165+
SeekFrom::End(_offset) => {
166+
// todo: support seeking from end if we know the size
167+
return Err(io::Error::new(
168+
ErrorKind::InvalidInput,
169+
"Seeking from end is not supported yet",
170+
))?;
171+
}
172+
};
173+
*guard = ReaderState::Seeking {
174+
position: position1,
175+
};
176+
Ok(())
177+
}
178+
ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
179+
ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
180+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
181+
}
182+
}
183+
184+
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
185+
let this = self.get_mut();
186+
let guard = &mut this.state;
187+
Poll::Ready(match std::mem::take(guard) {
188+
ReaderState::Seeking { position } => {
189+
*guard = ReaderState::Idle { position };
190+
Ok(position)
191+
}
192+
ReaderState::Idle { position } => {
193+
// seek calls poll_complete just in case, to finish a pending seek operation
194+
// before the next seek operation. So it is poll_complete/start_seek/poll_complete
195+
*guard = ReaderState::Idle { position };
196+
Ok(position)
197+
}
198+
state @ ReaderState::Reading { .. } => {
199+
// should I try to recover from this or just keep it poisoned?
200+
*guard = state;
201+
Err(io::Error::other("Can't seek while reading"))
202+
}
203+
ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
204+
})
205+
}
206+
}
207+
208+
#[cfg(test)]
209+
mod tests {
210+
use bao_tree::ChunkRanges;
211+
use testresult::TestResult;
212+
use tokio::io::{AsyncReadExt, AsyncSeekExt};
213+
214+
use super::*;
215+
use crate::{
216+
store::{
217+
fs::{
218+
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
219+
FsStore,
220+
},
221+
mem::MemStore,
222+
},
223+
util::ChunkRangesExt,
224+
};
225+
226+
async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
227+
for size in INTERESTING_SIZES {
228+
let data = test_data(size);
229+
let tag = blobs.add_bytes(data.clone()).await?;
230+
// read all
231+
{
232+
let mut reader = blobs.reader(tag.hash);
233+
let mut buf = Vec::new();
234+
reader.read_to_end(&mut buf).await?;
235+
assert_eq!(buf, data);
236+
let pos = reader.stream_position().await?;
237+
assert_eq!(pos, data.len() as u64);
238+
}
239+
// seek to mid and read all
240+
{
241+
let mut reader = blobs.reader(tag.hash);
242+
let mid = size / 2;
243+
reader.seek(SeekFrom::Start(mid as u64)).await?;
244+
let mut buf = Vec::new();
245+
reader.read_to_end(&mut buf).await?;
246+
assert_eq!(buf, data[mid..].to_vec());
247+
let pos = reader.stream_position().await?;
248+
assert_eq!(pos, data.len() as u64);
249+
}
250+
}
251+
Ok(())
252+
}
253+
254+
async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
255+
for size in INTERESTING_SIZES {
256+
let data = test_data(size);
257+
let ranges = ChunkRanges::chunk(0);
258+
let (hash, bao) = create_n0_bao(&data, &ranges)?;
259+
println!("importing {} bytes", bao.len());
260+
blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
261+
// read the first chunk or the entire blob, whatever is smaller
262+
// this should work!
263+
{
264+
let mut reader = blobs.reader(hash);
265+
let valid = size.min(1024);
266+
let mut buf = vec![0u8; valid];
267+
reader.read_exact(&mut buf).await?;
268+
assert_eq!(buf, data[..valid]);
269+
let pos = reader.stream_position().await?;
270+
assert_eq!(pos, valid as u64);
271+
}
272+
if size > 1024 {
273+
// read the part we don't have - should immediately return an error
274+
{
275+
let mut reader = blobs.reader(hash);
276+
let mut rest = vec![0u8; size - 1024];
277+
reader.seek(SeekFrom::Start(1024)).await?;
278+
let res = reader.read_exact(&mut rest).await;
279+
assert!(res.is_err());
280+
}
281+
// read crossing the end of the blob - should return an error despite
282+
// the first bytes being valid.
283+
// A read that fails should not update the stream position.
284+
{
285+
let mut reader = blobs.reader(hash);
286+
let mut buf = vec![0u8; size];
287+
let res = reader.read(&mut buf).await;
288+
assert!(res.is_err());
289+
let pos = reader.stream_position().await?;
290+
assert_eq!(pos, 0);
291+
}
292+
}
293+
}
294+
Ok(())
295+
}
296+
297+
#[tokio::test]
298+
async fn reader_partial_fs() -> TestResult<()> {
299+
let testdir = tempfile::tempdir()?;
300+
let store = FsStore::load(testdir.path().to_owned()).await?;
301+
// reader_smoke_raw(store.blobs()).await?;
302+
reader_partial(store.blobs()).await?;
303+
Ok(())
304+
}
305+
306+
#[tokio::test]
307+
async fn reader_partial_memory() -> TestResult<()> {
308+
let store = MemStore::new();
309+
reader_partial(store.blobs()).await?;
310+
Ok(())
311+
}
312+
313+
#[tokio::test]
314+
async fn reader_smoke_fs() -> TestResult<()> {
315+
let testdir = tempfile::tempdir()?;
316+
let store = FsStore::load(testdir.path().to_owned()).await?;
317+
// reader_smoke_raw(store.blobs()).await?;
318+
reader_smoke(store.blobs()).await?;
319+
Ok(())
320+
}
321+
322+
#[tokio::test]
323+
async fn reader_smoke_memory() -> TestResult<()> {
324+
let store = MemStore::new();
325+
reader_smoke(store.blobs()).await?;
326+
Ok(())
327+
}
328+
}

0 commit comments

Comments
 (0)