Skip to content

Commit 2cbdc7d

Browse files
authored
Merge e199899 into 56b0695
2 parents 56b0695 + e199899 commit 2cbdc7d

File tree

4 files changed

+111
-1
lines changed

4 files changed

+111
-1
lines changed

src/rpc.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,16 @@ impl<D: crate::store::Store> Handler<D> {
317317
#[allow(clippy::manual_flatten)]
318318
for item in tags {
319319
if let Ok((name, HashAndFormat { hash, format })) = item {
320+
if let Some(from) = msg.from.as_ref() {
321+
if &name < from {
322+
continue;
323+
}
324+
}
325+
if let Some(to) = msg.to.as_ref() {
326+
if &name >= to {
327+
break;
328+
}
329+
}
320330
if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) {
321331
co.yield_(TagInfo { name, hash, format }).await;
322332
}

src/rpc/client/tags.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,27 @@ where
4242
Self { rpc }
4343
}
4444

45+
/// Get the value of a single tag
46+
pub async fn get(&self, name: impl AsRef<[u8]>) -> Result<Option<TagInfo>> {
47+
let mut stream = self
48+
.rpc
49+
.server_streaming(ListRequest::single(name.as_ref()))
50+
.await?;
51+
Ok(stream.next().await.transpose()?)
52+
}
53+
54+
/// Lists all tags.
55+
pub async fn list_prefix(
56+
&self,
57+
prefix: impl AsRef<[u8]>,
58+
) -> Result<impl Stream<Item = Result<TagInfo>>> {
59+
let stream = self
60+
.rpc
61+
.server_streaming(ListRequest::prefix(prefix.as_ref()))
62+
.await?;
63+
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
64+
}
65+
4566
/// Lists all tags.
4667
pub async fn list(&self) -> Result<impl Stream<Item = Result<TagInfo>>> {
4768
let stream = self.rpc.server_streaming(ListRequest::all()).await?;

src/rpc/proto/tags.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
//! Tags RPC protocol
2+
use bytes::Bytes;
23
use nested_enum_utils::enum_conversions;
34
use quic_rpc_derive::rpc_requests;
45
use serde::{Deserialize, Serialize};
56

67
use super::{RpcResult, RpcService};
7-
use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag};
8+
use crate::{
9+
net_protocol::BatchId,
10+
rpc::client::tags::TagInfo,
11+
util::{increment_vec, next_prefix},
12+
HashAndFormat, Tag,
13+
};
814

915
#[allow(missing_docs)]
1016
#[derive(strum::Display, Debug, Serialize, Deserialize)]
@@ -73,14 +79,53 @@ pub struct ListRequest {
7379
pub raw: bool,
7480
/// List hash seq tags
7581
pub hash_seq: bool,
82+
/// From tag
83+
pub from: Option<Tag>,
84+
/// To tag (exclusive)
85+
pub to: Option<Tag>,
7686
}
7787

7888
impl ListRequest {
89+
/// List tags with a prefix
90+
pub fn prefix(prefix: &[u8]) -> Self {
91+
let from = prefix.to_vec();
92+
let mut to = from.clone();
93+
let from = Bytes::from(from).into();
94+
let to = if next_prefix(&mut to) {
95+
Some(Bytes::from(to).into())
96+
} else {
97+
None
98+
};
99+
Self {
100+
raw: true,
101+
hash_seq: true,
102+
from: Some(from),
103+
to,
104+
}
105+
}
106+
107+
/// List a single tag
108+
pub fn single(name: &[u8]) -> Self {
109+
let from = name.to_vec();
110+
let mut next = from.clone();
111+
increment_vec(&mut next);
112+
let from = Bytes::from(from).into();
113+
let to = Bytes::from(next).into();
114+
Self {
115+
raw: true,
116+
hash_seq: true,
117+
from: Some(from),
118+
to: Some(to),
119+
}
120+
}
121+
79122
/// List all tags
80123
pub fn all() -> Self {
81124
Self {
82125
raw: true,
83126
hash_seq: true,
127+
from: None,
128+
to: None,
84129
}
85130
}
86131

@@ -89,6 +134,8 @@ impl ListRequest {
89134
Self {
90135
raw: true,
91136
hash_seq: false,
137+
from: None,
138+
to: None,
92139
}
93140
}
94141

@@ -97,6 +144,8 @@ impl ListRequest {
97144
Self {
98145
raw: false,
99146
hash_seq: true,
147+
from: None,
148+
to: None,
100149
}
101150
}
102151
}

src/util.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,36 @@ pub(crate) fn raw_outboard_size(size: u64) -> u64 {
302302
BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size()
303303
}
304304

305+
/// Given a prefix, increment it lexographically.
306+
///
307+
/// If the prefix is all FF, this will return false because there is no
308+
/// higher prefix than that.
309+
#[allow(dead_code)]
310+
pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
311+
for byte in bytes.iter_mut().rev() {
312+
if *byte < 255 {
313+
*byte += 1;
314+
return true;
315+
}
316+
*byte = 0;
317+
}
318+
false
319+
}
320+
321+
/// Increment a byte vector, lexographically.
322+
#[allow(dead_code)]
323+
pub(crate) fn increment_vec(bytes: &mut Vec<u8>) {
324+
for byte in bytes.iter_mut().rev() {
325+
if *byte < 255 {
326+
*byte += 1;
327+
return;
328+
}
329+
*byte = 0;
330+
}
331+
332+
bytes.push(0);
333+
}
334+
305335
/// Synchronously compute the outboard of a file, and return hash and outboard.
306336
///
307337
/// It is assumed that the file is not modified while this is running.

0 commit comments

Comments
 (0)