Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to latest actix-based Stronghold #370

Merged
merged 1 commit into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions identity-account/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["iota", "tangle", "identity"]
homepage = "https://www.iota.org"

[dependencies]
actix = { version = "0.12.0", optional = true }
async-trait = { version = "0.1", default-features = false }
futures = { version = "0.3" }
hashbrown = { version = "0.11", features = ["serde"] }
Expand All @@ -22,10 +23,9 @@ itoa = { version = "0.4" }
log = { version = "0.4", default-features = false }
once_cell = { version = "1.7", default-features = false, features = ["std"] }
paste = { version = "1.0" }
riker = { version = "0.4", optional = true }
serde = { version = "1.0", default-features = false, features = [
"alloc",
"derive"
"derive",
] }
slog = { version = "2.7" }
strum = { version = "0.21", features = ["derive"] }
Expand All @@ -34,17 +34,17 @@ tokio = { version = "1.5", features = ["sync"] }
zeroize = { version = "1.4" }

[dependencies.iota-crypto]
version = "0.5"
version = "0.7"
features = ["blake2b", "ed25519", "hmac", "pbkdf", "sha", "slip10", "std"]
PhilippGackstatter marked this conversation as resolved.
Show resolved Hide resolved

[dependencies.iota_stronghold]
git = "https://github.com/iotaledger/stronghold.rs"
rev = "6dd92dc9743eba2f3b4126425e7572470d92c80b"
rev = "ad57181e7c5baa4b6ccb66fb464667c97967db08"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like that this was the latest revision. is there a specific reason to not use head?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean to specify branch = "dev" rather than a specific commit? We've just had some bad experience with iota-client not pinning their commits to a specific version, which broke our build randomly. We'd thus rather do a manual upgrade 馃檪

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. if stronghold reaches stable, I would advise using the master branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is debatable, because it is always safer to pin to a rev (even if it is a bit more opaque as to which rev you are actually consuming.) The other point is that there is only a main branch, not a master branch.

optional = true

[dependencies.stronghold_engine]
git = "https://github.com/iotaledger/stronghold.rs"
rev = "6dd92dc9743eba2f3b4126425e7572470d92c80b"
rev = "ad57181e7c5baa4b6ccb66fb464667c97967db08"
optional = true

[dev-dependencies]
Expand All @@ -54,15 +54,15 @@ tokio = { version = "1.5", features = [
"macros",
"rt",
"rt-multi-thread",
"sync"
"sync",
] }

[features]
mem-client = []
stronghold = [
"iota_stronghold",
"stronghold_engine",
"riker",
"actix",
"tokio/rt-multi-thread",
]
wasm = ["identity-iota/wasm"]
Expand Down
4 changes: 0 additions & 4 deletions identity-account/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ pub enum Error {
/// Caused by attempting to perform an invalid IO operation.
#[error(transparent)]
IoError(#[from] std::io::Error),
/// Caused by an internal failure of the riker actor system.
#[cfg(feature = "stronghold")]
#[error(transparent)]
ActorSystemError(#[from] riker::system::SystemError),
/// Caused by errors from the [iota_stronghold] crate.
#[cfg(feature = "stronghold")]
#[error(transparent)]
Expand Down
2 changes: 1 addition & 1 deletion identity-account/src/storage/stronghold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Stronghold {
#[async_trait::async_trait]
impl Storage for Stronghold {
async fn set_password(&self, password: EncryptionKey) -> Result<()> {
self.snapshot.set_password(password)
self.snapshot.set_password(password).await
}

async fn flush_changes(&self) -> Result<()> {
Expand Down
115 changes: 58 additions & 57 deletions identity-account/src/stronghold/context.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
// Copyright 2020-2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use actix::System;
use core::ops::Deref;
use core::ops::DerefMut;
use hashbrown::HashMap;
use hashbrown::HashSet;
use iota_stronghold::Stronghold;
use iota_stronghold::StrongholdFlags;
use once_cell::sync::OnceCell;
use riker::actors::ActorSystem;
use riker::actors::SystemBuilder;
use once_cell::sync::Lazy;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::Once;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use tokio::runtime::Runtime as AsyncRuntime;
use tokio::sync::Mutex as AsyncMutex;
use tokio::sync::MutexGuard as AsyncMutexGuard;
use zeroize::Zeroize;
Expand All @@ -38,21 +35,11 @@ pub struct Context {
runtime: Runtime,
}

fn async_runtime() -> Result<&'static Mutex<AsyncRuntime>> {
static __THIS: OnceCell<Mutex<AsyncRuntime>> = OnceCell::new();

__THIS.get_or_try_init(|| AsyncRuntime::new().map_err(Into::into).map(Mutex::new))
}

fn async_runtime_guard() -> Result<MutexGuard<'static, AsyncRuntime>> {
async_runtime().and_then(|mutex| mutex.lock().map_err(|_| Error::StrongholdMutexPoisoned("runtime")))
}

async fn clear_expired_passwords() -> Result<()> {
let this: &'static Context = Context::get()?;
let this: &'static Context = Context::get().await?;
let interval: Duration = *this.runtime.password_clear()?;

thread::sleep(interval);
tokio::time::sleep(interval).await;

if interval.as_nanos() == 0 {
return Ok(());
Expand All @@ -78,49 +65,59 @@ async fn clear_expired_passwords() -> Result<()> {
Ok(())
}

impl Context {
pub(crate) fn get() -> Result<&'static Self> {
static __THIS: OnceCell<Context> = OnceCell::new();
static __POOL: OnceCell<AsyncRuntime> = OnceCell::new();
static __SWEEP: Once = Once::new();
static CONTEXT: Lazy<core::result::Result<Context, String>> = Lazy::new(|| {
let (sender, receiver) = std::sync::mpsc::channel();

let this: &'static Self = __THIS.get_or_try_init::<_, Error>(|| {
let system: ActorSystem = SystemBuilder::new()
// Disable the default actor system logger
.log(slog::Logger::root(slog::Discard, slog::o!()))
.create()?;
thread::spawn(move || {
let system_runner = System::new();

let stronghold: Stronghold = Stronghold::init_stronghold_system(system, Vec::new(), Vec::new());
let database: Arc<AsyncMutex<Database>> = Arc::new(AsyncMutex::new(Database::new(stronghold)));
// TODO: convert error properly once anyhow is no longer used
let stronghold = system_runner
.block_on(Stronghold::init_stronghold_system(vec![], vec![]))
.map_err(|err| err.to_string());
PhilippGackstatter marked this conversation as resolved.
Show resolved Hide resolved

Ok(Self {
let context = stronghold.map(|sh| {
let database: Arc<AsyncMutex<Database>> = Arc::new(AsyncMutex::new(Database::new(sh)));
Context {
database,
runtime: Runtime::new(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be fine, as stronghold implicitly runs on a single threaded tokio runtime. access to strongholds functionality is synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still think it makes sense to make the Stronghold type clonable and get rid of the mutex here? This can come at a later point, I think, but it would be good to remove the double synchronization. In any case, thanks for taking the time to review!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stronghold can be made cloneable, since it only stores addresses. One issue i see here, is the synchronization between the actix system and the runtime used by identity. "Issue" in that sense that we need to create two runtimes. Maybe there is a way to lift this issue in the future.

})
})?;

// Spawn a background-process to clear expired passwords
__SWEEP.call_once(|| {
thread::spawn(|| {
// unwrap so an error kills the thread
async_runtime_guard().unwrap().spawn(async {
loop {
// unwrap so an error kills the thread
clear_expired_passwords().await.unwrap();
}
});
});
}
});

sender.send(context).expect("receiver channel has been dropped");

let spawn_successful = System::current().arbiter().spawn(async {
loop {
clear_expired_passwords()
.await
.expect("background password clearing failed");
}
});

Ok(this)
if !spawn_successful {
panic!("failed to send background task to arbiter");
}

system_runner.run().expect("system runner failed");
});

receiver.recv().expect("sender has been disconnected")
PhilippGackstatter marked this conversation as resolved.
Show resolved Hide resolved
});

impl Context {
pub(crate) async fn get() -> Result<&'static Self> {
match CONTEXT.deref() {
Ok(ctx) => Ok(ctx),
Err(err) => Err(Error::StrongholdResult(err.to_owned())),
}
}

pub(crate) async fn scope(
path: &Path,
name: &[u8],
flags: &[StrongholdFlags],
) -> Result<AsyncMutexGuard<'static, Database>> {
let this: &Self = Self::get()?;
let this: &Self = Self::get().await?;
let mut database: _ = this.database.lock().await;

database.switch_snapshot(&this.runtime, path).await?;
Expand All @@ -129,27 +126,31 @@ impl Context {
Ok(database)
}

pub(crate) fn on_change<T>(listener: T) -> Result<()>
pub(crate) async fn on_change<T>(listener: T) -> Result<()>
where
T: FnMut(&Path, &SnapshotStatus) + Send + 'static,
{
Self::get().and_then(|this| this.runtime.on_change(listener))
Self::get().await.and_then(|this| this.runtime.on_change(listener))
}

pub(crate) fn set_password(path: &Path, password: Password) -> Result<()> {
Self::get().and_then(|this| this.runtime.set_password(path, password))
pub(crate) async fn set_password(path: &Path, password: Password) -> Result<()> {
Self::get()
.await
.and_then(|this| this.runtime.set_password(path, password))
}

pub(crate) fn set_password_clear(interval: Duration) -> Result<()> {
Self::get().and_then(|this| this.runtime.set_password_clear(interval))
pub(crate) async fn set_password_clear(interval: Duration) -> Result<()> {
Self::get()
.await
.and_then(|this| this.runtime.set_password_clear(interval))
}

pub(crate) fn snapshot_status(path: &Path) -> Result<SnapshotStatus> {
Self::get().and_then(|this| this.runtime.snapshot_status(path))
pub(crate) async fn snapshot_status(path: &Path) -> Result<SnapshotStatus> {
Self::get().await.and_then(|this| this.runtime.snapshot_status(path))
}

pub(crate) async fn load(path: &Path, password: Password) -> Result<()> {
let this: &Self = Self::get()?;
let this: &Self = Self::get().await?;
let mut database: _ = this.database.lock().await;

this.runtime.set_password(path, password)?;
Expand All @@ -160,7 +161,7 @@ impl Context {
}

pub(crate) async fn unload(path: &Path, persist: bool) -> Result<()> {
let this: &Self = Self::get()?;
let this: &Self = Self::get().await?;
let mut database: _ = this.database.lock().await;

database.flush(&this.runtime, path, persist).await?;
Expand All @@ -169,7 +170,7 @@ impl Context {
}

pub(crate) async fn save(path: &Path) -> Result<()> {
let this: &Self = Self::get()?;
let this: &Self = Self::get().await?;
let mut database: _ = this.database.lock().await;

database.write(&this.runtime, path).await?;
Expand Down
16 changes: 8 additions & 8 deletions identity-account/src/stronghold/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ pub struct Snapshot {
}

impl Snapshot {
pub fn set_password_clear(interval: Duration) -> Result<()> {
Context::set_password_clear(interval)
pub async fn set_password_clear(interval: Duration) -> Result<()> {
Context::set_password_clear(interval).await
}

pub fn on_change<T>(listener: T) -> Result<()>
pub async fn on_change<T>(listener: T) -> Result<()>
where
T: FnMut(&Path, &SnapshotStatus) + Send + 'static,
{
Context::on_change(listener)
Context::on_change(listener).await
}

pub fn new<P>(path: &P) -> Self
Expand Down Expand Up @@ -65,12 +65,12 @@ impl Snapshot {
Records::new(&self.path, name, flags)
}

pub fn status(&self) -> Result<SnapshotStatus> {
Context::snapshot_status(&self.path)
pub async fn status(&self) -> Result<SnapshotStatus> {
Context::snapshot_status(&self.path).await
}

pub fn set_password(&self, password: Password) -> Result<()> {
Context::set_password(&self.path, password)
pub async fn set_password(&self, password: Password) -> Result<()> {
Context::set_password(&self.path, password).await
}

pub async fn load(&self, password: Password) -> Result<()> {
Expand Down
14 changes: 7 additions & 7 deletions identity-account/src/stronghold/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ rusty_fork_test! {
block_on(async {
let interval: Duration = Duration::from_millis(100);

Snapshot::set_password_clear(interval).unwrap();
Snapshot::set_password_clear(interval).await.unwrap();

let filename: PathBuf = generate_filename();
let snapshot: Snapshot = Snapshot::new(&filename);
Expand All @@ -81,7 +81,7 @@ rusty_fork_test! {
);

assert!(
matches!(snapshot.status().unwrap(), SnapshotStatus::Locked),
matches!(snapshot.status().await.unwrap(), SnapshotStatus::Locked),
"unexpected snapshot status",
);
})
Expand All @@ -92,7 +92,7 @@ rusty_fork_test! {
block_on(async {
let interval: Duration = Duration::from_millis(300);

Snapshot::set_password_clear(interval).unwrap();
Snapshot::set_password_clear(interval).await.unwrap();

let filename: PathBuf = generate_filename();
let snapshot: Snapshot = Snapshot::new(&filename);
Expand All @@ -105,7 +105,7 @@ rusty_fork_test! {
let location: Location = location(&format!("persists{}", index));

let set_result = store.set(location, format!("STRONGHOLD{}", index), None).await;
let status: SnapshotStatus = snapshot.status().unwrap();
let status: SnapshotStatus = snapshot.status().await.unwrap();

if let Some(timeout) = interval.checked_sub(instant.elapsed()) {
// Prior to the expiration time, the password should not be cleared yet
Expand All @@ -122,7 +122,7 @@ rusty_fork_test! {
} else {
// If elapsed > interval, set the password again.
// This might happen if the test is stopped by another thread.
snapshot.set_password(Default::default()).unwrap();
snapshot.set_password(Default::default()).await.unwrap();
instant = Instant::now();
}
}
Expand All @@ -131,7 +131,7 @@ rusty_fork_test! {

// Test may have taken too long / been interrupted and cleared the password already, retry
if matches!(result, Err(Error::StrongholdPasswordNotSet)) && interval.checked_sub(instant.elapsed()).is_none() {
snapshot.set_password(Default::default()).unwrap();
snapshot.set_password(Default::default()).await.unwrap();
result = store.get(location("persists1")).await;
}
assert_eq!(result.unwrap(), b"STRONGHOLD1");
Expand All @@ -146,7 +146,7 @@ rusty_fork_test! {
error
);
assert!(
matches!(snapshot.status().unwrap(), SnapshotStatus::Locked),
matches!(snapshot.status().await.unwrap(), SnapshotStatus::Locked),
"unexpected snapshot status",
);
})
Expand Down
2 changes: 1 addition & 1 deletion identity-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ url = { version = "2.2", default-features = false, features = ["serde"] }
zeroize = { version = "1.4", default-features = false }

[dependencies.iota-crypto]
version = "0.5"
version = "0.7"
default-features = false
features = ["blake2b", "ed25519", "random", "sha"]

Expand Down