Skip to content

Commit

Permalink
chore: store uploaded files list as text
Browse files Browse the repository at this point in the history
For the uploaded files list, rather than serialize to binary, we use a simple text format. The user
can now see the addresses of all the files they've uploaded, and use this information to retrieve
them, if need be. We are also now just using one file, appending to it for each upload.

Now that this file is human readable, the `files ls` command was deemed unnecessary and was
therefore removed.

With respect to the `files` commands, I've also taken the opportunity to make the language in the
text output more concise and matter of fact. For example, we don't really need to use things like
exclamation marks in the text.
  • Loading branch information
jacderida committed Sep 20, 2023
1 parent bdfd4f7 commit ba4809e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 116 deletions.
7 changes: 1 addition & 6 deletions sn_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod subcommands;
use crate::{
cli::Opt,
subcommands::{
files::{files_cmds, offline_files_cmds, FilesCmds},
files::files_cmds,
register::register_cmds,
wallet::{wallet_cmds, wallet_cmds_without_client, WalletCmds},
SubCmd,
Expand Down Expand Up @@ -68,11 +68,6 @@ async fn main() -> Result<()> {
wallet_cmds_without_client(cmds, &client_data_dir_path).await?;
return Ok(());
}
} else if let SubCmd::Files(cmd) = &opt.cmd {
if let FilesCmds::Ls {} = cmd {
offline_files_cmds(cmd, &client_data_dir_path).await?;
return Ok(());
}
}
println!("Instantiating a SAFE client...");
let secret_key = get_client_secret_key(&client_data_dir_path)?;
Expand Down
176 changes: 66 additions & 110 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use sn_protocol::storage::{Chunk, ChunkAddress};
use std::{
collections::BTreeMap,
fs,
io::prelude::*,
io::{BufRead, BufReader},
path::{Path, PathBuf},
time::{Duration, Instant},
};
Expand All @@ -28,7 +30,6 @@ use xor_name::XorName;

#[derive(Parser, Debug)]
pub enum FilesCmds {
Ls {},
Upload {
/// The location of the file(s) to upload.
///
Expand All @@ -54,22 +55,10 @@ pub enum FilesCmds {
///
/// If neither are, all the files uploaded by the current user will be downloaded again.
#[clap(name = "address")]
file_address: Option<String>,
file_addr: Option<String>,
},
}

pub(crate) async fn offline_files_cmds(cmds: &FilesCmds, root_dir: &Path) -> Result<()> {
match cmds {
FilesCmds::Ls {} => {
list_files(root_dir).await?;
}
cmd => {
return Err(eyre!("{cmd:?} requires a network connection"));
}
}
Ok(())
}

pub(crate) async fn files_cmds(
cmds: FilesCmds,
client: Client,
Expand All @@ -82,10 +71,10 @@ pub(crate) async fn files_cmds(
}
FilesCmds::Download {
file_name,
file_address,
file_addr,
} => {
if (file_name.is_some() && file_address.is_none())
|| (file_address.is_some() && file_name.is_none())
if (file_name.is_some() && file_addr.is_none())
|| (file_addr.is_some() && file_name.is_none())
{
return Err(
eyre!("Both the name and address must be supplied if either are used")
Expand All @@ -97,7 +86,7 @@ pub(crate) async fn files_cmds(

let file_api: Files = Files::new(client, root_dir.to_path_buf());

match (file_name, file_address) {
match (file_name, file_addr) {
(Some(name), Some(address)) => {
let bytes = hex::decode(address).expect("Input address is not a hex string");
download_file(
Expand All @@ -118,36 +107,10 @@ pub(crate) async fn files_cmds(
}
}
}
cmd => {
return Err(eyre!("{cmd:?} is an offline command"));
}
};
Ok(())
}

async fn list_files(root_dir: &Path) -> Result<()> {
let file_names_path = root_dir.join("uploaded_files");
let mut all_files: Vec<Vec<(XorName, String)>> = Vec::new();
let entries = fs::read_dir(file_names_path)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
continue;
}
let content = fs::read(&path)?;
let deserialized_data: Vec<(XorName, String)> = bincode::deserialize(&content)?;
all_files.push(deserialized_data);
}

for file in all_files.iter() {
for (addr, file_name) in file.iter() {
println!("{addr:64x}: {file_name}");
}
}
Ok(())
}

pub(super) async fn chunk_path(
client: &Client,
root_dir: &Path,
Expand Down Expand Up @@ -208,14 +171,10 @@ async fn upload_files(
// Payment shall always be verified.
let chunks_to_upload = chunk_path(&client, root_dir, &files_path).await?;

// the file's xorname and name to be recorded locally
let content_to_be_written_locally = {
let content = chunks_to_upload
.iter()
.map(|(file_addr, chunked_file)| (file_addr, &chunked_file.file_name))
.collect::<Vec<_>>();
bincode::serialize(&content)?
};
let uploaded_file_info = chunks_to_upload
.iter()
.map(|(file_addr, chunked_file)| (*file_addr, chunked_file.file_name.clone()))
.collect::<Vec<_>>();

let chunks_to_upload = chunks_to_upload
.into_iter()
Expand Down Expand Up @@ -252,16 +211,16 @@ async fn upload_files(

let elapsed = now.elapsed();
println!(
"After {elapsed:?}, uploaded {:?} chunks, current progress is {progress}/{total_chunks_uploading}. ",
"Uploaded {:?} chunks in {elapsed:?}. Current progress is {progress}/{total_chunks_uploading}.",
chunks_batch.len(),
);
info!(
"After {elapsed:?}, uploaded {:?} chunks, current progress is {progress}/{total_chunks_uploading}. ",
"Uploaded {:?} chunks in {elapsed:?}. Current progress is {progress}/{total_chunks_uploading}.",
chunks_batch.len(),
);
}

println!("First round of upload completed, verifying and repaying if required...");
println!("First round of upload complete. Verifying and repaying if required...");

// If we are not verifying, we can skip this
if verify_store {
Expand All @@ -271,13 +230,12 @@ async fn upload_files(
while !data_to_verify_or_repay.is_empty() {
tokio::time::sleep(Duration::from_secs(3)).await;
trace!(
"Verifying and potential topping up payment of {:?} chunks",
"Verifying and potentially topping up payment of {:?} chunks",
data_to_verify_or_repay.len()
);
data_to_verify_or_repay =
verify_and_repay_if_needed(file_api.clone(), data_to_verify_or_repay, batch_size)
.await?;
tokio::time::sleep(Duration::from_secs(3)).await;
}
}

Expand All @@ -286,26 +244,17 @@ async fn upload_files(
format_elapsed_time(start_time.elapsed())
);

// Record the uploaded files locally to be able to fetch them later
let content = bincode::serialize(&uploaded_files)?;
let file_names_path = root_dir.join("uploaded_files");
let all_data_put: Vec<_> = upload_results
.into_iter()
.map(|(addr, filename, _, _)| (addr, filename))
.collect();
for (addr, file_name) in all_data_put.iter() {
println!("Uploaded {file_name} to {addr:64x}");
let mut file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(file_names_path)?;
for (addr, file_name) in uploaded_file_info.iter() {
println!("Uploaded {} to {:x}", file_name, addr);
writeln!(file, "{:x}: {}", addr, file_name)?;
}
// Write the chunks locally to be able to verify them later
let content = bincode::serialize(&all_data_put)?;
fs::create_dir_all(file_names_path.as_path())?;
let date_time = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string();
let file_names_path = file_names_path.join(format!("file_names_{date_time}"));
println!(
"Writing {} bytes to {file_names_path:?}",
content_to_be_written_locally.len()
);
fs::write(file_names_path, content_to_be_written_locally)?;
file.flush()?;

Ok(())
}
Expand Down Expand Up @@ -368,7 +317,7 @@ async fn verify_and_repay_if_needed(
let total_chunks = chunks_paths.len();

println!(
"======= Verification: {total_chunks} chunks to be checked and repayed if required ============="
"======= Verification: {total_chunks} chunks to be checked and repaid if required ============="
);

let now = Instant::now();
Expand Down Expand Up @@ -414,19 +363,17 @@ async fn verify_and_repay_if_needed(
.collect::<Vec<_>>();

if total_failed_chunks.is_empty() {
println!(
"======= Verification Completed! All chunks have been paid and stored! ============="
);
println!("======= Verification complete: all chunks paid and stored =============");
return Ok(total_failed_chunks);
}

let num_of_failed_chunks = failed_chunks.len();
println!("======= Verification: {num_of_failed_chunks} chunks were not stored in the network, repaying them in batches =============");
println!("======= Verification: {num_of_failed_chunks} chunks were not stored. Repaying them in batches. =============");

// If there were any failed chunks, we need to repay them
for failed_chunks_batch in failed_chunks.chunks(batch_size) {
println!(
"Failed to fetch {} chunks, attempting to repay them",
"Failed to fetch {} chunks. Attempting to repay them.",
failed_chunks_batch.len()
);

Expand Down Expand Up @@ -464,36 +411,43 @@ async fn verify_and_repay_if_needed(
}

let elapsed = now.elapsed();
println!("After {elapsed:?}, repaid and re-uploaded {num_of_failed_chunks:?} chunks");
println!("Repaid and re-uploaded {num_of_failed_chunks:?} chunks in {elapsed:?}");

Ok(total_failed_chunks)
}

async fn download_files(file_api: &Files, root_dir: &Path) -> Result<()> {
let docs_of_uploaded_files_path = root_dir.join("uploaded_files");
let uploaded_files_path = root_dir.join("uploaded_files");
let download_path = root_dir.join("downloaded_files");
std::fs::create_dir_all(download_path.as_path())?;

for entry in WalkDir::new(docs_of_uploaded_files_path)
.into_iter()
.flatten()
{
if entry.file_type().is_file() {
let index_doc_bytes = Bytes::from(fs::read(entry.path())?);
let index_doc_name = entry.file_name();

println!("Loading file names from index doc {index_doc_name:?}");
let files_to_fetch: Vec<(XorName, String)> = bincode::deserialize(&index_doc_bytes)?;

if files_to_fetch.is_empty() {
println!("No files to download!");
}
for (xorname, file_name) in files_to_fetch.iter() {
download_file(file_api, xorname, file_name, &download_path).await;
}
let file = std::fs::File::open(&uploaded_files_path)?;
let reader = BufReader::new(file);
let mut uploaded_files = Vec::new();
for line in reader.lines() {
let line = line?;
let parts: Vec<&str> = line.split(": ").collect();

if parts.len() == 2 {
let xor_name_hex = parts[0];
let file_name = parts[1];

let bytes = hex::decode(xor_name_hex)?;
let xor_name_bytes: [u8; 32] = bytes
.try_into()
.expect("Failed to parse XorName from hex string");
let xor_name = XorName(xor_name_bytes);

uploaded_files.push((xor_name, file_name.to_string()));
} else {
println!("Skipping malformed line: {}", line);
}
}

for (xorname, file_name) in uploaded_files.iter() {
download_file(file_api, xorname, file_name, &download_path).await;
}

Ok(())
}

Expand All @@ -514,13 +468,9 @@ async fn download_file(
file_name: &String,
download_path: &Path,
) {
println!(
"Downloading file {file_name:?} with address {:64x}",
xorname
);
debug!("Downloading file {file_name:?}");
println!("Downloading {file_name} from {:64x}", xorname);
debug!("Downloading {file_name} from {:64x}", xorname);
let downloaded_file_path = download_path.join(file_name);
// The downloaded file will be writen to the folder directly.
match file_api
.read_bytes(
ChunkAddress::new(*xorname),
Expand All @@ -529,12 +479,18 @@ async fn download_file(
.await
{
Ok(_) => {
debug!("Successfully got file {file_name}!");
println!("Successfully got file {file_name}, stored at {downloaded_file_path:?}!");
debug!(
"Saved {file_name} at {}",
downloaded_file_path.to_string_lossy()
);
println!(
"Saved {file_name} at {}",
downloaded_file_path.to_string_lossy()
);
}
Err(error) => {
error!("Did not get file {file_name:?} from the network! {error}");
println!("Did not get file {file_name:?} from the network! {error}")
error!("Error downloading {file_name:?}: {error}");
println!("Error downloading {file_name:?}: {error}")
}
}
}

0 comments on commit ba4809e

Please sign in to comment.