Skip to content

Commit 346f2c8

Browse files
authored
Merge eb2f9fb into 56b0695
2 parents 56b0695 + eb2f9fb commit 346f2c8

File tree

12 files changed

+612
-111
lines changed

12 files changed

+612
-111
lines changed

src/hash.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,12 @@ pub struct HashAndFormat {
256256
pub format: BlobFormat,
257257
}
258258

259+
impl From<Hash> for HashAndFormat {
260+
fn from(hash: Hash) -> Self {
261+
Self::raw(hash)
262+
}
263+
}
264+
259265
#[cfg(feature = "redb")]
260266
mod redb_support {
261267
use postcard::experimental::max_size::MaxSize;

src/rpc.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ impl<D: crate::store::Store> Handler<D> {
295295

296296
async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> {
297297
self.store()
298-
.set_tag(msg.name, None)
298+
.delete_tags(msg.from, msg.to)
299299
.await
300300
.map_err(|e| RpcError::new(&e))?;
301301
Ok(())
@@ -313,7 +313,7 @@ impl<D: crate::store::Store> Handler<D> {
313313
tracing::info!("blob_list_tags");
314314
let blobs = self;
315315
Gen::new(|co| async move {
316-
let tags = blobs.store().tags().await.unwrap();
316+
let tags = blobs.store().tags(msg.from, msg.to).await.unwrap();
317317
#[allow(clippy::manual_flatten)]
318318
for item in tags {
319319
if let Ok((name, HashAndFormat { hash, format })) = item {
@@ -393,13 +393,11 @@ impl<D: crate::store::Store> Handler<D> {
393393
blobs.store().sync().await.map_err(|e| RpcError::new(&e))?;
394394
}
395395
if let Some(batch) = msg.batch {
396-
if let Some(content) = msg.value.as_ref() {
397-
blobs
398-
.batches()
399-
.await
400-
.remove_one(batch, content)
401-
.map_err(|e| RpcError::new(&*e))?;
402-
}
396+
blobs
397+
.batches()
398+
.await
399+
.remove_one(batch, &msg.value)
400+
.map_err(|e| RpcError::new(&*e))?;
403401
}
404402
Ok(())
405403
}
@@ -572,10 +570,7 @@ impl<D: crate::store::Store> Handler<D> {
572570
let HashAndFormat { hash, format } = *hash_and_format;
573571
let tag = match tag {
574572
SetTagOption::Named(tag) => {
575-
blobs
576-
.store()
577-
.set_tag(tag.clone(), Some(*hash_and_format))
578-
.await?;
573+
blobs.store().set_tag(tag.clone(), *hash_and_format).await?;
579574
tag
580575
}
581576
SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?,
@@ -764,10 +759,7 @@ impl<D: crate::store::Store> Handler<D> {
764759
let HashAndFormat { hash, format } = hash_and_format;
765760
let tag = match msg.tag {
766761
SetTagOption::Named(tag) => {
767-
blobs
768-
.store()
769-
.set_tag(tag.clone(), Some(hash_and_format))
770-
.await?;
762+
blobs.store().set_tag(tag.clone(), hash_and_format).await?;
771763
tag
772764
}
773765
SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?,
@@ -907,7 +899,7 @@ impl<D: crate::store::Store> Handler<D> {
907899
SetTagOption::Named(tag) => {
908900
blobs
909901
.store()
910-
.set_tag(tag.clone(), Some(*hash_and_format))
902+
.set_tag(tag.clone(), *hash_and_format)
911903
.await
912904
.map_err(|e| RpcError::new(&e))?;
913905
tag
@@ -922,7 +914,7 @@ impl<D: crate::store::Store> Handler<D> {
922914
for tag in tags_to_delete {
923915
blobs
924916
.store()
925-
.set_tag(tag, None)
917+
.delete_tags(Some(tag.clone()), Some(tag.successor()))
926918
.await
927919
.map_err(|e| RpcError::new(&e))?;
928920
}
@@ -959,7 +951,7 @@ impl<D: crate::store::Store> Handler<D> {
959951
progress.send(DownloadProgress::AllDone(stats)).await.ok();
960952
match tag {
961953
SetTagOption::Named(tag) => {
962-
self.store().set_tag(tag, Some(hash_and_format)).await?;
954+
self.store().set_tag(tag, hash_and_format).await?;
963955
}
964956
SetTagOption::Auto => {
965957
self.store().create_tag(hash_and_format).await?;

src/rpc/client/blobs/batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ where
441441
.rpc
442442
.rpc(tags::SetRequest {
443443
name: tag,
444-
value: Some(tt.hash_and_format()),
444+
value: tt.hash_and_format(),
445445
batch: Some(self.0.batch),
446446
sync: SyncMode::Full,
447447
})

src/rpc/client/tags.rs

Lines changed: 225 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@
1010
//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format.
1111
//!
1212
//! [`Client::delete`] can be used to delete a tag.
13+
use std::ops::{Bound, RangeBounds};
14+
1315
use anyhow::Result;
1416
use futures_lite::{Stream, StreamExt};
1517
use quic_rpc::{client::BoxedConnector, Connector, RpcClient};
1618
use serde::{Deserialize, Serialize};
1719

1820
use crate::{
1921
rpc::proto::{
20-
tags::{DeleteRequest, ListRequest},
22+
tags::{DeleteRequest, ListRequest, SetRequest, SyncMode},
2123
RpcService,
2224
},
23-
BlobFormat, Hash, Tag,
25+
BlobFormat, Hash, HashAndFormat, Tag,
2426
};
2527

2628
/// Iroh tags client.
@@ -30,6 +32,147 @@ pub struct Client<C = BoxedConnector<RpcService>> {
3032
pub(super) rpc: RpcClient<RpcService, C>,
3133
}
3234

35+
/// Options for a list operation.
36+
#[derive(Debug, Clone)]
37+
pub struct ListOptions {
38+
/// List tags to hash seqs
39+
pub hash_seq: bool,
40+
/// List tags to raw blobs
41+
pub raw: bool,
42+
/// Optional from tag (inclusive)
43+
pub from: Option<Tag>,
44+
/// Optional to tag (exclusive)
45+
pub to: Option<Tag>,
46+
}
47+
48+
fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
49+
where
50+
R: RangeBounds<E>,
51+
E: AsRef<[u8]>,
52+
{
53+
let from = match range.start_bound() {
54+
Bound::Included(start) => Some(Tag::from(start.as_ref())),
55+
Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
56+
Bound::Unbounded => None,
57+
};
58+
let to = match range.end_bound() {
59+
Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
60+
Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
61+
Bound::Unbounded => None,
62+
};
63+
(from, to)
64+
}
65+
66+
impl ListOptions {
67+
/// List a range of tags
68+
pub fn range<R, E>(range: R) -> Self
69+
where
70+
R: RangeBounds<E>,
71+
E: AsRef<[u8]>,
72+
{
73+
let (from, to) = tags_from_range(range);
74+
Self {
75+
from,
76+
to,
77+
raw: true,
78+
hash_seq: true,
79+
}
80+
}
81+
82+
/// List tags with a prefix
83+
pub fn prefix(prefix: &[u8]) -> Self {
84+
let from = Tag::from(prefix);
85+
let to = from.next_prefix();
86+
Self {
87+
raw: true,
88+
hash_seq: true,
89+
from: Some(from),
90+
to,
91+
}
92+
}
93+
94+
/// List a single tag
95+
pub fn single(name: &[u8]) -> Self {
96+
let from = Tag::from(name);
97+
Self {
98+
to: Some(from.successor()),
99+
from: Some(from),
100+
raw: true,
101+
hash_seq: true,
102+
}
103+
}
104+
105+
/// List all tags
106+
pub fn all() -> Self {
107+
Self {
108+
raw: true,
109+
hash_seq: true,
110+
from: None,
111+
to: None,
112+
}
113+
}
114+
115+
/// List raw tags
116+
pub fn raw() -> Self {
117+
Self {
118+
raw: true,
119+
hash_seq: false,
120+
from: None,
121+
to: None,
122+
}
123+
}
124+
125+
/// List hash seq tags
126+
pub fn hash_seq() -> Self {
127+
Self {
128+
raw: false,
129+
hash_seq: true,
130+
from: None,
131+
to: None,
132+
}
133+
}
134+
}
135+
136+
/// Options for a delete operation.
137+
#[derive(Debug, Clone)]
138+
pub struct DeleteOptions {
139+
/// Optional from tag (inclusive)
140+
pub from: Option<Tag>,
141+
/// Optional to tag (exclusive)
142+
pub to: Option<Tag>,
143+
}
144+
145+
impl DeleteOptions {
146+
/// Delete a single tag
147+
pub fn single(name: &[u8]) -> Self {
148+
let name = Tag::from(name);
149+
Self {
150+
to: Some(name.successor()),
151+
from: Some(name),
152+
}
153+
}
154+
155+
/// Delete a range of tags
156+
pub fn range<R, E>(range: R) -> Self
157+
where
158+
R: RangeBounds<E>,
159+
E: AsRef<[u8]>,
160+
{
161+
let (from, to) = tags_from_range(range);
162+
Self { from, to }
163+
}
164+
165+
/// Delete tags with a prefix
166+
pub fn prefix(prefix: &[u8]) -> Self {
167+
let from = Tag::from(prefix);
168+
let to = from.next_prefix();
169+
Self {
170+
from: Some(from),
171+
to,
172+
}
173+
}
174+
}
175+
33176
/// A client that uses the memory connector.
34177
pub type MemClient = Client<crate::rpc::MemConnector>;
35178

@@ -42,27 +185,100 @@ where
42185
Self { rpc }
43186
}
44187

188+
/// List all tags with options.
189+
///
190+
/// This is the most flexible way to list tags. All the other list methods are just convenience
191+
/// methods that call this one with the appropriate options.
192+
pub async fn list_with_opts(
193+
&self,
194+
options: ListOptions,
195+
) -> Result<impl Stream<Item = Result<TagInfo>>> {
196+
let stream = self
197+
.rpc
198+
.server_streaming(ListRequest::from(options))
199+
.await?;
200+
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
201+
}
202+
203+
/// Set the value for a single tag
204+
pub async fn set(&self, name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Result<()> {
205+
self.rpc
206+
.rpc(SetRequest {
207+
name: Tag::from(name.as_ref()),
208+
value: value.into(),
209+
batch: None,
210+
sync: SyncMode::Full,
211+
})
212+
.await??;
213+
Ok(())
214+
}
215+
216+
/// Get the value of a single tag
217+
pub async fn get(&self, name: impl AsRef<[u8]>) -> Result<Option<TagInfo>> {
218+
let mut stream = self
219+
.list_with_opts(ListOptions::single(name.as_ref()))
220+
.await?;
221+
stream.next().await.transpose()
222+
}
223+
224+
/// List a range of tags
225+
pub async fn list_range<R, E>(&self, range: R) -> Result<impl Stream<Item = Result<TagInfo>>>
226+
where
227+
R: RangeBounds<E>,
228+
E: AsRef<[u8]>,
229+
{
230+
self.list_with_opts(ListOptions::range(range)).await
231+
}
232+
233+
/// Lists all tags with the given prefix.
234+
pub async fn list_prefix(
235+
&self,
236+
prefix: impl AsRef<[u8]>,
237+
) -> Result<impl Stream<Item = Result<TagInfo>>> {
238+
self.list_with_opts(ListOptions::prefix(prefix.as_ref()))
239+
.await
240+
}
241+
45242
/// Lists all tags.
46243
pub async fn list(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
47-
let stream = self.rpc.server_streaming(ListRequest::all()).await?;
48-
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
244+
self.list_with_opts(ListOptions::all()).await
49245
}
50246

51247
/// Lists all tags with a hash_seq format.
52248
pub async fn list_hash_seq(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
53-
let stream = self.rpc.server_streaming(ListRequest::hash_seq()).await?;
54-
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
249+
self.list_with_opts(ListOptions::hash_seq()).await
55250
}
56251

57252
/// Deletes a tag.
58-
pub async fn delete(&self, name: Tag) -> Result<()> {
59-
self.rpc.rpc(DeleteRequest { name }).await??;
253+
pub async fn delete_with_opts(&self, options: DeleteOptions) -> Result<()> {
254+
self.rpc.rpc(DeleteRequest::from(options)).await??;
60255
Ok(())
61256
}
257+
258+
/// Deletes a tag.
259+
pub async fn delete(&self, name: impl AsRef<[u8]>) -> Result<()> {
260+
self.delete_with_opts(DeleteOptions::single(name.as_ref()))
261+
.await
262+
}
263+
264+
/// Deletes a range of tags.
265+
pub async fn delete_range<R, E>(&self, range: R) -> Result<()>
266+
where
267+
R: RangeBounds<E>,
268+
E: AsRef<[u8]>,
269+
{
270+
self.delete_with_opts(DeleteOptions::range(range)).await
271+
}
272+
273+
/// Delete all tags with the given prefix.
274+
pub async fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> Result<()> {
275+
self.delete_with_opts(DeleteOptions::prefix(prefix.as_ref()))
276+
.await
277+
}
62278
}
63279

64280
/// Information about a tag.
65-
#[derive(Debug, Serialize, Deserialize)]
281+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
66282
pub struct TagInfo {
67283
/// Name of the tag
68284
pub name: Tag,

0 commit comments

Comments
 (0)