Skip to content

Commit 4f2f3be

Browse files
authored
Merge f75b275 into 56b0695
2 parents 56b0695 + f75b275 commit 4f2f3be

File tree

11 files changed

+243
-33
lines changed

11 files changed

+243
-33
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ async-channel = "2.3.1"
1818
bao-tree = { version = "0.13", features = [
1919
"tokio_fsm",
2020
"validate",
21+
"experimental-mixed",
2122
], default-features = false }
2223
blake3 = { version = "1.4.5", package = "iroh-blake3" }
2324
bytes = { version = "1.7", features = ["serde"] }
@@ -77,6 +78,7 @@ walkdir = { version = "2.5.0", optional = true }
7778
# Examples
7879
console = { version = "0.15.8", optional = true }
7980
tracing-test = "0.2.5"
81+
positioned-io = "0.3.3"
8082

8183
[dev-dependencies]
8284
http-body = "1.0"
@@ -189,3 +191,4 @@ incremental = false
189191
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
190192
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
191193
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" }
194+
bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "read_and_seek" }

src/rpc.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use proto::{
2323
AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate,
2424
BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse,
2525
BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest,
26-
BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest,
27-
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse,
28-
ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest,
29-
ReadAtResponse, ValidateRequest,
26+
BatchUpdate, BlobEntryInfoRequest, BlobStatusRequest, BlobStatusResponse,
27+
ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest,
28+
DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest,
29+
ReadAtRequest, ReadAtResponse, ValidateRequest,
3030
},
3131
tags::{
3232
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
@@ -51,7 +51,9 @@ use crate::{
5151
},
5252
net_protocol::{BlobDownloadRequest, Blobs, BlobsInner},
5353
provider::{AddProgress, BatchAddPathProgress},
54-
store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress},
54+
store::{
55+
ConsistencyCheckProgress, EntryPathOrData, ImportProgress, MapEntry, ValidateProgress,
56+
},
5557
util::{
5658
local_pool::LocalPoolHandle,
5759
progress::{AsyncChannelProgressSender, ProgressSender},
@@ -203,6 +205,7 @@ impl<D: crate::store::Store> Handler<D> {
203205
.await
204206
}
205207
BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await,
208+
EntryInfo(msg) => chan.rpc(msg, self, Self::blob_entry_info).await,
206209
}
207210
}
208211

@@ -309,6 +312,17 @@ impl<D: crate::store::Store> Handler<D> {
309312
Ok(())
310313
}
311314

315+
async fn blob_entry_info(
316+
self,
317+
msg: BlobEntryInfoRequest,
318+
) -> RpcResult<Option<EntryPathOrData>> {
319+
Ok(self
320+
.store()
321+
.entry_path_or_data(msg.hash)
322+
.await
323+
.map_err(|e| RpcError::new(&e))?)
324+
}
325+
312326
fn blob_list_tags(self, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
313327
tracing::info!("blob_list_tags");
314328
let blobs = self;

src/rpc/client/blobs.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ use std::{
6666
};
6767

6868
use anyhow::{anyhow, Context as _, Result};
69+
use bao_tree::{
70+
io::{baofile::BaoFile, outboard::PreOrderOutboard},
71+
BaoTree,
72+
};
6973
use bytes::Bytes;
7074
use futures_lite::{Stream, StreamExt};
7175
use futures_util::SinkExt;
@@ -87,10 +91,10 @@ use crate::{
8791
format::collection::{Collection, SimpleStore},
8892
get::db::DownloadProgress as BytesDownloadProgress,
8993
net_protocol::BlobDownloadRequest,
90-
rpc::proto::RpcService,
94+
rpc::proto::{blobs::BlobEntryInfoRequest, RpcService},
9195
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
92-
util::SetTagOption,
93-
BlobFormat, Hash, Tag,
96+
util::{MemOrFile, SetTagOption},
97+
BlobFormat, Hash, Tag, IROH_BLOCK_SIZE,
9498
};
9599

96100
mod batch;
@@ -380,6 +384,31 @@ where
380384
))
381385
}
382386

387+
/// Open a blob as an independent bao file
388+
pub async fn open(&self, hash: Hash) -> Result<impl std::io::Read + std::io::Seek> {
389+
let Some(info) = self.rpc.rpc(BlobEntryInfoRequest { hash }).await?? else {
390+
return Err(anyhow!("Blob not found"));
391+
};
392+
let (data, size) = match info.data {
393+
MemOrFile::Mem(data) => (MemOrFile::Mem(data.clone()), data.len() as u64),
394+
MemOrFile::File((path, size)) => (MemOrFile::File(std::fs::File::open(path)?), size),
395+
};
396+
let outboard = match info.outboard {
397+
MemOrFile::Mem(data) => MemOrFile::Mem(data.clone()),
398+
MemOrFile::File(path) => MemOrFile::File(std::fs::File::open(path)?),
399+
};
400+
let file = BaoFile {
401+
data,
402+
outboard: PreOrderOutboard {
403+
tree: BaoTree::new(size, IROH_BLOCK_SIZE),
404+
root: hash.into(),
405+
data: outboard,
406+
},
407+
};
408+
let file = positioned_io::Cursor::new(file);
409+
Ok(file)
410+
}
411+
383412
/// Export a blob from the internal blob store to a path on the node's filesystem.
384413
///
385414
/// `destination` should be an writeable, absolute path on the local node's filesystem.

src/rpc/proto/blobs.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use crate::{
1515
provider::{AddProgress, BatchAddPathProgress},
1616
rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption},
1717
store::{
18-
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode,
19-
ValidateProgress,
18+
BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, ExportFormat, ExportMode,
19+
ImportMode, ValidateProgress,
2020
},
2121
util::SetTagOption,
2222
BlobFormat, Hash, HashAndFormat, Tag,
@@ -63,6 +63,8 @@ pub enum Request {
6363
BatchAddPath(BatchAddPathRequest),
6464
#[rpc(response = RpcResult<()>)]
6565
BatchCreateTempTag(BatchCreateTempTagRequest),
66+
#[rpc(response = RpcResult<Option<EntryPathOrData>>)]
67+
EntryInfo(BlobEntryInfoRequest),
6668
}
6769

6870
#[allow(missing_docs)]
@@ -83,6 +85,7 @@ pub enum Response {
8385
BatchCreate(BatchCreateResponse),
8486
BatchAddStream(BatchAddStreamResponse),
8587
BatchAddPath(BatchAddPathResponse),
88+
EntryInfo(RpcResult<Option<EntryPathOrData>>),
8689
}
8790

8891
/// A request to the node to provide the data at the given path
@@ -313,6 +316,13 @@ pub struct BatchAddPathRequest {
313316
pub batch: BatchId,
314317
}
315318

319+
/// Write a blob from a byte stream
320+
#[derive(Serialize, Deserialize, Debug)]
321+
pub struct BlobEntryInfoRequest {
322+
/// The hash of the blob
323+
pub hash: Hash,
324+
}
325+
316326
/// Response to a batch add path request
317327
#[derive(Serialize, Deserialize, Debug)]
318328
pub struct BatchAddPathResponse(pub BatchAddPathProgress);

src/store/bao_file.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,7 @@ impl FileStorage {
189189
}
190190

191191
fn current_size(&self) -> io::Result<u64> {
192-
let len = self.sizes.metadata()?.len();
193-
if len < 8 {
194-
Ok(0)
195-
} else {
196-
// todo: use the last full u64 in case the sizes file is not a multiple of 8
197-
// bytes. Not sure how that would happen, but we should handle it.
198-
let mut buf = [0u8; 8];
199-
self.sizes.read_exact_at(len - 8, &mut buf)?;
200-
Ok(u64::from_le_bytes(buf))
201-
}
192+
read_current_size(&self.sizes)
202193
}
203194

204195
fn write_batch(&mut self, size: u64, batch: &[BaoContentItem]) -> io::Result<()> {
@@ -470,6 +461,18 @@ impl AsyncSliceReader for OutboardReader {
470461
}
471462
}
472463

464+
pub fn read_current_size(sizes: &File) -> io::Result<u64> {
465+
let len = sizes.metadata()?.len();
466+
if len < 8 {
467+
Ok(0)
468+
} else {
469+
let len = len & !7;
470+
let mut buf = [0u8; 8];
471+
sizes.read_exact_at(len - 8, &mut buf)?;
472+
Ok(u64::from_le_bytes(buf))
473+
}
474+
}
475+
473476
enum HandleChange {
474477
None,
475478
MemToFile,

src/store/fs.rs

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ use tables::{ReadOnlyTables, ReadableTables, Tables};
9898

9999
use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
100100
use super::{
101-
bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
102-
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
103-
ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
101+
bao_file::{read_current_size, BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
102+
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, EntryStatus,
103+
ExportMode, ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
104104
};
105105
use crate::{
106106
store::{
@@ -532,6 +532,10 @@ pub(crate) enum ActorMessage {
532532
hash: Hash,
533533
tx: oneshot::Sender<ActorResult<EntryStatus>>,
534534
},
535+
EntryPathOrData {
536+
hash: Hash,
537+
tx: oneshot::Sender<ActorResult<Option<EntryPathOrData>>>,
538+
},
535539
#[cfg(test)]
536540
/// Query method: get the full entry state for a hash, both in memory and in redb.
537541
/// This is everything we got about the entry, including the actual inline outboard and data.
@@ -664,6 +668,7 @@ impl ActorMessage {
664668
| Self::Tags { .. }
665669
| Self::GcStart { .. }
666670
| Self::GetFullEntryState { .. }
671+
| Self::EntryPathOrData { .. }
667672
| Self::Dump => MessageCategory::ReadOnly,
668673
Self::Import { .. }
669674
| Self::Export { .. }
@@ -870,6 +875,14 @@ impl StoreInner {
870875
Ok(tags)
871876
}
872877

878+
async fn entry_path_or_data(&self, hash: Hash) -> OuterResult<Option<EntryPathOrData>> {
879+
let (tx, rx) = oneshot::channel();
880+
self.tx
881+
.send(ActorMessage::EntryPathOrData { hash, tx })
882+
.await?;
883+
Ok(rx.await??)
884+
}
885+
873886
async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
874887
let (tx, rx) = oneshot::channel();
875888
self.tx
@@ -1371,6 +1384,10 @@ impl super::Store for Store {
13711384
.await??)
13721385
}
13731386

1387+
async fn entry_path_or_data(&self, hash: Hash) -> io::Result<Option<EntryPathOrData>> {
1388+
Ok(self.0.entry_path_or_data(hash).await?)
1389+
}
1390+
13741391
async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
13751392
Ok(self.0.set_tag(name, hash).await?)
13761393
}
@@ -2266,6 +2283,65 @@ impl ActorState {
22662283
Ok(())
22672284
}
22682285

2286+
fn entry_path_or_data(
2287+
&mut self,
2288+
tables: &impl ReadableTables,
2289+
hash: Hash,
2290+
) -> ActorResult<Option<EntryPathOrData>> {
2291+
let data_path = || self.options.path.owned_data_path(&hash);
2292+
let outboard_path = || self.options.path.owned_outboard_path(&hash);
2293+
let sizes_path = || self.options.path.owned_sizes_path(&hash);
2294+
Ok(match tables.blobs().get(hash)? {
2295+
Some(guard) => match guard.value() {
2296+
EntryState::Complete {
2297+
data_location,
2298+
outboard_location,
2299+
} => {
2300+
let data = match data_location {
2301+
DataLocation::External(paths, size) => {
2302+
let path = paths.first().ok_or_else(|| {
2303+
ActorError::Inconsistent("external data missing".to_owned())
2304+
})?;
2305+
MemOrFile::File((path.clone(), size))
2306+
}
2307+
DataLocation::Owned(size) => MemOrFile::File((data_path(), size)),
2308+
DataLocation::Inline(_) => {
2309+
let data = tables.inline_data().get(hash)?.ok_or_else(|| {
2310+
ActorError::Inconsistent("inline data missing".to_owned())
2311+
})?;
2312+
MemOrFile::Mem(data.value().to_vec().into())
2313+
}
2314+
};
2315+
let outboard = match outboard_location {
2316+
OutboardLocation::Owned => MemOrFile::File(outboard_path()),
2317+
OutboardLocation::Inline(_) => MemOrFile::Mem(
2318+
tables
2319+
.inline_outboard()
2320+
.get(hash)?
2321+
.ok_or_else(|| {
2322+
ActorError::Inconsistent("inline outboard missing".to_owned())
2323+
})?
2324+
.value()
2325+
.to_vec()
2326+
.into(),
2327+
),
2328+
OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2329+
};
2330+
Some(EntryPathOrData { data, outboard })
2331+
}
2332+
EntryState::Partial { .. } => {
2333+
let sizes = std::fs::File::open(sizes_path())?;
2334+
let size = read_current_size(&sizes)?;
2335+
Some(EntryPathOrData {
2336+
data: MemOrFile::File((data_path(), size)),
2337+
outboard: MemOrFile::File(outboard_path()),
2338+
})
2339+
}
2340+
},
2341+
None => None,
2342+
})
2343+
}
2344+
22692345
fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
22702346
match msg {
22712347
ActorMessage::UpdateInlineOptions {
@@ -2339,6 +2415,10 @@ impl ActorState {
23392415
let res = self.get_full_entry_state(tables, hash);
23402416
tx.send(res).ok();
23412417
}
2418+
ActorMessage::EntryPathOrData { hash, tx } => {
2419+
let res = self.entry_path_or_data(tables, hash);
2420+
tx.send(res).ok();
2421+
}
23422422
x => return Ok(Err(x)),
23432423
}
23442424
Ok(Ok(()))

0 commit comments

Comments
 (0)