Skip to content

Commit c742d55

Browse files
authored
Merge 525440a into a279ad1
2 parents a279ad1 + 525440a commit c742d55

File tree

25 files changed

+592
-634
lines changed

25 files changed

+592
-634
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ futures-util = "0.3.30"
9696
testdir = "0.9.1"
9797

9898
[features]
99-
default = ["fs-store", "net_protocol"]
99+
default = ["fs-store", "net_protocol", "formats-collection"]
100100
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
101101
net_protocol = ["downloader", "dep:futures-util"]
102102
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
@@ -114,6 +114,8 @@ rpc = [
114114
"dep:ssh-key",
115115
"downloader",
116116
]
117+
formats = []
118+
formats-collection = ["formats"]
117119

118120
example-iroh = [
119121
"dep:clap",
@@ -129,6 +131,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]
129131

130132
[[example]]
131133
name = "provide-bytes"
134+
required-features = ["formats-collection"]
132135

133136
[[example]]
134137
name = "fetch-fsm"

examples/hello-world-fetch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ async fn main() -> Result<()> {
6767
"'Hello World' example expects to fetch a single blob, but the ticket indicates a collection.",
6868
);
6969

70-
// `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress
70+
// `download` returns a stream of `DownloadProgressEvent`. You can iterate through these updates to get progress
7171
// on the state of your download.
7272
let download_stream = blobs_client
7373
.download(ticket.hash(), ticket.node_addr().clone())
7474
.await?;
7575

76-
// You can also just `await` the stream, which will poll the `DownloadProgress` stream for you.
76+
// You can also just `await` the stream, which will poll the `DownloadProgressEvent` stream for you.
7777
let outcome = download_stream.await.context("unable to download hash")?;
7878

7979
println!(

examples/local-swarm-discovery.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,14 @@ mod progress {
140140
ProgressStyle,
141141
};
142142
use iroh_blobs::{
143-
get::{db::DownloadProgress, progress::BlobProgress, Stats},
143+
get::Stats,
144+
rpc::client::blobs::{BlobProgressEvent, DownloadProgressEvent},
144145
Hash,
145146
};
146147

147148
pub async fn show_download_progress(
148149
hash: Hash,
149-
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
150+
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
150151
) -> Result<()> {
151152
eprintln!("Fetching: {}", hash);
152153
let mp = MultiProgress::new();
@@ -157,7 +158,7 @@ mod progress {
157158
let mut seq = false;
158159
while let Some(x) = stream.next().await {
159160
match x? {
160-
DownloadProgress::InitialState(state) => {
161+
DownloadProgressEvent::InitialState(state) => {
161162
if state.connected {
162163
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
163164
}
@@ -177,21 +178,21 @@ mod progress {
177178
ip.set_length(size.value());
178179
ip.reset();
179180
match blob.progress {
180-
BlobProgress::Pending => {}
181-
BlobProgress::Progressing(offset) => ip.set_position(offset),
182-
BlobProgress::Done => ip.finish_and_clear(),
181+
BlobProgressEvent::Pending => {}
182+
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
183+
BlobProgressEvent::Done => ip.finish_and_clear(),
183184
}
184185
if !seq {
185186
op.finish_and_clear();
186187
}
187188
}
188189
}
189190
}
190-
DownloadProgress::FoundLocal { .. } => {}
191-
DownloadProgress::Connected => {
191+
DownloadProgressEvent::FoundLocal { .. } => {}
192+
DownloadProgressEvent::Connected => {
192193
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
193194
}
194-
DownloadProgress::FoundHashSeq { children, .. } => {
195+
DownloadProgressEvent::FoundHashSeq { children, .. } => {
195196
op.set_message(format!(
196197
"{} Downloading {} blob(s)\n",
197198
style("[3/3]").bold().dim(),
@@ -201,7 +202,7 @@ mod progress {
201202
op.reset();
202203
seq = true;
203204
}
204-
DownloadProgress::Found { size, child, .. } => {
205+
DownloadProgressEvent::Found { size, child, .. } => {
205206
if seq {
206207
op.set_position(child.into());
207208
} else {
@@ -210,13 +211,13 @@ mod progress {
210211
ip.set_length(size);
211212
ip.reset();
212213
}
213-
DownloadProgress::Progress { offset, .. } => {
214+
DownloadProgressEvent::Progress { offset, .. } => {
214215
ip.set_position(offset);
215216
}
216-
DownloadProgress::Done { .. } => {
217+
DownloadProgressEvent::Done { .. } => {
217218
ip.finish_and_clear();
218219
}
219-
DownloadProgress::AllDone(Stats {
220+
DownloadProgressEvent::AllDone(Stats {
220221
bytes_read,
221222
elapsed,
222223
..
@@ -230,7 +231,7 @@ mod progress {
230231
);
231232
break;
232233
}
233-
DownloadProgress::Abort(e) => {
234+
DownloadProgressEvent::Abort(e) => {
234235
bail!("download aborted: {}", e);
235236
}
236237
}

src/cli.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
1919
use tokio::io::AsyncWriteExt;
2020

2121
use crate::{
22-
get::{db::DownloadProgress, progress::BlobProgress, Stats},
23-
net_protocol::DownloadMode,
24-
provider::AddProgress,
22+
get::Stats,
2523
rpc::client::blobs::{
26-
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,
24+
self, AddProgressEvent, BlobInfo, BlobProgressEvent, BlobStatus, CollectionInfo,
25+
DownloadMode, DownloadOptions, DownloadProgressEvent, IncompleteBlobInfo, WrapOption,
2726
},
2827
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
2928
ticket::BlobTicket,
@@ -895,29 +894,29 @@ pub struct ProvideResponseEntry {
895894
pub hash: Hash,
896895
}
897896

898-
/// Combines the [`AddProgress`] outputs from a [`Stream`] into a single tuple.
897+
/// Combines the [`AddProgressEvent`] outputs from a [`Stream`] into a single tuple.
899898
pub async fn aggregate_add_response(
900-
mut stream: impl Stream<Item = Result<AddProgress>> + Unpin,
899+
mut stream: impl Stream<Item = Result<AddProgressEvent>> + Unpin,
901900
) -> Result<(Hash, BlobFormat, Vec<ProvideResponseEntry>)> {
902901
let mut hash_and_format = None;
903902
let mut collections = BTreeMap::<u64, (String, u64, Option<Hash>)>::new();
904903
let mut mp = Some(ProvideProgressState::new());
905904
while let Some(item) = stream.next().await {
906905
match item? {
907-
AddProgress::Found { name, id, size } => {
906+
AddProgressEvent::Found { name, id, size } => {
908907
tracing::trace!("Found({id},{name},{size})");
909908
if let Some(mp) = mp.as_mut() {
910909
mp.found(name.clone(), id, size);
911910
}
912911
collections.insert(id, (name, size, None));
913912
}
914-
AddProgress::Progress { id, offset } => {
913+
AddProgressEvent::Progress { id, offset } => {
915914
tracing::trace!("Progress({id}, {offset})");
916915
if let Some(mp) = mp.as_mut() {
917916
mp.progress(id, offset);
918917
}
919918
}
920-
AddProgress::Done { hash, id } => {
919+
AddProgressEvent::Done { hash, id } => {
921920
tracing::trace!("Done({id},{hash:?})");
922921
if let Some(mp) = mp.as_mut() {
923922
mp.done(id, hash);
@@ -931,15 +930,15 @@ pub async fn aggregate_add_response(
931930
}
932931
}
933932
}
934-
AddProgress::AllDone { hash, format, .. } => {
933+
AddProgressEvent::AllDone { hash, format, .. } => {
935934
tracing::trace!("AllDone({hash:?})");
936935
if let Some(mp) = mp.take() {
937936
mp.all_done();
938937
}
939938
hash_and_format = Some(HashAndFormat { hash, format });
940939
break;
941940
}
942-
AddProgress::Abort(e) => {
941+
AddProgressEvent::Abort(e) => {
943942
if let Some(mp) = mp.take() {
944943
mp.error();
945944
}
@@ -1032,7 +1031,7 @@ impl ProvideProgressState {
10321031
/// Displays the download progress for a given stream.
10331032
pub async fn show_download_progress(
10341033
hash: Hash,
1035-
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
1034+
mut stream: impl Stream<Item = Result<DownloadProgressEvent>> + Unpin,
10361035
) -> Result<()> {
10371036
eprintln!("Fetching: {}", hash);
10381037
let mp = MultiProgress::new();
@@ -1043,7 +1042,7 @@ pub async fn show_download_progress(
10431042
let mut seq = false;
10441043
while let Some(x) = stream.next().await {
10451044
match x? {
1046-
DownloadProgress::InitialState(state) => {
1045+
DownloadProgressEvent::InitialState(state) => {
10471046
if state.connected {
10481047
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
10491048
}
@@ -1063,21 +1062,21 @@ pub async fn show_download_progress(
10631062
ip.set_length(size.value());
10641063
ip.reset();
10651064
match blob.progress {
1066-
BlobProgress::Pending => {}
1067-
BlobProgress::Progressing(offset) => ip.set_position(offset),
1068-
BlobProgress::Done => ip.finish_and_clear(),
1065+
BlobProgressEvent::Pending => {}
1066+
BlobProgressEvent::Progressing(offset) => ip.set_position(offset),
1067+
BlobProgressEvent::Done => ip.finish_and_clear(),
10691068
}
10701069
if !seq {
10711070
op.finish_and_clear();
10721071
}
10731072
}
10741073
}
10751074
}
1076-
DownloadProgress::FoundLocal { .. } => {}
1077-
DownloadProgress::Connected => {
1075+
DownloadProgressEvent::FoundLocal { .. } => {}
1076+
DownloadProgressEvent::Connected => {
10781077
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
10791078
}
1080-
DownloadProgress::FoundHashSeq { children, .. } => {
1079+
DownloadProgressEvent::FoundHashSeq { children, .. } => {
10811080
op.set_message(format!(
10821081
"{} Downloading {} blob(s)\n",
10831082
style("[3/3]").bold().dim(),
@@ -1087,7 +1086,7 @@ pub async fn show_download_progress(
10871086
op.reset();
10881087
seq = true;
10891088
}
1090-
DownloadProgress::Found { size, child, .. } => {
1089+
DownloadProgressEvent::Found { size, child, .. } => {
10911090
if seq {
10921091
op.set_position(child.into());
10931092
} else {
@@ -1096,13 +1095,13 @@ pub async fn show_download_progress(
10961095
ip.set_length(size);
10971096
ip.reset();
10981097
}
1099-
DownloadProgress::Progress { offset, .. } => {
1098+
DownloadProgressEvent::Progress { offset, .. } => {
11001099
ip.set_position(offset);
11011100
}
1102-
DownloadProgress::Done { .. } => {
1101+
DownloadProgressEvent::Done { .. } => {
11031102
ip.finish_and_clear();
11041103
}
1105-
DownloadProgress::AllDone(Stats {
1104+
DownloadProgressEvent::AllDone(Stats {
11061105
bytes_read,
11071106
elapsed,
11081107
..
@@ -1116,7 +1115,7 @@ pub async fn show_download_progress(
11161115
);
11171116
break;
11181117
}
1119-
DownloadProgress::Abort(e) => {
1118+
DownloadProgressEvent::Abort(e) => {
11201119
bail!("download aborted: {}", e);
11211120
}
11221121
}

src/downloader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
5555
use tracing::{debug, error, error_span, trace, warn, Instrument};
5656

5757
use crate::{
58-
get::{db::DownloadProgress, Stats},
58+
get::{progress::DownloadProgressEvent, Stats},
5959
metrics::Metrics,
6060
store::Store,
6161
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
@@ -797,7 +797,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
797797
if let Some(sender) = handlers.on_progress {
798798
self.progress_tracker.unsubscribe(&kind, &sender);
799799
sender
800-
.send(DownloadProgress::Abort(serde_error::Error::new(
800+
.send(DownloadProgressEvent::Abort(serde_error::Error::new(
801801
&*anyhow::Error::from(DownloadError::Cancelled),
802802
)))
803803
.await

src/downloader/get.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,20 @@ use iroh::endpoint;
77

88
use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
99
use crate::{
10-
get::{db::get_to_db_in_steps, error::GetError},
11-
store::Store,
10+
get::Error,
11+
store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
1212
};
1313

14-
impl From<GetError> for FailureAction {
15-
fn from(e: GetError) -> Self {
14+
impl From<Error> for FailureAction {
15+
fn from(e: Error) -> Self {
1616
match e {
17-
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
18-
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
19-
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
20-
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
21-
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
17+
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
18+
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
19+
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
20+
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
21+
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
2222
// TODO: what do we want to do on local failures?
23-
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
23+
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
2424
}
2525
}
2626
}
@@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {
3434

3535
impl<S: Store> Getter for IoGetter<S> {
3636
type Connection = endpoint::Connection;
37-
type NeedsConn = crate::get::db::GetStateNeedsConn;
37+
type NeedsConn = FetchStateNeedsConn;
3838

3939
fn get(
4040
&mut self,
@@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
4545
async move {
4646
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
4747
Err(err) => Err(err.into()),
48-
Ok(crate::get::db::GetState::Complete(stats)) => {
49-
Ok(super::GetOutput::Complete(stats))
50-
}
51-
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
48+
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
49+
Ok(FetchState::NeedsConn(needs_conn)) => {
5250
Ok(super::GetOutput::NeedsConn(needs_conn))
5351
}
5452
}
@@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
5755
}
5856
}
5957

60-
impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
58+
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
6159
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
6260
async move {
6361
let res = self.proceed(conn).await;
@@ -73,7 +71,7 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
7371
}
7472

7573
#[cfg(feature = "metrics")]
76-
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
74+
fn track_metrics(res: &Result<crate::get::Stats, Error>) {
7775
use iroh_metrics::{inc, inc_by};
7876

7977
use crate::metrics::Metrics;
@@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
9088
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
9189
}
9290
Err(e) => match &e {
93-
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
91+
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
9492
_ => inc!(Metrics, downloads_error),
9593
},
9694
}

0 commit comments

Comments
 (0)