Skip to content

Commit

Permalink
feat(iroh-bytes): add initial query request
Browse files Browse the repository at this point in the history
not implemented yet, just the request and docs for it
  • Loading branch information
rklaehn committed Aug 26, 2023
1 parent 116eea9 commit 354c33b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 1 deletion.
9 changes: 9 additions & 0 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ pub mod fsm {
postcard::from_bytes::<GetRequest>(&response)
.map_err(ConnectedNextError::PostcardDe)?
}
Request::Query(_) => {
// todo: take the query request out of the state machine entirely
// the state machine is really for get requests and custom get requests,
// not for query requests
return Err(ConnectedNextError::Io(io::Error::new(
io::ErrorKind::InvalidInput,
"query request not supported",
)));
}
};
let hash = request.hash;
let ranges_iter = RangesIter::new(request.ranges);
Expand Down
79 changes: 79 additions & 0 deletions iroh-bytes/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ pub enum Request {
Get(GetRequest),
/// A get request that allows the receiver to create a collection
CustomGet(CustomGetRequest),
/// A query request for a blob or collection
Query(QueryRequest),
}

impl Request {
Expand All @@ -441,6 +443,7 @@ impl Request {
match self {
Request::Get(get) => get.token(),
Request::CustomGet(get) => get.token.as_ref(),
Request::Query(query) => query.token(),
}
}

Expand All @@ -449,6 +452,7 @@ impl Request {
match &mut self {
Request::Get(get) => get.token = value,
Request::CustomGet(get) => get.token = value,
Request::Query(query) => query.token = value,
}
self
}
Expand Down Expand Up @@ -517,6 +521,81 @@ impl GetRequest {
}
}

/// A query request
///
/// A query request is identical to a get request, except that it describes a
/// set of chunks for which we want availability information.
///
/// The answer to a query request is a set of chunks that are available, encoded
/// as a [`RangeSpecSeq`].
///
/// The provider has to compute the availability information only for the requested
/// ranges, not for the entire blob or collection. The response must contain
/// availability information for the requested ranges, but may contain availability
/// information for additional ranges if expedient. E.g. if the provider has
/// a blob completely available, it may send `RangeSet2::all()` for that blob
/// instead of limiting the response to the requested ranges.
///
/// However, the provider should take care to minimize the size of the response.
/// E.g. if we have complex availability information for a range that was not
/// requested, it should be omitted.
///
/// The availability information is a snapshot at one point in time.
/// There is no guarantee that the availability won't change.
///
/// E.g. availability might increase when the provider is itself downloading
/// data from other providers, or decrease when the provider is deleting data.
#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
pub struct QueryRequest {
/// blake3 hash
pub hash: Hash,
/// The range of data we request availability information for
///
/// The first element is the parent, all subsequent elements are children.
pub ranges: RangeSpecSeq,
/// Optional Request token
token: Option<RequestToken>,
}

impl QueryRequest {
/// Request a blob or collection with specified ranges
pub fn new(hash: Hash, ranges: RangeSpecSeq) -> Self {
Self {
hash,
ranges,
token: None,
}
}

/// Request a collection and all its children
pub fn all(hash: Hash) -> Self {
Self {
hash,
token: None,
ranges: RangeSpecSeq::all(),
}
}

/// Request just a single blob
pub fn single(hash: Hash) -> Self {
Self {
hash,
token: None,
ranges: RangeSpecSeq::from_ranges([RangeSet2::all()]),
}
}

/// Set the request token
pub fn with_token(self, token: Option<RequestToken>) -> Self {
Self { token, ..self }
}

/// Get the request token
pub fn token(&self) -> Option<&RequestToken> {
self.token.as_ref()
}
}

/// Write the given data to the provider sink, with a unsigned varint length prefix.
pub async fn write_lp<W: AsyncWrite + Unpin>(writer: &mut W, data: &[u8]) -> Result<()> {
ensure!(
Expand Down
17 changes: 16 additions & 1 deletion iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use anyhow::{Context, Result};
use bao_tree::io::fsm::{encode_ranges_validated, Outboard};
use bytes::Bytes;
use futures::future::BoxFuture;
use quinn::VarInt;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWrite;
use tracing::{debug, debug_span, warn};
use tracing_futures::Instrument;

use crate::baomap::*;
use crate::collection::CollectionParser;
use crate::protocol::{write_lp, CustomGetRequest, GetRequest, RangeSpec, Request, RequestToken};
use crate::protocol::{
write_lp, CustomGetRequest, GetRequest, QueryRequest, RangeSpec, Request, RequestToken,
};
use crate::util::RpcError;
use crate::Hash;

Expand Down Expand Up @@ -434,6 +437,7 @@ async fn handle_stream<D: Map, E: EventSender, C: CollectionParser>(
Request::CustomGet(request) => {
handle_custom_get(db, request, writer, custom_get_handler, collection_parser).await
}
Request::Query(request) => handle_query(db, request, collection_parser, writer).await,
}
}
async fn handle_custom_get<E: EventSender, D: Map, C: CollectionParser>(
Expand Down Expand Up @@ -521,6 +525,17 @@ pub async fn handle_get<D: Map, E: EventSender, C: CollectionParser>(
Ok(())
}

/// Handle a single standard get request.
pub async fn handle_query<D: Map, E: EventSender, C: CollectionParser>(
_db: D,
_request: QueryRequest,
_collection_parser: C,
mut writer: ResponseWriter<E>,
) -> Result<()> {
writer.inner.reset(VarInt::from_u32(1))?;
anyhow::bail!("query not implemented");
}

/// A helper struct that combines a quinn::SendStream with auxiliary information
#[derive(Debug)]
pub struct ResponseWriter<E> {
Expand Down

0 comments on commit 354c33b

Please sign in to comment.