Skip to content

Commit

Permalink
feat: contract auto acceptance (#4177)
Browse files Browse the repository at this point in the history
Description
---
Add the configuration for VN auto acceptance. After finding constitutions you can be a part of, auto respond with an acceptance.

Motivation and Context
---
to make it very easy for VN to become part of committees.


How Has This Been Tested?
---
~YOLO.~
~I joke, i'll write a spec for it before merge.~

Spec written but we've got some other bits we need to work out to get the specs passing again.

Tested manually and the process worked but transaction is rejected by the base node due to an input bug trying to utilize the contract as an input.
  • Loading branch information
brianp committed Jun 9, 2022
1 parent 42269ae commit 87f9969
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 257 deletions.
4 changes: 2 additions & 2 deletions applications/tari_app_grpc/proto/validator_node.proto
Expand Up @@ -31,11 +31,11 @@ service ValidatorNode {
// rpc ExecuteInstruction(ExecuteInstructionRequest) returns (ExecuteInstructionResponse);
rpc InvokeReadMethod(InvokeReadMethodRequest) returns (InvokeReadMethodResponse);
rpc InvokeMethod(InvokeMethodRequest) returns (InvokeMethodResponse);
rpc GetCommitteeRequests(GetCommitteeRequestsRequest) returns (stream TransactionOutput);
rpc GetConstitutionRequests(GetConstitutionRequestsRequest) returns (stream TransactionOutput);
rpc PublishContractAcceptance(PublishContractAcceptanceRequest) returns (PublishContractAcceptanceResponse);
}

message GetCommitteeRequestsRequest {
message GetConstitutionRequestsRequest {
// empty
}

Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/asset.rs
Expand Up @@ -31,6 +31,7 @@ use std::{
use tari_dan_core::models::AssetDefinition;

#[derive(Debug)]
#[allow(dead_code)]
pub struct Asset {
definition: AssetDefinition,
current_state: bool,
Expand All @@ -40,6 +41,7 @@ pub struct Asset {
kill_signal: Option<Arc<AtomicBool>>,
}

#[allow(dead_code)]
impl Asset {
pub fn new(definition: AssetDefinition) -> Self {
Self {
Expand Down
10 changes: 6 additions & 4 deletions applications/tari_validator_node/src/config.rs
Expand Up @@ -65,8 +65,9 @@ pub struct ValidatorNodeConfig {
pub assets_allow_list: Option<Vec<String>>,
pub data_dir: PathBuf,
pub p2p: P2pConfig,
pub committee_management_polling_interval: u64,
pub committee_management_confirmation_time: u64,
pub constitution_auto_accept: bool,
pub constitution_management_polling_interval: u64,
pub constitution_management_confirmation_time: u64,
pub grpc_address: Option<Multiaddr>,
}

Expand Down Expand Up @@ -101,8 +102,9 @@ impl Default for ValidatorNodeConfig {
new_asset_scanning_interval: 10,
assets_allow_list: None,
data_dir: PathBuf::from("/data/validator_node"),
committee_management_confirmation_time: 10,
committee_management_polling_interval: 5,
constitution_auto_accept: false,
constitution_management_confirmation_time: 10,
constitution_management_polling_interval: 5,
p2p,
grpc_address: Some("/ip4/127.0.0.1/tcp/18144".parse().unwrap()),
}
Expand Down
269 changes: 65 additions & 204 deletions applications/tari_validator_node/src/dan_node.rs
Expand Up @@ -20,238 +20,99 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use std::{sync::Arc, time::Duration};

use log::*;
use log::{error, info};
use tari_common::exit_codes::{ExitCode, ExitError};
use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_crypto::tari_utilities::hex::Hex;
use tari_dan_core::{
models::{AssetDefinition, Committee},
services::{
BaseNodeClient,
ConcreteAssetProcessor,
ConcreteCheckpointManager,
ConcreteCommitteeManager,
LoggingEventsPublisher,
MempoolServiceHandle,
NodeIdentitySigningService,
TariDanPayloadProcessor,
TariDanPayloadProvider,
},
workers::ConsensusWorker,
};
use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tari_common_types::types::Signature;
use tari_comms::NodeIdentity;
use tari_dan_core::services::{BaseNodeClient, WalletClient};
use tokio::{task, time};

use crate::{
config::ValidatorNodeConfig,
default_service_specification::DefaultServiceSpecification,
grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient},
monitoring::Monitoring,
p2p::services::{
inbound_connection_service::TariCommsInboundConnectionService,
outbound_connection_service::TariCommsOutboundService,
},
TariCommsValidatorNodeClientFactory,
};

const LOG_TARGET: &str = "tari::validator_node::app";
const _LOG_TARGET: &str = "tari::validator_node::app";

#[derive(Clone)]
pub struct DanNode {
config: ValidatorNodeConfig,
identity: Arc<NodeIdentity>,
}

impl DanNode {
pub fn new(config: ValidatorNodeConfig) -> Self {
Self { config }
pub fn new(config: ValidatorNodeConfig, identity: Arc<NodeIdentity>) -> Self {
Self { config, identity }
}

pub async fn start(
&self,
shutdown: ShutdownSignal,
node_identity: Arc<NodeIdentity>,
mempool_service: MempoolServiceHandle,
db_factory: SqliteDbFactory,
handles: ServiceHandles,
subscription_factory: SubscriptionFactory,
) -> Result<(), ExitError> {
pub async fn start(&self) -> Result<(), ExitError> {
let mut base_node_client = GrpcBaseNodeClient::new(self.config.base_node_grpc_address);
let mut next_scanned_height = 0u64;
let mut last_tip = 0u64;
let mut monitoring = Monitoring::new(self.config.committee_management_confirmation_time);
loop {
let tip = base_node_client
.get_tip_info()
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?;
if tip.height_of_longest_chain >= next_scanned_height {
info!(
target: LOG_TARGET,
"Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain
);
if self.config.scan_for_assets {
next_scanned_height =
tip.height_of_longest_chain + self.config.committee_management_polling_interval;
info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height);
} else {
next_scanned_height = u64::MAX; // Never run again.
}
let mut assets = base_node_client
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, e))?;
info!(
target: LOG_TARGET,
"Base node returned {} asset(s) to process",
assets.len()
);
if let Some(allow_list) = &self.config.assets_allow_list {
assets.retain(|(asset, _)| allow_list.contains(&asset.public_key.to_hex()));
}
for (asset, mined_height) in assets.clone() {
monitoring.add_if_unmonitored(asset.clone());
monitoring.add_state(asset.public_key, mined_height, true);
}
let mut known_active_public_keys = assets.into_iter().map(|(asset, _)| asset.public_key);
let active_public_keys = monitoring
.get_active_public_keys()
.into_iter()
.cloned()
.collect::<Vec<PublicKey>>();
for public_key in active_public_keys {
if !known_active_public_keys.any(|pk| pk == public_key) {
// Active asset is not part of the newly known active assets, maybe there were no checkpoint for
// the asset. Are we still part of the committee?
if let (false, height) = base_node_client
.check_if_in_committee(public_key.clone(), node_identity.public_key().clone())
.await
.unwrap()
{
// We are not part of the latest committee, set the state to false
monitoring.add_state(public_key.clone(), height, false)
}
let node = self.clone();

if self.config.constitution_auto_accept {
task::spawn(async move {
loop {
if let Ok(metadata) = base_node_client.get_tip_info().await {
last_tip = metadata.height_of_longest_chain;
}

match node
.find_and_accept_constitutions(base_node_client.clone(), last_tip)
.await
{
Ok(()) => info!("Contracts accepted"),
Err(e) => error!("Contracts not accepted becayse {:?}", e),
}

time::sleep(Duration::from_secs(
node.config.constitution_management_polling_interval,
))
.await;
}
}
if tip.height_of_longest_chain > last_tip {
last_tip = tip.height_of_longest_chain;
monitoring.update_height(last_tip, |asset| {
let node_identity = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
// Create a kill signal for each asset
let kill = Arc::new(AtomicBool::new(false));
let dan_config = self.config.clone();
let db_factory = db_factory.clone();
task::spawn(DanNode::start_asset_worker(
asset,
node_identity,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
kill.clone(),
));
kill
});
}
});
}

loop {
// other work here

time::sleep(Duration::from_secs(120)).await;
}
}

pub async fn start_asset_worker(
asset_definition: AssetDefinition,
node_identity: NodeIdentity,
mempool_service: MempoolServiceHandle,
handles: ServiceHandles,
subscription_factory: SubscriptionFactory,
shutdown: ShutdownSignal,
config: ValidatorNodeConfig,
db_factory: SqliteDbFactory,
kill: Arc<AtomicBool>,
async fn find_and_accept_constitutions(
&self,
mut base_node_client: GrpcBaseNodeClient,
last_tip: u64,
) -> Result<(), ExitError> {
let timeout = Duration::from_secs(asset_definition.phase_timeout);
let committee = asset_definition
.committee
.iter()
.map(|s| {
CommsPublicKey::from_hex(s)
.map_err(|e| ExitError::new(ExitCode::ConfigError, format!("could not convert to hex:{}", e)))
})
.collect::<Result<Vec<_>, _>>()?;

let committee = Committee::new(committee);
let committee_service = ConcreteCommitteeManager::new(committee);

let payload_provider = TariDanPayloadProvider::new(mempool_service.clone());

let events_publisher = LoggingEventsPublisher::default();
let signing_service = NodeIdentitySigningService::new(node_identity.clone());

// let _backend = LmdbAssetStore::initialize(data_dir.join("asset_data"), Default::default())
// .map_err(|err| ExitCodes::DatabaseError(err.to_string()))?;
// let data_store = AssetDataStore::new(backend);
let asset_processor = ConcreteAssetProcessor::default();

let payload_processor = TariDanPayloadProcessor::new(asset_processor);
let mut inbound = TariCommsInboundConnectionService::new(asset_definition.public_key.clone());
let receiver = inbound.get_receiver();

let loopback = inbound.clone_sender();
let shutdown_2 = shutdown.clone();
task::spawn(async move {
let topic_subscription =
subscription_factory.get_subscription(TariMessageType::DanConsensusMessage, "HotStuffMessages");
inbound.run(shutdown_2, topic_subscription).await
});
let dht = handles.expect_handle::<Dht>();
let outbound =
TariCommsOutboundService::new(dht.outbound_requester(), loopback, asset_definition.public_key.clone());
let base_node_client = GrpcBaseNodeClient::new(config.base_node_grpc_address);
let chain_storage = SqliteStorageService {};
let wallet_client = GrpcWalletClient::new(config.wallet_grpc_address);
let checkpoint_manager = ConcreteCheckpointManager::new(asset_definition.clone(), wallet_client);
let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(dht.dht_requester());
let mut consensus_worker = ConsensusWorker::<DefaultServiceSpecification>::new(
receiver,
outbound,
committee_service,
node_identity.public_key().clone(),
payload_provider,
events_publisher,
signing_service,
payload_processor,
asset_definition,
base_node_client,
timeout,
db_factory,
chain_storage,
checkpoint_manager,
validator_node_client_factory,
);

if let Err(err) = consensus_worker.run(shutdown.clone(), None, kill).await {
error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err);
return Err(ExitError::new(ExitCode::UnknownError, err));
let mut wallet_client = GrpcWalletClient::new(self.config.wallet_grpc_address);

let outputs = base_node_client
.get_constitutions(self.identity.public_key().clone())
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?;

for output in outputs {
if let Some(sidechain_features) = output.features.sidechain_features {
let contract_id = sidechain_features.contract_id;
let constitution = sidechain_features.constitution.expect("Constitution wasn't present");

if constitution.acceptance_requirements.acceptance_period_expiry < last_tip {
let signature = Signature::default();

match wallet_client
.submit_contract_acceptance(&contract_id, self.identity.public_key(), &signature)
.await
{
Ok(tx_id) => info!("Accepted with id={}", tx_id),
Err(_) => error!("Did not accept the contract acceptance"),
};
};
}
}

Ok(())
}

// async fn start_asset_proxy(&self) -> Result<(), ExitCodes> {
// todo!()
// }
}
Expand Up @@ -100,7 +100,7 @@ impl BaseNodeClient for GrpcBaseNodeClient {
Ok(output)
}

async fn check_for_constitutions_for_me(
async fn get_constitutions(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, DigitalAssetError> {
Expand Down
Expand Up @@ -69,7 +69,7 @@ impl<TServiceSpecification: ServiceSpecification> ValidatorNodeGrpcServer<TServi
impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_server::ValidatorNode
for ValidatorNodeGrpcServer<TServiceSpecification>
{
type GetCommitteeRequestsStream = mpsc::Receiver<Result<TransactionOutput, tonic::Status>>;
type GetConstitutionRequestsStream = mpsc::Receiver<Result<TransactionOutput, tonic::Status>>;

async fn publish_contract_acceptance(
&self,
Expand All @@ -96,10 +96,10 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
}
}

async fn get_committee_requests(
async fn get_constitution_requests(
&self,
_request: tonic::Request<rpc::GetCommitteeRequestsRequest>,
) -> Result<Response<Self::GetCommitteeRequestsStream>, tonic::Status> {
_request: tonic::Request<rpc::GetConstitutionRequestsRequest>,
) -> Result<Response<Self::GetConstitutionRequestsStream>, tonic::Status> {
let (mut _sender, receiver) = mpsc::channel(100);
task::spawn(async move {
let mut _test = 1u64;
Expand Down

0 comments on commit 87f9969

Please sign in to comment.