Skip to content

Commit

Permalink
removing tokio fs
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Aug 29, 2023
1 parent 795d774 commit 32066db
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 183 deletions.
2 changes: 1 addition & 1 deletion sn_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sn_transfers = { path = "../sn_transfers", version = "0.10.28" }
sn_logging = { path = "../sn_logging", version = "0.2.4" }
sn_peers_acquisition= { path="../sn_peers_acquisition", version = "0.1.4" }
sn_protocol = { path = "../sn_protocol", version = "0.5.3" }
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tracing = { version = "~0.1.26" }
tracing-core = "0.1.30"
url = "2.4.0"
Expand Down
16 changes: 8 additions & 8 deletions sn_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async fn main() -> Result<()> {
println!("Built with git version: {}", sn_build_info::git_info());
println!("Instantiating a SAFE client...");

let client_data_dir_path = get_client_data_dir_path().await?;
let secret_key = get_client_secret_key(&client_data_dir_path).await?;
let client_data_dir_path = get_client_data_dir_path()?;
let secret_key = get_client_secret_key(&client_data_dir_path)?;

if opt.peers.peers.is_empty() {
if !cfg!(feature = "local-discovery") {
Expand Down Expand Up @@ -89,27 +89,27 @@ async fn main() -> Result<()> {
Ok(())
}

async fn get_client_secret_key(root_dir: &PathBuf) -> Result<SecretKey> {
fn get_client_secret_key(root_dir: &PathBuf) -> Result<SecretKey> {
// create the root directory if it doesn't exist
tokio::fs::create_dir_all(&root_dir).await?;
std::fs::create_dir_all(root_dir)?;
let key_path = root_dir.join(CLIENT_KEY);
let secret_key = if key_path.is_file() {
info!("Client key found. Loading from file...");
let secret_hex_bytes = tokio::fs::read(key_path).await?;
let secret_hex_bytes = std::fs::read(key_path)?;
bls_secret_from_hex(secret_hex_bytes)?
} else {
info!("No key found. Generating a new client key...");
let secret_key = SecretKey::random();
tokio::fs::write(key_path, hex::encode(secret_key.to_bytes())).await?;
std::fs::write(key_path, hex::encode(secret_key.to_bytes()))?;
secret_key
};
Ok(secret_key)
}

async fn get_client_data_dir_path() -> Result<PathBuf> {
fn get_client_data_dir_path() -> Result<PathBuf> {
let mut home_dirs = dirs_next::data_dir().expect("Data directory is obtainable");
home_dirs.push("safe");
home_dirs.push("client");
tokio::fs::create_dir_all(home_dirs.as_path()).await?;
std::fs::create_dir_all(home_dirs.as_path())?;
Ok(home_dirs)
}
15 changes: 8 additions & 7 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use color_eyre::Result;
use libp2p::futures::future::join_all;
use sn_client::{Client, Files, MAX_CONCURRENT_CHUNK_UPLOAD};
use sn_protocol::storage::{Chunk, ChunkAddress};
use tokio::{fs, sync::Semaphore};
use std::fs;
use tokio::sync::Semaphore;

use std::{
// fs,
Expand Down Expand Up @@ -157,11 +158,11 @@ async fn upload_files(
.collect();

let content = bincode::serialize(&chunks_to_fetch)?;
fs::create_dir_all(file_names_path.as_path()).await?;
fs::create_dir_all(file_names_path.as_path())?;
let date_time = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string();
let file_names_path = file_names_path.join(format!("file_names_{date_time}"));
println!("Writing {} bytes to {file_names_path:?}", content.len());
fs::write(file_names_path, content).await?;
fs::write(file_names_path, content)?;

Ok(())
}
Expand Down Expand Up @@ -189,7 +190,7 @@ async fn upload_chunks(

let wallet_client = file_api.wallet(wallet_dir).await?;

let chunk = Chunk::new(Bytes::from(fs::read(path).await?));
let chunk = Chunk::new(Bytes::from(fs::read(path)?));

file_api
.upload_chunk_in_parallel(chunk, &wallet_client, verify_store)
Expand All @@ -202,14 +203,14 @@ async fn upload_chunks(
async fn download_files(file_api: &Files, root_dir: &Path) -> Result<()> {
let docs_of_uploaded_files_path = root_dir.join("uploaded_files");
let download_path = root_dir.join("downloaded_files");
tokio::fs::create_dir_all(download_path.as_path()).await?;
std::fs::create_dir_all(download_path.as_path())?;

for entry in WalkDir::new(docs_of_uploaded_files_path)
.into_iter()
.flatten()
{
if entry.file_type().is_file() {
let index_doc_bytes = Bytes::from(fs::read(entry.path()).await?);
let index_doc_bytes = Bytes::from(fs::read(entry.path())?);
let index_doc_name = entry.file_name();

println!("Loading file names from index doc {index_doc_name:?}");
Expand Down Expand Up @@ -244,7 +245,7 @@ async fn download_file(
println!("Successfully got file {file_name}!");
let file_name_path = download_path.join(file_name);
println!("Writing {} bytes to {file_name_path:?}", bytes.len());
if let Err(err) = fs::write(file_name_path, bytes).await {
if let Err(err) = fs::write(file_name_path, bytes) {
println!("Failed to create file {file_name:?} with error {err:?}");
}
}
Expand Down
25 changes: 12 additions & 13 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,21 @@ pub(crate) async fn wallet_cmds(
}

async fn address(root_dir: &Path) -> Result<()> {
let wallet = LocalWallet::load_from(root_dir).await?;
let wallet = LocalWallet::load_from(root_dir)?;
let address_hex = hex::encode(wallet.address().to_bytes());
println!("{address_hex}");
Ok(())
}

async fn balance(root_dir: &Path) -> Result<()> {
let wallet = LocalWallet::load_from(root_dir).await?;
let wallet = LocalWallet::load_from(root_dir)?;
let balance = wallet.balance();
println!("{balance}");
Ok(())
}

async fn get_faucet(root_dir: &Path, url: String) -> Result<()> {
let wallet = LocalWallet::load_from(root_dir).await?;
let wallet = LocalWallet::load_from(root_dir)?;
let address_hex = hex::encode(wallet.address().to_bytes());
let url = if !url.contains("://") {
format!("{}://{}", "http", url)
Expand Down Expand Up @@ -144,17 +144,17 @@ async fn deposit(root_dir: &Path, read_from_stdin: bool, dbc: Option<String>) ->
return deposit_from_dbc_hex(root_dir, dbc_hex).await;
}

let mut wallet = LocalWallet::load_from(root_dir).await?;
let mut wallet = LocalWallet::load_from(root_dir)?;

let previous_balance = wallet.balance();

wallet.try_load_deposits().await?;
wallet.try_load_deposits()?;

let deposited =
sn_dbc::Token::from_nano(wallet.balance().as_nano() - previous_balance.as_nano());
if deposited.is_zero() {
println!("Nothing deposited.");
} else if let Err(err) = wallet.store().await {
} else if let Err(err) = wallet.store() {
println!("Failed to store deposited ({deposited}) amount: {:?}", err);
} else {
println!("Deposited {deposited}.");
Expand All @@ -171,13 +171,13 @@ async fn read_dbc_from_stdin(root_dir: &Path) -> Result<()> {
}

async fn deposit_from_dbc_hex(root_dir: &Path, input: String) -> Result<()> {
let mut wallet = LocalWallet::load_from(root_dir).await?;
let mut wallet = LocalWallet::load_from(root_dir)?;
let dbc = sn_dbc::Dbc::from_hex(input.trim())?;

let old_balance = wallet.balance();
wallet.deposit(vec![dbc]).await?;
wallet.deposit(vec![dbc])?;
let new_balance = wallet.balance();
wallet.store().await?;
wallet.store()?;

println!("Successfully stored dbc to wallet dir. \nOld balance: {old_balance}\nNew balance: {new_balance}");

Expand All @@ -200,7 +200,7 @@ async fn send(
return Ok(());
}

let wallet = LocalWallet::load_from(root_dir).await?;
let wallet = LocalWallet::load_from(root_dir)?;
let mut wallet_client = WalletClient::new(client.clone(), wallet);

match wallet_client.send(amount, address, verify_store).await {
Expand All @@ -209,13 +209,13 @@ async fn send(
let mut wallet = wallet_client.into_wallet();
let new_balance = wallet.balance();

if let Err(err) = wallet.store().await {
if let Err(err) = wallet.store() {
println!("Failed to store wallet: {err:?}");
} else {
println!("Successfully stored wallet with new balance {new_balance}.");
}

wallet.store_dbc(new_dbc).await?;
wallet.store_dbc(new_dbc)?;
println!("Successfully stored new dbc to wallet dir. It can now be sent to the recipient, using any channel of choice.");
}
Err(err) => {
Expand All @@ -240,7 +240,6 @@ pub(super) async fn chunk_and_pay_for_storage(
) -> Result<BTreeMap<XorName, ChunkedFile>> {
trace!("Starting to chunk_and_pay_for_storage");
let wallet = LocalWallet::load_from(root_dir)
.await
.wrap_err("Unable to read wallet file in {path:?}")
.suggestion(
"If you have an old wallet file, it may no longer be compatible. Try removing it",
Expand Down
2 changes: 1 addition & 1 deletion sn_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ sn_registers = { path = "../sn_registers", version = "0.2.3" }
sn_transfers = { path = "../sn_transfers", version = "0.10.28" }
thiserror = "1.0.23"
tiny-keccak = "~2.0.2"
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tracing = { version = "~0.1.26" }
xor_name = "5.0.0"
7 changes: 3 additions & 4 deletions sn_client/src/faucet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn get_tokens_from_faucet(
/// With all balance transferred from the genesis_wallet to the faucet_wallet.
pub async fn load_faucet_wallet_from_genesis_wallet(client: &Client) -> Result<LocalWallet> {
println!("Loading faucet...");
let mut faucet_wallet = create_faucet_wallet().await;
let mut faucet_wallet = create_faucet_wallet();

let faucet_balance = faucet_wallet.balance();
if !faucet_balance.is_zero() {
Expand All @@ -36,7 +36,7 @@ pub async fn load_faucet_wallet_from_genesis_wallet(client: &Client) -> Result<L
}

println!("Loading genesis...");
let genesis_wallet = load_genesis_wallet().await?;
let genesis_wallet = load_genesis_wallet()?;

// Transfer to faucet. We will transfer almost all of the genesis wallet's
// balance to the faucet,.
Expand All @@ -52,10 +52,9 @@ pub async fn load_faucet_wallet_from_genesis_wallet(client: &Client) -> Result<L
)
.await?;

faucet_wallet.deposit(vec![dbc.clone()]).await?;
faucet_wallet.deposit(vec![dbc.clone()])?;
faucet_wallet
.store()
.await
.expect("Faucet wallet shall be stored successfully.");
println!("Faucet wallet balance: {}", faucet_wallet.balance());

Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Files {

/// Create a new WalletClient for a given root directory.
pub async fn wallet(&self, root_dir: &Path) -> Result<WalletClient> {
let wallet = LocalWallet::load_from(root_dir).await?;
let wallet = LocalWallet::load_from(root_dir)?;
Ok(WalletClient::new(self.client.clone(), wallet))
}

Expand Down
11 changes: 4 additions & 7 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl WalletClient {

/// Stores the wallet to disk.
pub async fn store_local_wallet(&self) -> Result<()> {
self.wallet.store().await
self.wallet.store()
}

/// Get the wallet balance
Expand All @@ -59,7 +59,7 @@ impl WalletClient {

/// Get the payment dbc for a given network address
pub async fn get_payment_dbcs(&self, address: &NetworkAddress) -> Vec<Dbc> {
self.wallet.get_payment_dbcs(address).await
self.wallet.get_payment_dbcs(address)
}

/// Send tokens to another wallet.
Expand All @@ -70,7 +70,7 @@ impl WalletClient {
to: PublicAddress,
verify_store: bool,
) -> Result<Dbc> {
let transfer = self.wallet.local_send(vec![(amount, to)], None).await?;
let transfer = self.wallet.local_send(vec![(amount, to)], None)?;

let created_dbcs = transfer.created_dbcs.clone();

Expand Down Expand Up @@ -176,8 +176,7 @@ impl WalletClient {
}

self.wallet
.local_send_storage_payment(all_data_payments, None)
.await?;
.local_send_storage_payment(all_data_payments, None)?;

// send to network
trace!("Sending storage payment transfer to the network");
Expand Down Expand Up @@ -335,11 +334,9 @@ pub async fn send(
let mut wallet = wallet_client.into_wallet();
wallet
.store()
.await
.expect("Wallet shall be successfully stored.");
wallet
.store_dbc(new_dbc.clone())
.await
.expect("Created dbc shall be successfully stored.");

if did_error {
Expand Down
2 changes: 1 addition & 1 deletion sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sn_protocol = { path = "../sn_protocol", version = "0.5.3" }
sn_dbc = { version = "19.1.1", features = ["serdes"] }
sn_transfers = { path = "../sn_transfers", version = "0.10.28" }
thiserror = "1.0.23"
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tracing = { version = "~0.1.26" }
xor_name = "5.0.0"

Expand Down
2 changes: 1 addition & 1 deletion sn_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sn_protocol = { path = "../sn_protocol", version = "0.5.3" }
sn_registers = { path = "../sn_registers", version = "0.2.3" }
sn_transfers = { path = "../sn_transfers", version = "0.10.28" }
thiserror = "1.0.23"
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-stream = { version = "~0.1.12" }
tonic = { version = "0.6.2" }
tracing = { version = "~0.1.26" }
Expand Down
5 changes: 1 addition & 4 deletions sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use std::{
time::Duration,
};
use tokio::{
fs::File,
io::AsyncWriteExt,
runtime::Runtime,
sync::{broadcast::error::RecvError, mpsc},
time::sleep,
Expand Down Expand Up @@ -240,8 +238,7 @@ async fn start_node(
// write the PID to the root dir
let pid = std::process::id();
let pid_file = running_node.root_dir_path().join("safenode.pid");
let mut file = File::create(&pid_file).await?;
file.write_all(pid.to_string().as_bytes()).await?;
std::fs::write(pid_file, pid.to_string().as_bytes())?;

// Channel to receive node ctrl cmds from RPC service (if enabled), and events monitoring task
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<NodeCtrl>(5);
Expand Down
12 changes: 5 additions & 7 deletions sn_node/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lazy_static::lazy_static;
use sn_dbc::Token;
use sn_logging::{LogFormat, LogOutputDest};
use std::{net::SocketAddr, path::Path, sync::Once};
use tokio::{fs::remove_dir_all, sync::Mutex};
use tokio::sync::Mutex;
use tonic::Request;
use tracing_core::Level;

Expand Down Expand Up @@ -73,9 +73,7 @@ pub async fn get_client() -> Client {
}

pub async fn get_wallet(root_dir: &Path) -> LocalWallet {
LocalWallet::load_from(root_dir)
.await
.expect("Wallet shall be successfully created.")
LocalWallet::load_from(root_dir).expect("Wallet shall be successfully created.")
}

pub async fn get_funded_wallet(
Expand All @@ -92,7 +90,7 @@ pub async fn get_funded_wallet(

println!("Verifying the transfer from faucet...");
client.verify(&tokens).await?;
local_wallet.deposit(vec![tokens]).await?;
local_wallet.deposit(vec![tokens])?;
assert_eq!(local_wallet.balance(), wallet_balance);
println!("Tokens deposited to the wallet that'll pay for storage: {wallet_balance}.");

Expand Down Expand Up @@ -123,7 +121,7 @@ pub async fn node_restart(addr: SocketAddr) -> Result<()> {
let chunks_records = root_dir.join("record_store");
if let Ok(true) = chunks_records.try_exists() {
println!("Removing Chunks records from {}", chunks_records.display());
remove_dir_all(chunks_records).await?;
std::fs::remove_dir_all(chunks_records)?;
}

// remove Registers records
Expand All @@ -133,7 +131,7 @@ pub async fn node_restart(addr: SocketAddr) -> Result<()> {
"Removing Registers records from {}",
registers_records.display()
);
remove_dir_all(registers_records).await?;
std::fs::remove_dir_all(registers_records)?;
}

let _response = client
Expand Down

0 comments on commit 32066db

Please sign in to comment.