Skip to content

Commit

Permalink
feat: add Doc::import_file and Doc::export_file (#1793)
Browse files Browse the repository at this point in the history
## Description

closes #1781 

### canonicalize_path:

There is now a `canonicalized_path_to_string` method that takes a path
(that it expects to be canonicalized already) and turns it into a
string.

What was previously `canonicalize_path` is now renamed to
`relative_canonicalized_path_to_string`, and calls
`canonicalized_path_to_string`, with the `must_be_relative` param set to
`true`
  • Loading branch information
ramfox committed Nov 15, 2023
1 parent 39ed64e commit fe7fc50
Show file tree
Hide file tree
Showing 4 changed files with 753 additions and 37 deletions.
275 changes: 267 additions & 8 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

use std::collections::HashMap;
use std::io::{self, Cursor};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as AnyhowContext, Result};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_bytes::provider::AddProgress;
use iroh_bytes::store::ValidateProgress;
// use iroh_bytes::util::progress::FlumeProgressSender;
use iroh_bytes::util::runtime;
use iroh_bytes::Hash;
use iroh_bytes::{BlobFormat, Tag};
Expand All @@ -35,12 +36,13 @@ use crate::rpc_protocol::{
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
CounterStats, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest,
DocDelResponse, DocDropRequest, DocGetExactRequest, DocGetManyRequest, DocImportRequest,
DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetHashRequest, DocSetRequest,
DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket,
DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest,
NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
DocDelResponse, DocDropRequest, DocExportFileRequest, DocExportProgress, DocGetExactRequest,
DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest,
DocListRequest, DocOpenRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress,
ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest,
NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
};
use crate::sync_engine::LiveEvent;

Expand Down Expand Up @@ -624,6 +626,47 @@ where
Ok(())
}

/// Add an entry from an absolute file path
pub async fn import_file(
&self,
author: AuthorId,
key: Bytes,
path: impl AsRef<Path>,
in_place: bool,
) -> Result<DocImportFileProgress> {
self.ensure_open()?;
let stream = self
.0
.rpc
.server_streaming(DocImportFileRequest {
doc_id: self.id(),
author_id: author,
path: path.as_ref().into(),
key,
in_place,
})
.await?;
Ok(DocImportFileProgress::new(stream))
}

/// Export an entry as a file to a given absolute path.
pub async fn export_file(
&self,
entry: Entry,
path: impl AsRef<Path>,
) -> Result<DocExportFileProgress> {
self.ensure_open()?;
let stream = self
.0
.rpc
.server_streaming(DocExportFileRequest {
entry,
path: path.as_ref().into(),
})
.await?;
Ok(DocExportFileProgress::new(stream))
}

/// Read the content of an [`Entry`] as a streaming [`BlobReader`].
pub async fn read(&self, entry: &Entry) -> Result<BlobReader> {
self.ensure_open()?;
Expand Down Expand Up @@ -751,6 +794,145 @@ where
}
}

/// Progress stream for doc import operations.
#[derive(derive_more::Debug)]
pub struct DocImportFileProgress {
#[debug(skip)]
stream: Pin<Box<dyn Stream<Item = Result<DocImportProgress>> + Send + Unpin + 'static>>,
}

impl DocImportFileProgress {
fn new(
stream: (impl Stream<Item = Result<impl Into<DocImportProgress>, impl Into<anyhow::Error>>>
+ Send
+ Unpin
+ 'static),
) -> Self {
let stream = stream.map(|item| match item {
Ok(item) => Ok(item.into()),
Err(err) => Err(err.into()),
});
Self {
stream: Box::pin(stream),
}
}

/// Finish writing the stream, ignoring all intermediate progress events.
///
/// Returns a [`DocImportFileOutcome`] which contains a tag, key, and hash and the size of the
/// content.
pub async fn finish(mut self) -> Result<DocImportFileOutcome> {
let mut entry_size = 0;
let mut entry_hash = None;
while let Some(msg) = self.next().await {
match msg? {
DocImportProgress::Found { size, .. } => {
entry_size = size;
}
DocImportProgress::AllDone { key } => {
let hash = entry_hash
.context("expected DocImportProgress::IngestDone event to occur")?;
let outcome = DocImportFileOutcome {
hash,
key,
size: entry_size,
};
return Ok(outcome);
}
DocImportProgress::Abort(err) => return Err(err.into()),
DocImportProgress::Progress { .. } => {}
DocImportProgress::IngestDone { hash, .. } => {
entry_hash = Some(hash);
}
}
}
Err(anyhow!("Response stream ended prematurely"))
}
}

/// Outcome of a [`Doc::import_file`] operation
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DocImportFileOutcome {
/// The hash of the entry's content
hash: Hash,
/// The size of the entry
size: u64,
/// The key of the entry
key: Bytes,
}

impl Stream for DocImportFileProgress {
type Item = Result<DocImportProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

/// Progress stream for doc export operations.
#[derive(derive_more::Debug)]
pub struct DocExportFileProgress {
#[debug(skip)]
stream: Pin<Box<dyn Stream<Item = Result<DocExportProgress>> + Send + Unpin + 'static>>,
}
impl DocExportFileProgress {
fn new(
stream: (impl Stream<Item = Result<impl Into<DocExportProgress>, impl Into<anyhow::Error>>>
+ Send
+ Unpin
+ 'static),
) -> Self {
let stream = stream.map(|item| match item {
Ok(item) => Ok(item.into()),
Err(err) => Err(err.into()),
});
Self {
stream: Box::pin(stream),
}
}
/// Iterate through the export progress stream, returning when the stream has completed.

/// Returns a [`DocExportFileOutcome`] which contains a file path the data was writen to and the size of the content.
pub async fn finish(mut self) -> Result<DocExportFileOutcome> {
let mut total_size = 0;
let mut path = None;
while let Some(msg) = self.next().await {
match msg? {
DocExportProgress::Found { size, outpath, .. } => {
total_size = size;
path = Some(outpath);
}
DocExportProgress::AllDone => {
let path = path.context("expected DocExportProgress::Found event to occur")?;
let outcome = DocExportFileOutcome {
size: total_size,
path,
};
return Ok(outcome);
}
DocExportProgress::Abort(err) => return Err(err.into()),
DocExportProgress::Progress { .. } => {}
}
}
Err(anyhow!("Response stream ended prematurely"))
}
}

/// Outcome of a [`Doc::export_file`] operation
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DocExportFileOutcome {
/// The size of the entry
size: u64,
/// The path to which the entry was saved
path: PathBuf,
}

impl Stream for DocExportFileProgress {
type Item = Result<DocExportProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

fn flatten<T, E1, E2>(
s: impl Stream<Item = StdResult<StdResult<T, E1>, E2>>,
) -> impl Stream<Item = Result<T>>
Expand All @@ -770,6 +952,8 @@ mod tests {
use super::*;

use iroh_bytes::util::runtime;
use rand::RngCore;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn test_drop_doc_client_sync() -> Result<()> {
Expand All @@ -795,4 +979,79 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_doc_import_export() -> Result<()> {
let doc_store = iroh_sync::store::memory::Store::default();
let rt = runtime::Handle::from_current(1)?;
let db = iroh_bytes::store::mem::Store::new(rt.clone());
let node = crate::node::Node::builder(db, doc_store)
.runtime(&rt)
.spawn()
.await?;

// create temp file
let temp_dir = tempfile::tempdir().context("tempdir")?;

let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;
let out_root = temp_dir.path().join("out");

let path = in_root.join("test");

let size = 100;
let mut buf = vec![0u8; size];
rand::thread_rng().fill_bytes(&mut buf);
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;

// create doc & author
let client = node.client();
let doc = client.docs.create().await.context("doc create")?;
let author = client.authors.create().await.context("author create")?;

// import file
let import_outcome = doc
.import_file(
author,
crate::util::fs::path_to_key(path.clone(), None, Some(in_root))?,
path,
true,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;

// export file
let entry = doc
.get_one(Query::author(author).key_exact(import_outcome.key))
.await
.context("get one")?
.unwrap();
let key = entry.key().to_vec();
let export_outcome = doc
.export_file(
entry,
crate::util::fs::key_to_path(key, None, Some(out_root))?,
)
.await
.context("export file")?
.finish()
.await
.context("export finish")?;

let got_bytes = tokio::fs::read(export_outcome.path)
.await
.context("tokio read")?;
assert_eq!(buf, got_bytes);

Ok(())
}
}

0 comments on commit fe7fc50

Please sign in to comment.