Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(vn scanning): only scan since last scan and restart accepted contracts #4252

Merged
merged 6 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 116 additions & 56 deletions applications/tari_validator_node/src/contract_worker_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::{
collections::HashMap,
convert::TryInto,
convert::{TryFrom, TryInto},
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
Expand All @@ -33,7 +33,10 @@ use tari_common_types::types::{FixedHash, FixedHashSizeError, HashDigest, Privat
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_core::{consensus::ConsensusHashWriter, transactions::transaction_components::ContractConstitution};
use tari_crypto::{keys::SecretKey, tari_utilities::hex::Hex};
use tari_crypto::{
keys::SecretKey,
tari_utilities::{hex::Hex, message_format::MessageFormat, ByteArray},
};
use tari_dan_core::{
models::{AssetDefinition, BaseLayerMetadata, Committee},
services::{
Expand All @@ -55,7 +58,11 @@ use tari_dan_core::{
workers::ConsensusWorker,
DigitalAssetError,
};
use tari_dan_storage_sqlite::{global::SqliteGlobalDbBackendAdapter, SqliteDbFactory, SqliteStorageService};
use tari_dan_storage_sqlite::{
global::{models::contract::NewContract, SqliteGlobalDbBackendAdapter},
SqliteDbFactory,
SqliteStorageService,
};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
Expand Down Expand Up @@ -131,79 +138,107 @@ impl ContractWorkerManager {
}

pub async fn start(mut self) -> Result<(), WorkerManagerError> {
// TODO: Uncomment line to scan from previous block height once we can
// start up asset workers for existing contracts.
// self.load_initial_state()?;
self.load_initial_state()?;

if self.config.constitution_auto_accept {
info!("constitution_auto_accept is true")
info!("constitution_auto_accept is true");
}

if !self.config.scan_for_assets {
info!(
target: LOG_TARGET,
"scan_for_assets set to false. Contract scanner is sleeping."
"scan_for_assets set to false. Contract scanner is shutting down."
);
self.shutdown.await;
return Ok(());
}

// TODO: Get statuses of active contracts
self.start_active_contracts().await?;

loop {
// TODO: Get statuses of Accepted contracts to see if quorum is me if quorum is met, start the chain and
// create a checkpoint

let tip = self.base_node_client.get_tip_info().await?;
let next_scan_height = self.last_scanned_height + self.config.constitution_management_polling_interval;
if tip.height_of_longest_chain < next_scan_height {
info!(
target: LOG_TARGET,
"Base layer tip is {}. Next scan will occur at height {}.",
tip.height_of_longest_chain,
next_scan_height
);
tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break,
}
continue;
if self.config.constitution_auto_accept {
self.scan_and_accept_contracts(&tip).await?;
}
info!(
target: LOG_TARGET,
"Base layer tip is {}. Scanning for new contracts.", tip.height_of_longest_chain,
);

let active_contracts = self.scan_for_new_contracts(&tip).await?;
tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break
}
}

info!(target: LOG_TARGET, "{} new contract(s) found", active_contracts.len());
Ok(())
}

for contract in active_contracts {
self.global_db
.save_contract(contract.contract_id, contract.mined_height, ContractState::Pending)?;
async fn start_active_contracts(&mut self) -> Result<(), WorkerManagerError> {
let active_contracts = self.global_db.get_active_contracts()?;

if self.config.constitution_auto_accept {
info!(
target: LOG_TARGET,
"Posting acceptance transaction for contract {}", contract.contract_id
);
self.post_contract_acceptance(&contract).await?;
for contract in active_contracts {
let contract_id = FixedHash::try_from(contract.contract_id)?;

self.global_db
.update_contract_state(contract.contract_id, ContractState::Accepted)?;
println!("Starting contract {}", contract_id.to_hex());

// TODO: Scan for acceptances and once enough are present, start working on the contract
// for now, we start working immediately.
let kill = self.spawn_asset_worker(contract.contract_id, &contract.constitution);
self.active_workers.insert(contract.contract_id, kill);
let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| {
WorkerManagerError::DataCorruption {
details: error.to_string(),
}
}
self.set_last_scanned_block(tip)?;
})?;

tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break,
let kill = self.spawn_asset_worker(contract_id, &constitution);
self.active_workers.insert(contract_id, kill);
}

Ok(())
}

async fn scan_and_accept_contracts(&mut self, tip: &BaseLayerMetadata) -> Result<(), WorkerManagerError> {
info!(
target: LOG_TARGET,
"Base layer tip is {}. Scanning for new contracts.", tip.height_of_longest_chain,
);

let new_contracts = self.scan_for_new_contracts(tip).await?;

info!(target: LOG_TARGET, "{} new contract(s) found", new_contracts.len());

for contract in new_contracts {
match self
.global_db
.save_contract(contract.clone().into(), ContractState::Pending)
{
Ok(_) => info!("Saving contract data id={}", contract.contract_id.to_hex()),
Err(error) => error!(
"Couldn't save contract data id={} received error={}",
contract.contract_id.to_hex(),
error.to_string()
),
}

info!(
target: LOG_TARGET,
"Posting acceptance transaction for contract {}", contract.contract_id
);
self.post_contract_acceptance(&contract).await?;

// TODO: This should only be set to Accepted but we don't have steps for checking quorums yet.
self.global_db
.update_contract_state(contract.contract_id, ContractState::Active)?;

// TODO: Scan for acceptances and once enough are present, start working on the contract
// for now, we start working immediately.
let kill = self.spawn_asset_worker(contract.contract_id, &contract.constitution);
self.active_workers.insert(contract.contract_id, kill);
}

self.set_last_scanned_block(tip)?;

Ok(())
}

// TODO: Remove once we can start previous contracts
#[allow(dead_code)]
fn load_initial_state(&mut self) -> Result<(), WorkerManagerError> {
self.last_scanned_hash = self
.global_db
Expand Down Expand Up @@ -272,15 +307,27 @@ impl ContractWorkerManager {
tip.height_of_longest_chain
);

self.global_db
.save_contract(contract_id, mined_height, ContractState::Expired)?;
let contract = ActiveContract {
constitution,
contract_id,
mined_height,
};

match self.global_db.save_contract(contract.into(), ContractState::Expired) {
Ok(_) => info!("Saving expired contract data id={}", contract_id.to_hex()),
Err(error) => error!(
"Couldn't save expired contract data id={} received error={}",
contract_id.to_hex(),
error.to_string()
),
}

continue;
}

new_contracts.push(ActiveContract {
contract_id,
constitution,
contract_id,
mined_height,
});
}
Expand Down Expand Up @@ -416,9 +463,11 @@ impl ContractWorkerManager {
Ok(())
}

fn set_last_scanned_block(&mut self, tip: BaseLayerMetadata) -> Result<(), WorkerManagerError> {
self.global_db
.set_data(GlobalDbMetadataKey::LastScannedConstitutionHash, &*tip.tip_hash)?;
fn set_last_scanned_block(&mut self, tip: &BaseLayerMetadata) -> Result<(), WorkerManagerError> {
self.global_db.set_data(
GlobalDbMetadataKey::LastScannedConstitutionHash,
tip.tip_hash.as_bytes(),
)?;
self.global_db.set_data(
GlobalDbMetadataKey::LastScannedConstitutionHeight,
&tip.height_of_longest_chain.to_le_bytes(),
Expand Down Expand Up @@ -451,7 +500,18 @@ pub enum WorkerManagerError {

#[derive(Debug, Clone)]
struct ActiveContract {
pub contract_id: FixedHash,
pub constitution: ContractConstitution,
pub contract_id: FixedHash,
pub mined_height: u64,
}

impl From<ActiveContract> for NewContract {
fn from(value: ActiveContract) -> Self {
Self {
height: value.mined_height as i64,
contract_id: value.contract_id.to_vec(),
constitution: value.constitution.to_binary().unwrap(),
state: 0,
}
}
}
11 changes: 8 additions & 3 deletions dan_layer/core/src/storage/global/global_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ impl<TGlobalDbBackendAdapter: GlobalDbBackendAdapter> GlobalDb<TGlobalDbBackendA

pub fn save_contract(
&self,
contract_id: FixedHash,
mined_height: u64,
contract: TGlobalDbBackendAdapter::NewModel,
state: ContractState,
) -> Result<(), StorageError> {
self.adapter
.save_contract(contract_id, mined_height, state)
.save_contract(contract, state)
.map_err(TGlobalDbBackendAdapter::Error::into)
}

Expand All @@ -67,4 +66,10 @@ impl<TGlobalDbBackendAdapter: GlobalDbBackendAdapter> GlobalDb<TGlobalDbBackendA
.update_contract_state(contract_id, state)
.map_err(TGlobalDbBackendAdapter::Error::into)
}

pub fn get_active_contracts(&self) -> Result<Vec<TGlobalDbBackendAdapter::Model>, StorageError> {
self.adapter
.get_active_contracts()
.map_err(TGlobalDbBackendAdapter::Error::into)
}
}
10 changes: 8 additions & 2 deletions dan_layer/core/src/storage/global/global_db_backend_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::storage::StorageError;
pub trait GlobalDbBackendAdapter: Send + Sync + Clone {
type BackendTransaction;
type Error: Into<StorageError>;
type Model;
type NewModel;

fn create_transaction(&self) -> Result<Self::BackendTransaction, Self::Error>;
fn commit(&self, tx: &Self::BackendTransaction) -> Result<(), Self::Error>;
Expand All @@ -39,9 +41,9 @@ pub trait GlobalDbBackendAdapter: Send + Sync + Clone {
key: &GlobalDbMetadataKey,
connection: &Self::BackendTransaction,
) -> Result<Option<Vec<u8>>, Self::Error>;
fn save_contract(&self, contract_id: FixedHash, mined_height: u64, state: ContractState)
-> Result<(), Self::Error>;
fn save_contract(&self, contract: Self::NewModel, state: ContractState) -> Result<(), Self::Error>;
fn update_contract_state(&self, contract_id: FixedHash, state: ContractState) -> Result<(), Self::Error>;
fn get_active_contracts(&self) -> Result<Vec<Self::Model>, Self::Error>;
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -65,6 +67,10 @@ pub enum ContractState {
Pending = 0,
Accepted = 1,
Expired = 2,
QuorumMet = 3,
Active = 4,
Quarantined = 5,
Shutdown = 6,
}

impl ContractState {
Expand Down
13 changes: 7 additions & 6 deletions dan_layer/core/src/storage/mocks/global_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct MockGlobalDbBackupAdapter;
impl GlobalDbBackendAdapter for MockGlobalDbBackupAdapter {
type BackendTransaction = ();
type Error = StorageError;
type Model = ();
type NewModel = ();

fn create_transaction(&self) -> Result<Self::BackendTransaction, Self::Error> {
todo!()
Expand All @@ -58,16 +60,15 @@ impl GlobalDbBackendAdapter for MockGlobalDbBackupAdapter {
todo!()
}

fn save_contract(
&self,
_contract_id: FixedHash,
_mined_height: u64,
_status: ContractState,
) -> Result<(), Self::Error> {
fn save_contract(&self, _contract: Self::Model, _status: ContractState) -> Result<(), Self::Error> {
todo!()
}

fn update_contract_state(&self, _contract_id: FixedHash, _state: ContractState) -> Result<(), Self::Error> {
todo!()
}

fn get_active_contracts(&self) -> Result<Vec<Self::Model>, Self::Error> {
todo!()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
-- // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

create table contracts (
id blob primary key not null,
height bigint not null,
state integer not null
id Integer primary key autoincrement not null,
contract_id blob not null,
height bigint not null,
state integer not null,
constitution blob not null
);

create index contracts_contract_id_index on contracts (contract_id);
create index contracts_state_index on contracts (state);
24 changes: 22 additions & 2 deletions dan_layer/storage_sqlite/src/global/models/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,31 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_dan_core::storage::global::ContractState;

use crate::global::schema::*;

#[derive(Queryable, Insertable, Identifiable)]
#[derive(Queryable, Identifiable)]
pub struct Contract {
pub id: Vec<u8>,
pub id: i32,
pub contract_id: Vec<u8>,
pub height: i64,
pub state: i32,
pub constitution: Vec<u8>,
}

#[derive(Insertable)]
#[table_name = "contracts"]
pub struct NewContract {
pub contract_id: Vec<u8>,
pub height: i64,
pub state: i32,
pub constitution: Vec<u8>,
}

impl NewContract {
pub fn with_state(&mut self, state: ContractState) -> &mut Self {
self.state = i32::from(state.as_byte());
self
}
}
Loading