From 68b74a5e220b7e83ba563e5d981964d73b3b4d98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anler=20Hern=C3=A1ndez=20Peral?= Date: Thu, 20 Jun 2019 15:24:58 +0200 Subject: [PATCH] refactor(wallet): implement graceful shutdown of the db --- wallet/src/actors/app/builder.rs | 31 ++++++++---- wallet/src/actors/app/error.rs | 4 -- .../actors/app/handlers/get_wallet_infos.rs | 8 +-- wallet/src/actors/app/handlers/mod.rs | 3 ++ wallet/src/actors/app/handlers/stop.rs | 18 +++++++ wallet/src/actors/app/mod.rs | 31 +++++++++++- wallet/src/actors/controller.rs | 28 ++++++++--- wallet/src/actors/storage/builder.rs | 50 ++----------------- .../actors/storage/handlers/create_wallet.rs | 7 ++- wallet/src/actors/storage/handlers/flush.rs | 21 ++++++++ .../{wallet_infos.rs => get_wallet_infos.rs} | 12 +++-- wallet/src/actors/storage/handlers/mod.rs | 6 ++- wallet/src/actors/storage/mod.rs | 42 +++++++++------- wallet/src/lib.rs | 1 - wallet/src/storage/mod.rs | 8 +++ 15 files changed, 169 insertions(+), 101 deletions(-) create mode 100644 wallet/src/actors/app/handlers/stop.rs create mode 100644 wallet/src/actors/storage/handlers/flush.rs rename wallet/src/actors/storage/handlers/{wallet_infos.rs => get_wallet_infos.rs} (59%) diff --git a/wallet/src/actors/app/builder.rs b/wallet/src/actors/app/builder.rs index b1754243d..ced2ebcf3 100644 --- a/wallet/src/actors/app/builder.rs +++ b/wallet/src/actors/app/builder.rs @@ -1,11 +1,14 @@ -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use actix::prelude::*; use failure::Error; use witnet_net::client::tcp::JsonRpcClient; -use crate::actors::{storage, Crypto, RadExecutor, Storage}; +use crate::{ + actors::{Crypto, RadExecutor, Storage}, + storage, +}; #[derive(Default)] pub struct AppBuilder { @@ -31,19 +34,25 @@ impl AppBuilder { || Ok(None), |url| JsonRpcClient::start(url.as_ref()).map(Some), )?; - let storage = Storage::build() - .with_path(self.db_path) - .with_file_name("witnet_wallets.db") - .with_options({ - let mut db_opts = storage::Options::default(); - db_opts.create_if_missing(true); - db_opts - }) - .start()?; + + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.set_merge_operator("merge operator", storage::storage_merge_operator, None); + // From rocksdb docs: every store to stable storage will issue a fsync. This parameter + // should be set to true while storing data to filesystem like ext3 that can lose files + // after a reboot. + db_opts.set_use_fsync(true); + + let db = Arc::new( + rocksdb::DB::open(&db_opts, self.db_path.join("witnet_wallet.db")) + .map_err(storage::Error::OpenDbFailed)?, + ); + let storage = Storage::build().start()?; let crypto = Crypto::build().start(); let rad_executor = RadExecutor::start(); let app = super::App { + db, storage, rad_executor, node_client, diff --git a/wallet/src/actors/app/error.rs b/wallet/src/actors/app/error.rs index 82b7fed4a..5434417fd 100644 --- a/wallet/src/actors/app/error.rs +++ b/wallet/src/actors/app/error.rs @@ -4,8 +4,6 @@ use failure::Fail; use witnet_net::client::tcp; use witnet_rad::error::RadError; -use crate::storage; - #[derive(Debug, Fail)] pub enum Error { #[fail(display = "wallet not connected to a node")] @@ -24,8 +22,6 @@ pub enum Error { RadFailed(#[cause] RadError), #[fail(display = "could not communicate with database")] StorageCommFailed(#[cause] actix::MailboxError), - #[fail(display = "{}", _0)] - StorageOpFailed(#[cause] storage::Error), #[fail(display = "could not communicate with cryptographic engine")] CryptoCommFailed(#[cause] actix::MailboxError), } diff --git a/wallet/src/actors/app/handlers/get_wallet_infos.rs b/wallet/src/actors/app/handlers/get_wallet_infos.rs index 04102f57f..a29c6a8c3 100644 --- a/wallet/src/actors/app/handlers/get_wallet_infos.rs +++ b/wallet/src/actors/app/handlers/get_wallet_infos.rs @@ -1,8 +1,7 @@ use actix::prelude::*; use futures::future; -use crate::actors::storage; -use crate::actors::{app::error, App}; +use crate::actors::App; use crate::api; impl Message for api::WalletInfosRequest { @@ -14,10 +13,7 @@ impl Handler for App { fn handle(&mut self, _msg: api::WalletInfosRequest, _ctx: &mut Self::Context) -> Self::Result { let fut = self - .storage - .send(storage::GetWalletInfos) - .map_err(error::Error::StorageCommFailed) - .and_then(|res| future::result(res).map_err(error::Error::StorageOpFailed)) + .get_wallet_infos() .and_then(|infos| { future::ok(api::WalletInfosResponse { total: infos.len(), diff --git a/wallet/src/actors/app/handlers/mod.rs b/wallet/src/actors/app/handlers/mod.rs index 452e32216..b19a81abc 100644 --- a/wallet/src/actors/app/handlers/mod.rs +++ b/wallet/src/actors/app/handlers/mod.rs @@ -11,6 +11,9 @@ mod notification; mod run_rad_req; mod send_data_req; mod send_vtt; +mod stop; mod subscribe; mod unlock_wallet; mod unsubscribe; + +pub use stop::*; diff --git a/wallet/src/actors/app/handlers/stop.rs b/wallet/src/actors/app/handlers/stop.rs new file mode 100644 index 000000000..9dbf74cfa --- /dev/null +++ b/wallet/src/actors/app/handlers/stop.rs @@ -0,0 +1,18 @@ +use actix::prelude::*; + +use crate::actors::app::App; + +pub struct Stop; + +impl Message for Stop { + type Result = Result<(), failure::Error>; +} + +impl Handler for App { + type Result = ResponseFuture<(), failure::Error>; + + fn handle(&mut self, _msg: Stop, _ctx: &mut Self::Context) -> Self::Result { + log::info!("stopping application..."); + self.stop() + } +} diff --git a/wallet/src/actors/app/mod.rs b/wallet/src/actors/app/mod.rs index 6b6ced608..a33fb67cb 100644 --- a/wallet/src/actors/app/mod.rs +++ b/wallet/src/actors/app/mod.rs @@ -1,6 +1,7 @@ //! # Application actor. //! //! See [`App`](App) actor for more information. +use std::sync::Arc; use actix::prelude::*; use failure::Error; @@ -17,13 +18,17 @@ use crate::wallet; pub mod builder; pub mod error; -mod handlers; +pub mod handlers; + +/// Expose message to stop application. +pub use handlers::Stop; /// Application actor. /// /// The application actor is in charge of managing the state of the application and coordinating the /// service actors, e.g.: storage, node client, and so on. pub struct App { + db: Arc, storage: Addr, rad_executor: Addr, crypto: Addr, @@ -99,6 +104,17 @@ impl App { } } + /// Get id and caption of all the wallets stored in the database. + fn get_wallet_infos(&self) -> ResponseFuture, Error> { + let fut = self + .storage + .send(storage::GetWalletInfos(self.db.clone())) + .map_err(map_storage_failed_err) + .and_then(map_err); + + Box::new(fut) + } + /// Create an empty wallet. fn create_wallet( &self, @@ -129,7 +145,7 @@ impl App { let wallet = wallet::Wallet::new(info, content); slf.storage - .send(storage::CreateWallet(wallet, password)) + .send(storage::CreateWallet(slf.db.clone(), wallet, password)) .map_err(map_storage_failed_err) .map(move |_| id) .into_actor(slf) @@ -137,6 +153,17 @@ impl App { Box::new(fut) } + + /// Perform all the tasks needed to properly stop the application. + fn stop(&self) -> ResponseFuture<(), Error> { + let fut = self + .storage + .send(storage::Flush(self.db.clone())) + .map_err(map_storage_failed_err) + .and_then(map_err); + + Box::new(fut) + } } impl Actor for App { diff --git a/wallet/src/actors/controller.rs b/wallet/src/actors/controller.rs index 61e6c5abe..59ec6f9c3 100644 --- a/wallet/src/actors/controller.rs +++ b/wallet/src/actors/controller.rs @@ -10,20 +10,24 @@ use actix::prelude::*; use jsonrpc_core as rpc; use jsonrpc_pubsub as pubsub; -use super::App; +use super::{app, App}; use crate::api; use witnet_net::server::ws::Server; /// Controller actor. pub struct Controller { - _server: Server, - _app: Addr, + server: Option, + app: Addr, } impl Controller { pub fn build() -> ControllerBuilder { ControllerBuilder::new() } + + fn stop_server(&mut self) { + drop(self.server.take()) + } } /// Controller builder used to set optional parameters using the builder-pattern. @@ -89,8 +93,8 @@ impl ControllerBuilder { .start()?; let controller = Controller { - _app: app, - _server: server, + server: Some(server), + app, }; Ok(controller.start()) @@ -110,7 +114,17 @@ impl Message for Shutdown { impl Handler for Controller { type Result = (); - fn handle(&mut self, _: Shutdown, _ctx: &mut Self::Context) -> Self::Result { - System::current().stop(); + fn handle(&mut self, _msg: Shutdown, ctx: &mut Self::Context) -> Self::Result { + self.stop_server(); + self.app + .send(app::Stop) + .map_err(|_| log::error!("couldn't stop application")) + .and_then(|_| { + log::info!("shutting down system!"); + System::current().stop(); + Ok(()) + }) + .into_actor(self) + .spawn(ctx); } } diff --git a/wallet/src/actors/storage/builder.rs b/wallet/src/actors/storage/builder.rs index f5e2d4efe..8a666d7f9 100644 --- a/wallet/src/actors/storage/builder.rs +++ b/wallet/src/actors/storage/builder.rs @@ -1,21 +1,14 @@ -use std::env; -use std::path::PathBuf; -use std::sync::Arc; - use actix::prelude::*; use failure::Error; use super::Storage; use crate::storage; -pub struct Builder<'a> { - options: Option, - path: Option, - name: Option<&'a str>, +pub struct Builder { params: storage::Params, } -impl<'a> Builder<'a> { +impl Builder { pub fn new() -> Self { let params = storage::Params { encrypt_hash_iterations: 10_000, @@ -23,50 +16,15 @@ impl<'a> Builder<'a> { encrypt_salt_length: 32, }; - Self { - params, - path: None, - name: None, - options: None, - } - } - /// Create a new instance of the Storage actor using the given database options. - pub fn with_options(mut self, options: rocksdb::Options) -> Self { - self.options = Some(options); - self - } - - /// Set the path where to store the database files. - pub fn with_path(mut self, path: PathBuf) -> Self { - self.path = Some(path); - self - } - - /// Set the filename of the database. - pub fn with_file_name(mut self, name: &'a str) -> Self { - self.name = Some(name); - self + Self { params } } /// Start an instance of the actor inside a SyncArbiter. pub fn start(self) -> Result, Error> { - let mut options = self.options.unwrap_or_default(); - options.set_merge_operator("merge operator", storage::storage_merge_operator, None); - // From rocksdb docs: every store to stable storage will issue a fsync. This parameter - // should be set to true while storing data to filesystem like ext3 that can lose files - // after a reboot. - options.set_use_fsync(true); - let path = self.path.map_or_else(env::current_dir, Ok)?; - let file_name = self.name.unwrap_or_else(|| "witnet_wallets.db"); - let db = rocksdb::DB::open(&options, path.join(file_name)) - .map_err(storage::Error::OpenDbFailed)?; - let db_ref = Arc::new(db); - let params_ref = Arc::new(self.params); - // Spawn one thread with the storage actor (because is blocking). Do not use more than one // thread, otherwise you'll receive and error because RocksDB only allows one connection at a // time. - let addr = SyncArbiter::start(1, move || Storage::new(params_ref.clone(), db_ref.clone())); + let addr = SyncArbiter::start(1, move || Storage::new(self.params.clone())); Ok(addr) } diff --git a/wallet/src/actors/storage/handlers/create_wallet.rs b/wallet/src/actors/storage/handlers/create_wallet.rs index 5a11e11af..684601e78 100644 --- a/wallet/src/actors/storage/handlers/create_wallet.rs +++ b/wallet/src/actors/storage/handlers/create_wallet.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use actix::prelude::*; use witnet_protected::ProtectedString; @@ -6,6 +8,7 @@ use crate::actors::storage::Storage; use crate::{storage::Error, wallet}; pub struct CreateWallet( + pub Arc, /// Wallet to save pub wallet::Wallet, /// Encryption password @@ -21,9 +24,9 @@ impl Handler for Storage { fn handle( &mut self, - CreateWallet(wallet, password): CreateWallet, + CreateWallet(db, wallet, password): CreateWallet, _ctx: &mut Self::Context, ) -> Self::Result { - self.create_wallet(wallet, password) + self.create_wallet(db.as_ref(), wallet, password) } } diff --git a/wallet/src/actors/storage/handlers/flush.rs b/wallet/src/actors/storage/handlers/flush.rs new file mode 100644 index 000000000..225868f17 --- /dev/null +++ b/wallet/src/actors/storage/handlers/flush.rs @@ -0,0 +1,21 @@ +use std::sync::Arc; + +use actix::prelude::*; + +use crate::actors::storage::Storage; +use crate::storage::Error; + +pub struct Flush(pub Arc); + +impl Message for Flush { + type Result = Result<(), Error>; +} + +impl Handler for Storage { + type Result = ::Result; + + fn handle(&mut self, Flush(db): Flush, _ctx: &mut Self::Context) -> Self::Result { + log::info!("flushing storage"); + self.flush(db.as_ref()) + } +} diff --git a/wallet/src/actors/storage/handlers/wallet_infos.rs b/wallet/src/actors/storage/handlers/get_wallet_infos.rs similarity index 59% rename from wallet/src/actors/storage/handlers/wallet_infos.rs rename to wallet/src/actors/storage/handlers/get_wallet_infos.rs index 951a5ce98..9ec3a7b17 100644 --- a/wallet/src/actors/storage/handlers/wallet_infos.rs +++ b/wallet/src/actors/storage/handlers/get_wallet_infos.rs @@ -1,10 +1,12 @@ +use std::sync::Arc; + use actix::prelude::*; use crate::actors::storage::Storage; use crate::{storage::Error, wallet}; /// Get the list of created wallets along with their ids -pub struct GetWalletInfos; +pub struct GetWalletInfos(pub Arc); impl Message for GetWalletInfos { type Result = Result, Error>; @@ -13,7 +15,11 @@ impl Message for GetWalletInfos { impl Handler for Storage { type Result = Result, Error>; - fn handle(&mut self, _msg: GetWalletInfos, _ctx: &mut Self::Context) -> Self::Result { - self.get_wallet_infos() + fn handle( + &mut self, + GetWalletInfos(db): GetWalletInfos, + _ctx: &mut Self::Context, + ) -> Self::Result { + self.get_wallet_infos(db.as_ref()) } } diff --git a/wallet/src/actors/storage/handlers/mod.rs b/wallet/src/actors/storage/handlers/mod.rs index bae8273b2..40300032f 100644 --- a/wallet/src/actors/storage/handlers/mod.rs +++ b/wallet/src/actors/storage/handlers/mod.rs @@ -1,5 +1,7 @@ mod create_wallet; -mod wallet_infos; +mod flush; +mod get_wallet_infos; pub use create_wallet::*; -pub use wallet_infos::*; +pub use flush::*; +pub use get_wallet_infos::*; diff --git a/wallet/src/actors/storage/mod.rs b/wallet/src/actors/storage/mod.rs index cc17e233a..d0a125bf8 100644 --- a/wallet/src/actors/storage/mod.rs +++ b/wallet/src/actors/storage/mod.rs @@ -2,9 +2,9 @@ //! //! It is charge of managing the connection to the key-value database. This actor is blocking so it //! must be used with a `SyncArbiter`. -use std::sync::Arc; use actix::prelude::*; +use rocksdb::DB; use witnet_protected::ProtectedString; @@ -15,32 +15,27 @@ pub mod handlers; pub use handlers::*; -/// Expose options for tunning the database. -pub type Options = rocksdb::Options; - /// Storage actor. pub struct Storage { - /// Holds the wallets ids in plain text, and the wallets information encrypted with a password. - db: Arc, - params: Arc, + params: storage::Params, } impl Storage { - pub fn build<'a>() -> builder::Builder<'a> { + pub fn build() -> builder::Builder { builder::Builder::new() } - pub fn new(params: Arc, db: Arc) -> Self { - Self { db, params } + pub fn new(params: storage::Params) -> Self { + Self { params } } - pub fn get_wallet_infos(&self) -> Result, storage::Error> { - let ids = self.get_wallet_ids()?; + pub fn get_wallet_infos(&self, db: &DB) -> Result, storage::Error> { + let ids = self.get_wallet_ids(db)?; let len = ids.len(); let infos = ids .into_iter() .try_fold(Vec::with_capacity(len), |mut acc, id| { - let info = storage::get(self.db.as_ref(), storage::keys::wallet_info(id.as_ref()))?; + let info = storage::get(db, storage::keys::wallet_info(id.as_ref()))?; acc.push(info); Ok(acc) @@ -49,12 +44,13 @@ impl Storage { Ok(infos) } - pub fn get_wallet_ids(&self) -> Result, storage::Error> { - storage::get_default(self.db.as_ref(), storage::keys::wallets()) + pub fn get_wallet_ids(&self, db: &DB) -> Result, storage::Error> { + storage::get_default(db, storage::keys::wallets()) } pub fn create_wallet( &self, + db: &DB, wallet: wallet::Wallet, password: ProtectedString, ) -> Result<(), storage::Error> { @@ -66,17 +62,29 @@ impl Storage { storage::put( &mut batch, storage::keys::wallet(id), - &storage::encrypt(self.params.as_ref(), password.as_ref(), &wallet.content)?, + &storage::encrypt(&self.params, password.as_ref(), &wallet.content)?, )?; - storage::write(self.db.as_ref(), batch)?; + storage::write(db, batch)?; Ok(()) } + + fn flush(&self, db: &DB) -> Result<(), storage::Error> { + storage::flush(db) + } } impl Actor for Storage { type Context = SyncContext; + + fn started(&mut self, _ctx: &mut Self::Context) { + log::trace!("storage actor started"); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + log::trace!("storage actor stopped"); + } } impl Supervised for Storage {} diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 4768290d1..4363831e6 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -34,7 +34,6 @@ pub fn run(conf: Config) -> Result<(), Error> { .start()?; signal::ctrl_c(move || { - log::info!("Shutting down"); controller.do_send(actors::controller::Shutdown); }); diff --git a/wallet/src/storage/mod.rs b/wallet/src/storage/mod.rs index 25f3786ff..834abf485 100644 --- a/wallet/src/storage/mod.rs +++ b/wallet/src/storage/mod.rs @@ -11,6 +11,7 @@ pub mod keys; pub use error::Error; /// Encryption parameters used by the encryption function. +#[derive(Clone)] pub struct Params { pub(crate) encrypt_hash_iterations: u32, pub(crate) encrypt_iv_length: usize, @@ -64,6 +65,13 @@ pub fn write(db: &rocksdb::DB, batch: rocksdb::WriteBatch) -> Result<(), error:: db.write(batch).map_err(error::Error::DbOpFailed) } +/// Flush database. +pub fn flush(db: &rocksdb::DB) -> Result<(), error::Error> { + let mut opts = rocksdb::FlushOptions::default(); + opts.set_wait(true); + db.flush_opt(&opts).map_err(error::Error::DbOpFailed) +} + /// Generate an encryption key. fn gen_key(password: &[u8], salt: &[u8], iter_count: u32) -> Protected { pbkdf2_sha256(password, salt, iter_count)