diff --git a/Cargo.toml b/Cargo.toml index 06f3f871c..2596b5063 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ serde_repr = "0.1" once_cell = "1.4" iota-core = { git = "https://github.com/iotaledger/iota.rs", branch = "dev" } url = { version = "2.1", features = [ "serde" ] } -tokio = "0.2" +tokio = { version = "0.2", features = ["macros", "sync"] } rand = "0.3" rusqlite = { version = "0.23", features = ["bundled"], optional = true } slip10 = "0.4" diff --git a/README.md b/README.md index 2d26a8b3b..3ba9f75dc 100644 --- a/README.md +++ b/README.md @@ -80,16 +80,18 @@ use iota_wallet::{ storage::sqlite::SqliteStorageAdapter, }; use std::path::PathBuf; + #[tokio::main] async fn main() -> iota_wallet::Result<()> { let storage_folder: PathBuf = "./my-db".into(); let manager = - AccountManager::with_storage_adapter(&storage_folder, SqliteStorageAdapter::new(&storage_folder, "accounts")?)?; + AccountManager::with_storage_adapter(&storage_folder, SqliteStorageAdapter::new(&storage_folder, "accounts")?).await?; let client_options = ClientOptionsBuilder::node("http://api.lb-0.testnet.chrysalis2.com")?.build(); let account = manager .create_account(client_options) .signer_type(SignerType::EnvMnemonic) - .initialise()?; + .initialise() + .await?; Ok(()) } ``` diff --git a/bindings/node/README.md b/bindings/node/README.md index 895bf7c39..62816f2ef 100644 --- a/bindings/node/README.md +++ b/bindings/node/README.md @@ -63,10 +63,6 @@ Creates a new instance of the AccountManager. | [storagePath] | string | undefined | The path where the database file will be saved | | [storageType] | number | undefined | The type of the database. Stronghold = 1, Sqlite = 2 | -#### startBackgroundSync(): void - -Initialises the background polling mechanism and MQTT monitoring. Automatically called on `setStrongholdPassword`. - #### setStrongholdPassword(password): void Sets the stronghold password and initialises it. diff --git a/bindings/node/lib/index.d.ts b/bindings/node/lib/index.d.ts index 9a15d7338..280151448 100644 --- a/bindings/node/lib/index.d.ts +++ b/bindings/node/lib/index.d.ts @@ -128,7 +128,6 @@ export declare interface ManagerOptions { export declare class AccountManager { constructor(storagePath?: string) - startBackgroundSync(): void setStrongholdPassword(password: string): void createAccount(account: AccountToCreate): Account getAccount(accountId: string | number): Account | undefined diff --git a/bindings/node/native/src/classes/account/mod.rs b/bindings/node/native/src/classes/account/mod.rs index 3c078648f..fd28b36e1 100644 --- a/bindings/node/native/src/classes/account/mod.rs +++ b/bindings/node/native/src/classes/account/mod.rs @@ -3,29 +3,18 @@ use std::str::FromStr; -use iota_wallet::{ - account::{Account, AccountIdentifier}, - message::MessageId, -}; +use iota_wallet::{account::AccountIdentifier, message::MessageId}; use neon::prelude::*; mod sync; -pub struct AccountWrapper(pub String); - -impl Drop for AccountWrapper { - fn drop(&mut self) { - crate::remove_account(&self.0); - } -} +pub struct AccountWrapper(pub AccountIdentifier); declare_types! { pub class JsAccount for AccountWrapper { init(mut cx) { - let account = cx.argument::(0)?.value(); - let account: Account = serde_json::from_str(&account).expect("invalid account JSON"); - let id = crate::store_account(account); - Ok(AccountWrapper(id)) + let account_id = cx.argument::(0)?.value(); + Ok(AccountWrapper(serde_json::from_str(&account_id).expect("invalid account identifier"))) } method id(mut cx) { @@ -33,9 +22,7 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let account = account.read().unwrap(); - account.id().clone() + id.clone() }; match id { @@ -49,9 +36,8 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let account = account.read().unwrap(); - *account.index() + let account_handle = crate::get_account(id); + crate::block_on(async move { account_handle.index().await }) }; Ok(cx.number(index as f64).upcast()) @@ -62,9 +48,8 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let account = account.read().unwrap(); - account.alias().clone() + let account_handle = crate::get_account(id); + crate::block_on(async move { account_handle.alias().await }) }; Ok(cx.string(alias).upcast()) @@ -75,9 +60,8 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let account = account.read().unwrap(); - account.available_balance() + let account_handle = crate::get_account(id); + crate::block_on(async move { account_handle.available_balance().await }) }; Ok(cx.number(balance as f64).upcast()) } @@ -87,9 +71,8 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let account = account.read().unwrap(); - account.total_balance() + let account_handle = crate::get_account(id); + crate::block_on(async move { account_handle.total_balance().await }) }; Ok(cx.number(balance as f64).upcast()) } @@ -113,17 +96,19 @@ declare_types! { let this = cx.this(); let id = cx.borrow(&this, |r| r.0.clone()); - let account = crate::get_account(&id); - let account = account.read().unwrap(); - let messages = account.list_messages(count, from, filter); - - let js_array = JsArray::new(&mut cx, messages.len() as u32); - for (index, message) in messages.iter().enumerate() { - let value = neon_serde::to_value(&mut cx, &message)?; - js_array.set(&mut cx, index as u32, value)?; - } + let account_handle = crate::get_account(&id); + crate::block_on(async move { + let account = account_handle.read().await; + let messages = account.list_messages(count, from, filter); + + let js_array = JsArray::new(&mut cx, messages.len() as u32); + for (index, message) in messages.iter().enumerate() { + let value = neon_serde::to_value(&mut cx, &message)?; + js_array.set(&mut cx, index as u32, value)?; + } - Ok(js_array.upcast()) + Ok(js_array.upcast()) + }) } method listAddresses(mut cx) { @@ -134,17 +119,19 @@ declare_types! { let this = cx.this(); let id = cx.borrow(&this, |r| r.0.clone()); - let account = crate::get_account(&id); - let account = account.read().unwrap(); - let addresses = account.list_addresses(unspent); - - let js_array = JsArray::new(&mut cx, addresses.len() as u32); - for (index, address) in addresses.iter().enumerate() { - let value = neon_serde::to_value(&mut cx, &address)?; - js_array.set(&mut cx, index as u32, value)?; - } + let account_handle = crate::get_account(&id); + crate::block_on(async move { + let account = account_handle.read().await; + let addresses = account.list_addresses(unspent); + + let js_array = JsArray::new(&mut cx, addresses.len() as u32); + for (index, address) in addresses.iter().enumerate() { + let value = neon_serde::to_value(&mut cx, &address)?; + js_array.set(&mut cx, index as u32, value)?; + } - Ok(js_array.upcast()) + Ok(js_array.upcast()) + }) } method setAlias(mut cx) { @@ -153,10 +140,8 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let mut account = account.write().unwrap(); - account.set_alias(alias); - account.save_pending_changes().expect("failed to save account"); + let account_handle = crate::get_account(id); + crate::block_on(async move { account_handle.set_alias(alias).await; }); } Ok(cx.undefined().upcast()) } @@ -168,10 +153,8 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let mut account = account.write().unwrap(); - account.set_client_options(client_options); - account.save_pending_changes().expect("failed to save account"); + let account_handle = crate::get_account(id); + crate::block_on(async move { account_handle.set_client_options(client_options).await; }); } Ok(cx.undefined().upcast()) } @@ -180,13 +163,15 @@ declare_types! { let message_id = MessageId::from_str(cx.argument::(0)?.value().as_str()).expect("invalid message id length"); let this = cx.this(); let id = cx.borrow(&this, |r| r.0.clone()); - let account = crate::get_account(&id); - let account = account.read().unwrap(); - let message = account.get_message(&message_id); - match message { - Some(m) => Ok(neon_serde::to_value(&mut cx, &m)?), - None => Ok(cx.undefined().upcast()) - } + crate::block_on(async move { + let account_handle = crate::get_account(&id); + let account = account_handle.read().await; + let message = account.get_message(&message_id); + match message { + Some(m) => Ok(neon_serde::to_value(&mut cx, &m)?), + None => Ok(cx.undefined().upcast()) + } + }) } method generateAddress(mut cx) { @@ -194,9 +179,10 @@ declare_types! { let this = cx.this(); let guard = cx.lock(); let id = &this.borrow(&guard).0; - let account = crate::get_account(id); - let mut account = account.write().unwrap(); - account.generate_address().expect("error generating address") + crate::block_on(async move { + let account_handle = crate::get_account(id); + account_handle.generate_address().await.expect("error generating address") + }) }; Ok(neon_serde::to_value(&mut cx, &address)?) } @@ -204,13 +190,15 @@ declare_types! { method latestAddress(mut cx) { let this = cx.this(); let id = cx.borrow(&this, |r| r.0.clone()); - let account = crate::get_account(&id); - let account = account.read().unwrap(); - let address = account.latest_address(); - match address { - Some(a) => Ok(neon_serde::to_value(&mut cx, &a)?), - None => Ok(cx.undefined().upcast()) - } + crate::block_on(async move { + let account_handle = crate::get_account(&id); + let account = account_handle.read().await; + let address = account.latest_address(); + match address { + Some(a) => Ok(neon_serde::to_value(&mut cx, &a)?), + None => Ok(cx.undefined().upcast()) + } + }) } method sync(mut cx) { diff --git a/bindings/node/native/src/classes/account/sync.rs b/bindings/node/native/src/classes/account/sync.rs index 98cbc6184..cc5fc0a98 100644 --- a/bindings/node/native/src/classes/account/sync.rs +++ b/bindings/node/native/src/classes/account/sync.rs @@ -1,7 +1,10 @@ // Copyright 2020 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use iota_wallet::{account::SyncedAccount, WalletError}; +use iota_wallet::{ + account::{AccountIdentifier, SyncedAccount}, + WalletError, +}; use neon::prelude::*; use serde::Deserialize; @@ -16,7 +19,7 @@ pub struct SyncOptions { } pub struct SyncTask { - pub account_id: String, + pub account_id: AccountIdentifier, pub options: SyncOptions, } @@ -26,30 +29,30 @@ impl Task for SyncTask { type JsEvent = JsValue; fn perform(&self) -> Result { - let account = crate::get_account(&self.account_id); - let mut acc = account.write().unwrap(); - let mut synchronizer = acc.sync(); - if let Some(address_index) = self.options.address_index { - synchronizer = synchronizer.address_index(address_index); - } - if let Some(gap_limit) = self.options.gap_limit { - synchronizer = synchronizer.gap_limit(gap_limit); - } - if let Some(skip_persistance) = self.options.skip_persistance { - if skip_persistance { - synchronizer = synchronizer.skip_persistance(); + crate::block_on(async move { + let account = crate::get_account(&self.account_id); + let mut synchronizer = account.sync().await; + if let Some(address_index) = self.options.address_index { + synchronizer = synchronizer.address_index(address_index); } - } - crate::block_on(crate::convert_async_panics(|| async { synchronizer.execute().await })) + if let Some(gap_limit) = self.options.gap_limit { + synchronizer = synchronizer.gap_limit(gap_limit); + } + if let Some(skip_persistance) = self.options.skip_persistance { + if skip_persistance { + synchronizer = synchronizer.skip_persistance(); + } + } + synchronizer.execute().await + }) } fn complete(self, mut cx: TaskContext, value: Result) -> JsResult { match value { Ok(val) => { - let synced = serde_json::to_string(&val).unwrap(); - let synced = cx.string(synced); - let account_id = cx.string(&self.account_id); - Ok(crate::JsSyncedAccount::new(&mut cx, vec![synced, account_id])?.upcast()) + let id = crate::store_synced_account(val); + let id = cx.string(id); + Ok(crate::JsSyncedAccount::new(&mut cx, vec![id])?.upcast()) } Err(e) => cx.throw_error(e.to_string()), } diff --git a/bindings/node/native/src/classes/account_manager/internal_transfer.rs b/bindings/node/native/src/classes/account_manager/internal_transfer.rs index 44c98a5f1..f73461ad6 100644 --- a/bindings/node/native/src/classes/account_manager/internal_transfer.rs +++ b/bindings/node/native/src/classes/account_manager/internal_transfer.rs @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock}; -use iota_wallet::{account_manager::AccountManager, message::Message, WalletError}; +use iota_wallet::{account::AccountIdentifier, account_manager::AccountManager, message::Message, WalletError}; use neon::prelude::*; pub struct InternalTransferTask { pub manager: Arc>, - pub from_account_id: String, - pub to_account_id: String, + pub from_account_id: AccountIdentifier, + pub to_account_id: AccountIdentifier, pub amount: u64, } @@ -21,18 +21,9 @@ impl Task for InternalTransferTask { fn perform(&self) -> Result { let manager = self.manager.read().unwrap(); crate::block_on(crate::convert_async_panics(|| async { - let from_account = crate::get_account(&self.from_account_id); - let from_account = from_account.read().unwrap(); - let to_account = crate::get_account(&self.to_account_id); - let to_account = to_account.read().unwrap(); - let res = manager - .internal_transfer(from_account.id(), to_account.id(), self.amount) - .await?; - - crate::update_account(&self.from_account_id, res.from_account); - crate::update_account(&self.to_account_id, res.to_account); - - Ok(res.message) + manager + .internal_transfer(&self.from_account_id, &self.to_account_id, self.amount) + .await })) } diff --git a/bindings/node/native/src/classes/account_manager/mod.rs b/bindings/node/native/src/classes/account_manager/mod.rs index 368b9468b..3cf541744 100644 --- a/bindings/node/native/src/classes/account_manager/mod.rs +++ b/bindings/node/native/src/classes/account_manager/mod.rs @@ -101,8 +101,8 @@ declare_types! { None => Default::default(), }; let manager = match options.storage_type { - StorageType::Sqlite => AccountManager::with_storage_adapter(&options.storage_path, SqliteStorageAdapter::new(&options.storage_path, "accounts").unwrap()), - StorageType::Stronghold => AccountManager::with_storage_adapter(&options.storage_path, StrongholdStorageAdapter::new(&options.storage_path).unwrap()), + StorageType::Sqlite => crate::block_on(AccountManager::with_storage_adapter(&options.storage_path, SqliteStorageAdapter::new(&options.storage_path, "accounts").unwrap())), + StorageType::Stronghold => crate::block_on(AccountManager::with_storage_adapter(&options.storage_path, StrongholdStorageAdapter::new(&options.storage_path).unwrap())), }; let manager = manager.expect("error initializing account manager"); Ok(AccountManagerWrapper(Arc::new(RwLock::new(manager)))) @@ -115,18 +115,7 @@ declare_types! { let guard = cx.lock(); let ref_ = &this.borrow(&guard).0; let mut manager = ref_.write().unwrap(); - manager.set_stronghold_password(password).expect("error setting stronghold password"); - } - Ok(cx.undefined().upcast()) - } - - method startBackgroundSync(mut cx) { - { - let this = cx.this(); - let guard = cx.lock(); - let ref_ = &this.borrow(&guard).0; - let mut manager = ref_.write().unwrap(); - manager.start_background_sync(); + crate::block_on(async move { manager.set_stronghold_password(password).await }).expect("error setting stronghold password"); } Ok(cx.undefined().upcast()) } @@ -159,12 +148,13 @@ declare_types! { .expect("invalid account created at format"), ); } - builder.initialise().expect("error creating account") + crate::block_on(async move { builder.initialise().await }).expect("error creating account") }; - let account = serde_json::to_string(&account).unwrap(); - let account = cx.string(account); - Ok(JsAccount::new(&mut cx, vec![account])?.upcast()) + let id = crate::store_account(account); + let id = cx.string(serde_json::to_string(&id).unwrap()); + + Ok(JsAccount::new(&mut cx, vec![id])?.upcast()) } method getAccount(mut cx) { @@ -175,13 +165,13 @@ declare_types! { let guard = cx.lock(); let ref_ = &this.borrow(&guard).0; let manager = ref_.read().unwrap(); - manager.get_account(&id) + crate::block_on(async move { manager.get_account(&id).await }) }; match account { - Ok(acc) => { - let account = serde_json::to_string(&acc).unwrap(); - let account = cx.string(account); - Ok(JsAccount::new(&mut cx, vec![account])?.upcast()) + Ok(account) => { + let id = crate::store_account(account); + let id = cx.string(serde_json::to_string(&id).unwrap()); + Ok(JsAccount::new(&mut cx, vec![id])?.upcast()) }, Err(_) => Ok(cx.undefined().upcast()) } @@ -194,13 +184,13 @@ declare_types! { let guard = cx.lock(); let ref_ = &this.borrow(&guard).0; let manager = ref_.read().unwrap(); - manager.get_account_by_alias(alias) + crate::block_on(async move { manager.get_account_by_alias(alias).await }) }; match account { - Some(acc) => { - let account = serde_json::to_string(&acc).unwrap(); - let account = cx.string(account); - Ok(JsAccount::new(&mut cx, vec![account])?.upcast()) + Some(account) => { + let id = crate::store_account(account); + let id = cx.string(serde_json::to_string(&id).unwrap()); + Ok(JsAccount::new(&mut cx, vec![id])?.upcast()) }, None => Ok(cx.undefined().upcast()) } @@ -212,14 +202,14 @@ declare_types! { let guard = cx.lock(); let ref_ = &this.borrow(&guard).0; let manager = ref_.read().unwrap(); - manager.get_accounts().expect("failed to get accounts") + crate::block_on(async move { manager.get_accounts().await }) }; let js_array = JsArray::new(&mut cx, accounts.len() as u32); - for (index, account) in accounts.iter().enumerate() { - let account = serde_json::to_string(&account).unwrap(); - let account = cx.string(account); - let js_account = JsAccount::new(&mut cx, vec![account])?; + for (index, account) in accounts.into_iter().enumerate() { + let id = crate::store_account(account); + let id = cx.string(serde_json::to_string(&id).unwrap()); + let js_account = JsAccount::new(&mut cx, vec![id])?; js_array.set(&mut cx, index as u32, js_account)?; } @@ -234,7 +224,7 @@ declare_types! { let guard = cx.lock(); let ref_ = &this.borrow(&guard).0; let manager = ref_.read().unwrap(); - manager.remove_account(&id).expect("error removing account") + crate::block_on(async move { manager.remove_account(&id).await }).expect("error removing account") }; Ok(cx.undefined().upcast()) } diff --git a/bindings/node/native/src/classes/account_manager/sync.rs b/bindings/node/native/src/classes/account_manager/sync.rs index b97909df6..e50f5a003 100644 --- a/bindings/node/native/src/classes/account_manager/sync.rs +++ b/bindings/node/native/src/classes/account_manager/sync.rs @@ -24,10 +24,10 @@ impl Task for SyncTask { match value { Ok(synced_accounts) => { let js_array = JsArray::new(&mut cx, synced_accounts.len() as u32); - for (index, synced_account) in synced_accounts.iter().enumerate() { - let synced = serde_json::to_string(&synced_account).unwrap(); - let synced = cx.string(synced); - let synced_instance = crate::JsSyncedAccount::new(&mut cx, vec![synced])?; + for (index, synced_account) in synced_accounts.into_iter().enumerate() { + let id = crate::store_synced_account(synced_account); + let id = cx.string(id); + let synced_instance = crate::JsSyncedAccount::new(&mut cx, vec![id])?; js_array.set(&mut cx, index as u32, synced_instance)?; } diff --git a/bindings/node/native/src/classes/synced_account/mod.rs b/bindings/node/native/src/classes/synced_account/mod.rs index e93efbf78..75ddd768c 100644 --- a/bindings/node/native/src/classes/synced_account/mod.rs +++ b/bindings/node/native/src/classes/synced_account/mod.rs @@ -1,13 +1,9 @@ // Copyright 2020 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::{ - str::FromStr, - sync::{Arc, RwLock}, -}; +use std::str::FromStr; use iota_wallet::{ - account::SyncedAccount, address::parse as parse_address, message::{MessageId, RemainderValueStrategy, Transfer}, }; @@ -17,15 +13,19 @@ mod repost; mod send; #[derive(Clone)] -pub struct SyncedAccountWrapper(Arc>, String); +pub struct SyncedAccountWrapper(pub String); + +impl Drop for SyncedAccountWrapper { + fn drop(&mut self) { + crate::remove_synced_account(&self.0); + } +} declare_types! { pub class JsSyncedAccount for SyncedAccountWrapper { init(mut cx) { - let synced = cx.argument::(0)?.value(); - let account_id = cx.argument::(1)?.value(); - let synced: SyncedAccount = serde_json::from_str(&synced).expect("invalid synced account JSON"); - Ok(SyncedAccountWrapper(Arc::new(RwLock::new(synced)), account_id)) + let synced_account_id = cx.argument::(0)?.value(); + Ok(SyncedAccountWrapper(synced_account_id)) } method send(mut cx) { @@ -45,10 +45,9 @@ declare_types! { .remainder_value_strategy(remainder_value_strategy); let this = cx.this(); - let instance = cx.borrow(&this, |r| r.clone()); + let synced_account_id = cx.borrow(&this, |r| r.0.clone()); let task = send::SendTask { - synced: instance.0, - account_id: instance.1, + synced_account_id, transfer, }; task.schedule(cb); @@ -60,10 +59,9 @@ declare_types! { let cb = cx.argument::(1)?; let this = cx.this(); - let instance = cx.borrow(&this, |r| r.clone()); + let synced_account_id = cx.borrow(&this, |r| r.0.clone()); let task = repost::RepostTask { - synced: instance.0, - account_id: instance.1, + synced_account_id, message_id, action: repost::RepostAction::Retry, }; @@ -76,10 +74,9 @@ declare_types! { let cb = cx.argument::(1)?; let this = cx.this(); - let instance = cx.borrow(&this, |r| r.clone()); + let synced_account_id = cx.borrow(&this, |r| r.0.clone()); let task = repost::RepostTask { - synced: instance.0, - account_id: instance.1, + synced_account_id, message_id, action: repost::RepostAction::Reattach, }; @@ -92,10 +89,9 @@ declare_types! { let cb = cx.argument::(1)?; let this = cx.this(); - let instance = cx.borrow(&this, |r| r.clone()); + let synced_account_id = cx.borrow(&this, |r| r.0.clone()); let task = repost::RepostTask { - synced: instance.0, - account_id: instance.1, + synced_account_id, message_id, action: repost::RepostAction::Promote, }; diff --git a/bindings/node/native/src/classes/synced_account/repost.rs b/bindings/node/native/src/classes/synced_account/repost.rs index f410ecb70..79c48f8f4 100644 --- a/bindings/node/native/src/classes/synced_account/repost.rs +++ b/bindings/node/native/src/classes/synced_account/repost.rs @@ -1,10 +1,7 @@ // Copyright 2020 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::sync::{Arc, RwLock}; - use iota_wallet::{ - account::SyncedAccount, message::{Message, MessageId}, WalletError, }; @@ -17,8 +14,7 @@ pub enum RepostAction { } pub struct RepostTask { - pub synced: Arc>, - pub account_id: String, + pub synced_account_id: String, pub message_id: MessageId, pub action: RepostAction, } @@ -29,7 +25,9 @@ impl Task for RepostTask { type JsEvent = JsValue; fn perform(&self) -> Result { - let synced = self.synced.read().unwrap(); + let synced = crate::get_synced_account(&self.synced_account_id); + let synced = synced.read().unwrap(); + crate::block_on(crate::convert_async_panics(|| async { let message = match self.action { RepostAction::Retry => synced.retry(&self.message_id).await?, @@ -37,10 +35,6 @@ impl Task for RepostTask { RepostAction::Promote => synced.promote(&self.message_id).await?, }; - let account = crate::get_account(&self.account_id); - let mut account = account.write().unwrap(); - account.append_messages(vec![message.clone()]); - Ok(message) })) } diff --git a/bindings/node/native/src/classes/synced_account/send.rs b/bindings/node/native/src/classes/synced_account/send.rs index 5dfc2f1d0..10f0947fe 100644 --- a/bindings/node/native/src/classes/synced_account/send.rs +++ b/bindings/node/native/src/classes/synced_account/send.rs @@ -1,18 +1,14 @@ // Copyright 2020 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::sync::{Arc, RwLock}; - use iota_wallet::{ - account::SyncedAccount, message::{Message, Transfer}, WalletError, }; use neon::prelude::*; pub struct SendTask { - pub synced: Arc>, - pub account_id: String, + pub synced_account_id: String, pub transfer: Transfer, } @@ -22,11 +18,11 @@ impl Task for SendTask { type JsEvent = JsValue; fn perform(&self) -> Result { - let synced = self.synced.read().unwrap(); + let synced = crate::get_synced_account(&self.synced_account_id); + let synced = synced.read().unwrap(); + crate::block_on(crate::convert_async_panics(|| async { - let res = synced.transfer(self.transfer.clone()).await?; - crate::update_account(&self.account_id, res.account); - Ok(res.message) + synced.transfer(self.transfer.clone()).await })) } diff --git a/bindings/node/native/src/lib.rs b/bindings/node/native/src/lib.rs index 59b340508..747f41e40 100644 --- a/bindings/node/native/src/lib.rs +++ b/bindings/node/native/src/lib.rs @@ -3,15 +3,15 @@ use std::{ any::Any, + borrow::Cow, collections::HashMap, panic::AssertUnwindSafe, sync::{Arc, Mutex, RwLock}, - thread, }; use futures::{Future, FutureExt}; use iota_wallet::{ - account::{Account, AccountIdentifier}, + account::{AccountHandle, AccountIdentifier, SyncedAccount}, WalletError, }; use neon::prelude::*; @@ -22,109 +22,70 @@ use tokio::runtime::Runtime; mod classes; use classes::*; -type AccountInstanceMap = Arc>>>>; - -/// check if the account instance is loaded on the JS side (AccountInstanceMap) and update it by running a callback -fn mutate_account_if_exists(account_id: &AccountIdentifier, cb: F) { - let account_id = account_id.clone(); - thread::spawn(move || { - let map = instances() - .read() - .expect("failed to lock read on account instances: mutate_account_if_exists()"); - - for account in map.values() { - let account_ = account.read().unwrap(); - if account_.id() == &account_id { - std::mem::drop(account_); - let mut account = account.write().unwrap(); - cb(&mut account); - break; - } - } - }); -} +type AccountInstanceMap = Arc>>; +type SyncedAccountHandle = Arc>; +type SyncedAccountInstanceMap = Arc>>; /// Gets the account instances map. -fn instances() -> &'static AccountInstanceMap { - static INSTANCES: Lazy = Lazy::new(|| { - iota_wallet::event::on_balance_change(|event| { - let address = event.cloned_address(); - let balance = *event.balance(); - mutate_account_if_exists(event.account_id(), move |account| { - let addresses = account.addresses_mut(); - if let Some(address) = addresses.iter_mut().find(|a| a == &&address) { - address.set_balance(balance); - } - }); - }); - iota_wallet::event::on_new_transaction(|event| { - let message = event.cloned_message(); - mutate_account_if_exists(event.account_id(), move |account| { - account.append_messages(vec![message]); - }); - }); - iota_wallet::event::on_confirmation_state_change(|event| { - let message = event.cloned_message(); - let confirmed = *event.confirmed(); - mutate_account_if_exists(event.account_id(), move |account| { - if let Some(message) = account.messages_mut().iter_mut().find(|m| m == &&message) { - message.set_confirmed(Some(confirmed)); - } - }); - }); - iota_wallet::event::on_reattachment(|event| { - let message = event.cloned_message(); - mutate_account_if_exists(event.account_id(), move |account| { - account.append_messages(vec![message]); - }); - }); - iota_wallet::event::on_broadcast(|event| { - let message = event.cloned_message(); - mutate_account_if_exists(event.account_id(), move |account| { - if let Some(message) = account.messages_mut().iter_mut().find(|m| m == &&message) { - message.set_broadcasted(true); - } - }); - }); - Default::default() - }); +fn account_instances() -> &'static AccountInstanceMap { + static INSTANCES: Lazy = Lazy::new(Default::default); &INSTANCES } -pub(crate) fn get_account(id: &str) -> Arc> { - let map = instances() +pub(crate) fn get_account(id: &AccountIdentifier) -> AccountHandle { + account_instances() .read() - .expect("failed to lock account instances: get_account()"); - map.get(id).expect("account dropped or not initialised").clone() + .expect("failed to lock account instances: get_account()") + .get(id) + .expect("account dropped or not initialised") + .clone() } -pub(crate) fn store_account(account: Account) -> String { - let mut map = instances() +pub(crate) fn store_account(account_handle: AccountHandle) -> AccountIdentifier { + let handle = account_handle.clone(); + let id = block_on(async move { handle.id().await }); + + account_instances() .write() - .expect("failed to lock account instances: store_account()"); - let id: String = thread_rng().sample_iter(&Alphanumeric).take(10).collect(); - map.insert(id.clone(), Arc::new(RwLock::new(account))); + .expect("failed to lock account instances: store_account()") + .insert(id.clone(), account_handle); + id } -pub(crate) fn update_account(id: &str, account: Account) { - let mut map = instances() +/// Gets the synced account instances map. +fn synced_account_instances() -> &'static SyncedAccountInstanceMap { + static INSTANCES: Lazy = Lazy::new(Default::default); + &INSTANCES +} + +pub(crate) fn get_synced_account(id: &str) -> SyncedAccountHandle { + synced_account_instances() + .read() + .expect("failed to lock synced account instances: get_synced_account()") + .get(id) + .expect("synced account dropped or not initialised") + .clone() +} + +pub(crate) fn store_synced_account(synced_account: SyncedAccount) -> String { + let mut map = synced_account_instances() .write() - .expect("failed to lock account instances: store_account()"); - map.insert(id.to_string(), Arc::new(RwLock::new(account))); + .expect("failed to lock synced account instances: store_synced_account()"); + let id: String = thread_rng().sample_iter(&Alphanumeric).take(10).collect(); + map.insert(id.clone(), Arc::new(RwLock::new(synced_account))); + id } -pub(crate) fn remove_account(id: &str) { - let mut map = instances() +pub(crate) fn remove_synced_account(id: &str) { + synced_account_instances() .write() - .expect("failed to lock account instances: remove_account()"); - map.remove(id); + .expect("failed to lock synced account instances: remove_synced_account()") + .remove(id); } fn panic_to_response_message(panic: Box) -> Result { - let msg = if let Some(message) = panic.downcast_ref::() { - format!("Internal error: {}", message) - } else if let Some(message) = panic.downcast_ref::<&str>() { + let msg = if let Some(message) = panic.downcast_ref::>() { format!("Internal error: {}", message) } else { "Internal error".to_string() diff --git a/examples/account_operations.rs b/examples/account_operations.rs index afc719df1..070cd2054 100644 --- a/examples/account_operations.rs +++ b/examples/account_operations.rs @@ -5,22 +5,26 @@ use iota_wallet::{account_manager::AccountManager, client::ClientOptionsBuilder, #[tokio::main] async fn main() -> iota_wallet::Result<()> { - let mut manager = AccountManager::new().unwrap(); - manager.set_stronghold_password("password").unwrap(); + let mut manager = AccountManager::new().await.unwrap(); + manager.set_stronghold_password("password").await.unwrap(); // first we'll create an example account and store it let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443")?.build(); - let mut account = manager.create_account(client_options).alias("alias").initialise()?; + let account = manager + .create_account(client_options) + .alias("alias") + .initialise() + .await?; // update alias - account.set_alias("the new alias"); + account.set_alias("the new alias").await; // list unspent addresses let _ = account.list_addresses(false); // list spent addresses let _ = account.list_addresses(true); // generate a new unused address - let _ = account.generate_address()?; + let _ = account.generate_address().await?; // list messages let _ = account.list_messages(5, 0, Some(MessageType::Failed)); diff --git a/examples/actor.rs b/examples/actor.rs index cd9b48620..d59048d0e 100644 --- a/examples/actor.rs +++ b/examples/actor.rs @@ -1,7 +1,10 @@ // Copyright 2020 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use iota_wallet::actor::{AccountToCreate, Message, MessageType, Response, ResponseType, WalletMessageHandler}; +use iota_wallet::{ + actor::{AccountToCreate, Message, MessageType, Response, ResponseType, WalletMessageHandler}, + client::ClientOptionsBuilder, +}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; /// The Wallet actor. @@ -23,13 +26,15 @@ impl WalletActor { fn spawn_actor() -> UnboundedSender { let (tx, rx) = unbounded_channel(); - let actor = WalletActor { - rx, - message_handler: Default::default(), - }; std::thread::spawn(|| { let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(actor.run()); + runtime.block_on(async move { + let actor = WalletActor { + rx, + message_handler: WalletMessageHandler::new().await.unwrap(), + }; + actor.run().await + }); }); tx } @@ -45,7 +50,13 @@ async fn send_message(tx: &UnboundedSender, message_type: MessageType) async fn main() { let tx = spawn_actor(); - let account = AccountToCreate::default(); + let account = AccountToCreate { + client_options: ClientOptionsBuilder::node("http://node.iota").unwrap().build(), + mnemonic: None, + alias: None, + created_at: None, + }; + send_message(&tx, MessageType::SetStrongholdPassword("password".to_string())).await; let response = send_message(&tx, MessageType::CreateAccount(account)).await; match response.response() { diff --git a/examples/backup_and_restore.rs b/examples/backup_and_restore.rs index 1221994a6..e58069d93 100644 --- a/examples/backup_and_restore.rs +++ b/examples/backup_and_restore.rs @@ -3,25 +3,33 @@ use iota_wallet::{account_manager::AccountManager, client::ClientOptionsBuilder}; -fn main() -> iota_wallet::Result<()> { - let mut manager = AccountManager::new().unwrap(); - manager.set_stronghold_password("password").unwrap(); +#[tokio::main] +async fn main() -> iota_wallet::Result<()> { + let mut manager = AccountManager::new().await.unwrap(); + manager.set_stronghold_password("password").await.unwrap(); // first we'll create an example account let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443")?.build(); - let account = manager.create_account(client_options).alias("alias").initialise()?; - let id = account.id(); + let account_handle = manager + .create_account(client_options) + .alias("alias") + .initialise() + .await?; + let id = account_handle.id().await; // backup the stored accounts to ./backup/${backup_name} let backup_path = manager.backup("./backup")?; // delete the account on the current storage - manager.remove_account(&id)?; + manager.remove_account(&id).await?; // import the accounts from the backup and assert that it's the same manager.import_accounts(backup_path)?; - let imported_account = manager.get_account(id)?; - assert_eq!(account, imported_account); + let imported_account_handle = manager.get_account(&id).await?; + + let account = account_handle.read().await; + let imported_account = imported_account_handle.read().await; + assert_eq!(*account, *imported_account); Ok(()) } diff --git a/examples/custom_storage.rs b/examples/custom_storage.rs index f5a400da3..45560770b 100644 --- a/examples/custom_storage.rs +++ b/examples/custom_storage.rs @@ -59,15 +59,21 @@ impl StorageAdapter for MyStorage { } } -fn main() -> iota_wallet::Result<()> { +#[tokio::main] +async fn main() -> iota_wallet::Result<()> { let mut manager = AccountManager::with_storage_adapter("./example-database/sled", MyStorage::new("./example-database/sled")?) + .await .unwrap(); - manager.set_stronghold_password("password").unwrap(); + manager.set_stronghold_password("password").await.unwrap(); // first we'll create an example account let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443")?.build(); - manager.create_account(client_options).alias("alias").initialise()?; + manager + .create_account(client_options) + .alias("alias") + .initialise() + .await?; Ok(()) } diff --git a/examples/transfer.rs b/examples/transfer.rs index 46afa9916..1e75ddbdf 100644 --- a/examples/transfer.rs +++ b/examples/transfer.rs @@ -5,18 +5,26 @@ use iota_wallet::{account_manager::AccountManager, client::ClientOptionsBuilder, #[tokio::main] async fn main() -> iota_wallet::Result<()> { - let mut manager = AccountManager::new().unwrap(); - manager.set_stronghold_password("password").unwrap(); + let mut manager = AccountManager::new().await.unwrap(); + manager.set_stronghold_password("password").await.unwrap(); // first we'll create an example account and store it let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443")?.build(); - let account = manager.create_account(client_options).alias("alias").initialise()?; + let account = manager + .create_account(client_options) + .alias("alias") + .initialise() + .await?; // we need to synchronize with the Tangle first let sync_accounts = manager.sync_accounts().await?; let sync_account = sync_accounts.first().unwrap(); + sync_account - .transfer(Transfer::new(account.latest_address().unwrap().address().clone(), 150)) + .transfer(Transfer::new( + account.latest_address().await.unwrap().address().clone(), + 150, + )) .await?; Ok(()) diff --git a/src/account/mod.rs b/src/account/mod.rs index 0e71a72c6..c97ec6d08 100644 --- a/src/account/mod.rs +++ b/src/account/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + account_manager::AccountStore, address::{Address, IotaAddress}, client::ClientOptions, message::{Message, MessageType}, @@ -13,17 +14,19 @@ use getset::{Getters, Setters}; use iota::message::prelude::MessageId; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use std::{ collections::HashMap, convert::TryInto, + ops::Deref, path::PathBuf, sync::{Arc, Mutex}, }; mod sync; pub(crate) use sync::{repost_message, RepostAction}; -pub use sync::{AccountSynchronizer, SyncedAccount, TransferMetadata}; +pub use sync::{AccountSynchronizer, SyncedAccount}; type AddressesLock = Arc>>; type AccountAddressesLock = Arc>>; @@ -68,34 +71,36 @@ impl From for AccountIdentifier { } /// Account initialiser. -pub struct AccountInitialiser<'a> { +pub struct AccountInitialiser { + accounts: AccountStore, + storage_path: PathBuf, mnemonic: Option, alias: Option, created_at: Option>, messages: Vec, addresses: Vec
, client_options: ClientOptions, - skip_persistance: bool, - storage_path: &'a PathBuf, signer_type: Option, + skip_persistance: bool, } -impl<'a> AccountInitialiser<'a> { +impl AccountInitialiser { /// Initialises the account builder. - pub(crate) fn new(client_options: ClientOptions, storage_path: &'a PathBuf) -> Self { + pub(crate) fn new(client_options: ClientOptions, accounts: AccountStore, storage_path: PathBuf) -> Self { Self { + accounts, + storage_path, mnemonic: None, alias: None, created_at: None, messages: vec![], addresses: vec![], client_options, - skip_persistance: false, - storage_path, #[cfg(feature = "stronghold")] signer_type: Some(SignerType::Stronghold), #[cfg(not(feature = "stronghold"))] signer_type: None, + skip_persistance: false, } } @@ -144,8 +149,9 @@ impl<'a> AccountInitialiser<'a> { } /// Initialises the account. - pub fn initialise(self) -> crate::Result { - let accounts = crate::storage::with_adapter(self.storage_path, |storage| storage.get_all())?; + pub async fn initialise(self) -> crate::Result { + let mut accounts = self.accounts.write().await; + let alias = self.alias.unwrap_or_else(|| format!("Account {}", accounts.len())); let signer_type = self .signer_type @@ -163,13 +169,10 @@ impl<'a> AccountInitialiser<'a> { } } - // check for empty latest account only when not skipping persistance (account discovery process) - if !self.skip_persistance { - if let Some(latest_account) = accounts.last() { - let latest_account: Account = serde_json::from_str(&latest_account)?; - if latest_account.messages().is_empty() && latest_account.total_balance() == 0 { - return Err(crate::WalletError::LatestAccountIsEmpty); - } + if let Some(latest_account_handle) = accounts.values().last() { + let latest_account = latest_account_handle.read().await; + if latest_account.messages().is_empty() && latest_account.total_balance() == 0 { + return Err(crate::WalletError::LatestAccountIsEmpty); } } @@ -182,17 +185,23 @@ impl<'a> AccountInitialiser<'a> { messages: self.messages, addresses: self.addresses, client_options: self.client_options, - storage_path: self.storage_path.clone(), - has_pending_changes: false, + storage_path: self.storage_path, + has_pending_changes: true, }; let id = with_signer(&signer_type, |signer| signer.init_account(&account, mnemonic))?; account.set_id(id.into()); - if !self.skip_persistance { - account.save()?; - } - Ok(account) + let guard = if self.skip_persistance { + account.into() + } else { + let account_id = account.id().clone(); + let guard: AccountHandle = account.into(); + accounts.insert(account_id, guard.clone()); + guard + }; + + Ok(guard) } } @@ -242,7 +251,134 @@ pub struct Account { has_pending_changes: bool, } +/// A thread guard over an account. +#[derive(Debug, Clone)] +pub struct AccountHandle(Arc>); + +impl From for AccountHandle { + fn from(account: Account) -> Self { + Self(Arc::new(RwLock::new(account))) + } +} + +impl Deref for AccountHandle { + type Target = RwLock; + fn deref(&self) -> &Self::Target { + &self.0.deref() + } +} + +macro_rules! guard_field_getters { + ($ty:ident, $(#[$attr:meta] => $x:ident => $ret:ty),*) => { + impl $ty { + $( + #[$attr] + pub async fn $x(&self) -> $ret { + self.0.read().await.$x().clone() + } + )* + } + } +} + +guard_field_getters!( + AccountHandle, + #[doc = "Bridge to [Account#id](struct.Account.html#method.id)."] => id => AccountIdentifier, + #[doc = "Bridge to [Account#signer_type](struct.Account.html#method.signer_type)."] => signer_type => SignerType, + #[doc = "Bridge to [Account#index](struct.Account.html#method.index)."] => index => usize, + #[doc = "Bridge to [Account#alias](struct.Account.html#method.alias)."] => alias => String, + #[doc = "Bridge to [Account#created_at](struct.Account.html#method.created_at)."] => created_at => DateTime, + #[doc = "Bridge to [Account#messages](struct.Account.html#method.messages). + This method clones the addresses so prefer the using the `read` method to access the account instance."] => messages => Vec, + #[doc = "Bridge to [Account#addresses](struct.Account.html#method.addresses). + This method clones the addresses so prefer the using the `read` method to access the account instance."] => addresses => Vec
, + #[doc = "Bridge to [Account#client_options](struct.Account.html#method.client_options)."] => client_options => ClientOptions +); + +impl AccountHandle { + /// Returns the builder to setup the process to synchronize this account with the Tangle. + pub async fn sync(&self) -> AccountSynchronizer { + AccountSynchronizer::new(self.clone()).await + } + + /// Gets a new unused address and links it to this account. + pub async fn generate_address(&self) -> crate::Result
{ + let mut account = self.0.write().await; + let address = crate::address::get_new_address(&account)?; + + account.do_mut(|account| { + account.addresses.push(address.clone()); + }); + + let _ = crate::monitor::monitor_address_balance(self.clone(), address.address()); + + Ok(address) + } + + /// Bridge to [Account#latest_address](struct.Account.html#method.latest_address). + pub async fn latest_address(&self) -> Option
{ + self.0.read().await.latest_address().cloned() + } + + /// Bridge to [Account#total_balance](struct.Account.html#method.total_balance). + pub async fn total_balance(&self) -> u64 { + self.0.read().await.total_balance() + } + + /// Bridge to [Account#available_balance](struct.Account.html#method.available_balance). + pub async fn available_balance(&self) -> u64 { + self.0.read().await.available_balance() + } + + /// Bridge to [Account#set_alias](struct.Account.html#method.set_alias). + pub async fn set_alias(&self, alias: impl AsRef) { + self.0.write().await.set_alias(alias); + } + + /// Bridge to [Account#set_client_options](struct.Account.html#method.set_client_options). + pub async fn set_client_options(&self, options: ClientOptions) { + self.0.write().await.set_client_options(options); + } + + /// Bridge to [Account#list_messages](struct.Account.html#method.list_messages). + /// This method clones the account's messages so when querying a large list of messages + /// prefer using the `read` method to access the account instance. + pub async fn list_messages(&self, count: usize, from: usize, message_type: Option) -> Vec { + self.0 + .read() + .await + .list_messages(count, from, message_type) + .into_iter() + .cloned() + .collect() + } + + /// Bridge to [Account#list_addresses](struct.Account.html#method.list_addresses). + /// This method clones the account's addresses so when querying a large list of addresses + /// prefer using the `read` method to access the account instance. + pub async fn list_addresses(&self, unspent: bool) -> Vec
{ + self.0 + .read() + .await + .list_addresses(unspent) + .into_iter() + .cloned() + .collect() + } + + /// Bridge to [Account#get_message](struct.Account.html#method.get_message). + pub async fn get_message(&self, message_id: &MessageId) -> Option { + self.0.read().await.get_message(message_id).cloned() + } +} + impl Account { + pub(crate) fn do_mut(&mut self, f: impl FnOnce(&mut Self) -> R) -> R { + let res = f(self); + self.has_pending_changes = true; + res + } + /// Returns the most recent address of the account. pub fn latest_address(&self) -> Option<&Address> { self.addresses @@ -251,11 +387,6 @@ impl Account { .max_by_key(|a| a.key_index()) } - /// Returns the builder to setup the process to synchronize this account with the Tangle. - pub fn sync(&'_ mut self) -> AccountSynchronizer<'_> { - AccountSynchronizer::new(self, self.storage_path.clone()) - } - /// Gets the account's total balance. /// It's read directly from the storage. To read the latest account balance, you should `sync` first. pub fn total_balance(&self) -> u64 { @@ -292,21 +423,14 @@ impl Account { self.client_options = options; } - /// Saves the pending changes on the account. - /// This is automatically performed when the account goes out of scope. - pub fn save_pending_changes(&mut self) -> crate::Result<()> { + pub(crate) fn save(&mut self) -> crate::Result<()> { if self.has_pending_changes { - self.save()?; - self.has_pending_changes = false; + let storage_path = self.storage_path.clone(); + crate::storage::save_account(&storage_path, self)?; } Ok(()) } - pub(crate) fn save(&mut self) -> crate::Result<()> { - let storage_path = self.storage_path.clone(); - crate::storage::save_account(&storage_path, self) - } - /// Gets a list of transactions on this account. /// It's fetched from the storage. To ensure the database is updated with the latest transactions, /// `sync` should be called first. @@ -321,20 +445,25 @@ impl Account { /// use iota_wallet::{account_manager::AccountManager, client::ClientOptionsBuilder, message::MessageType}; /// # use rand::{thread_rng, Rng}; /// - /// # let storage_path: String = thread_rng().gen_ascii_chars().take(10).collect(); - /// # let storage_path = std::path::PathBuf::from(format!("./example-database/{}", storage_path)); - /// // gets 10 received messages, skipping the first 5 most recent messages. - /// let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") - /// .expect("invalid node URL") - /// .build(); - /// let mut manager = AccountManager::new().unwrap(); - /// # let mut manager = AccountManager::with_storage_path(storage_path).unwrap(); - /// manager.set_stronghold_password("password").unwrap(); - /// let mut account = manager - /// .create_account(client_options) - /// .initialise() - /// .expect("failed to add account"); - /// account.list_messages(10, 5, Some(MessageType::Received)); + /// #[tokio::main] + /// async fn main() { + /// # let storage_path: String = thread_rng().gen_ascii_chars().take(10).collect(); + /// # let storage_path = std::path::PathBuf::from(format!("./example-database/{}", storage_path)); + /// // gets 10 received messages, skipping the first 5 most recent messages. + /// let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") + /// .expect("invalid node URL") + /// .build(); + /// let mut manager = AccountManager::new().await.unwrap(); + /// # let mut manager = AccountManager::with_storage_path(storage_path).await.unwrap(); + /// manager.set_stronghold_password("password").await.unwrap(); + /// let account_handle = manager + /// .create_account(client_options) + /// .initialise() + /// .await + /// .expect("failed to add account"); + /// let account = account_handle.read().await; + /// account.list_messages(10, 5, Some(MessageType::Received)); + /// } /// ``` pub fn list_messages(&self, count: usize, from: usize, message_type: Option) -> Vec<&Message> { let mut messages: Vec<&Message> = vec![]; @@ -384,21 +513,10 @@ impl Account { .collect() } - /// Gets a new unused address and links it to this account. - pub fn generate_address(&mut self) -> crate::Result
{ - let address = crate::address::get_new_address(&self)?; - self.addresses.push(address.clone()); - - self.save()?; - - // ignore errors because we fallback to the polling system - let _ = crate::monitor::monitor_address_balance(&self, address.address()); - Ok(address) - } - #[doc(hidden)] pub fn append_messages(&mut self, messages: Vec) { self.messages.extend(messages); + self.has_pending_changes = true; } pub(crate) fn append_addresses(&mut self, addresses: Vec
) { @@ -412,15 +530,14 @@ impl Account { self.addresses.push(address); } }); + self.has_pending_changes = true; } - #[doc(hidden)] - pub fn addresses_mut(&mut self) -> &mut Vec
{ + pub(crate) fn addresses_mut(&mut self) -> &mut Vec
{ &mut self.addresses } - #[doc(hidden)] - pub fn messages_mut(&mut self) -> &mut Vec { + pub(crate) fn messages_mut(&mut self) -> &mut Vec { &mut self.messages } @@ -432,7 +549,7 @@ impl Account { impl Drop for Account { fn drop(&mut self) { - let _ = self.save_pending_changes(); + let _ = self.save(); } } @@ -463,30 +580,36 @@ mod tests { #[test] fn set_alias() { let manager = crate::test_utils::get_account_manager(); + let manager = manager.lock().unwrap(); let updated_alias = "updated alias"; let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") .expect("invalid node URL") .build(); - let account_id = { - let mut account = manager - .create_account(client_options) - .alias("alias") - .initialise() - .expect("failed to add account"); - - account.set_alias(updated_alias); - account.id().clone() - }; - - let account_in_storage = manager - .get_account(&account_id) - .expect("failed to get account from storage"); - assert_eq!( - account_in_storage.alias().to_string(), - updated_alias.to_string() - ); + crate::block_on(async move { + let account_id = { + let account_handle = manager + .create_account(client_options) + .alias("alias") + .initialise() + .await + .expect("failed to add account"); + + account_handle.set_alias(updated_alias).await; + account_handle.id().await + }; + + let account_in_storage = manager + .get_account(&account_id) + .await + .expect("failed to get account from storage"); + let account_in_storage = account_in_storage.read().await; + assert_eq!( + account_in_storage.alias().to_string(), + updated_alias.to_string() + ); + }); } } } diff --git a/src/account/sync/mod.rs b/src/account/sync/mod.rs index b9a3ec811..4b7aac604 100644 --- a/src/account/sync/mod.rs +++ b/src/account/sync/mod.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - account::{get_account_addresses_lock, Account, AccountIdentifier}, + account::{get_account_addresses_lock, Account, AccountHandle}, address::{Address, AddressBuilder, AddressOutput, IotaAddress}, client::get_client, message::{Message, RemainderValueStrategy, Transfer}, @@ -16,13 +16,12 @@ use iota::{ }, ClientMiner, }; -use serde::{Deserialize, Serialize}; +use serde::{ser::Serializer, Serialize}; use slip10::BIP32Path; use std::{ convert::TryInto, num::NonZeroU64, - path::PathBuf, sync::{mpsc::channel, Arc, Mutex, MutexGuard}, thread, time::Duration, @@ -49,7 +48,6 @@ const OUTPUT_LOCK_TIMEOUT: Duration = Duration::from_secs(30); /// Returns a (addresses, messages) tuples representing the address history up to latest unused address, /// and the messages associated with the addresses. async fn sync_addresses( - storage_path: &PathBuf, account: &'_ Account, address_index: usize, gap_limit: usize, @@ -73,7 +71,7 @@ async fn sync_addresses( for (iota_address_index, iota_address_internal, iota_address) in &generated_iota_addresses { futures_.push(async move { let client = crate::client::get_client(account.client_options()); - let client = client.read().unwrap(); + let client = client.read().await; let address_outputs = client.get_address().outputs(&iota_address).await?; let balance = client.get_address().balance(&iota_address).await?; @@ -159,7 +157,7 @@ async fn sync_messages( let client_options = client_options.clone(); async move { let client = crate::client::get_client(&client_options); - let client = client.read().unwrap(); + let client = client.read().await; let address_outputs = client.get_address().outputs(address.address()).await?; let balance = client.get_address().balance(address.address()).await?; @@ -201,42 +199,49 @@ async fn update_account_messages<'a>( new_messages: &'a [(MessageId, Option, IotaMessage)], ) -> crate::Result<()> { let client = get_client(account.client_options()); - let messages = account.messages_mut(); - // sync `broadcasted` state - messages - .iter_mut() - .filter(|message| !message.broadcasted() && new_messages.iter().any(|(id, _, _)| id == message.id())) - .for_each(|message| { - message.set_broadcasted(true); - }); + account.do_mut(|account| { + let messages = account.messages_mut(); + + // sync `broadcasted` state + messages + .iter_mut() + .filter(|message| !message.broadcasted() && new_messages.iter().any(|(id, _, _)| id == message.id())) + .for_each(|message| { + message.set_broadcasted(true); + }); + }); // sync `confirmed` state - let mut unconfirmed_messages: Vec<&mut Message> = messages - .iter_mut() + let unconfirmed_message_ids: Vec = account + .messages() + .iter() .filter(|message| message.confirmed().is_none()) + .map(|m| *m.id()) .collect(); - let client = client.read().unwrap(); + let client = client.read().await; - for message in unconfirmed_messages.iter_mut() { - let metadata = client.get_message().metadata(message.id()).await?; + for message_id in unconfirmed_message_ids { + let metadata = client.get_message().metadata(&message_id).await?; if let Some(inclusion_state) = metadata.ledger_inclusion_state { let confirmed = inclusion_state == "included"; - message.set_confirmed(Some(confirmed)); + account.do_mut(|account| { + let message = account + .messages_mut() + .iter_mut() + .find(|m| m.id() == &message_id) + .unwrap(); + message.set_confirmed(Some(confirmed)); + }); } } Ok(()) } -async fn perform_sync( - mut account: &mut Account, - storage_path: &PathBuf, - address_index: usize, - gap_limit: usize, -) -> crate::Result { - let (found_addresses, found_messages) = sync_addresses(&storage_path, &account, address_index, gap_limit).await?; +async fn perform_sync(mut account: &mut Account, address_index: usize, gap_limit: usize) -> crate::Result { + let (found_addresses, found_messages) = sync_addresses(&account, address_index, gap_limit).await?; let mut new_messages = vec![]; for (found_message_id, confirmed, found_message) in found_messages { @@ -298,25 +303,23 @@ async fn perform_sync( } /// Account sync helper. -pub struct AccountSynchronizer<'a> { - account: &'a mut Account, +pub struct AccountSynchronizer { + account_handle: AccountHandle, address_index: usize, gap_limit: usize, skip_persistance: bool, - storage_path: PathBuf, } -impl<'a> AccountSynchronizer<'a> { +impl AccountSynchronizer { /// Initialises a new instance of the sync helper. - pub(super) fn new(account: &'a mut Account, storage_path: PathBuf) -> Self { - let address_index = account.addresses().len(); + pub(super) async fn new(account_handle: AccountHandle) -> Self { + let address_index = account_handle.read().await.addresses().len(); Self { - account, + account_handle, // by default we synchronize from the latest address (supposedly unspent) address_index: if address_index == 0 { 0 } else { address_index - 1 }, gap_limit: if address_index == 0 { 10 } else { 1 }, skip_persistance: false, - storage_path, } } @@ -342,48 +345,78 @@ impl<'a> AccountSynchronizer<'a> { /// The account syncing process ensures that the latest metadata (balance, transactions) /// associated with an account is fetched from the tangle and is stored locally. pub async fn execute(self) -> crate::Result { - let options = self.account.client_options().clone(); + let options = self.account_handle.client_options().await; let client = get_client(&options); - let _ = crate::monitor::unsubscribe(&self.account); + let _ = crate::monitor::unsubscribe(self.account_handle.clone()); - let mut account_ = self.account.clone(); - let return_value = - match perform_sync(&mut account_, &self.storage_path, self.address_index, self.gap_limit).await { - Ok(is_empty) => { - self.account.set_addresses(account_.addresses().to_vec()); - self.account.set_messages(account_.messages().to_vec()); - if !self.skip_persistance { - self.account.save()?; - } - - let synced_account = SyncedAccount { - account_id: self.account.id().clone(), - deposit_address: self.account.latest_address().unwrap().clone(), - is_empty, - storage_path: self.storage_path, - addresses: self.account.addresses().clone(), - messages: self.account.messages().clone(), - }; - Ok(synced_account) + let mut account_ = { + let account_ref = self.account_handle.read().await; + account_ref.clone() + }; + let message_ids_before_sync: Vec = account_.messages().iter().map(|m| *m.id()).collect(); + let addresses_before_sync: Vec = account_.addresses().iter().map(|a| a.address().to_bech32()).collect(); + + let return_value = match perform_sync(&mut account_, self.address_index, self.gap_limit).await { + Ok(is_empty) => { + if !self.skip_persistance { + let mut account_ref = self.account_handle.write().await; + account_ref.set_addresses(account_.addresses().to_vec()); + account_ref.set_messages(account_.messages().to_vec()); } - Err(e) => Err(e), - }; - let _ = crate::monitor::monitor_account_addresses_balance(&self.account); - let _ = crate::monitor::monitor_unconfirmed_messages(&self.account); + let account_ref = self.account_handle.read().await; + + let synced_account = SyncedAccount { + account_handle: self.account_handle.clone(), + deposit_address: account_ref.latest_address().unwrap().clone(), + is_empty, + addresses: account_ref + .addresses() + .iter() + .filter(|a| { + !addresses_before_sync + .iter() + .any(|addr| addr == &a.address().to_bech32()) + }) + .cloned() + .collect(), + messages: account_ref + .messages() + .iter() + .filter(|m| !message_ids_before_sync.iter().any(|id| id == m.id())) + .cloned() + .collect(), + }; + Ok(synced_account) + } + Err(e) => Err(e), + }; + + let _ = crate::monitor::monitor_account_addresses_balance(self.account_handle.clone()); + let _ = crate::monitor::monitor_unconfirmed_messages(self.account_handle.clone()); return_value } } +fn serialize_as_id(x: &AccountHandle, s: S) -> Result +where + S: Serializer, +{ + crate::block_on(async move { + let account = x.read().await; + account.id().serialize(s) + }) +} + /// Data returned from account synchronization. -#[derive(Debug, Clone, PartialEq, Getters, Serialize, Deserialize)] +#[derive(Debug, Clone, Getters, Serialize)] pub struct SyncedAccount { /// The associated account identifier. - #[serde(rename = "accountId")] + #[serde(rename = "accountId", serialize_with = "serialize_as_id")] #[getset(get = "pub")] - account_id: AccountIdentifier, + account_handle: AccountHandle, /// The account's deposit address. #[serde(rename = "depositAddress")] #[getset(get = "pub")] @@ -398,17 +431,6 @@ pub struct SyncedAccount { /// The account addresses. #[getset(get = "pub")] addresses: Vec
, - #[serde(rename = "storagePath")] - storage_path: PathBuf, -} - -/// Transfer response metadata. -#[derive(Debug)] -pub struct TransferMetadata { - /// The transfer message. - pub message: Message, - /// The transfer source account with new message and addresses attached. - pub account: Account, } impl SyncedAccount { @@ -428,7 +450,7 @@ impl SyncedAccount { &self, locked_addresses: &'a mut MutexGuard<'_, Vec>, threshold: u64, - account: &'a mut Account, + account: &'a Account, address: &'a IotaAddress, ) -> crate::Result<(Vec, Option)> { let mut available_addresses: Vec = account @@ -461,67 +483,69 @@ impl SyncedAccount { } /// Send messages. - pub async fn transfer(&self, transfer_obj: Transfer) -> crate::Result { + pub async fn transfer(&self, transfer_obj: Transfer) -> crate::Result { // validate the transfer if transfer_obj.amount == 0 { return Err(crate::WalletError::ZeroAmount); } + let account_ = self.account_handle.read().await; + // lock the transfer process until we select the input addresses // we do this to prevent multiple threads trying to transfer at the same time // so it doesn't consume the same addresses multiple times, which leads to a conflict state - let account_addresses_locker = get_account_addresses_lock(&self.account_id); + let account_addresses_locker = get_account_addresses_lock(account_.id()); let mut locked_addresses = account_addresses_locker.lock().unwrap(); // prepare the transfer getting some needed objects and values let value: u64 = transfer_obj.amount; - let mut account = crate::storage::get_account(&self.storage_path, &self.account_id)?; let mut addresses_to_watch = vec![]; - if value > account.total_balance() { + if value > account_.total_balance() { return Err(crate::WalletError::InsufficientFunds); } + let available_balance = account_.available_balance(); + drop(account_); + // if the transfer value exceeds the account's available balance, // wait for an account update or sync it with the tangle - if value > account.available_balance() { + if value > available_balance { let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); - let storage_path = self.storage_path.clone(); - let account_id = self.account_id.clone(); + let account_handle = self.account_handle.clone(); thread::spawn(move || { let tx = tx.lock().unwrap(); for _ in 1..30 { thread::sleep(OUTPUT_LOCK_TIMEOUT / 30); - if let Ok(account) = crate::storage::get_account(&storage_path, &account_id) { - // the account received an update and now the balance is sufficient - if value <= account.available_balance() { - let _ = tx.send(account); - break; - } + let account = crate::block_on(async { account_handle.read().await }); + // the account received an update and now the balance is sufficient + if value <= account.available_balance() { + let _ = tx.send(()); + break; } } }); match rx.recv_timeout(OUTPUT_LOCK_TIMEOUT) { - Ok(acc) => { - account = acc; - } + Ok(_) => {} Err(_) => { // if we got a timeout waiting for the account update, we try to sync it - account.sync().execute().await?; + self.account_handle.sync().await.execute().await?; } } } - let client = crate::client::get_client(account.client_options()); - let client = client.read().unwrap(); + let account_ = self.account_handle.read().await; + + let client = crate::client::get_client(account_.client_options()); + let client = client.read().await; if let RemainderValueStrategy::AccountAddress(ref remainder_target_address) = transfer_obj.remainder_value_strategy { - if !account + if !account_ .addresses() .iter() .any(|addr| addr.address() == remainder_target_address) @@ -534,7 +558,7 @@ impl SyncedAccount { let (input_addresses, remainder_address) = self.select_inputs( &mut locked_addresses, transfer_obj.amount, - &mut account, + &account_, &transfer_obj.address, )?; @@ -545,7 +569,7 @@ impl SyncedAccount { let mut address_index_recorders = vec![]; for input_address in &input_addresses { - let account_address = account + let account_address = account_ .addresses() .iter() .find(|a| a.address() == &input_address.address) @@ -554,13 +578,13 @@ impl SyncedAccount { let mut outputs = vec![]; let address_path = BIP32Path::from_str(&format!( "m/44H/4218H/{}H/{}H/{}H", - *account.index(), + *account_.index(), *account_address.internal() as u32, *account_address.key_index() )) .unwrap(); - for (offset, address_output) in account_address.available_outputs(&account).iter().enumerate() { + for (offset, address_output) in account_address.available_outputs(&account_).iter().enumerate() { outputs.push(( (*address_output).clone(), *account_address.key_index(), @@ -611,12 +635,15 @@ impl SyncedAccount { } } + drop(account_); + let mut account_ = self.account_handle.write().await; + // if there's remainder value, we check the strategy defined in the transfer let mut remainder_value_deposit_address = None; if remainder_value > 0 { let remainder_address = remainder_address.ok_or_else(|| anyhow::anyhow!("remainder address not defined"))?; - let remainder_address = account + let remainder_address = account_ .addresses() .iter() .find(|a| a.address() == &remainder_address.address) @@ -628,12 +655,12 @@ impl SyncedAccount { // generate a new change address to send the remainder value RemainderValueStrategy::ChangeAddress => { if *remainder_address.internal() { - let deposit_address = account.latest_address().unwrap().address().clone(); + let deposit_address = account_.latest_address().unwrap().address().clone(); deposit_address } else { - let change_address = crate::address::get_new_change_address(&account, &remainder_address)?; + let change_address = crate::address::get_new_change_address(&account_, &remainder_address)?; let addr = change_address.address().clone(); - account.append_addresses(vec![change_address]); + account_.append_addresses(vec![change_address]); addresses_to_watch.push(addr.clone()); addr } @@ -657,8 +684,8 @@ impl SyncedAccount { .finish() .map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?; - let unlock_blocks = crate::signing::with_signer(account.signer_type(), |signer| { - signer.sign_message(&account, &essence, &mut address_index_recorders) + let unlock_blocks = crate::signing::with_signer(account_.signer_type(), |signer| { + signer.sign_message(&account_, &essence, &mut address_index_recorders) })?; let mut tx_builder = Transaction::builder().with_essence(essence); for unlock_block in unlock_blocks { @@ -679,32 +706,22 @@ impl SyncedAccount { // if this is a transfer to the account's latest address or we used the latest as deposit of the remainder // value, we generate a new one to keep the latest address unused - let latest_address = account.latest_address().unwrap().address(); + let latest_address = account_.latest_address().unwrap().address(); if latest_address == &transfer_obj.address || (remainder_value_deposit_address.is_some() && &remainder_value_deposit_address.unwrap() == latest_address) { - let addr = crate::address::get_new_address(&account)?; + let addr = crate::address::get_new_address(&account_)?; addresses_to_watch.push(addr.address().clone()); - account.append_addresses(vec![addr]); + account_.append_addresses(vec![addr]); } let message = client.get_message().data(&message_id).await?; - // drop the client ref so it doesn't lock the monitor system - std::mem::drop(client); - - for address in addresses_to_watch { - // ignore errors because we fallback to the polling system - let _ = crate::monitor::monitor_address_balance(&account, &address); - } - - let message = Message::from_iota_message(message_id, account.addresses(), &message, None)?; - account.append_messages(vec![message.clone()]); + let message = Message::from_iota_message(message_id, account_.addresses(), &message, None)?; + account_.append_messages(vec![message.clone()]); - account.save()?; - - let account_addresses_locker = get_account_addresses_lock(&self.account_id); + let account_addresses_locker = get_account_addresses_lock(account_.id()); let mut locked_addresses = account_addresses_locker.lock().unwrap(); for input_address in &input_addresses { let index = locked_addresses @@ -714,25 +731,34 @@ impl SyncedAccount { locked_addresses.remove(index); } + // drop the client and account_ refs so it doesn't lock the monitor system + drop(account_); + drop(client); + + for address in addresses_to_watch { + // ignore errors because we fallback to the polling system + let _ = crate::monitor::monitor_address_balance(self.account_handle.clone(), &address); + } + // ignore errors because we fallback to the polling system - let _ = crate::monitor::monitor_confirmation_state_change(&account, &message_id); + let _ = crate::monitor::monitor_confirmation_state_change(self.account_handle.clone(), &message_id); - Ok(TransferMetadata { message, account }) + Ok(message) } /// Retry message. pub async fn retry(&self, message_id: &MessageId) -> crate::Result { - repost_message(&self.account_id, &self.storage_path, message_id, RepostAction::Retry).await + repost_message(self.account_handle.clone(), message_id, RepostAction::Retry).await } /// Promote message. pub async fn promote(&self, message_id: &MessageId) -> crate::Result { - repost_message(&self.account_id, &self.storage_path, message_id, RepostAction::Promote).await + repost_message(self.account_handle.clone(), message_id, RepostAction::Promote).await } /// Reattach message. pub async fn reattach(&self, message_id: &MessageId) -> crate::Result { - repost_message(&self.account_id, &self.storage_path, message_id, RepostAction::Reattach).await + repost_message(self.account_handle.clone(), message_id, RepostAction::Reattach).await } } @@ -743,12 +769,12 @@ pub(crate) enum RepostAction { } pub(crate) async fn repost_message( - account_id: &AccountIdentifier, - storage_path: &PathBuf, + account_handle: AccountHandle, message_id: &MessageId, action: RepostAction, ) -> crate::Result { - let mut account: Account = crate::storage::get_account(&storage_path, account_id)?; + let mut account = account_handle.write().await; + let message = match account.get_message(message_id) { Some(message_to_repost) => { // get the latest reattachment of the message we want to promote/rettry/reattach @@ -764,7 +790,7 @@ pub(crate) async fn repost_message( } let client = crate::client::get_client(account.client_options()); - let client = client.read().unwrap(); + let client = client.read().await; let (id, message) = match action { RepostAction::Promote => { @@ -811,20 +837,23 @@ mod tests { rusty_fork_test! { #[test] fn account_sync() { + let manager = crate::test_utils::get_account_manager(); + let manager = manager.lock().unwrap(); + + let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") + .unwrap() + .build(); crate::block_on(async move { - let manager = crate::test_utils::get_account_manager(); - let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") - .unwrap() - .build(); let account = manager .create_account(client_options) .alias("alias") .initialise() + .await .unwrap(); - - // let synced_accounts = account.sync().execute().await.unwrap(); - // TODO improve test when the node API is ready to use }); + + // let synced_accounts = account.sync().execute().await.unwrap(); + // TODO improve test when the node API is ready to use } } } diff --git a/src/account_manager.rs b/src/account_manager.rs index efaedcb99..cba4c4940 100644 --- a/src/account_manager.rs +++ b/src/account_manager.rs @@ -3,7 +3,7 @@ use crate::{ account::{ - account_id_to_stronghold_record_id, repost_message, Account, AccountIdentifier, AccountInitialiser, + account_id_to_stronghold_record_id, repost_message, AccountHandle, AccountIdentifier, AccountInitialiser, RepostAction, SyncedAccount, }, client::ClientOptions, @@ -14,22 +14,34 @@ use crate::{ }; use std::{ + borrow::Cow, + collections::HashMap, convert::TryInto, fs, panic::AssertUnwindSafe, path::{Path, PathBuf}, + sync::Arc, thread, time::Duration, }; use futures::FutureExt; use getset::{Getters, Setters}; -use iota::message::prelude::MessageId; +use iota::{MessageId, Payload}; use stronghold::Stronghold; +use tokio::{ + sync::{ + broadcast::{channel as broadcast_channel, Receiver as BroadcastReceiver, Sender as BroadcastSender}, + RwLock, + }, + time::{delay_for, Duration as AsyncDuration}, +}; /// The default storage path. pub const DEFAULT_STORAGE_PATH: &str = "./example-database"; +pub(crate) type AccountStore = Arc>>; + /// The account manager. /// /// Used to manage multiple accounts. @@ -41,67 +53,91 @@ pub struct AccountManager { /// the polling interval. #[getset(get = "pub", set = "pub")] polling_interval: Duration, - started_monitoring: bool, + accounts: AccountStore, + stop_polling_sender: Option>, } -/// Internal transfer response metadata. -pub struct InternalTransferMetadata { - /// Transfer message. - pub message: Message, - /// Source account with new message and addresses attached. - pub from_account: Account, - /// Destination account with new message attached. - pub to_account: Account, +impl Drop for AccountManager { + fn drop(&mut self) { + let accounts = self.accounts.clone(); + let stop_polling_sender = self.stop_polling_sender.clone(); + thread::spawn(move || { + let _ = crate::block_on(async { + for account in accounts.read().await.values() { + let _ = crate::monitor::unsubscribe(account.clone()); + } + + if let Some(sender) = stop_polling_sender { + sender.send(()).expect("failed to stop polling process"); + } + }); + }) + .join() + .expect("failed to stop monitoring and polling systems"); + } } impl AccountManager { /// Initialises a new instance of the account manager with the default storage adapter. - pub fn new() -> crate::Result { - Self::with_storage_path(DEFAULT_STORAGE_PATH) + pub async fn new() -> crate::Result { + Self::with_storage_path(DEFAULT_STORAGE_PATH).await } /// Initialises a new instance of the account manager with the default storage adapter using the specified storage /// path. - pub fn with_storage_path(storage_path: impl AsRef) -> crate::Result { - Self::with_storage_adapter(&storage_path, crate::storage::get_adapter_from_path(&storage_path)?) + pub async fn with_storage_path(storage_path: impl AsRef) -> crate::Result { + Self::with_storage_adapter(&storage_path, crate::storage::get_adapter_from_path(&storage_path)?).await } /// Initialises a new instance of the account manager with the specified adapter. - pub fn with_storage_adapter( + pub async fn with_storage_adapter( storage_path: impl AsRef, adapter: S, ) -> crate::Result { crate::storage::set_adapter(&storage_path, adapter); - let instance = Self { - storage_path: storage_path.as_ref().to_path_buf(), + let storage_path = storage_path.as_ref().to_path_buf(); + let mut instance = Self { + storage_path: storage_path.clone(), polling_interval: Duration::from_millis(30_000), - started_monitoring: false, + accounts: Self::load_accounts(&storage_path) + .await + .unwrap_or_else(|_| Default::default()), + stop_polling_sender: None, }; + + instance.start_background_sync().await; + Ok(instance) } + async fn load_accounts(storage_path: &PathBuf) -> crate::Result { + let accounts = crate::storage::with_adapter(&storage_path, |storage| storage.get_all())?; + let accounts = crate::storage::parse_accounts(&storage_path, &accounts)? + .into_iter() + .map(|account| (account.id().clone(), account.into())) + .collect(); + Ok(Arc::new(RwLock::new(accounts))) + } + /// Starts monitoring the accounts with the node's mqtt topics. - fn start_monitoring(&self) -> crate::Result<()> { - let accounts = crate::storage::with_adapter(&self.storage_path, |storage| storage.get_all())?; - let accounts = crate::storage::parse_accounts(&self.storage_path, &accounts)?; - for account in accounts { - crate::monitor::monitor_account_addresses_balance(&account)?; - crate::monitor::monitor_unconfirmed_messages(&account)?; + async fn start_monitoring(&self) -> crate::Result<()> { + for account in self.accounts.read().await.values() { + crate::monitor::monitor_account_addresses_balance(account.clone()).await?; + crate::monitor::monitor_unconfirmed_messages(account.clone()).await?; } Ok(()) } /// Initialises the background polling and MQTT monitoring. - pub fn start_background_sync(&mut self) { - if !self.started_monitoring { - let monitoring_disabled = self.start_monitoring().is_err(); - self.start_polling(monitoring_disabled); - self.started_monitoring = true; - } + async fn start_background_sync(&mut self) { + let monitoring_disabled = self.start_monitoring().await.is_err(); + let (stop_polling_sender, stop_polling_receiver) = broadcast_channel(1); + self.start_polling(monitoring_disabled, stop_polling_receiver); + self.stop_polling_sender = Some(stop_polling_sender); } /// Sets the stronghold password. - pub fn set_stronghold_password>(&mut self, password: P) -> crate::Result<()> { + pub async fn set_stronghold_password>(&mut self, password: P) -> crate::Result<()> { let stronghold_path = self.storage_path.join(crate::storage::stronghold_snapshot_filename()); let stronghold = Stronghold::new( &stronghold_path, @@ -110,59 +146,93 @@ impl AccountManager { None, )?; crate::init_stronghold(&self.storage_path, stronghold); - self.start_background_sync(); + + if self.accounts.read().await.is_empty() { + self.accounts = Self::load_accounts(&self.storage_path).await?; + let _ = self.start_monitoring().await; + } + Ok(()) } /// Starts the polling mechanism. - fn start_polling(&self, is_monitoring_disabled: bool) -> thread::JoinHandle<()> { + fn start_polling(&self, is_monitoring_disabled: bool, mut stop: BroadcastReceiver<()>) { let storage_path = self.storage_path.clone(); - let interval = self.polling_interval; + let accounts = self.accounts.clone(); + + let interval = AsyncDuration::from_millis(self.polling_interval.as_millis().try_into().unwrap()); + thread::spawn(move || { - loop { - let storage_path_ = storage_path.clone(); - crate::block_on(async move { - if let Err(panic) = AssertUnwindSafe(poll(storage_path_, is_monitoring_disabled)) - .catch_unwind() - .await - { - let msg = if let Some(message) = panic.downcast_ref::() { - format!("Internal error: {}", message) - } else if let Some(message) = panic.downcast_ref::<&str>() { - format!("Internal error: {}", message) - } else { - "Internal error".to_string() - }; - let _error = crate::WalletError::UnknownError(msg); - // when the error is dropped, the on_error event will be triggered + crate::enter(|| { + tokio::spawn(async move { + loop { + tokio::select! { + _ = async { + let storage_path_ = storage_path.clone(); + let accounts_ = accounts.clone(); + + if let Err(panic) = AssertUnwindSafe(poll(accounts_.clone(), storage_path_, is_monitoring_disabled)) + .catch_unwind() + .await { + let msg = if let Some(message) = panic.downcast_ref::>() { + format!("Internal error: {}", message) + } else { + "Internal error".to_string() + }; + let _error = crate::WalletError::UnknownError(msg); + // when the error is dropped, the on_error event will be triggered + } + + let accounts_ = accounts_.read().await; + for account_handle in accounts_.values() { + let mut account = account_handle.write().await; + let _ = account.save(); + } + + delay_for(interval).await; + } => {} + _ = stop.recv() => {} + } } }); - thread::sleep(interval); - } - }) + }); + }).join().expect("failed to start polling"); } /// Adds a new account. - pub fn create_account(&self, client_options: ClientOptions) -> AccountInitialiser<'_> { - AccountInitialiser::new(client_options, &self.storage_path) + pub fn create_account(&self, client_options: ClientOptions) -> AccountInitialiser { + AccountInitialiser::new(client_options, self.accounts.clone(), self.storage_path.clone()) } /// Deletes an account. - pub fn remove_account(&self, account_id: &AccountIdentifier) -> crate::Result<()> { - let account_str = crate::storage::with_adapter(&self.storage_path, |storage| storage.get(&account_id))?; - let account: Account = serde_json::from_str(&account_str)?; - if !(account.messages().is_empty() && account.total_balance() == 0) { - return Err(crate::WalletError::MessageNotEmpty); + pub async fn remove_account(&self, account_id: &AccountIdentifier) -> crate::Result<()> { + let mut accounts = self.accounts.write().await; + + { + let account_handle = accounts.get(&account_id).ok_or(crate::WalletError::AccountNotFound)?; + let account = account_handle.read().await; + + if !(account.messages().is_empty() && account.total_balance() == 0) { + return Err(crate::WalletError::MessageNotEmpty); + } + } + + accounts.remove(account_id); + + if let Err(e) = crate::storage::with_adapter(&self.storage_path, |storage| storage.remove(&account_id)) { + match e { + // if we got an "AccountNotFound" error, that means we didn't save the cached account yet + crate::WalletError::AccountNotFound => {} + _ => return Err(e), + } } - crate::storage::with_adapter(&self.storage_path, |storage| storage.remove(&account_id))?; + Ok(()) } /// Syncs all accounts. pub async fn sync_accounts(&self) -> crate::Result> { - let accounts = crate::storage::with_adapter(&self.storage_path, |storage| storage.get_all())?; - let mut accounts = crate::storage::parse_accounts(&self.storage_path, &accounts)?; - sync_accounts(&self.storage_path, None, &mut accounts).await + sync_accounts(self.accounts.clone(), &self.storage_path, None).await } /// Transfers an amount from an account to another. @@ -171,22 +241,20 @@ impl AccountManager { from_account_id: &AccountIdentifier, to_account_id: &AccountIdentifier, amount: u64, - ) -> crate::Result { - let mut from_account = self.get_account(from_account_id)?; - let to_account = self.get_account(to_account_id)?; - let to_address = to_account + ) -> crate::Result { + let to_address = self + .get_account(to_account_id) + .await? + .read() + .await .latest_address() .ok_or_else(|| anyhow::anyhow!("destination account address list empty"))? .clone(); - let from_synchronized = from_account.sync().execute().await?; - let metadata = from_synchronized + + let from_synchronized = self.get_account(from_account_id).await?.sync().await.execute().await?; + from_synchronized .transfer(Transfer::new(to_address.address().clone(), amount)) - .await?; - Ok(InternalTransferMetadata { - to_account, - from_account: metadata.account, - message: metadata.message, - }) + .await } /// Backups the accounts to the given destination @@ -244,12 +312,14 @@ impl AccountManager { backup_stronghold.account_get_by_id(&account_id_to_stronghold_record_id(account.id())?)?; let created_at_timestamp: u128 = account.created_at().timestamp().try_into().unwrap(); // safe to unwrap since it's > 0 let stronghold_account = crate::with_stronghold_from_path(&self.storage_path, |stronghold| { - stronghold.account_import( - Some(created_at_timestamp), - Some(created_at_timestamp), - stronghold_account.mnemonic().to_string(), - Some("password"), - ) + stronghold + .account_import( + Some(created_at_timestamp), + Some(created_at_timestamp), + stronghold_account.mnemonic().to_string(), + Some("password"), + ) + .map_err(Into::into) }); account.save()?; @@ -259,62 +329,70 @@ impl AccountManager { } /// Gets the account associated with the given identifier. - pub fn get_account(&self, account_id: &AccountIdentifier) -> crate::Result { - let mut account = crate::storage::get_account(&self.storage_path, &account_id)?; - account.set_storage_path(self.storage_path.clone()); - Ok(account) + pub async fn get_account(&self, account_id: &AccountIdentifier) -> crate::Result { + let accounts = self.accounts.read().await; + accounts + .get(account_id) + .cloned() + .ok_or(crate::WalletError::AccountNotFound) } /// Gets the account associated with the given alias (case insensitive). - pub fn get_account_by_alias>(&self, alias: S) -> Option { - let alias = alias.into().to_lowercase(); - if let Ok(accounts) = self.get_accounts() { - accounts.into_iter().find(|acc| acc.alias().to_lowercase() == alias) - } else { - None + pub async fn get_account_by_alias>(&self, alias: S) -> Option { + let alias = alias.as_ref().to_lowercase(); + for account_handle in self.accounts.read().await.values() { + let account = account_handle.read().await; + if account + .alias() + .to_lowercase() + .chars() + .zip(alias.chars()) + .all(|(x, y)| x == y) + { + return Some(account_handle.clone()); + } } + None } /// Gets all accounts from storage. - pub fn get_accounts(&self) -> crate::Result> { - crate::storage::with_adapter(&self.storage_path, |storage| { - crate::storage::parse_accounts(&self.storage_path, &storage.get_all()?) - }) + pub async fn get_accounts(&self) -> Vec { + let accounts = self.accounts.read().await; + accounts.values().cloned().collect() } /// Reattaches an unconfirmed transaction. pub async fn reattach(&self, account_id: &AccountIdentifier, message_id: &MessageId) -> crate::Result { - let mut account = self.get_account(account_id)?; - account.sync().execute().await?.reattach(message_id).await + let account = self.get_account(account_id).await?; + account.sync().await.execute().await?.reattach(message_id).await } /// Promotes an unconfirmed transaction. pub async fn promote(&self, account_id: &AccountIdentifier, message_id: &MessageId) -> crate::Result { - let mut account = self.get_account(account_id)?; - account.sync().execute().await?.promote(message_id).await + let account = self.get_account(account_id).await?; + account.sync().await.execute().await?.promote(message_id).await } /// Retries an unconfirmed transaction. pub async fn retry(&self, account_id: &AccountIdentifier, message_id: &MessageId) -> crate::Result { - let mut account = self.get_account(account_id)?; - account.sync().execute().await?.retry(message_id).await + let account = self.get_account(account_id).await?; + account.sync().await.execute().await?.retry(message_id).await } } -async fn poll(storage_path: PathBuf, syncing: bool) -> crate::Result<()> { +async fn poll(accounts: AccountStore, storage_path: PathBuf, syncing: bool) -> crate::Result<()> { let retried = if syncing { - let accounts_before_sync = crate::storage::with_adapter(&storage_path, |storage| storage.get_all())?; - let mut accounts_before_sync = crate::storage::parse_accounts(&storage_path, &accounts_before_sync)?; - let synced_accounts = sync_accounts(&storage_path, Some(0), &mut accounts_before_sync).await?; - let accounts_after_sync = crate::storage::with_adapter(&storage_path, |storage| storage.get_all())?; - let mut accounts_after_sync = crate::storage::parse_accounts(&storage_path, &accounts_after_sync)?; + let mut accounts_before_sync = Vec::new(); + for account_handle in accounts.read().await.values() { + accounts_before_sync.push(account_handle.read().await.clone()); + } + let synced_accounts = sync_accounts(accounts.clone(), &storage_path, Some(0)).await?; + let accounts_after_sync = accounts.read().await; // compare accounts to check for balance changes and new messages for account_before_sync in &accounts_before_sync { - let account_after_sync = accounts_after_sync - .iter_mut() - .find(|account| account.id() == account_before_sync.id()) - .unwrap(); + let account_after_sync = accounts_after_sync.get(account_before_sync.id()).unwrap(); + let account_after_sync = account_after_sync.read().await; // balance event for address_before_sync in account_before_sync.addresses() { @@ -352,20 +430,26 @@ async fn poll(storage_path: PathBuf, syncing: bool) -> crate::Result<()> { } } } - retry_unconfirmed_transactions(synced_accounts.iter().zip(accounts_after_sync.iter()).collect()).await? + retry_unconfirmed_transactions(synced_accounts).await? } else { - let accounts = crate::storage::with_adapter(&storage_path, |storage| storage.get_all())?; let mut retried_messages = vec![]; - for account in crate::storage::parse_accounts(&storage_path, &accounts)? { - let unconfirmed_messages = - account.list_messages(account.messages().len(), 0, Some(MessageType::Unconfirmed)); + for account_handle in accounts.read().await.values() { + let (account_id, unconfirmed_messages): (AccountIdentifier, Vec<(MessageId, Payload)>) = { + let account = account_handle.read().await; + let account_id = account.id().clone(); + let unconfirmed_messages = account + .list_messages(account.messages().len(), 0, Some(MessageType::Unconfirmed)) + .iter() + .map(|m| (*m.id(), m.payload().clone())) + .collect(); + (account_id, unconfirmed_messages) + }; let mut promotions = vec![]; let mut reattachments = vec![]; - for message in unconfirmed_messages { - let new_message = - repost_message(account.id(), &storage_path, message.id(), RepostAction::Retry).await?; - if new_message.payload() == message.payload() { + for (message_id, payload) in unconfirmed_messages { + let new_message = repost_message(account_handle.clone(), &message_id, RepostAction::Retry).await?; + if new_message.payload() == &payload { reattachments.push(new_message); } else { promotions.push(new_message); @@ -375,7 +459,7 @@ async fn poll(storage_path: PathBuf, syncing: bool) -> crate::Result<()> { retried_messages.push(RetriedData { promoted: promotions, reattached: reattachments, - account_id: account.id().clone(), + account_id, }); } @@ -391,63 +475,75 @@ async fn poll(storage_path: PathBuf, syncing: bool) -> crate::Result<()> { } async fn discover_accounts( + accounts: AccountStore, storage_path: &PathBuf, client_options: &ClientOptions, signer_type: Option, -) -> crate::Result> { +) -> crate::Result> { let mut synced_accounts = vec![]; loop { - let mut account_initialiser = AccountInitialiser::new(client_options.clone(), &storage_path).skip_persistance(); + let mut account_initialiser = + AccountInitialiser::new(client_options.clone(), accounts.clone(), storage_path.clone()).skip_persistance(); if let Some(signer_type) = &signer_type { account_initialiser = account_initialiser.signer_type(signer_type.clone()); } - let mut account = account_initialiser.initialise()?; - let synced_account = account.sync().skip_persistance().execute().await?; + let account = account_initialiser.initialise().await?; + let synced_account = account.sync().await.execute().await?; let is_empty = *synced_account.is_empty(); if is_empty { break; } else { - synced_accounts.push(synced_account); - account.save()?; + synced_accounts.push((account, synced_account)); } } Ok(synced_accounts) } async fn sync_accounts<'a>( + accounts: AccountStore, storage_path: &PathBuf, address_index: Option, - accounts: &mut Vec, ) -> crate::Result> { let mut synced_accounts = vec![]; let mut last_account = None; - for account in accounts { - let mut sync = account.sync(); - if let Some(index) = address_index { - sync = sync.address_index(index); + + { + let mut accounts = accounts.write().await; + for account_handle in accounts.values_mut() { + let mut sync = account_handle.sync().await; + if let Some(index) = address_index { + sync = sync.address_index(index); + } + let synced_account = sync.execute().await?; + + let account = account_handle.read().await; + last_account = Some(( + account.messages().is_empty() || account.addresses().iter().all(|addr| *addr.balance() == 0), + account.client_options().clone(), + account.signer_type().clone(), + )); + synced_accounts.push(synced_account); } - let synced_account = sync.execute().await?; - last_account = Some(account); - synced_accounts.push(synced_account); } let discovered_accounts_res = match last_account { - Some(account) => { - if account.messages().is_empty() || account.addresses().iter().all(|addr| *addr.balance() == 0) { - discover_accounts( - &storage_path, - account.client_options(), - Some(account.signer_type().clone()), - ) - .await + Some((is_empty, client_options, signer_type)) => { + if is_empty { + discover_accounts(accounts.clone(), &storage_path, &client_options, Some(signer_type)).await } else { Ok(vec![]) } } - None => discover_accounts(&storage_path, &ClientOptions::default(), None).await, + None => Ok(vec![]), /* None => discover_accounts(accounts.clone(), &storage_path, &ClientOptions::default(), + * None).await, */ }; + if let Ok(discovered_accounts) = discovered_accounts_res { - synced_accounts.extend(discovered_accounts.into_iter()); + let mut accounts = accounts.write().await; + for (account_handle, synced_account) in discovered_accounts { + accounts.insert(account_handle.id().await, account_handle); + synced_accounts.push(synced_account); + } } Ok(synced_accounts) @@ -459,9 +555,11 @@ struct RetriedData { account_id: AccountIdentifier, } -async fn retry_unconfirmed_transactions(accounts: Vec<(&SyncedAccount, &Account)>) -> crate::Result> { +async fn retry_unconfirmed_transactions(synced_accounts: Vec) -> crate::Result> { let mut retried_messages = vec![]; - for (synced, account) in accounts { + for synced in synced_accounts { + let account = synced.account_handle().read().await; + let unconfirmed_messages = account.list_messages(account.messages().len(), 0, Some(MessageType::Unconfirmed)); let mut reattachments = vec![]; let mut promotions = vec![]; @@ -524,27 +622,33 @@ mod tests { client::ClientOptionsBuilder, message::Message, }; - use iota::message::prelude::{Ed25519Address, Indexation, Message as IotaMessage, MessageId, Payload}; + use iota::{Ed25519Address, Indexation, MessageBuilder, MessageId, Payload}; use rusty_fork::rusty_fork_test; rusty_fork_test! { #[test] fn store_accounts() { let manager = crate::test_utils::get_account_manager(); + let manager = manager.lock().unwrap(); let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") .expect("invalid node URL") .build(); - let account = manager - .create_account(client_options) - .alias("alias") - .initialise() - .expect("failed to add account"); - - manager - .remove_account(account.id()) - .expect("failed to remove account"); + crate::block_on(async move { + let account_handle = manager + .create_account(client_options) + .alias("alias") + .initialise() + .await + .expect("failed to add account"); + let account = account_handle.read().await; + + manager + .remove_account(account.id()) + .await + .expect("failed to remove account"); + }); } } @@ -552,27 +656,41 @@ mod tests { #[test] fn remove_account_with_message_history() { let manager = crate::test_utils::get_account_manager(); + let manager = manager.lock().unwrap(); let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") .expect("invalid node URL") .build(); - let account = manager - .create_account(client_options) - .messages(vec![Message::from_iota_message(MessageId::new([0; 32]), &[], &IotaMessage::builder() + let messages = vec![Message::from_iota_message( + MessageId::new([0; 32]), + &[], + &MessageBuilder::::new() .with_parent1(MessageId::new([0; 32])) .with_parent2(MessageId::new([0; 32])) - .with_payload(Payload::Indexation(Box::new(Indexation::new( - "index".to_string(), - &[0; 16], - ).unwrap()))) + .with_payload(Payload::Indexation(Box::new( + Indexation::new("index".to_string(), &[0; 16]).unwrap(), + ))) .with_network_id(0) + .with_nonce_provider(crate::test_utils::NoopNonceProvider {}, 0f64) .finish() - .unwrap(), None).unwrap()]) - .initialise().unwrap(); + .unwrap(), + None, + ) + .unwrap()]; + + crate::block_on(async move { + let account_handle = manager + .create_account(client_options) + .messages(messages) + .initialise() + .await + .unwrap(); - let remove_response = manager.remove_account(account.id()); - assert!(remove_response.is_err()); + let account = account_handle.read().await; + let remove_response = manager.remove_account(account.id()).await; + assert!(remove_response.is_err()); + }); } } @@ -580,25 +698,30 @@ mod tests { #[test] fn remove_account_with_balance() { let manager = crate::test_utils::get_account_manager(); + let manager = manager.lock().unwrap(); let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") .expect("invalid node URL") .build(); - let account = manager - .create_account(client_options) - .addresses(vec![AddressBuilder::new() - .balance(5) - .key_index(0) - .address(IotaAddress::Ed25519(Ed25519Address::new([0; 32]))) - .outputs(vec![]) - .build() - .unwrap()]) - .initialise() - .unwrap(); - - let remove_response = manager.remove_account(account.id()); - assert!(remove_response.is_err()); + crate::block_on(async move { + let account_handle = manager + .create_account(client_options) + .addresses(vec![AddressBuilder::new() + .balance(5) + .key_index(0) + .address(IotaAddress::Ed25519(Ed25519Address::new([0; 32]))) + .outputs(vec![]) + .build() + .unwrap()]) + .initialise() + .await + .unwrap(); + let account = account_handle.read().await; + + let remove_response = manager.remove_account(account.id()).await; + assert!(remove_response.is_err()); + }); } } @@ -606,19 +729,23 @@ mod tests { #[test] fn create_account_with_latest_without_history() { let manager = crate::test_utils::get_account_manager(); + let manager = manager.lock().unwrap(); let client_options = ClientOptionsBuilder::node("https://nodes.devnet.iota.org:443") .expect("invalid node URL") .build(); - let account = manager - .create_account(client_options.clone()) - .alias("alias") - .initialise() - .expect("failed to add account"); + crate::block_on(async move { + let account = manager + .create_account(client_options.clone()) + .alias("alias") + .initialise() + .await + .expect("failed to add account"); - let create_response = manager.create_account(client_options).initialise(); - assert!(create_response.is_err()); + let create_response = manager.create_account(client_options).initialise().await; + assert!(create_response.is_err()); + }); } } } diff --git a/src/actor/message.rs b/src/actor/message.rs index ca0e189b6..0defe2c06 100644 --- a/src/actor/message.rs +++ b/src/actor/message.rs @@ -12,7 +12,7 @@ use serde::{ser::Serializer, Deserialize, Serialize}; use tokio::sync::mpsc::UnboundedSender; /// An account to create. -#[derive(Clone, Debug, Deserialize, Default)] +#[derive(Clone, Debug, Deserialize)] pub struct AccountToCreate { /// The node options. #[serde(rename = "clientOptions")] diff --git a/src/actor/mod.rs b/src/actor/mod.rs index c47d0aca4..23ae0c74b 100644 --- a/src/actor/mod.rs +++ b/src/actor/mod.rs @@ -11,6 +11,7 @@ use futures::{Future, FutureExt}; use iota::message::prelude::MessageId; use std::{ any::Any, + borrow::Cow, convert::TryInto, panic::{catch_unwind, AssertUnwindSafe}, path::PathBuf, @@ -25,18 +26,8 @@ pub struct WalletMessageHandler { account_manager: AccountManager, } -impl Default for WalletMessageHandler { - fn default() -> Self { - Self { - account_manager: AccountManager::new().unwrap(), - } - } -} - fn panic_to_response_message(panic: Box) -> Result { - let msg = if let Some(message) = panic.downcast_ref::() { - format!("Internal error: {}", message) - } else if let Some(message) = panic.downcast_ref::<&str>() { + let msg = if let Some(message) = panic.downcast_ref::>() { format!("Internal error: {}", message) } else { "Internal error".to_string() @@ -64,17 +55,17 @@ where impl WalletMessageHandler { /// Creates a new instance of the message handler with the default account manager. - pub fn new() -> Result { + pub async fn new() -> Result { let instance = Self { - account_manager: AccountManager::new()?, + account_manager: AccountManager::new().await?, }; Ok(instance) } /// Creates a new instance of the message handler with the account manager using the given storage path. - pub fn with_storage_path(storage_path: PathBuf) -> Result { + pub async fn with_storage_path(storage_path: PathBuf) -> Result { let instance = Self { - account_manager: AccountManager::with_storage_path(storage_path)?, + account_manager: AccountManager::with_storage_path(storage_path).await?, }; Ok(instance) } @@ -87,10 +78,16 @@ impl WalletMessageHandler { /// Handles a message. pub async fn handle(&mut self, message: Message) { let response: Result = match message.message_type() { - MessageType::RemoveAccount(account_id) => convert_panics(|| self.remove_account(account_id)), - MessageType::CreateAccount(account) => convert_panics(|| self.create_account(account)), - MessageType::GetAccount(account_id) => convert_panics(|| self.get_account(account_id)), - MessageType::GetAccounts => convert_panics(|| self.get_accounts()), + MessageType::RemoveAccount(account_id) => { + convert_async_panics(|| async { self.remove_account(account_id).await }).await + } + MessageType::CreateAccount(account) => { + convert_async_panics(|| async { self.create_account(account).await }).await + } + MessageType::GetAccount(account_id) => { + convert_async_panics(|| async { self.get_account(account_id).await }).await + } + MessageType::GetAccounts => convert_async_panics(|| async { self.get_accounts().await }).await, MessageType::CallAccountMethod { account_id, method } => { convert_async_panics(|| async { self.call_account_method(account_id, method).await }).await } @@ -100,7 +97,9 @@ impl WalletMessageHandler { } MessageType::Backup(destination_path) => convert_panics(|| self.backup(destination_path)), MessageType::RestoreBackup(backup_path) => convert_panics(|| self.restore_backup(backup_path)), - MessageType::SetStrongholdPassword(password) => convert_panics(|| self.set_stronghold_password(password)), + MessageType::SetStrongholdPassword(password) => { + convert_async_panics(|| async { self.set_stronghold_password(password).await }).await + } MessageType::SendTransfer { account_id, transfer } => { convert_async_panics(|| async { self.send_transfer(account_id, transfer).await }).await } @@ -153,10 +152,11 @@ impl WalletMessageHandler { account_id: &AccountIdentifier, method: &AccountMethod, ) -> Result { - let mut account = self.account_manager.get_account(account_id)?; + let account_handle = self.account_manager.get_account(account_id).await?; + match method { AccountMethod::GenerateAddress => { - let address = account.generate_address()?; + let address = account_handle.generate_address().await?; Ok(ResponseType::GeneratedAddress(address)) } AccountMethod::ListMessages { @@ -164,6 +164,7 @@ impl WalletMessageHandler { from, message_type, } => { + let account = account_handle.read().await; let messages: Vec = account .list_messages(*count, *from, message_type.clone()) .into_iter() @@ -172,18 +173,28 @@ impl WalletMessageHandler { Ok(ResponseType::Messages(messages)) } AccountMethod::ListAddresses { unspent } => { + let account = account_handle.read().await; let addresses = account.list_addresses(*unspent).into_iter().cloned().collect(); Ok(ResponseType::Addresses(addresses)) } - AccountMethod::GetAvailableBalance => Ok(ResponseType::AvailableBalance(account.available_balance())), - AccountMethod::GetTotalBalance => Ok(ResponseType::TotalBalance(account.total_balance())), - AccountMethod::GetLatestAddress => Ok(ResponseType::LatestAddress(account.latest_address().cloned())), + AccountMethod::GetAvailableBalance => { + let account = account_handle.read().await; + Ok(ResponseType::AvailableBalance(account.available_balance())) + } + AccountMethod::GetTotalBalance => { + let account = account_handle.read().await; + Ok(ResponseType::TotalBalance(account.total_balance())) + } + AccountMethod::GetLatestAddress => { + let account = account_handle.read().await; + Ok(ResponseType::LatestAddress(account.latest_address().cloned())) + } AccountMethod::SyncAccount { address_index, gap_limit, skip_persistance, } => { - let mut synchronizer = account.sync(); + let mut synchronizer = account_handle.sync().await; if let Some(address_index) = address_index { synchronizer = synchronizer.address_index(*address_index); } @@ -202,14 +213,15 @@ impl WalletMessageHandler { } /// The remove account message handler. - fn remove_account(&self, account_id: &AccountIdentifier) -> Result { + async fn remove_account(&self, account_id: &AccountIdentifier) -> Result { self.account_manager .remove_account(&account_id) + .await .map(|_| ResponseType::RemovedAccount(account_id.clone())) } /// The create account message handler. - fn create_account(&self, account: &AccountToCreate) -> Result { + async fn create_account(&self, account: &AccountToCreate) -> Result { let mut builder = self.account_manager.create_account(account.client_options.clone()); if let Some(mnemonic) = &account.mnemonic { @@ -226,28 +238,39 @@ impl WalletMessageHandler { ); } - builder.initialise().map(ResponseType::CreatedAccount) + match builder.initialise().await { + Ok(account_handle) => { + let account = account_handle.read().await; + Ok(ResponseType::CreatedAccount(account.clone())) + } + Err(e) => Err(e), + } } - fn get_account(&self, account_id: &AccountIdentifier) -> Result { - let account = self.account_manager.get_account(&account_id)?; - Ok(ResponseType::ReadAccount(account)) + async fn get_account(&self, account_id: &AccountIdentifier) -> Result { + let account_handle = self.account_manager.get_account(&account_id).await?; + let account = account_handle.read().await; + Ok(ResponseType::ReadAccount(account.clone())) } - fn get_accounts(&self) -> Result { - let accounts = self.account_manager.get_accounts()?; - Ok(ResponseType::ReadAccounts(accounts)) + async fn get_accounts(&self) -> Result { + let accounts = self.account_manager.get_accounts().await; + let mut accounts_ = Vec::new(); + for account_handle in accounts { + accounts_.push(account_handle.read().await.clone()); + } + Ok(ResponseType::ReadAccounts(accounts_)) } - fn set_stronghold_password(&mut self, password: &str) -> Result { - self.account_manager.set_stronghold_password(password)?; + async fn set_stronghold_password(&mut self, password: &str) -> Result { + self.account_manager.set_stronghold_password(password).await?; Ok(ResponseType::StrongholdPasswordSet) } async fn send_transfer(&self, account_id: &AccountIdentifier, transfer: &Transfer) -> Result { - let mut account = self.account_manager.get_account(account_id)?; - let synced = account.sync().execute().await?; - let message = synced.transfer(transfer.clone()).await?.message; + let account = self.account_manager.get_account(account_id).await?; + let synced = account.sync().await.execute().await?; + let message = synced.transfer(transfer.clone()).await?; Ok(ResponseType::SentTransfer(message)) } @@ -260,8 +283,7 @@ impl WalletMessageHandler { let message = self .account_manager .internal_transfer(from_account_id, to_account_id, amount) - .await? - .message; + .await?; Ok(ResponseType::SentTransfer(message)) } } @@ -269,6 +291,7 @@ impl WalletMessageHandler { #[cfg(test)] mod tests { use super::{AccountToCreate, Message, MessageType, Response, ResponseType, WalletMessageHandler}; + use crate::client::ClientOptionsBuilder; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; /// The wallet actor builder. @@ -297,10 +320,10 @@ mod tests { } /// Builds the Wallet actor. - pub fn build(self) -> Wallet { + pub async fn build(self) -> Wallet { Wallet { rx: self.rx.expect("rx is required"), - message_handler: WalletMessageHandler::new().expect("failed to initialise account manager"), + message_handler: self.message_handler.expect("message handler is required"), } } } @@ -319,15 +342,22 @@ mod tests { while let Some(message) = self.rx.recv().await { self.message_handler.handle(message).await; } + println!("DONE"); } } fn spawn_actor() -> UnboundedSender { let (tx, rx) = unbounded_channel(); - let actor = WalletBuilder::new().rx(rx).build(); std::thread::spawn(|| { let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(actor.run()); + runtime.block_on(async move { + let actor = WalletBuilder::new() + .rx(rx) + .message_handler(WalletMessageHandler::new().await.unwrap()) + .build() + .await; + actor.run().await + }); }); tx } @@ -344,14 +374,24 @@ mod tests { let tx = spawn_actor(); // create an account - let account = AccountToCreate::default(); + let account = AccountToCreate { + client_options: ClientOptionsBuilder::node("http://node.iota").unwrap().build(), + mnemonic: None, + alias: None, + created_at: None, + }; send_message(&tx, MessageType::SetStrongholdPassword("password".to_string())).await; let response = send_message(&tx, MessageType::CreateAccount(account)).await; match response.response() { ResponseType::CreatedAccount(created_account) => { - // remove the created account - let response = send_message(&tx, MessageType::RemoveAccount(created_account.id().clone())).await; - assert!(matches!(response.response(), ResponseType::RemovedAccount(_))); + let id = created_account.id().clone(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(6)); + // remove the created account + let response = + crate::block_on(async move { send_message(&tx, MessageType::RemoveAccount(id)).await }); + assert!(matches!(response.response(), ResponseType::RemovedAccount(_))); + }); } _ => panic!("unexpected response"), } diff --git a/src/client.rs b/src/client.rs index 6a3a54dee..1fd650168 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,11 +6,12 @@ pub use iota::client::builder::Network; use iota::client::{BrokerOptions, Client, ClientBuilder}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use url::Url; use std::{ collections::HashMap, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, }; type ClientInstanceMap = Arc>>>>; @@ -243,7 +244,7 @@ impl ClientOptionsBuilder { } /// The client options type. -#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, Getters)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, Getters)] #[getset(get = "pub(crate)")] pub struct ClientOptions { node: Option, diff --git a/src/lib.rs b/src/lib.rs index eba136956..f98768506 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,7 @@ use stronghold::Stronghold; use tokio::runtime::Runtime; static STRONGHOLD_INSTANCE: OnceCell>>> = OnceCell::new(); +static RUNTIME: OnceCell> = OnceCell::new(); /// The wallet error type. #[derive(Debug, thiserror::Error)] @@ -143,37 +144,77 @@ pub(crate) fn remove_stronghold(stronghold_path: PathBuf) { stronghold_map.remove(&stronghold_path); } -pub(crate) fn with_stronghold_from_path T>(path: &PathBuf, cb: F) -> T { +pub(crate) fn with_stronghold_from_path crate::Result>( + path: &PathBuf, + cb: F, +) -> crate::Result { let stronghold_map = STRONGHOLD_INSTANCE.get_or_init(Default::default).lock().unwrap(); if let Some(stronghold) = stronghold_map.get(path) { cb(stronghold) } else { - panic!("should initialize stronghold instance before using it") + Err(anyhow::anyhow!("should initialize stronghold instance before using it").into()) } } pub(crate) fn block_on(cb: C) -> C::Output { - static INSTANCE: OnceCell> = OnceCell::new(); - let runtime = INSTANCE.get_or_init(|| Mutex::new(Runtime::new().unwrap())); + let runtime = RUNTIME.get_or_init(|| Mutex::new(Runtime::new().unwrap())); runtime.lock().unwrap().block_on(cb) } +pub(crate) fn enter R>(cb: C) -> R { + let runtime = RUNTIME.get_or_init(|| Mutex::new(Runtime::new().unwrap())); + runtime.lock().unwrap().enter(cb) +} + #[cfg(test)] mod test_utils { use super::account_manager::AccountManager; + use iota::pow::providers::{Provider as PowProvider, ProviderBuilder as PowProviderBuilder}; use once_cell::sync::OnceCell; use rand::{thread_rng, Rng}; - use std::path::PathBuf; + use std::{path::PathBuf, sync::Mutex, time::Duration}; - static MANAGER_INSTANCE: OnceCell = OnceCell::new(); - pub fn get_account_manager() -> &'static AccountManager { + static MANAGER_INSTANCE: OnceCell> = OnceCell::new(); + pub fn get_account_manager() -> &'static Mutex { MANAGER_INSTANCE.get_or_init(|| { - let storage_path: String = thread_rng().gen_ascii_chars().take(10).collect(); - let storage_path = PathBuf::from(format!("./example-database/{}", storage_path)); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(async move { + let storage_path: String = thread_rng().gen_ascii_chars().take(10).collect(); + let storage_path = PathBuf::from(format!("./example-database/{}", storage_path)); - let mut manager = AccountManager::with_storage_path(storage_path).unwrap(); - manager.set_stronghold_password("password").unwrap(); - manager + let mut manager = AccountManager::with_storage_path(storage_path).await.unwrap(); + manager.set_polling_interval(Duration::from_secs(4)); + manager.set_stronghold_password("password").await.unwrap(); + Mutex::new(manager) + }) }) } + + /// The miner builder. + #[derive(Default)] + pub struct NoopNonceProviderBuilder; + + impl PowProviderBuilder for NoopNonceProviderBuilder { + type Provider = NoopNonceProvider; + + fn new() -> Self { + Self::default() + } + + fn finish(self) -> NoopNonceProvider { + NoopNonceProvider {} + } + } + + /// The miner used for PoW + pub struct NoopNonceProvider; + + impl PowProvider for NoopNonceProvider { + type Builder = NoopNonceProviderBuilder; + type Error = crate::WalletError; + + fn nonce(&self, bytes: &[u8], target_score: f64) -> std::result::Result { + Ok(0) + } + } } diff --git a/src/monitor.rs b/src/monitor.rs index 671271a12..6b80c372d 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - account::{Account, AccountIdentifier}, + account::AccountHandle, address::{AddressOutput, IotaAddress}, client::ClientOptions, message::{Message, MessageType}, @@ -11,7 +11,7 @@ use crate::{ use iota::{message::prelude::MessageId, MessageMetadata, OutputMetadata, Topic, TopicEvent}; use serde::Deserialize; -use std::{convert::TryInto, path::PathBuf}; +use std::convert::TryInto; #[derive(Deserialize)] struct AddressOutputPayload { @@ -42,85 +42,62 @@ struct AddressOutputPayloadAddress { } /// Unsubscribe from all topics associated with the account. -pub fn unsubscribe(account: &Account) -> crate::Result<()> { +pub async fn unsubscribe(account_handle: AccountHandle) -> crate::Result<()> { + let account = account_handle.read().await; let client = crate::client::get_client(account.client_options()); - let mut client = client.write().unwrap(); + let mut client = client.write().await; client.subscriber().unsubscribe()?; Ok(()) } -fn mutate_account( - account_id: &AccountIdentifier, - storage_path: &PathBuf, - cb: F, -) -> crate::Result<()> { - let mut account = crate::storage::get_account(&storage_path, &account_id)?; - cb(&mut account); - account.save()?; - Ok(()) -} - -fn subscribe_to_topic( +async fn subscribe_to_topic( client_options: &ClientOptions, topic: String, handler: C, ) -> crate::Result<()> { let client = crate::client::get_client(&client_options); - let mut client = client.write().unwrap(); + let mut client = client.write().await; client.subscriber().topic(Topic::new(topic)?).subscribe(handler)?; Ok(()) } /// Monitor account addresses for balance changes. -pub fn monitor_account_addresses_balance(account: &Account) -> crate::Result<()> { +pub async fn monitor_account_addresses_balance(account_handle: AccountHandle) -> crate::Result<()> { + let account = account_handle.read().await; for address in account.addresses() { - monitor_address_balance(&account, address.address())?; + monitor_address_balance(account_handle.clone(), address.address()).await?; } Ok(()) } /// Monitor address for balance changes. -pub fn monitor_address_balance(account: &Account, address: &IotaAddress) -> crate::Result<()> { - let account_id = account.id().clone(); - let storage_path = account.storage_path().clone(); - let client_options = account.client_options().clone(); +pub async fn monitor_address_balance(account_handle: AccountHandle, address: &IotaAddress) -> crate::Result<()> { + let client_options = account_handle.client_options().await; + let client_options_ = client_options.clone(); let address = address.clone(); - let address_bech32 = address.to_bech32(); subscribe_to_topic( - account.client_options(), - format!("addresses/{}/outputs", address_bech32), + &client_options_, + format!("addresses/{}/outputs", address.to_bech32()), move |topic_event| { let topic_event = topic_event.clone(); let address = address.clone(); let client_options = client_options.clone(); - let storage_path = storage_path.clone(); - let account_id = account_id.clone(); - - std::thread::spawn(move || { - crate::block_on(async { - let _ = process_output( - topic_event.payload.clone(), - account_id, - address, - client_options, - storage_path, - ) - .await; - }); + let account_handle = account_handle.clone(); + + crate::block_on(async { + let _ = process_output(topic_event.payload.clone(), account_handle, address, client_options).await; }); }, - )?; - - Ok(()) + ) + .await } async fn process_output( payload: String, - account_id: AccountIdentifier, + account_handle: AccountHandle, address: IotaAddress, client_options: ClientOptions, - storage_path: PathBuf, ) -> crate::Result<()> { let output: AddressOutputPayload = serde_json::from_str(&payload)?; let metadata = OutputMetadata { @@ -139,98 +116,100 @@ async fn process_output( let message = { let client = crate::client::get_client(&client_options_); - let client = client.read().unwrap(); + let client = client.read().await; client.get_message().data(&message_id_).await? }; + let mut account = account_handle.write().await; + let account_id = account.id().clone(); let message_id_ = *message_id; - mutate_account(&account_id, &storage_path, |account| { - { - let addresses = account.addresses_mut(); - let address_to_update = addresses.iter_mut().find(|a| a.address() == &address).unwrap(); - address_to_update.handle_new_output(address_output); - crate::event::emit_balance_change(&account_id, &address_to_update, *address_to_update.balance()); - } - match account.messages_mut().iter().position(|m| m.id() == &message_id_) { - Some(message_index) => { + let addresses = account.addresses_mut(); + let address_to_update = addresses.iter_mut().find(|a| a.address() == &address).unwrap(); + address_to_update.handle_new_output(address_output); + crate::event::emit_balance_change(&account_id, &address_to_update, *address_to_update.balance()); + + match account.messages_mut().iter().position(|m| m.id() == &message_id_) { + Some(message_index) => { + account.do_mut(|account| { let message = &mut account.messages_mut()[message_index]; message.set_confirmed(Some(true)); - } - None => { - let message = - Message::from_iota_message(message_id_, account.addresses(), &message, Some(true)).unwrap(); - crate::event::emit_transaction_event( - crate::event::TransactionEventType::NewTransaction, - &account_id, - &message, - ); + }); + } + None => { + let message = Message::from_iota_message(message_id_, account.addresses(), &message, Some(true)).unwrap(); + crate::event::emit_transaction_event( + crate::event::TransactionEventType::NewTransaction, + account.id(), + &message, + ); + account.do_mut(|account| { account.messages_mut().push(message); - } + }); } - })?; + } Ok(()) } /// Monitor the account's unconfirmed messages for confirmation state change. -pub fn monitor_unconfirmed_messages(account: &Account) -> crate::Result<()> { +pub async fn monitor_unconfirmed_messages(account_handle: AccountHandle) -> crate::Result<()> { + let account = account_handle.read().await; for message in account.list_messages(0, 0, Some(MessageType::Unconfirmed)) { - monitor_confirmation_state_change(&account, message.id())?; + monitor_confirmation_state_change(account_handle.clone(), message.id()).await?; } Ok(()) } /// Monitor message for confirmation state. -pub fn monitor_confirmation_state_change(account: &Account, message_id: &MessageId) -> crate::Result<()> { - let account_id = account.id().clone(); - let storage_path = account.storage_path().clone(); - let message = account - .messages() - .iter() - .find(|message| message.id() == message_id) - .unwrap() - .clone(); +pub async fn monitor_confirmation_state_change( + account_handle: AccountHandle, + message_id: &MessageId, +) -> crate::Result<()> { + let (message, client_options) = { + let account = account_handle.read().await; + let message = account + .messages() + .iter() + .find(|message| message.id() == message_id) + .unwrap() + .clone(); + (message, account.client_options().clone()) + }; let message_id = *message_id; subscribe_to_topic( - account.client_options(), + &client_options, format!("messages/{}/metadata", message_id.to_string()), move |topic_event| { - let account_id = account_id.clone(); - let _ = process_metadata( - topic_event.payload.clone(), - account_id, - message_id, - &message, - &storage_path, - ); + let account_handle = account_handle.clone(); + crate::block_on(async { + let _ = process_metadata(topic_event.payload.clone(), account_handle, message_id, &message).await; + }); }, - )?; - Ok(()) + ) + .await } -fn process_metadata( +async fn process_metadata( payload: String, - account_id: AccountIdentifier, + account_handle: AccountHandle, message_id: MessageId, message: &Message, - storage_path: &PathBuf, ) -> crate::Result<()> { let metadata: MessageMetadata = serde_json::from_str(&payload)?; if let Some(inclusion_state) = metadata.ledger_inclusion_state { let confirmed = inclusion_state == "included"; if message.confirmed().is_none() || confirmed != message.confirmed().unwrap() { - mutate_account(&account_id, &storage_path, |account| { - let message_id = { - let messages = account.messages_mut(); - let message = messages.iter_mut().find(|m| m.id() == &message_id).unwrap(); - message.set_confirmed(Some(confirmed)); - *message.id() - }; - - crate::event::emit_confirmation_state_change(&account_id, &message, confirmed); - })?; + let mut account = account_handle.write().await; + + account.do_mut(|account| { + let messages = account.messages_mut(); + let account_message = messages.iter_mut().find(|m| m.id() == &message_id).unwrap(); + account_message.set_confirmed(Some(confirmed)); + }); + + crate::event::emit_confirmation_state_change(account.id(), &message, confirmed); } } Ok(()) diff --git a/src/signing/stronghold.rs b/src/signing/stronghold.rs index 96c4cd478..dac342ee0 100644 --- a/src/signing/stronghold.rs +++ b/src/signing/stronghold.rs @@ -61,12 +61,13 @@ impl super::Signer for StrongholdSigner { }) .collect::>(); crate::with_stronghold_from_path(account.storage_path(), |stronghold| { - stronghold.get_transaction_unlock_blocks( - &account_id_to_stronghold_record_id(account.id())?, - &essence, - &mut inputs, - ) + stronghold + .get_transaction_unlock_blocks( + &account_id_to_stronghold_record_id(account.id())?, + &essence, + &mut inputs, + ) + .map_err(Into::into) }) - .map_err(|e| e.into()) } } diff --git a/src/storage/stronghold.rs b/src/storage/stronghold.rs index a9bc92600..e929a576c 100644 --- a/src/storage/stronghold.rs +++ b/src/storage/stronghold.rs @@ -85,8 +85,9 @@ impl StorageAdapter for StrongholdStorageAdapter { let mut accounts = vec![]; let (_, index) = crate::with_stronghold_from_path(&self.path, |stronghold| get_account_index(&stronghold))?; for (_, record_id) in index { - let account = - crate::with_stronghold_from_path(&self.path, |stronghold| stronghold.record_read(&record_id))?; + let account = crate::with_stronghold_from_path(&self.path, |stronghold| { + stronghold.record_read(&record_id).map_err(Into::into) + })?; accounts.push(account); } Ok(accounts)