Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Update exit-future and make sc-cli compile on wasm #4289

Merged
merged 10 commits into from Dec 4, 2019
38 changes: 21 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions client/cli/Cargo.toml
Expand Up @@ -16,9 +16,8 @@ time = "0.1.42"
ansi_term = "0.12.1"
lazy_static = "1.4.0"
app_dirs = "1.2.1"
tokio = "0.1.22"
tokio = "0.2.1"
futures = { version = "0.3.1", features = ["compat"] }
futures01 = "0.1.29"
fdlimit = "0.1.1"
serde_json = "1.0.41"
panic-handler = { package = "sp-panic-handler", path = "../../primitives/panic-handler" }
Expand All @@ -33,9 +32,11 @@ sc-telemetry = { path = "../telemetry" }
keyring = { package = "sp-keyring", path = "../../primitives/keyring" }
names = "0.11.0"
structopt = "0.3.3"
rpassword = "4.0.1"
sc-tracing = { package = "sc-tracing", path = "../tracing" }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
rpassword = "4.0.1"

[dev-dependencies]
tempfile = "3.1.0"

Expand Down
55 changes: 31 additions & 24 deletions client/cli/src/lib.rs
Expand Up @@ -43,7 +43,7 @@ use primitives::H256;

use std::{
io::{Write, Read, Seek, Cursor, stdin, stdout, ErrorKind}, iter, fs::{self, File},
net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, str::FromStr,
net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, str::FromStr, pin::Pin, task::Poll
};

use names::{Generator, Name};
Expand All @@ -61,8 +61,7 @@ pub use traits::{GetLogFilter, AugmentClap};
use app_dirs::{AppInfo, AppDataType};
use log::info;
use lazy_static::lazy_static;
use futures::{Future, FutureExt, TryFutureExt};
use futures01::{Async, Future as _};
use futures::{Future, compat::Future01CompatExt, executor::block_on};
use sc_telemetry::TelemetryEndpoints;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::Block as BlockT;
Expand Down Expand Up @@ -396,23 +395,23 @@ impl<'a> ParseAndPrepareExport<'a> {
// Note: while we would like the user to handle the exit themselves, we handle it here
// for backwards compatibility reasons.
let (exit_send, exit_recv) = std::sync::mpsc::channel();
let exit = exit.into_exit()
.map(|_| Ok::<_, ()>(()))
.compat();
let exit = exit.into_exit();
std::thread::spawn(move || {
let _ = exit.wait();
block_on(exit);
let _ = exit_send.send(());
});

let mut export_fut = builder(config)?.export_blocks(file, from.into(), to.map(Into::into), json);
let fut = futures01::future::poll_fn(|| {
let mut export_fut = builder(config)?
.export_blocks(file, from.into(), to.map(Into::into), json)
.compat();
let fut = futures::future::poll_fn(|cx| {
if exit_recv.try_recv().is_ok() {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
export_fut.poll()
Pin::new(&mut export_fut).poll(cx)
});

let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(fut)?;
Ok(())
}
Expand Down Expand Up @@ -455,23 +454,23 @@ impl<'a> ParseAndPrepareImport<'a> {
// Note: while we would like the user to handle the exit themselves, we handle it here
// for backwards compatibility reasons.
let (exit_send, exit_recv) = std::sync::mpsc::channel();
let exit = exit.into_exit()
.map(|_| Ok::<_, ()>(()))
.compat();
let exit = exit.into_exit();
std::thread::spawn(move || {
let _ = exit.wait();
block_on(exit);
let _ = exit_send.send(());
});

let mut import_fut = builder(config)?.import_blocks(file, false);
let fut = futures01::future::poll_fn(|| {
let mut import_fut = builder(config)?
.import_blocks(file, false)
.compat();
let fut = futures::future::poll_fn(|cx| {
if exit_recv.try_recv().is_ok() {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
import_fut.poll()
Pin::new(&mut import_fut).poll(cx)
});

let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(fut)?;
Ok(())
}
Expand Down Expand Up @@ -513,8 +512,10 @@ impl<'a> CheckBlock<'a> {
};

let start = std::time::Instant::now();
let check = builder(config)?.check_block(block_id);
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
let check = builder(config)?
.check_block(block_id)
.compat();
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(check)?;
println!("Completed in {} ms.", start.elapsed().as_millis());
Ok(())
Expand Down Expand Up @@ -719,6 +720,7 @@ fn fill_network_configuration(
Ok(())
}

#[cfg(not(target_os = "unknown"))]
fn input_keystore_password() -> Result<String, String> {
rpassword::read_password_from_tty(Some("Keystore password: "))
.map_err(|e| format!("{:?}", e))
Expand All @@ -730,7 +732,12 @@ fn fill_config_keystore_password<C, G, E>(
cli: &RunCmd,
) -> Result<(), String> {
config.keystore_password = if cli.password_interactive {
Some(input_keystore_password()?.into())
#[cfg(not(target_os = "unknown"))]
{
Some(input_keystore_password()?.into())
}
#[cfg(target_os = "unknown")]
None
} else if let Some(ref file) = cli.password_filename {
Some(fs::read_to_string(file).map_err(|e| format!("{}", e))?.into())
} else if let Some(ref password) = cli.password {
Expand Down
17 changes: 14 additions & 3 deletions client/finality-grandpa/src/communication/mod.rs
Expand Up @@ -289,7 +289,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
on_exit: impl Future<Item = (), Error = ()> + Clone + Send + 'static,
on_exit: impl futures03::Future<Output = ()> + Clone + Send + Unpin + 'static,
) -> (
Self,
impl Future<Item = (), Error = ()> + Send + 'static,
Expand Down Expand Up @@ -350,9 +350,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
// lazily spawn these jobs onto their own tasks. the lazy future has access
// to tokio globals, which aren't available outside.
let mut executor = tokio_executor::DefaultExecutor::current();
executor.spawn(Box::new(rebroadcast_job.select(on_exit.clone()).then(|_| Ok(()))))

use futures03::{FutureExt, TryFutureExt};

let rebroadcast_job = rebroadcast_job
.select(on_exit.clone().map(Ok).compat())
.then(|_| Ok(()));

let reporting_job = reporting_job
.select(on_exit.clone().map(Ok).compat())
.then(|_| Ok(()));

executor.spawn(Box::new(rebroadcast_job))
.expect("failed to spawn grandpa rebroadcast job task");
executor.spawn(Box::new(reporting_job.select(on_exit.clone()).then(|_| Ok(()))))
executor.spawn(Box::new(reporting_job))
.expect("failed to spawn grandpa reporting job task");
Ok(())
});
Expand Down
11 changes: 5 additions & 6 deletions client/finality-grandpa/src/communication/tests.rs
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use keyring::Ed25519Keyring;
use codec::Encode;
use sp_runtime::traits::NumberFor;

use std::{pin::Pin, task::{Context, Poll}};
use crate::environment::SharedVoterSetState;
use fg_primitives::AuthorityList;
use super::gossip::{self, GossipValidator};
Expand Down Expand Up @@ -175,12 +175,11 @@ fn make_test_network() -> (
#[derive(Clone)]
struct Exit;

impl Future for Exit {
type Item = ();
type Error = ();
impl futures03::Future for Exit {
type Output = ();

fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
Poll::Pending
}
}

Expand Down
8 changes: 5 additions & 3 deletions client/finality-grandpa/src/lib.rs
Expand Up @@ -555,7 +555,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
X: Future<Item=(),Error=()> + Clone + Send + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
let GrandpaParams {
config,
Expand Down Expand Up @@ -634,7 +634,9 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
let telemetry_task = telemetry_task
.then(|_| futures::future::empty::<(), ()>());

Ok(voter_work.select(on_exit).select2(telemetry_task).then(|_| Ok(())))
use futures03::{FutureExt, TryFutureExt};

Ok(voter_work.select(on_exit.map(Ok).compat()).select2(telemetry_task).then(|_| Ok(())))
}

/// Future that powers the voter.
Expand Down Expand Up @@ -889,7 +891,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X>(
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
X: Future<Item=(),Error=()> + Clone + Send + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
run_grandpa_voter(grandpa_params)
}
Expand Down
6 changes: 4 additions & 2 deletions client/finality-grandpa/src/observer.rs
Expand Up @@ -155,7 +155,7 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
config: Config,
link: LinkHalf<B, E, Block, RA, SC>,
network: N,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
on_exit: impl futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
) -> ::sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
Expand Down Expand Up @@ -195,7 +195,9 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(

let observer_work = network_startup.and_then(move |()| observer_work);

Ok(observer_work.select(on_exit).map(|_| ()).map_err(|_| ()))
use futures03::{FutureExt, TryFutureExt};

Ok(observer_work.select(on_exit.map(Ok).compat()).map(|_| ()).map_err(|_| ()))
}

/// Future that powers the observer.
Expand Down
10 changes: 5 additions & 5 deletions client/finality-grandpa/src/tests.rs
Expand Up @@ -39,6 +39,7 @@ use sp_runtime::generic::{BlockId, DigestItem};
use primitives::{NativeOrEncoded, ExecutionContext, crypto::Public};
use fg_primitives::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
use state_machine::{backend::InMemory, prove_read, read_proof_check};
use std::{pin::Pin, task};

use authorities::AuthoritySet;
use finality_proof::{FinalityProofProvider, AuthoritySetForFinalityProver, AuthoritySetForFinalityChecker};
Expand Down Expand Up @@ -175,12 +176,11 @@ impl TestNetFactory for GrandpaTestNet {
#[derive(Clone)]
struct Exit;

impl Future for Exit {
type Item = ();
type Error = ();
impl futures03::Future for Exit {
type Output = ();

fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
fn poll(self: Pin<&mut Self>, _: &mut task::Context) -> task::Poll<()> {
task::Poll::Pending
}
}

Expand Down