Skip to content

Commit

Permalink
[service-registry] Refactor MinerClientActor to MinerClientService.
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar committed Sep 14, 2020
1 parent dd0a1a3 commit f9957ca
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 71 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion account/service/src/service.rs
Expand Up @@ -176,7 +176,8 @@ mod tests {
registry.put_shared(account_storage).await?;
let service_ref = registry.registry::<AccountService>().await?;
let account = service_ref.get_default_account().await?;
assert!(account.is_none());
//default account will auto create
assert!(account.is_some());
Ok(())
}
}
1 change: 1 addition & 0 deletions cmd/miner_client/Cargo.toml
Expand Up @@ -35,6 +35,7 @@ indicatif = "0.15"
starcoin-rpc-client = { path = "../../rpc/client" }
starcoin-rpc-api = { path = "../../rpc/api" }
tokio-compat = "0.1.5"
starcoin-service-registry = { path = "../../commons/service-registry" }

[[bin]]
name = "starcoin_miner"
Expand Down
7 changes: 7 additions & 0 deletions cmd/miner_client/src/job_client.rs
Expand Up @@ -4,6 +4,7 @@ use crypto::HashValue;
use futures::stream::BoxStream;
use futures::{stream::StreamExt, TryStreamExt};
use starcoin_rpc_client::RpcClient;
use starcoin_types::genesis_config::ConsensusStrategy;
use starcoin_types::U256;

pub struct JobRpcClient {
Expand All @@ -27,4 +28,10 @@ impl JobClient for JobRpcClient {
fn submit_seal(&self, pow_hash: HashValue, nonce: u64) -> Result<()> {
self.rpc_client.miner_submit(pow_hash, nonce)
}

fn consensus(&self) -> Result<ConsensusStrategy> {
self.rpc_client
.node_info()
.map(|node_info| node_info.consensus)
}
}
2 changes: 2 additions & 0 deletions cmd/miner_client/src/lib.rs
Expand Up @@ -9,12 +9,14 @@ use anyhow::Result;
use crypto::HashValue;
use futures::stream::BoxStream;
use rand::Rng;
use starcoin_types::genesis_config::ConsensusStrategy;
use starcoin_types::U256;
use std::ops::Range;

pub trait JobClient {
fn subscribe(&self) -> Result<BoxStream<Result<(HashValue, U256)>>>;
fn submit_seal(&self, pow_hash: HashValue, nonce: u64) -> Result<()>;
fn consensus(&self) -> Result<ConsensusStrategy>;
}

fn partition_nonce(id: u64, total: u64) -> Range<u64> {
Expand Down
6 changes: 2 additions & 4 deletions cmd/miner_client/src/main.rs
@@ -1,6 +1,6 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0
use starcoin_config::{ConsensusStrategy, MinerClientConfig};
use starcoin_config::MinerClientConfig;
use starcoin_miner_client::job_client::JobRpcClient;
use starcoin_miner_client::miner::MinerClient;
use starcoin_rpc_client::RpcClient;
Expand All @@ -13,8 +13,6 @@ pub struct StarcoinOpt {
pub server: String,
#[structopt(long, short = "n", default_value = "1")]
pub thread_num: u16,
#[structopt(long, short = "c", default_value = "argon")]
pub consensus: ConsensusStrategy,
}

fn main() {
Expand All @@ -31,7 +29,7 @@ fn main() {
let client = RpcClient::connect_websocket(&format!("ws://{}", opts.server), &mut rt).unwrap();
rt.block_on_std(async move {
let job_client = JobRpcClient::new(client);
let mut miner_client = MinerClient::new(config, opts.consensus, job_client);
let mut miner_client = MinerClient::new(config, job_client).unwrap();
miner_client.start().await.unwrap();
});
}
82 changes: 43 additions & 39 deletions cmd/miner_client/src/miner.rs
Expand Up @@ -2,14 +2,15 @@
// SPDX-License-Identifier: Apache-2
use crate::worker::{start_worker, WorkerController, WorkerMessage};
use crate::JobClient;
use actix::{Actor, Arbiter, Context};
use actix_rt::Arbiter;
use anyhow::Result;
use crypto::HashValue;
use futures::channel::mpsc;
use futures::stream::StreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use logger::prelude::*;
use starcoin_config::{ConsensusStrategy, MinerClientConfig};
use starcoin_config::MinerClientConfig;
use starcoin_service_registry::{ActorService, ServiceContext, ServiceFactory};
use starcoin_types::U256;
use std::sync::Mutex;
use std::thread;
Expand All @@ -29,11 +30,8 @@ impl<C> MinerClient<C>
where
C: JobClient,
{
pub fn new(
config: MinerClientConfig,
consensus_strategy: ConsensusStrategy,
job_client: C,
) -> Self {
pub fn new(config: MinerClientConfig, job_client: C) -> Result<Self> {
let consensus_strategy = job_client.consensus()?;
let (nonce_tx, nonce_rx) = mpsc::unbounded();
let (worker_controller, pb) = if config.enable_stderr {
let mp = MultiProgress::new();
Expand All @@ -48,13 +46,13 @@ where
let worker_controller = start_worker(&config, consensus_strategy, nonce_tx, None);
(worker_controller, None)
};
Self {
Ok(Self {
nonce_rx,
worker_controller,
job_client,
pb,
num_seals_found: Mutex::new(0),
}
})
}

pub async fn start(&mut self) -> Result<()> {
Expand Down Expand Up @@ -117,51 +115,57 @@ where
}
}

pub struct MinerClientActor<C>
pub struct MinerClientService<C>
where
C: JobClient,
C: JobClient + Send + Unpin + Clone + Sync + 'static,
{
config: MinerClientConfig,
consensus_strategy: ConsensusStrategy,
job_client: C,
}

impl<C> MinerClientActor<C>
where
C: JobClient,
{
pub fn new(
config: MinerClientConfig,
consensus_strategy: ConsensusStrategy,
job_client: C,
) -> Self {
MinerClientActor {
config,
consensus_strategy,
job_client,
}
}
}

impl<C> Actor for MinerClientActor<C>
impl<C> ActorService for MinerClientService<C>
where
C: JobClient + Unpin + 'static + Clone + Send + Sync,
C: JobClient + Send + Unpin + Clone + Sync,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
fn started(&mut self, _ctx: &mut ServiceContext<Self>) {
let config = self.config.clone();
let consensus_strategy = self.consensus_strategy;
let job_client = self.job_client.clone();
let arbiter = Arbiter::new();
let fut = async move {
let mut miner_cli = MinerClient::new(config, consensus_strategy, job_client);
miner_cli.start().await.unwrap();
let mut miner_cli = match MinerClient::new(config, job_client) {
Err(e) => {
error!("Create MinerClient error: {:?}", e);
return;
}
Ok(cli) => cli,
};
if let Err(e) = miner_cli.start().await {
error!("Start MinerClient error: {:?}", e);
}
};
arbiter.send(Box::pin(fut));
info!("MinerClientActor started");
//FIXME if use cxt.wait, actor can not quit graceful, because MinerClient.start is a loop
//TODO refactor MinerClient, and support graceful quit.
//ctx.wait(fut)
}
}

impl<C> MinerClientService<C>
where
C: JobClient + Send + Unpin + Clone + Sync,
{
pub fn new(config: MinerClientConfig, job_client: C) -> Self {
MinerClientService { config, job_client }
}
}

fn stopped(&mut self, _ctx: &mut Self::Context) {
info!("MinerClientActor stopped");
impl<C> ServiceFactory<Self> for MinerClientService<C>
where
C: JobClient + Send + Unpin + Clone + Sync,
{
fn create(ctx: &mut ServiceContext<MinerClientService<C>>) -> Result<MinerClientService<C>> {
let config = ctx.get_shared::<MinerClientConfig>()?;
let job_client = ctx.get_shared::<C>()?;
Ok(Self::new(config, job_client))
}
}
2 changes: 1 addition & 1 deletion commons/service-registry/src/service.rs
Expand Up @@ -17,7 +17,7 @@ use std::fmt::Debug;
use std::time::Duration;

#[allow(unused_variables)]
pub trait ActorService: Send + Unpin + Sized {
pub trait ActorService: Send + Sized {
fn service_name() -> &'static str {
type_name::<Self>()
}
Expand Down
6 changes: 3 additions & 3 deletions miner/src/create_block_template/test_create_block_template.rs
Expand Up @@ -153,11 +153,11 @@ fn test_new_head() {

let mut master_inner = Inner::new(
node_config.net(),
storage.clone(),
storage,
genesis_id,
txpool.clone(),
txpool,
None,
miner_account.clone(),
miner_account,
)
.unwrap();

Expand Down
10 changes: 8 additions & 2 deletions miner/src/job_bus_client.rs
Expand Up @@ -7,16 +7,18 @@ use futures::executor::block_on;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use starcoin_miner_client::JobClient;
use starcoin_vm_types::genesis_config::ConsensusStrategy;
use types::U256;

#[derive(Clone)]
pub struct JobBusClient {
bus: Addr<BusActor>,
consensus: ConsensusStrategy,
}

impl JobBusClient {
pub fn new(bus: Addr<BusActor>) -> Self {
Self { bus }
pub fn new(bus: Addr<BusActor>, consensus: ConsensusStrategy) -> Self {
Self { bus, consensus }
}
}

Expand All @@ -35,4 +37,8 @@ impl JobClient for JobBusClient {
let bus = self.bus.clone();
block_on(async move { bus.broadcast(SubmitSealEvent::new(pow_hash, nonce)).await })
}

fn consensus(&self) -> Result<ConsensusStrategy> {
Ok(self.consensus)
}
}
2 changes: 1 addition & 1 deletion miner/src/lib.rs
Expand Up @@ -11,7 +11,7 @@ use futures::prelude::*;
use logger::prelude::*;
use starcoin_config::ConsensusStrategy;
use starcoin_config::NodeConfig;
pub use starcoin_miner_client::miner::{MinerClient, MinerClientActor};
pub use starcoin_miner_client::miner::{MinerClient, MinerClientService};
use starcoin_service_registry::{
ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceRef,
};
Expand Down
32 changes: 13 additions & 19 deletions node/src/node.rs
Expand Up @@ -17,7 +17,7 @@ use starcoin_logger::LoggerHandle;
use starcoin_miner::headblock_pacemaker::HeadBlockPacemaker;
use starcoin_miner::job_bus_client::JobBusClient;
use starcoin_miner::ondemand_pacemaker::OndemandPacemaker;
use starcoin_miner::{CreateBlockTemplateService, MinerClientActor, MinerService};
use starcoin_miner::{CreateBlockTemplateService, MinerClientService, MinerService};
use starcoin_network::{NetworkAsyncService, PeerMsgBroadcasterActor};
use starcoin_network_rpc_api::gen_client::get_rpc_info;
use starcoin_node_api::message::{NodeRequest, NodeResponse};
Expand Down Expand Up @@ -48,7 +48,6 @@ pub struct NodeStartedHandle {
pub chain_actor: ChainActorRef,
pub sync_actor: Addr<SyncActor<NetworkAsyncService>>,
pub rpc_actor: Addr<RpcActor>,
pub miner_client: Option<Addr<MinerClientActor<JobBusClient>>>,
pub chain_notifier: Addr<ChainNotifyHandlerActor>,
pub network: NetworkAsyncService,
pub network_rpc_server: Addr<NetworkRpcServer>,
Expand All @@ -64,7 +63,6 @@ pub struct Node {
pub chain_actor: ChainActorRef,
pub sync_actor: Addr<SyncActor<NetworkAsyncService>>,
pub rpc_actor: Addr<RpcActor>,
pub miner_client: Option<Addr<MinerClientActor<JobBusClient>>>,
pub chain_notifier: Addr<ChainNotifyHandlerActor>,
pub network: NetworkAsyncService,
pub network_rpc_server: Addr<NetworkRpcServer>,
Expand Down Expand Up @@ -295,20 +293,18 @@ pub async fn start(
registry.registry::<MinerService>().await?;

let miner_client_config = config.miner.client_config.clone();
let consensus_strategy = config.net().consensus();
let job_client = JobBusClient::new(bus.clone());
let miner_client = if config.miner.enable_miner_client {
Some(
Arbiter::new()
.exec(move || {
MinerClientActor::new(miner_client_config, consensus_strategy, job_client)
.start()
})
.await?,
)
} else {
None
};
registry.put_shared(miner_client_config).await?;
let job_client = JobBusClient::new(bus.clone(), config.net().consensus());
registry.put_shared(job_client).await?;
registry
.registry::<MinerClientService<JobBusClient>>()
.await?;
if !config.miner.enable_miner_client {
info!("Config.miner.enable_miner_client is false, so stop MinerClientService.");
registry
.stop_service(MinerClientService::<JobBusClient>::service_name())
.await?;
}

let (json_rpc, _io_handler) = RpcActor::launch(
config.clone(),
Expand All @@ -333,7 +329,6 @@ pub async fn start(
chain_actor: chain.clone(),
sync_actor: sync.clone(),
rpc_actor: json_rpc.clone(),
miner_client: miner_client.clone(),
chain_notifier: chain_notify_handler.clone(),
network: network.clone(),
network_rpc_server: network_rpc_server.clone(),
Expand All @@ -351,7 +346,6 @@ pub async fn start(
chain_actor: chain,
sync_actor: sync,
rpc_actor: json_rpc,
miner_client,
chain_notifier: chain_notify_handler,
network,
network_rpc_server,
Expand Down
6 changes: 5 additions & 1 deletion rpc/api/src/node/mod.rs
Expand Up @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use starcoin_config::ChainNetwork;
use starcoin_consensus::Consensus;
use starcoin_types::peer_info::PeerInfo;
use starcoin_vm_types::genesis_config::ConsensusStrategy;
use std::collections::HashMap;

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -18,16 +19,19 @@ pub struct NodeInfo {
pub peer_info: PeerInfo,
pub self_address: String,
pub net: ChainNetwork,
pub consensus: ConsensusStrategy,
pub now: u64,
}

impl NodeInfo {
pub fn new(peer_info: PeerInfo, self_address: String, net: ChainNetwork) -> Self {
let now = net.consensus().now();
let consensus = net.consensus();
let now = consensus.now();
Self {
peer_info,
self_address,
net,
consensus,
now,
}
}
Expand Down

0 comments on commit f9957ca

Please sign in to comment.