Skip to content

Commit

Permalink
fix(validator-node): fix consensus stall after genesis (#3855)
Browse files Browse the repository at this point in the history
Description
---
- Fixes consensus "stall" after genesis
- Clean up state generics using ServiceSpecification 

Motivation and Context
---
After genesis consensus would stay on ViewId(1), this has been fixed

How Has This Been Tested?
---
Manually, minting instructions processed
  • Loading branch information
sdbondi committed Feb 24, 2022
1 parent 0322402 commit 64efeff
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 343 deletions.
Expand Up @@ -36,7 +36,12 @@ use tari_dan_core::{
TariDanPayloadProvider,
},
};
use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStateDbBackendAdapter, SqliteStorageService};
use tari_dan_storage_sqlite::{
SqliteChainBackendAdapter,
SqliteDbFactory,
SqliteStateDbBackendAdapter,
SqliteStorageService,
};

use crate::{
grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient},
Expand All @@ -55,6 +60,7 @@ impl ServiceSpecification for DefaultServiceSpecification {
type AssetProcessor = ConcreteAssetProcessor;
type AssetProxy = ConcreteAssetProxy<Self>;
type BaseNodeClient = GrpcBaseNodeClient;
type ChainDbBackendAdapter = SqliteChainBackendAdapter;
type ChainStorageService = SqliteStorageService;
type CheckpointManager = ConcreteCheckpointManager<Self::WalletClient>;
type CommitteeManager = ConcreteCommitteeManager;
Expand Down
8 changes: 8 additions & 0 deletions dan_layer/core/src/models/view.rs
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::fmt::{Display, Formatter};

use crate::models::ViewId;

// TODO: Encapsulate
Expand All @@ -38,3 +40,9 @@ impl View {
self.view_id
}
}

impl Display for View {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "View(Id: {}, leader: {})", self.view_id.as_u64(), self.is_leader)
}
}
1 change: 1 addition & 0 deletions dan_layer/core/src/services/asset_proxy.rs
Expand Up @@ -115,6 +115,7 @@ impl<TServiceSpecification: ServiceSpecification<Addr = PublicKey>> ConcreteAsse
method: String,
args: Vec<u8>,
) -> Result<Option<Vec<u8>>, DigitalAssetError> {
debug!(target: LOG_TARGET, "Forwarding '{}' instruction to {}", member, method);
let mut client = self.validator_node_client_factory.create_client(member);
let resp = client
.invoke_method(asset_public_key, template_id, method, args)
Expand Down
11 changes: 9 additions & 2 deletions dan_layer/core/src/services/service_specification.rs
Expand Up @@ -37,7 +37,7 @@ use crate::{
SigningService,
ValidatorNodeClientFactory,
},
storage::{state::StateDbBackendAdapter, ChainStorageService, DbFactory},
storage::{chain::ChainDbBackendAdapter, state::StateDbBackendAdapter, ChainStorageService, DbFactory},
};

/// A trait to describe a specific configuration of services. This type allows other services to
Expand All @@ -48,10 +48,17 @@ pub trait ServiceSpecification: Clone {
type AssetProcessor: AssetProcessor + Clone + Sync + Send + 'static;
type AssetProxy: AssetProxy + Clone + Sync + Send + 'static;
type BaseNodeClient: BaseNodeClient + Clone + Sync + Send + 'static;
type ChainDbBackendAdapter: ChainDbBackendAdapter;
type ChainStorageService: ChainStorageService<Self::Payload>;
type CheckpointManager: CheckpointManager<Self::Addr>;
type CommitteeManager: CommitteeManager<Self::Addr>;
type DbFactory: DbFactory<StateDbBackendAdapter = Self::StateDbBackendAdapter> + Clone + Sync + Send + 'static;
type DbFactory: DbFactory<
StateDbBackendAdapter = Self::StateDbBackendAdapter,
ChainDbBackendAdapter = Self::ChainDbBackendAdapter,
> + Clone
+ Sync
+ Send
+ 'static;
type EventsPublisher: EventsPublisher<ConsensusWorkerDomainEvent>;
type InboundConnectionService: InboundConnectionService<Addr = Self::Addr, Payload = Self::Payload>
+ 'static
Expand Down
62 changes: 33 additions & 29 deletions dan_layer/core/src/workers/consensus_worker.rs
Expand Up @@ -30,7 +30,7 @@ use crate::{
models::{domain_events::ConsensusWorkerDomainEvent, AssetDefinition, ConsensusWorkerState, View, ViewId},
services::{CheckpointManager, CommitteeManager, EventsPublisher, PayloadProvider, ServiceSpecification},
storage::{
chain::ChainDbUnitOfWork,
chain::{ChainDb, ChainDbUnitOfWork},
state::{StateDbUnitOfWork, StateDbUnitOfWorkImpl, StateDbUnitOfWorkReader},
DbFactory,
},
Expand Down Expand Up @@ -116,14 +116,25 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
shutdown: ShutdownSignal,
max_views_to_process: Option<u64>,
) -> Result<(), DigitalAssetError> {
let chain_db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
self.current_view_id = chain_db
.get_tip_node()?
.map(|n| ViewId(n.height() as u64))
.unwrap_or_else(|| ViewId(0));
info!(
target: LOG_TARGET,
"Consensus worker started for asset '{}'. Tip: {}", self.asset_definition.public_key, self.current_view_id
);
let starting_view = self.current_view_id;
loop {
if let Some(max) = max_views_to_process {
if max <= self.current_view_id.0 - starting_view.0 {
break;
}
}
let next_event = self.next_state_event(&shutdown).await?;
let next_event = self.next_state_event(&chain_db, &shutdown).await?;
if next_event.must_shutdown() {
info!(
target: LOG_TARGET,
Expand All @@ -133,7 +144,10 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
break;
}
let (from, to) = self.transition(next_event)?;
debug!(target: LOG_TARGET, "Transitioning from {:?} to {:?}", from, to);
debug!(
target: LOG_TARGET,
"Transitioning from {:?} to {:?} ({})", from, to, self.current_view_id
);

self.events_publisher
.publish(ConsensusWorkerDomainEvent::StateChanged { from, to });
Expand All @@ -144,12 +158,13 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp

async fn next_state_event(
&mut self,
chain_db: &ChainDb<TSpecification::ChainDbBackendAdapter>,
shutdown: &ShutdownSignal,
) -> Result<ConsensusWorkerStateEvent, DigitalAssetError> {
use ConsensusWorkerState::*;
match &mut self.state {
Starting => {
states::Starting::default()
states::Starting::<TSpecification>::new()
.next_event(
&mut self.base_node_client,
&self.asset_definition,
Expand All @@ -160,7 +175,7 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
.await
},
Synchronizing => {
states::Synchronizing::new()
states::Synchronizing::<TSpecification>::new()
.next_event(
&mut self.base_node_client,
&self.asset_definition,
Expand All @@ -171,18 +186,17 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
.await
},
Prepare => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state_tx = self
.db_factory
.get_state_db(&self.asset_definition.public_key)?
.ok_or(DigitalAssetError::MissingDatabase)?
.new_unit_of_work(self.current_view_id.as_u64());

let mut prepare =
states::Prepare::new(self.node_address.clone(), self.asset_definition.public_key.clone());
let mut prepare = states::Prepare::<TSpecification>::new(
self.node_address.clone(),
self.asset_definition.public_key.clone(),
);
let res = prepare
.next_event(
&self.get_current_view()?,
Expand All @@ -206,11 +220,8 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
Ok(res)
},
PreCommit => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut state = states::PreCommitState::new(
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state = states::PreCommitState::<TSpecification>::new(
self.node_address.clone(),
self.committee_manager.current_committee()?.clone(),
self.asset_definition.public_key.clone(),
Expand All @@ -230,11 +241,8 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
},

Commit => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut state = states::CommitState::new(
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state = states::CommitState::<TSpecification>::new(
self.node_address.clone(),
self.asset_definition.public_key.clone(),
self.committee_manager.current_committee()?.clone(),
Expand All @@ -255,11 +263,8 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
Ok(res)
},
Decide => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut state = states::DecideState::new(
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state = states::DecideState::<TSpecification>::new(
self.node_address.clone(),
self.asset_definition.public_key.clone(),
self.committee_manager.current_committee()?.clone(),
Expand Down Expand Up @@ -300,7 +305,7 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
self.payload_provider.get_payload_queue().await,
);
self.state_db_unit_of_work = None;
let mut state = states::NextViewState::default();
let mut state = states::NextViewState::<TSpecification>::new();
state
.next_event(
&self.get_current_view()?,
Expand Down Expand Up @@ -335,8 +340,7 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
(_, NotPartOfCommittee) => Idle,
(Idle, TimedOut) => Starting,
(_, TimedOut) => {
warn!(target: LOG_TARGET, "State timed out");
self.current_view_id = self.current_view_id.saturating_sub(1.into());
warn!(target: LOG_TARGET, "State timed out for {}", self.current_view_id);
NextView
},
(NextView, NewView { new_view }) => {
Expand Down

0 comments on commit 64efeff

Please sign in to comment.