Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions wallet/src/actors/app/builder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};

use actix::prelude::*;
use failure::Error;

use witnet_net::client::tcp::JsonRpcClient;

use crate::actors::{storage, Crypto, RadExecutor, Storage};
use crate::{
actors::{Crypto, RadExecutor, Storage},
storage,
};

#[derive(Default)]
pub struct AppBuilder {
Expand All @@ -31,19 +34,25 @@ impl AppBuilder {
|| Ok(None),
|url| JsonRpcClient::start(url.as_ref()).map(Some),
)?;
let storage = Storage::build()
.with_path(self.db_path)
.with_file_name("witnet_wallets.db")
.with_options({
let mut db_opts = storage::Options::default();
db_opts.create_if_missing(true);
db_opts
})
.start()?;

let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_merge_operator("merge operator", storage::storage_merge_operator, None);
// From rocksdb docs: every store to stable storage will issue a fsync. This parameter
// should be set to true while storing data to filesystem like ext3 that can lose files
// after a reboot.
db_opts.set_use_fsync(true);

let db = Arc::new(
rocksdb::DB::open(&db_opts, self.db_path.join("witnet_wallet.db"))
.map_err(storage::Error::OpenDbFailed)?,
);
let storage = Storage::build().start()?;
let crypto = Crypto::build().start();
let rad_executor = RadExecutor::start();

let app = super::App {
db,
storage,
rad_executor,
node_client,
Expand Down
4 changes: 0 additions & 4 deletions wallet/src/actors/app/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use failure::Fail;
use witnet_net::client::tcp;
use witnet_rad::error::RadError;

use crate::storage;

#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "wallet not connected to a node")]
Expand All @@ -24,8 +22,6 @@ pub enum Error {
RadFailed(#[cause] RadError),
#[fail(display = "could not communicate with database")]
StorageCommFailed(#[cause] actix::MailboxError),
#[fail(display = "{}", _0)]
StorageOpFailed(#[cause] storage::Error),
#[fail(display = "could not communicate with cryptographic engine")]
CryptoCommFailed(#[cause] actix::MailboxError),
}
8 changes: 2 additions & 6 deletions wallet/src/actors/app/handlers/get_wallet_infos.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use actix::prelude::*;
use futures::future;

use crate::actors::storage;
use crate::actors::{app::error, App};
use crate::actors::App;
use crate::api;

impl Message for api::WalletInfosRequest {
Expand All @@ -14,10 +13,7 @@ impl Handler<api::WalletInfosRequest> for App {

fn handle(&mut self, _msg: api::WalletInfosRequest, _ctx: &mut Self::Context) -> Self::Result {
let fut = self
.storage
.send(storage::GetWalletInfos)
.map_err(error::Error::StorageCommFailed)
.and_then(|res| future::result(res).map_err(error::Error::StorageOpFailed))
.get_wallet_infos()
.and_then(|infos| {
future::ok(api::WalletInfosResponse {
total: infos.len(),
Expand Down
3 changes: 3 additions & 0 deletions wallet/src/actors/app/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ mod notification;
mod run_rad_req;
mod send_data_req;
mod send_vtt;
mod stop;
mod subscribe;
mod unlock_wallet;
mod unsubscribe;

pub use stop::*;
18 changes: 18 additions & 0 deletions wallet/src/actors/app/handlers/stop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use actix::prelude::*;

use crate::actors::app::App;

pub struct Stop;

impl Message for Stop {
type Result = Result<(), failure::Error>;
}

impl Handler<Stop> for App {
type Result = ResponseFuture<(), failure::Error>;

fn handle(&mut self, _msg: Stop, _ctx: &mut Self::Context) -> Self::Result {
log::info!("stopping application...");
self.stop()
}
}
31 changes: 29 additions & 2 deletions wallet/src/actors/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! # Application actor.
//!
//! See [`App`](App) actor for more information.
use std::sync::Arc;

use actix::prelude::*;
use failure::Error;
Expand All @@ -17,13 +18,17 @@ use crate::wallet;

pub mod builder;
pub mod error;
mod handlers;
pub mod handlers;

/// Expose message to stop application.
pub use handlers::Stop;

/// Application actor.
///
/// The application actor is in charge of managing the state of the application and coordinating the
/// service actors, e.g.: storage, node client, and so on.
pub struct App {
db: Arc<rocksdb::DB>,
storage: Addr<Storage>,
rad_executor: Addr<RadExecutor>,
crypto: Addr<Crypto>,
Expand Down Expand Up @@ -99,6 +104,17 @@ impl App {
}
}

/// Get id and caption of all the wallets stored in the database.
fn get_wallet_infos(&self) -> ResponseFuture<Vec<wallet::WalletInfo>, Error> {
let fut = self
.storage
.send(storage::GetWalletInfos(self.db.clone()))
.map_err(map_storage_failed_err)
.and_then(map_err);

Box::new(fut)
}

/// Create an empty wallet.
fn create_wallet(
&self,
Expand Down Expand Up @@ -129,14 +145,25 @@ impl App {
let wallet = wallet::Wallet::new(info, content);

slf.storage
.send(storage::CreateWallet(wallet, password))
.send(storage::CreateWallet(slf.db.clone(), wallet, password))
.map_err(map_storage_failed_err)
.map(move |_| id)
.into_actor(slf)
});

Box::new(fut)
}

/// Perform all the tasks needed to properly stop the application.
fn stop(&self) -> ResponseFuture<(), Error> {
let fut = self
.storage
.send(storage::Flush(self.db.clone()))
.map_err(map_storage_failed_err)
.and_then(map_err);

Box::new(fut)
}
}

impl Actor for App {
Expand Down
28 changes: 21 additions & 7 deletions wallet/src/actors/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,24 @@ use actix::prelude::*;
use jsonrpc_core as rpc;
use jsonrpc_pubsub as pubsub;

use super::App;
use super::{app, App};
use crate::api;
use witnet_net::server::ws::Server;

/// Controller actor.
pub struct Controller {
_server: Server,
_app: Addr<App>,
server: Option<Server>,
app: Addr<App>,
}

impl Controller {
pub fn build() -> ControllerBuilder {
ControllerBuilder::new()
}

fn stop_server(&mut self) {
drop(self.server.take())
}
}

/// Controller builder used to set optional parameters using the builder-pattern.
Expand Down Expand Up @@ -89,8 +93,8 @@ impl ControllerBuilder {
.start()?;

let controller = Controller {
_app: app,
_server: server,
server: Some(server),
app,
};

Ok(controller.start())
Expand All @@ -110,7 +114,17 @@ impl Message for Shutdown {
impl Handler<Shutdown> for Controller {
type Result = ();

fn handle(&mut self, _: Shutdown, _ctx: &mut Self::Context) -> Self::Result {
System::current().stop();
fn handle(&mut self, _msg: Shutdown, ctx: &mut Self::Context) -> Self::Result {
self.stop_server();
self.app
.send(app::Stop)
.map_err(|_| log::error!("couldn't stop application"))
.and_then(|_| {
log::info!("shutting down system!");
System::current().stop();
Ok(())
})
.into_actor(self)
.spawn(ctx);
}
}
50 changes: 4 additions & 46 deletions wallet/src/actors/storage/builder.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,30 @@
use std::env;
use std::path::PathBuf;
use std::sync::Arc;

use actix::prelude::*;
use failure::Error;

use super::Storage;
use crate::storage;

pub struct Builder<'a> {
options: Option<rocksdb::Options>,
path: Option<PathBuf>,
name: Option<&'a str>,
pub struct Builder {
params: storage::Params,
}

impl<'a> Builder<'a> {
impl Builder {
pub fn new() -> Self {
let params = storage::Params {
encrypt_hash_iterations: 10_000,
encrypt_iv_length: 16,
encrypt_salt_length: 32,
};

Self {
params,
path: None,
name: None,
options: None,
}
}
/// Create a new instance of the Storage actor using the given database options.
pub fn with_options(mut self, options: rocksdb::Options) -> Self {
self.options = Some(options);
self
}

/// Set the path where to store the database files.
pub fn with_path(mut self, path: PathBuf) -> Self {
self.path = Some(path);
self
}

/// Set the filename of the database.
pub fn with_file_name(mut self, name: &'a str) -> Self {
self.name = Some(name);
self
Self { params }
}

/// Start an instance of the actor inside a SyncArbiter.
pub fn start(self) -> Result<Addr<Storage>, Error> {
let mut options = self.options.unwrap_or_default();
options.set_merge_operator("merge operator", storage::storage_merge_operator, None);
// From rocksdb docs: every store to stable storage will issue a fsync. This parameter
// should be set to true while storing data to filesystem like ext3 that can lose files
// after a reboot.
options.set_use_fsync(true);
let path = self.path.map_or_else(env::current_dir, Ok)?;
let file_name = self.name.unwrap_or_else(|| "witnet_wallets.db");
let db = rocksdb::DB::open(&options, path.join(file_name))
.map_err(storage::Error::OpenDbFailed)?;
let db_ref = Arc::new(db);
let params_ref = Arc::new(self.params);

// Spawn one thread with the storage actor (because is blocking). Do not use more than one
// thread, otherwise you'll receive and error because RocksDB only allows one connection at a
// time.
let addr = SyncArbiter::start(1, move || Storage::new(params_ref.clone(), db_ref.clone()));
let addr = SyncArbiter::start(1, move || Storage::new(self.params.clone()));

Ok(addr)
}
Expand Down
7 changes: 5 additions & 2 deletions wallet/src/actors/storage/handlers/create_wallet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use actix::prelude::*;

use witnet_protected::ProtectedString;
Expand All @@ -6,6 +8,7 @@ use crate::actors::storage::Storage;
use crate::{storage::Error, wallet};

pub struct CreateWallet(
pub Arc<rocksdb::DB>,
/// Wallet to save
pub wallet::Wallet,
/// Encryption password
Expand All @@ -21,9 +24,9 @@ impl Handler<CreateWallet> for Storage {

fn handle(
&mut self,
CreateWallet(wallet, password): CreateWallet,
CreateWallet(db, wallet, password): CreateWallet,
_ctx: &mut Self::Context,
) -> Self::Result {
self.create_wallet(wallet, password)
self.create_wallet(db.as_ref(), wallet, password)
}
}
21 changes: 21 additions & 0 deletions wallet/src/actors/storage/handlers/flush.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::sync::Arc;

use actix::prelude::*;

use crate::actors::storage::Storage;
use crate::storage::Error;

pub struct Flush(pub Arc<rocksdb::DB>);

impl Message for Flush {
type Result = Result<(), Error>;
}

impl Handler<Flush> for Storage {
type Result = <Flush as Message>::Result;

fn handle(&mut self, Flush(db): Flush, _ctx: &mut Self::Context) -> Self::Result {
log::info!("flushing storage");
self.flush(db.as_ref())
}
}
Loading