diff --git a/Cargo.toml b/Cargo.toml index b73edc277..425bfdbec 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,12 +6,27 @@ edition = "2018" [dependencies] env_logger = { version = "0.7", optional = true } -ws = { version = "0.7", optional = true } hex = { version = "0.4", default-features=false, optional = true } log = { version = "0.4", optional = true } serde = { version = "1.0", optional = true, features = ["derive"] } serde_json = { version = "1.0", optional = true } primitive-types = { version = "0.6", default-features = false, features = ["codec"] } +wasm-bindgen = "=0.2.55" +futures-signals = {git="https://github.com/alanpoon/rust-signals.git",branch="master"} + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +ws = { version = "0.7", optional = true } + +[target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] +version = "0.3.19" +features = [ + 'WebSocket', + 'Response', + 'EventTarget', + 'Event', + 'MessageEvent', + 'ErrorEvent' +] [dependencies.primitives] git = "https://github.com/paritytech/substrate" @@ -80,11 +95,6 @@ rev = "3bf9540e72df5ecb3955845764dfee7dcdbb26b5" package = "sr-std" default-features = false -[dev-dependencies.node_runtime] -git = "https://github.com/paritytech/substrate" -rev = "3bf9540e72df5ecb3955845764dfee7dcdbb26b5" -package = "node-runtime" - [dependencies.node_primitives] git = "https://github.com/paritytech/substrate" rev = "3bf9540e72df5ecb3955845764dfee7dcdbb26b5" @@ -98,6 +108,7 @@ package = "pallet-contracts" [dev-dependencies] wabt = "0.9.0" +futures = "0.3.1" [dev-dependencies.keyring] git = "https://github.com/paritytech/substrate" @@ -120,7 +131,6 @@ std = [ "balances/std", "runtime_version/std", "metadata/std", - "runtime_io/std", "indices/std", "primitives/std", "serde/std", diff --git a/src/examples/example_generic_extrinsic.rs b/src/examples/example_generic_extrinsic.rs index f182d2f4e..08ab141da 100755 --- a/src/examples/example_generic_extrinsic.rs +++ b/src/examples/example_generic_extrinsic.rs @@ -39,11 +39,11 @@ fn main() { // call Balances::transfer // the names are given as strings - let xt: UncheckedExtrinsicV4<_> = compose_extrinsic!( + let xt: UncheckedExtrinsicV3<_, sr25519::Pair> = compose_extrinsic!( api.clone(), "Balances", "transfer", - GenericAddress::from(to.clone()), + GenericAddress::from(to.0.clone()), Compact(42 as u128) ); diff --git a/src/examples/example_get_storage.rs b/src/examples/example_get_storage.rs index 464464b54..14e5c7cba 100755 --- a/src/examples/example_get_storage.rs +++ b/src/examples/example_get_storage.rs @@ -23,22 +23,22 @@ use substrate_api_client::{ Api, utils::hexstr_to_u256, }; - -fn main() { +use futures::executor::block_on; +async fn run (){ env_logger::init(); let url = get_node_url_from_cli(); - let mut api = Api::new(format!("ws://{}", url)); + let mut api = Api::new(format!("ws://{}", url)).await; // get some plain storage value - let result_str = api.get_storage("Balances", "TotalIssuance", None).unwrap(); + let result_str = api.get_storage("Balances", "TotalIssuance", None).await.unwrap(); let result = hexstr_to_u256(result_str).unwrap(); println!("[+] TotalIssuance is {}", result); // get Alice's AccountNonce let accountid = AccountKeyring::Alice.to_account_id(); let result_str = api - .get_storage("System", "AccountNonce", Some(accountid.encode())) + .get_storage("System", "AccountNonce", Some(accountid.encode())).await .unwrap(); let result = hexstr_to_u256(result_str).unwrap(); println!("[+] Alice's Account Nonce is {}", result.low_u32()); @@ -46,14 +46,18 @@ fn main() { // get Alice's AccountNonce with the AccountKey let signer = AccountKeyring::Alice.pair(); let result_str = api - .get_storage("System", "AccountNonce", Some(signer.public().encode())) + .get_storage("System", "AccountNonce", Some(signer.public().encode())).await .unwrap(); let result = hexstr_to_u256(result_str).unwrap(); println!("[+] Alice's Account Nonce is {}", result.low_u32()); // get Alice's AccountNonce with api.get_nonce() api.signer = Some(signer); - println!("[+] Alice's Account Nonce is {}", api.get_nonce().unwrap()); + println!("[+] Alice's Account Nonce is {}", api.get_nonce().await.unwrap()); +} +fn main() { + let future = run(); + block_on(future); } pub fn get_node_url_from_cli() -> String { diff --git a/src/extrinsic/balances.rs b/src/extrinsic/balances.rs index 8f710eacb..6a911c712 100644 --- a/src/extrinsic/balances.rs +++ b/src/extrinsic/balances.rs @@ -40,7 +40,7 @@ where P: Pair, MultiSignature: From, { - pub fn balance_transfer(&self, to: GenericAddress, amount: u128) -> BalanceTransferXt { + pub async fn balance_transfer(&self, to: GenericAddress, amount: u128) -> BalanceTransferXt { compose_extrinsic!( self, BALANCES_MODULE, @@ -50,7 +50,7 @@ where ) } - pub fn balance_set_balance(&self, who: GenericAddress, free_balance: u128, reserved_balance: u128) -> BalanceSetBalanceXt { + pub async fn balance_set_balance(&self, who: GenericAddress, free_balance: u128, reserved_balance: u128) -> BalanceSetBalanceXt { compose_extrinsic!( self, BALANCES_MODULE, diff --git a/src/extrinsic/contract.rs b/src/extrinsic/contract.rs index ccb7eecf5..1e11caf22 100644 --- a/src/extrinsic/contract.rs +++ b/src/extrinsic/contract.rs @@ -51,7 +51,7 @@ where P: Pair, MultiSignature: From, { - pub fn contract_put_code(&self, gas_limit: u64, code: Vec) -> ContractPutCodeXt { + pub async fn contract_put_code(&self, gas_limit: u64, code: Vec) -> ContractPutCodeXt { compose_extrinsic!( &self, CONTRACTS_MODULE, @@ -61,7 +61,7 @@ where ) } - pub fn contract_create( + pub async fn contract_create( &self, endowment: u128, gas_limit: u64, @@ -80,7 +80,7 @@ where ) } - pub fn contract_call( + pub async fn contract_call( &self, dest: GenericAddress, value: u128, diff --git a/src/extrinsic/mod.rs b/src/extrinsic/mod.rs index 90ac6b07e..3f821ca67 100644 --- a/src/extrinsic/mod.rs +++ b/src/extrinsic/mod.rs @@ -129,7 +129,7 @@ macro_rules! compose_extrinsic { $crate::compose_extrinsic_offline!( signer, call.clone(), - $api.get_nonce().unwrap(), + $api.get_nonce().await.unwrap(), $api.genesis_hash, $api.runtime_version.spec_version ) diff --git a/src/lib.rs b/src/lib.rs index 2139ce7e0..d4723ac70 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,11 +19,6 @@ use rstd::prelude::*; -#[cfg(feature = "std")] -use std::sync::mpsc::channel; -#[cfg(feature = "std")] -use std::sync::mpsc::Sender as ThreadOut; - use codec::{Decode, Encode}; #[cfg(feature = "std")] @@ -33,9 +28,6 @@ use metadata::RuntimeMetadataPrefixed; use primitives::H256 as Hash; use primitives::crypto::Pair; -#[cfg(feature = "std")] -use ws::Result as WsResult; - #[cfg(feature = "std")] use node_metadata::NodeMetadata; @@ -59,6 +51,8 @@ pub mod utils; pub mod rpc; use runtime_primitives::{AccountId32, MultiSignature}; +use futures_signals::signal::Mutable; +use futures_signals::signal::SignalExt; #[cfg(feature = "std")] #[derive(Clone)] @@ -80,15 +74,15 @@ impl

Api

P: Pair, MultiSignature: From, { - pub fn new(url: String) -> Self { - let genesis_hash = Self::_get_genesis_hash(url.clone()); + pub async fn new(url: String) -> Self { + let genesis_hash = Self::_get_genesis_hash(url.clone()).await; info!("Got genesis hash: {:?}", genesis_hash); - let meta = Self::_get_metadata(url.clone()); + let meta = Self::_get_metadata(url.clone()).await; let metadata = node_metadata::parse_metadata(&meta); info!("Metadata: {:?}", metadata); - let runtime_version = Self::_get_runtime_version(url.clone()); + let runtime_version = Self::_get_runtime_version(url.clone()).await; info!("Runtime Version: {:?}", runtime_version); Self { @@ -100,154 +94,154 @@ impl

Api

} } - pub fn set_signer(mut self, signer: P) -> Self { + pub async fn set_signer(mut self, signer: P) -> Self { self.signer = Some(signer); self } - fn _get_genesis_hash(url: String) -> Hash { + async fn _get_genesis_hash(url: String) -> Hash { let jsonreq = json_req::chain_get_block_hash(); - let genesis_hash_str = Self::_get_request(url, jsonreq.to_string()) + let genesis_hash_str = Self::_get_request(url, jsonreq.to_string()).await .expect("Fetching genesis hash from node failed"); hexstr_to_hash(genesis_hash_str).unwrap() } - fn _get_runtime_version(url: String) -> RuntimeVersion { + async fn _get_runtime_version(url: String) -> RuntimeVersion { let jsonreq = json_req::state_get_runtime_version(); - let version_str = Self::_get_request(url, jsonreq.to_string()).unwrap(); //expect("Fetching runtime version from node failed"); + let version_str = Self::_get_request(url, jsonreq.to_string()).await.unwrap(); //expect("Fetching runtime version from node failed"); debug!("got the following runtime version (raw): {}", version_str); serde_json::from_str(&version_str).unwrap() } - fn _get_metadata(url: String) -> RuntimeMetadataPrefixed { + async fn _get_metadata(url: String) -> RuntimeMetadataPrefixed { let jsonreq = json_req::state_get_metadata(); - let metadata_str = Self::_get_request(url, jsonreq.to_string()).unwrap(); + let metadata_str = Self::_get_request(url, jsonreq.to_string()).await.unwrap(); let _unhex = hexstr_to_vec(metadata_str).unwrap(); let mut _om = _unhex.as_slice(); RuntimeMetadataPrefixed::decode(&mut _om).unwrap() } - fn _get_nonce(url: String, signer: [u8; 32]) -> u32 { + async fn _get_nonce(url: String, signer: [u8; 32]) -> u32 { let result_str = Self::_get_storage( url, "System", "AccountNonce", Some(signer.encode()), - ) + ).await .unwrap(); let nonce = hexstr_to_u256(result_str).unwrap_or(U256::from_little_endian(&[0, 0, 0, 0])); nonce.low_u32() } - fn _get_storage( + async fn _get_storage( url: String, module: &str, storage_key_name: &str, param: Option>, - ) -> WsResult { + ) -> Result { let keyhash = storage_key_hash(module, storage_key_name, param); debug!("with storage key: {}", keyhash); let jsonreq = json_req::state_get_storage(&keyhash); - Self::_get_request(url, jsonreq.to_string()) + Self::_get_request(url, jsonreq.to_string()).await } - fn _get_storage_double_map( + async fn _get_storage_double_map( url: String, module: &str, storage_key_name: &str, first: Vec, second: Vec - ) -> WsResult { + ) -> Result { let keyhash = storage_key_hash_double_map(module, storage_key_name, first, second); debug!("with storage key: {}", keyhash); let jsonreq = json_req::state_get_storage(&keyhash); - Self::_get_request(url, jsonreq.to_string()) + Self::_get_request(url, jsonreq.to_string()).await } // low level access - fn _get_request(url: String, jsonreq: String) -> WsResult { - let (result_in, result_out) = channel(); + async fn _get_request(url: String, jsonreq: String) -> Result { + let result_in = Mutable::new("".to_string()); rpc::get(url, jsonreq.clone(), result_in.clone()); - - Ok(result_out.recv().unwrap()) + let result= result_in.signal_cloned().before("".to_string()).await; + Ok(result.unwrap()) } - pub fn get_metadata(&self) -> RuntimeMetadataPrefixed { - Self::_get_metadata(self.url.clone()) + pub async fn get_metadata(&self) -> RuntimeMetadataPrefixed { + Self::_get_metadata(self.url.clone()).await } - pub fn get_spec_version(&self) -> u32 { - Self::_get_runtime_version(self.url.clone()).spec_version + pub async fn get_spec_version(&self) -> u32 { + Self::_get_runtime_version(self.url.clone()).await.spec_version } - pub fn get_genesis_hash(&self) -> Hash { - Self::_get_genesis_hash(self.url.clone()) + pub async fn get_genesis_hash(&self) -> Hash { + Self::_get_genesis_hash(self.url.clone()).await } - pub fn get_nonce(&self) -> Result { + pub async fn get_nonce(&self) -> Result { match &self.signer { Some(key) => { let mut arr: [u8; 32] = Default::default(); arr.clone_from_slice(key.to_owned().public().as_ref()); - Ok(Self::_get_nonce(self.url.clone(), arr)) + Ok(Self::_get_nonce(self.url.clone(), arr).await) }, None => Err("Can't get nonce when no signer is set"), } } - pub fn get_free_balance(&self, address: &AccountId32) -> U256 { + pub async fn get_free_balance(&self, address: &AccountId32) -> U256 { let id: &[u8; 32] = address.as_ref(); let result_str = self - .get_storage("Balances", "FreeBalance", Some(id.to_owned().encode())) + .get_storage("Balances", "FreeBalance", Some(id.to_owned().encode())).await .unwrap(); hexstr_to_u256(result_str).unwrap() } - pub fn get_request(&self, jsonreq: String) -> WsResult { - Self::_get_request(self.url.clone(), jsonreq) + pub async fn get_request(&self, jsonreq: String) -> Result { + Self::_get_request(self.url.clone(), jsonreq).await } - pub fn get_storage( + pub async fn get_storage( &self, storage_prefix: &str, storage_key_name: &str, param: Option>, - ) -> WsResult { - Self::_get_storage(self.url.clone(), storage_prefix, storage_key_name, param) + ) -> Result { + Self::_get_storage(self.url.clone(), storage_prefix, storage_key_name, param).await } - pub fn get_storage_double_map( + pub async fn get_storage_double_map( &self, storage_prefix: &str, storage_key_name: &str, first: Vec, second: Vec, - ) -> WsResult { + ) -> Result { Self::_get_storage_double_map(self.url.clone(), storage_prefix, storage_key_name, - first, second) + first, second).await } - pub fn send_extrinsic(&self, xthex_prefixed: String) -> WsResult { + pub async fn send_extrinsic(&self, xthex_prefixed: String) -> Result { debug!("sending extrinsic: {:?}", xthex_prefixed); let jsonreq = json_req::author_submit_and_watch_extrinsic(&xthex_prefixed).to_string(); - let (result_in, result_out) = channel(); + let result_in = Mutable::new("".to_string()); rpc::send_extrinsic_and_wait_until_finalized( self.url.clone(), jsonreq.clone(), - result_in.clone(), + result_in.clone() ); - - Ok(hexstr_to_hash(result_out.recv().unwrap()).unwrap()) + let result = result_in.signal_cloned().to_future().await; + Ok(hexstr_to_hash(result).unwrap()) } - pub fn subscribe_events(&self, sender: ThreadOut) { + pub async fn subscribe_events(&self, sender: Mutable) { debug!("subscribing to events"); let key = storage_key_hash("System", "Events", None); let jsonreq = json_req::state_subscribe_storage(&key).to_string(); - rpc::start_event_subscriber(self.url.clone(), jsonreq.clone(), sender.clone()); + rpc::start_event_subscriber(self.url.clone(), jsonreq.clone(), sender).await } } diff --git a/src/rpc/client.rs b/src/rpc/client.rs deleted file mode 100644 index 97713ebf7..000000000 --- a/src/rpc/client.rs +++ /dev/null @@ -1,124 +0,0 @@ -/* - Copyright 2019 Supercomputing Systems AG - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -*/ - -use std::sync::mpsc::Sender as ThreadOut; - -use crate::rpc::json_req::REQUEST_TRANSFER; -use log::{debug, error, info}; -use ws::{CloseCode, Handler, Handshake, Message, Result, Sender}; - -pub type OnMessageFn = fn(msg: Message, out: Sender, result: ThreadOut) -> Result<()>; - -pub struct RpcClient { - pub out: Sender, - pub request: String, - pub result: ThreadOut, - pub on_message_fn: OnMessageFn, -} - -impl Handler for RpcClient { - fn on_open(&mut self, _: Handshake) -> Result<()> { - info!("sending request: {}", self.request); - self.out.send(self.request.clone()).unwrap(); - Ok(()) - } - - fn on_message(&mut self, msg: Message) -> Result<()> { - info!("got message"); - debug!("{}", msg); - (self.on_message_fn)(msg, self.out.clone(), self.result.clone()) - } -} - -pub fn on_get_request_msg(msg: Message, out: Sender, result: ThreadOut) -> Result<()> { - let retstr = msg.as_text().unwrap(); - let value: serde_json::Value = serde_json::from_str(retstr).unwrap(); - - result.send(value["result"].to_string()).unwrap(); - out.close(CloseCode::Normal).unwrap(); - Ok(()) -} - -pub fn on_subscription_msg(msg: Message, _out: Sender, result: ThreadOut) -> Result<()> { - let retstr = msg.as_text().unwrap(); - let value: serde_json::Value = serde_json::from_str(retstr).unwrap(); - match value["id"].as_str() { - Some(_idstr) => {} - _ => { - // subscriptions - debug!("no id field found in response. must be subscription"); - debug!("method: {:?}", value["method"].as_str()); - match value["method"].as_str() { - Some("state_storage") => { - let _changes = &value["params"]["result"]["changes"]; - let _res_str = _changes[0][1].as_str().unwrap().to_string(); - result.send(_res_str).unwrap(); - } - _ => error!("unsupported method"), - } - } - }; - Ok(()) -} - -pub fn on_extrinsic_msg(msg: Message, out: Sender, result: ThreadOut) -> Result<()> { - let retstr = msg.as_text().unwrap(); - let value: serde_json::Value = serde_json::from_str(retstr).unwrap(); - match value["id"].as_str() { - Some(idstr) => match idstr.parse::() { - Ok(req_id) => match req_id { - REQUEST_TRANSFER => match value.get("error") { - Some(err) => error!("ERROR: {:?}", err), - _ => debug!("no error"), - }, - _ => debug!("Unknown request id"), - }, - Err(_) => error!("error assigning request id"), - }, - _ => { - // subscriptions - debug!("no id field found in response. must be subscription"); - debug!("method: {:?}", value["method"].as_str()); - match value["method"].as_str() { - Some("author_extrinsicUpdate") => { - match value["params"]["result"].as_str() { - Some(res) => debug!("author_extrinsicUpdate: {}", res), - _ => { - debug!( - "author_extrinsicUpdate: finalized: {}", - value["params"]["result"]["finalized"].as_str().unwrap() - ); - // return result to calling thread - result - .send( - value["params"]["result"]["finalized"] - .as_str() - .unwrap() - .to_string(), - ) - .unwrap(); - // we've reached the end of the flow. return - out.close(CloseCode::Normal).unwrap(); - } - } - } - _ => error!("unsupported method"), - } - } - }; - Ok(()) -} diff --git a/src/rpc/client/mod.rs b/src/rpc/client/mod.rs new file mode 100644 index 000000000..d47c13f28 --- /dev/null +++ b/src/rpc/client/mod.rs @@ -0,0 +1,102 @@ +/* + Copyright 2019 Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +#[cfg(not(target_arch = "wasm32"))] +pub mod ws_client; +#[cfg(not(target_arch = "wasm32"))] +pub use ws_client::{start_rpc_client_thread,RpcClient}; + +#[cfg(target_arch = "wasm32")] +pub mod websys_client; +#[cfg(target_arch = "wasm32")] +pub use websys_client::start_rpc_client_thread; + +pub enum ResultE{ + None, + Close, + S(String), + SClose(String) //send and close +} +pub type OnMessageFn = fn(msg: &str) -> ResultE; +use crate::rpc::json_req::REQUEST_TRANSFER; +use log::{debug, error}; +pub fn on_get_request_msg(msg: &str) -> ResultE { + let value: serde_json::Value = serde_json::from_str(msg).unwrap(); + ResultE::SClose(value["result"].to_string()) +} + +pub fn on_subscription_msg(msg: &str) -> ResultE { + let value: serde_json::Value = serde_json::from_str(msg).unwrap(); + match value["id"].as_str() { + Some(_idstr) => { + ResultE::None + } + _ => { + // subscriptions + debug!("no id field found in response. must be subscription"); + debug!("method: {:?}", value["method"].as_str()); + match value["method"].as_str() { + Some("state_storage") => { + let _changes = &value["params"]["result"]["changes"]; + let _res_str = _changes[0][1].as_str().unwrap().to_string(); + ResultE::S(_res_str) + } + _ => {error!("unsupported method");ResultE::None}, + } + } + } +} + +pub fn on_extrinsic_msg(msg: &str) -> ResultE{ + let value: serde_json::Value = serde_json::from_str(msg).unwrap(); + match value["id"].as_str() { + Some(idstr) => match idstr.parse::() { + Ok(req_id) => match req_id { + REQUEST_TRANSFER => match value.get("error") { + Some(err) => {error!("ERROR: {:?}", err);ResultE::None}, + _ => {debug!("no error");ResultE::None}, + }, + _ => {debug!("Unknown request id");ResultE::None}, + }, + Err(_) => {error!("error assigning request id");ResultE::None}, + }, + _ => { + // subscriptions + debug!("no id field found in response. must be subscription"); + debug!("method: {:?}", value["method"].as_str()); + match value["method"].as_str() { + Some("author_extrinsicUpdate") => { + match value["params"]["result"].as_str() { + Some(res) => {debug!("author_extrinsicUpdate: {}", res);ResultE::None}, + _ => { + debug!( + "author_extrinsicUpdate: finalized: {}", + value["params"]["result"]["finalized"].as_str().unwrap() + ); + // return result to calling thread + ResultE::SClose(value["params"]["result"]["finalized"] + .as_str() + .unwrap() + .to_string()) + } + } + } + _ => {error!("unsupported method");ResultE::None}, + } + } + } +} diff --git a/src/rpc/client/websys_client.rs b/src/rpc/client/websys_client.rs new file mode 100644 index 000000000..fde715930 --- /dev/null +++ b/src/rpc/client/websys_client.rs @@ -0,0 +1,62 @@ +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; +use web_sys::{MessageEvent, WebSocket,ErrorEvent}; +use super::{ResultE,OnMessageFn}; +use futures_signals::signal::Mutable; +macro_rules! console_log { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) +} +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} + +pub fn start_rpc_client_thread( + url: String, + jsonreq: String, + result_in: Mutable, + on_message_fn: OnMessageFn, +) { + let ws = WebSocket::new(&url).unwrap(); + let ws_c = ws.clone(); + let on_message = { + Closure::wrap(Box::new(move |evt: MessageEvent| { + let msgg = evt.data() + .as_string() + .expect("Can't convert received data to a string"); + let res_e = (on_message_fn)(&msgg); + match res_e { + ResultE::None=>{}, + ResultE::Close=>{ + ws_c.close_with_code(1000).unwrap(); + }, + ResultE::S(s)=>{ + result_in.set(s); + }, + ResultE::SClose(s)=>{ + result_in.set(s); + ws_c.close_with_code(1000).unwrap(); + } + } + }) as Box) + }; + + ws.set_onmessage(Some(on_message.as_ref().unchecked_ref())); + on_message.forget(); + let onerror_callback = Closure::wrap(Box::new(move |e: ErrorEvent| { + console_log!("error event: {:?}", e); + }) as Box); + ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); + onerror_callback.forget(); + let cloned_ws = ws.clone(); + let onopen_callback = Closure::wrap(Box::new(move |_| { + match cloned_ws.send_with_str(&jsonreq) { + Ok(_) => console_log!("message successfully sent"), + Err(err) => console_log!("error sending message: {:?}", err), + } + }) as Box); + ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref())); + onopen_callback.forget(); + +} diff --git a/src/rpc/client/ws_client.rs b/src/rpc/client/ws_client.rs new file mode 100644 index 000000000..928103845 --- /dev/null +++ b/src/rpc/client/ws_client.rs @@ -0,0 +1,59 @@ +use ws::connect; + +use ws::{CloseCode,Handler, Result,Message,Sender,Handshake}; +use std::thread; +use super::{ResultE,OnMessageFn}; +use futures_signals::signal::Mutable; + +pub struct RpcClient { + pub out: Sender, + pub request: String, + pub result: Mutable, + pub on_message_fn: OnMessageFn, +} +impl Handler for RpcClient { + fn on_open(&mut self,_: Handshake ) -> Result<()> { + self.out.send(self.request.clone()).unwrap(); + Ok(()) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + let msgg = msg.as_text().unwrap(); + let res_e = (self.on_message_fn)(&msgg); + match res_e { + ResultE::None=>{}, + ResultE::Close=>{ + self.out.close(CloseCode::Normal).unwrap(); + }, + ResultE::S(s)=>{ + self.result.set(s); + }, + ResultE::SClose(s)=>{ + self.result.set(s); + self.out.close(CloseCode::Normal).unwrap(); + } + } + Ok(()) + } +} + + +pub fn start_rpc_client_thread( + url: String, + jsonreq: String, + result_in: Mutable, + on_message_fn: OnMessageFn, +) { + let _client = thread::Builder::new() + .name("client".to_owned()) + .spawn(move || { + connect(url, |out| RpcClient { + out, + request: jsonreq.clone(), + result: result_in.clone(), + on_message_fn, + }) + .unwrap() + }) + .unwrap(); +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 2db45ac2d..78aaf0b1e 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -15,48 +15,25 @@ */ -use std::sync::mpsc::Sender as ThreadOut; -use std::thread; - -use ws::connect; - use client::*; - +use futures_signals::signal::Mutable; mod client; pub mod json_req; -pub fn get(url: String, json_req: String, result_in: ThreadOut) { - start_rpc_client_thread(url, json_req, result_in, on_get_request_msg) +pub fn get(url: String, json_req: String,result_in:Mutable) { + start_rpc_client_thread(url, json_req,result_in, on_get_request_msg) } -pub fn send_extrinsic_and_wait_until_finalized( +pub async fn send_extrinsic_and_wait_until_finalized( url: String, json_req: String, - result_in: ThreadOut, + result_in:Mutable ) { - start_rpc_client_thread(url, json_req, result_in, on_extrinsic_msg) + start_rpc_client_thread(url, json_req,result_in, on_extrinsic_msg) } -pub fn start_event_subscriber(url: String, json_req: String, result_in: ThreadOut) { - start_rpc_client_thread(url, json_req, result_in, on_subscription_msg) +pub async fn start_event_subscriber(url: String, json_req: String,result_in:Mutable) { + start_rpc_client_thread(url, json_req, result_in,on_subscription_msg) } -fn start_rpc_client_thread( - url: String, - jsonreq: String, - result_in: ThreadOut, - on_message_fn: OnMessageFn, -) { - let _client = thread::Builder::new() - .name("client".to_owned()) - .spawn(move || { - connect(url, |out| RpcClient { - out, - request: jsonreq.clone(), - result: result_in.clone(), - on_message_fn, - }) - .unwrap() - }) - .unwrap(); -} +pub use client::start_rpc_client_thread; \ No newline at end of file