Skip to content

Commit 42a2789

Browse files
authored
Merge aa970fe into ee3e710
2 parents ee3e710 + aa970fe commit 42a2789

File tree

13 files changed

+455
-153
lines changed

13 files changed

+455
-153
lines changed

examples/expiring-tags.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:
7373
// find tags to delete one by one and then delete them
7474
//
7575
// this allows us to print the tags before deleting them
76-
let mut tags = blobs.tags().list().await?;
76+
let mut tags = blobs.tags().list().stream();
7777
let mut to_delete = Vec::new();
7878
while let Some(tag) = tags.next().await {
7979
let tag = tag?.name;
@@ -102,7 +102,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:
102102

103103
async fn print_store_info(store: &Store) -> anyhow::Result<()> {
104104
let now = chrono::Utc::now();
105-
let mut tags = store.tags().list().await?;
105+
let mut tags = store.tags().list().stream();
106106
println!(
107107
"Current time: {}",
108108
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
@@ -112,7 +112,7 @@ async fn print_store_info(store: &Store) -> anyhow::Result<()> {
112112
let tag = tag?;
113113
println!(" {tag:?}");
114114
}
115-
let mut blobs = store.list().stream().await?;
115+
let mut blobs = store.list().stream();
116116
println!("Blobs:");
117117
while let Some(item) = blobs.next().await {
118118
println!(" {}", item?);

src/api/blobs.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ use super::{
5656
ApiClient, RequestResult, Tags,
5757
};
5858
use crate::{
59-
api::proto::{BatchRequest, ImportByteStreamUpdate},
59+
api::proto::{BatchRequest, ImportByteStreamUpdate, ListBlobsItem},
6060
provider::StreamContext,
6161
store::IROH_BLOCK_SIZE,
62-
util::temp_tag::TempTag,
62+
util::{
63+
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
64+
temp_tag::TempTag,
65+
},
6366
BlobFormat, Hash, HashAndFormat,
6467
};
6568

@@ -835,34 +838,48 @@ impl ImportBaoHandle {
835838

836839
/// A progress handle for a blobs list operation.
837840
pub struct BlobsListProgress {
838-
inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
841+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListBlobsItem>>>,
839842
}
840843

841844
impl BlobsListProgress {
842845
fn new(
843-
fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
846+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListBlobsItem>>> + Send + 'static,
844847
) -> Self {
845848
Self {
846849
inner: Box::pin(fut),
847850
}
848851
}
849852

850-
pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
851-
let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
852-
let mut hashes = Vec::new();
853-
while let Some(item) = rx.recv().await? {
854-
hashes.push(item?);
853+
pub async fn hashes(self) -> super::Result<Vec<Hash>> {
854+
self.inner.try_collect().await
855+
}
856+
857+
pub fn stream(self) -> impl Stream<Item = super::Result<Hash>> {
858+
self.inner.into_stream()
859+
}
860+
}
861+
862+
impl IrpcStreamItem for ListBlobsItem {
863+
type Error = super::Error;
864+
type Item = Hash;
865+
866+
fn into_result_opt(self) -> Option<Result<Hash, super::Error>> {
867+
match self {
868+
Self::Item(hash) => Some(Ok(hash)),
869+
Self::Error(e) => Some(Err(e)),
870+
Self::Done => None,
855871
}
856-
Ok(hashes)
857872
}
858873

859-
pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
860-
let mut rx = self.inner.await?;
861-
Ok(Gen::new(|co| async move {
862-
while let Ok(Some(item)) = rx.recv().await {
863-
co.yield_(item).await;
864-
}
865-
}))
874+
fn from_result(item: std::result::Result<Hash, super::Error>) -> Self {
875+
match item {
876+
Ok(hash) => Self::Item(hash),
877+
Err(e) => Self::Error(e),
878+
}
879+
}
880+
881+
fn done() -> Self {
882+
Self::Done
866883
}
867884
}
868885

src/api/proto.rs

Lines changed: 148 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
//! The file system store is quite complex and optimized, so to get started take a look at
1515
//! the much simpler memory store.
1616
use std::{
17+
collections::HashSet,
1718
fmt::{self, Debug},
19+
future::{Future, IntoFuture},
1820
io,
1921
num::NonZeroU64,
2022
ops::{Bound, RangeBounds},
@@ -32,13 +34,20 @@ use irpc::{
3234
channel::{mpsc, oneshot},
3335
rpc_requests,
3436
};
35-
use n0_future::Stream;
37+
use n0_future::{future, Stream};
3638
use range_collections::RangeSet2;
3739
use serde::{Deserialize, Serialize};
3840
pub(crate) mod bitfield;
3941
pub use bitfield::Bitfield;
4042

41-
use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
43+
use crate::{
44+
store::util::Tag,
45+
util::{
46+
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
47+
temp_tag::TempTag,
48+
},
49+
BlobFormat, Hash, HashAndFormat,
50+
};
4251

4352
pub(crate) trait HashSpecific {
4453
fn hash(&self) -> Hash;
@@ -89,7 +98,7 @@ impl HashSpecific for CreateTagMsg {
8998
#[rpc_requests(message = Command, alias = "Msg")]
9099
#[derive(Debug, Serialize, Deserialize)]
91100
pub enum Request {
92-
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
101+
#[rpc(tx = mpsc::Sender<ListBlobsItem>)]
93102
ListBlobs(ListRequest),
94103
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
95104
Batch(BatchRequest),
@@ -113,7 +122,7 @@ pub enum Request {
113122
ImportPath(ImportPathRequest),
114123
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
115124
ExportPath(ExportPathRequest),
116-
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
125+
#[rpc(tx = mpsc::Sender<ListTagsItem>)]
117126
ListTags(ListTagsRequest),
118127
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
119128
SetTag(SetTagRequest),
@@ -123,7 +132,7 @@ pub enum Request {
123132
RenameTag(RenameTagRequest),
124133
#[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
125134
CreateTag(CreateTagRequest),
126-
#[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
135+
#[rpc(tx = mpsc::Sender<ListTempTagsItem>)]
127136
ListTempTags(ListTempTagsRequest),
128137
#[rpc(tx = oneshot::Sender<TempTag>)]
129138
CreateTempTag(CreateTempTagRequest),
@@ -351,6 +360,109 @@ pub struct TagInfo {
351360
pub hash: Hash,
352361
}
353362

363+
#[derive(Debug, Serialize, Deserialize)]
364+
pub enum ListBlobsItem {
365+
Item(Hash),
366+
Error(super::Error),
367+
Done,
368+
}
369+
370+
#[derive(Debug, Serialize, Deserialize)]
371+
pub enum ListTagsItem {
372+
Item(TagInfo),
373+
Error(super::Error),
374+
Done,
375+
}
376+
377+
impl From<std::result::Result<TagInfo, super::Error>> for ListTagsItem {
378+
fn from(item: std::result::Result<TagInfo, super::Error>) -> Self {
379+
match item {
380+
Ok(item) => ListTagsItem::Item(item),
381+
Err(err) => ListTagsItem::Error(err),
382+
}
383+
}
384+
}
385+
386+
impl IrpcStreamItem for ListTagsItem {
387+
type Error = super::Error;
388+
type Item = TagInfo;
389+
390+
fn into_result_opt(self) -> Option<Result<TagInfo, super::Error>> {
391+
match self {
392+
ListTagsItem::Item(item) => Some(Ok(item)),
393+
ListTagsItem::Done => None,
394+
ListTagsItem::Error(err) => Some(Err(err)),
395+
}
396+
}
397+
398+
fn from_result(item: std::result::Result<TagInfo, super::Error>) -> Self {
399+
match item {
400+
Ok(i) => Self::Item(i),
401+
Err(e) => Self::Error(e),
402+
}
403+
}
404+
405+
fn done() -> Self {
406+
Self::Done
407+
}
408+
}
409+
410+
pub struct ListTempTagsProgress {
411+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTempTagsItem>>>,
412+
}
413+
414+
impl IntoFuture for ListTempTagsProgress {
415+
fn into_future(self) -> Self::IntoFuture {
416+
Box::pin(self.inner.try_collect())
417+
}
418+
419+
type IntoFuture = future::Boxed<Self::Output>;
420+
421+
type Output = super::Result<HashSet<HashAndFormat>>;
422+
}
423+
424+
impl ListTempTagsProgress {
425+
pub(super) fn new(
426+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTempTagsItem>>> + Send + 'static,
427+
) -> Self {
428+
Self {
429+
inner: Box::pin(fut),
430+
}
431+
}
432+
433+
pub fn stream(self) -> impl Stream<Item = super::Result<HashAndFormat>> {
434+
self.inner.into_stream()
435+
}
436+
}
437+
438+
pub struct ListTagsProgress {
439+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTagsItem>>>,
440+
}
441+
442+
impl IntoFuture for ListTagsProgress {
443+
fn into_future(self) -> Self::IntoFuture {
444+
Box::pin(self.inner.try_collect())
445+
}
446+
447+
type IntoFuture = future::Boxed<Self::Output>;
448+
449+
type Output = super::Result<Vec<TagInfo>>;
450+
}
451+
452+
impl ListTagsProgress {
453+
pub(super) fn new(
454+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTagsItem>>> + Send + 'static,
455+
) -> Self {
456+
Self {
457+
inner: Box::pin(fut),
458+
}
459+
}
460+
461+
pub fn stream(self) -> impl Stream<Item = super::Result<TagInfo>> {
462+
self.inner.into_stream()
463+
}
464+
}
465+
354466
impl From<TagInfo> for HashAndFormat {
355467
fn from(tag_info: TagInfo) -> Self {
356468
HashAndFormat {
@@ -410,6 +522,37 @@ pub struct CreateTempTagRequest {
410522
#[derive(Debug, Serialize, Deserialize)]
411523
pub struct ListTempTagsRequest;
412524

525+
#[derive(Debug, Serialize, Deserialize)]
526+
pub enum ListTempTagsItem {
527+
Item(HashAndFormat),
528+
Error(super::Error),
529+
Done,
530+
}
531+
532+
impl IrpcStreamItem for ListTempTagsItem {
533+
type Error = super::Error;
534+
type Item = HashAndFormat;
535+
536+
fn into_result_opt(self) -> Option<Result<HashAndFormat, super::Error>> {
537+
match self {
538+
ListTempTagsItem::Item(item) => Some(Ok(item)),
539+
ListTempTagsItem::Done => None,
540+
ListTempTagsItem::Error(err) => Some(Err(err)),
541+
}
542+
}
543+
544+
fn from_result(item: std::result::Result<HashAndFormat, super::Error>) -> Self {
545+
match item {
546+
Ok(i) => Self::Item(i),
547+
Err(e) => Self::Error(e),
548+
}
549+
}
550+
551+
fn done() -> Self {
552+
Self::Done
553+
}
554+
}
555+
413556
/// Rename a tag atomically
414557
#[derive(Debug, Serialize, Deserialize)]
415558
pub struct RenameTagRequest {

0 commit comments

Comments
 (0)