Skip to content

Commit

Permalink
fix(vn scanning): only scan since last scan and restart accepted cont…
Browse files Browse the repository at this point in the history
…racts (#4252)

Description
---
The VN was starting the scanning from the genesis block because it had no mechanism for restarting already accepted contracts when restarting the VN. To fix this we're recording all the contracts we're a part of and their states. Already accepted contracts will persist their constitution properties for reference when restarting the side chain later on. 
Additionally we're storing the last scanned block information so we can restart scanning from the place you left off.

I've removed the block height interval check. It seemed useless when we're using a sleep interval and may at some point cause us to miss a contract if it was done under very short expiry times.

Motivation and Context
---
The VN is getting better with how it's managing everything but restarts caused problems that we can now solve via global db.

How Has This Been Tested?
---
Integration tests still pass (although we could now validate the stored state of found contracts)
Manually
  • Loading branch information
brianp committed Jun 30, 2022
1 parent 25e316b commit 43b4a53
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 86 deletions.
172 changes: 116 additions & 56 deletions applications/tari_validator_node/src/contract_worker_manager.rs
Expand Up @@ -22,7 +22,7 @@

use std::{
collections::HashMap,
convert::TryInto,
convert::{TryFrom, TryInto},
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
Expand All @@ -33,7 +33,10 @@ use tari_common_types::types::{FixedHash, FixedHashSizeError, HashDigest, Privat
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_core::{consensus::ConsensusHashWriter, transactions::transaction_components::ContractConstitution};
use tari_crypto::{keys::SecretKey, tari_utilities::hex::Hex};
use tari_crypto::{
keys::SecretKey,
tari_utilities::{hex::Hex, message_format::MessageFormat, ByteArray},
};
use tari_dan_core::{
models::{AssetDefinition, BaseLayerMetadata, Committee},
services::{
Expand All @@ -55,7 +58,11 @@ use tari_dan_core::{
workers::ConsensusWorker,
DigitalAssetError,
};
use tari_dan_storage_sqlite::{global::SqliteGlobalDbBackendAdapter, SqliteDbFactory, SqliteStorageService};
use tari_dan_storage_sqlite::{
global::{models::contract::NewContract, SqliteGlobalDbBackendAdapter},
SqliteDbFactory,
SqliteStorageService,
};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
Expand Down Expand Up @@ -131,79 +138,107 @@ impl ContractWorkerManager {
}

pub async fn start(mut self) -> Result<(), WorkerManagerError> {
// TODO: Uncomment line to scan from previous block height once we can
// start up asset workers for existing contracts.
// self.load_initial_state()?;
self.load_initial_state()?;

if self.config.constitution_auto_accept {
info!("constitution_auto_accept is true")
info!("constitution_auto_accept is true");
}

if !self.config.scan_for_assets {
info!(
target: LOG_TARGET,
"scan_for_assets set to false. Contract scanner is sleeping."
"scan_for_assets set to false. Contract scanner is shutting down."
);
self.shutdown.await;
return Ok(());
}

// TODO: Get statuses of active contracts
self.start_active_contracts().await?;

loop {
// TODO: Get statuses of Accepted contracts to see if quorum is me if quorum is met, start the chain and
// create a checkpoint

let tip = self.base_node_client.get_tip_info().await?;
let next_scan_height = self.last_scanned_height + self.config.constitution_management_polling_interval;
if tip.height_of_longest_chain < next_scan_height {
info!(
target: LOG_TARGET,
"Base layer tip is {}. Next scan will occur at height {}.",
tip.height_of_longest_chain,
next_scan_height
);
tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break,
}
continue;
if self.config.constitution_auto_accept {
self.scan_and_accept_contracts(&tip).await?;
}
info!(
target: LOG_TARGET,
"Base layer tip is {}. Scanning for new contracts.", tip.height_of_longest_chain,
);

let active_contracts = self.scan_for_new_contracts(&tip).await?;
tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break
}
}

info!(target: LOG_TARGET, "{} new contract(s) found", active_contracts.len());
Ok(())
}

for contract in active_contracts {
self.global_db
.save_contract(contract.contract_id, contract.mined_height, ContractState::Pending)?;
async fn start_active_contracts(&mut self) -> Result<(), WorkerManagerError> {
let active_contracts = self.global_db.get_active_contracts()?;

if self.config.constitution_auto_accept {
info!(
target: LOG_TARGET,
"Posting acceptance transaction for contract {}", contract.contract_id
);
self.post_contract_acceptance(&contract).await?;
for contract in active_contracts {
let contract_id = FixedHash::try_from(contract.contract_id)?;

self.global_db
.update_contract_state(contract.contract_id, ContractState::Accepted)?;
println!("Starting contract {}", contract_id.to_hex());

// TODO: Scan for acceptances and once enough are present, start working on the contract
// for now, we start working immediately.
let kill = self.spawn_asset_worker(contract.contract_id, &contract.constitution);
self.active_workers.insert(contract.contract_id, kill);
let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| {
WorkerManagerError::DataCorruption {
details: error.to_string(),
}
}
self.set_last_scanned_block(tip)?;
})?;

tokio::select! {
_ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {},
_ = &mut self.shutdown => break,
let kill = self.spawn_asset_worker(contract_id, &constitution);
self.active_workers.insert(contract_id, kill);
}

Ok(())
}

async fn scan_and_accept_contracts(&mut self, tip: &BaseLayerMetadata) -> Result<(), WorkerManagerError> {
info!(
target: LOG_TARGET,
"Base layer tip is {}. Scanning for new contracts.", tip.height_of_longest_chain,
);

let new_contracts = self.scan_for_new_contracts(tip).await?;

info!(target: LOG_TARGET, "{} new contract(s) found", new_contracts.len());

for contract in new_contracts {
match self
.global_db
.save_contract(contract.clone().into(), ContractState::Pending)
{
Ok(_) => info!("Saving contract data id={}", contract.contract_id.to_hex()),
Err(error) => error!(
"Couldn't save contract data id={} received error={}",
contract.contract_id.to_hex(),
error.to_string()
),
}

info!(
target: LOG_TARGET,
"Posting acceptance transaction for contract {}", contract.contract_id
);
self.post_contract_acceptance(&contract).await?;

// TODO: This should only be set to Accepted but we don't have steps for checking quorums yet.
self.global_db
.update_contract_state(contract.contract_id, ContractState::Active)?;

// TODO: Scan for acceptances and once enough are present, start working on the contract
// for now, we start working immediately.
let kill = self.spawn_asset_worker(contract.contract_id, &contract.constitution);
self.active_workers.insert(contract.contract_id, kill);
}

self.set_last_scanned_block(tip)?;

Ok(())
}

// TODO: Remove once we can start previous contracts
#[allow(dead_code)]
fn load_initial_state(&mut self) -> Result<(), WorkerManagerError> {
self.last_scanned_hash = self
.global_db
Expand Down Expand Up @@ -272,15 +307,27 @@ impl ContractWorkerManager {
tip.height_of_longest_chain
);

self.global_db
.save_contract(contract_id, mined_height, ContractState::Expired)?;
let contract = ActiveContract {
constitution,
contract_id,
mined_height,
};

match self.global_db.save_contract(contract.into(), ContractState::Expired) {
Ok(_) => info!("Saving expired contract data id={}", contract_id.to_hex()),
Err(error) => error!(
"Couldn't save expired contract data id={} received error={}",
contract_id.to_hex(),
error.to_string()
),
}

continue;
}

new_contracts.push(ActiveContract {
contract_id,
constitution,
contract_id,
mined_height,
});
}
Expand Down Expand Up @@ -416,9 +463,11 @@ impl ContractWorkerManager {
Ok(())
}

fn set_last_scanned_block(&mut self, tip: BaseLayerMetadata) -> Result<(), WorkerManagerError> {
self.global_db
.set_data(GlobalDbMetadataKey::LastScannedConstitutionHash, &*tip.tip_hash)?;
fn set_last_scanned_block(&mut self, tip: &BaseLayerMetadata) -> Result<(), WorkerManagerError> {
self.global_db.set_data(
GlobalDbMetadataKey::LastScannedConstitutionHash,
tip.tip_hash.as_bytes(),
)?;
self.global_db.set_data(
GlobalDbMetadataKey::LastScannedConstitutionHeight,
&tip.height_of_longest_chain.to_le_bytes(),
Expand Down Expand Up @@ -451,7 +500,18 @@ pub enum WorkerManagerError {

#[derive(Debug, Clone)]
struct ActiveContract {
pub contract_id: FixedHash,
pub constitution: ContractConstitution,
pub contract_id: FixedHash,
pub mined_height: u64,
}

impl From<ActiveContract> for NewContract {
fn from(value: ActiveContract) -> Self {
Self {
height: value.mined_height as i64,
contract_id: value.contract_id.to_vec(),
constitution: value.constitution.to_binary().unwrap(),
state: 0,
}
}
}
11 changes: 8 additions & 3 deletions dan_layer/core/src/storage/global/global_db.rs
Expand Up @@ -53,12 +53,11 @@ impl<TGlobalDbBackendAdapter: GlobalDbBackendAdapter> GlobalDb<TGlobalDbBackendA

pub fn save_contract(
&self,
contract_id: FixedHash,
mined_height: u64,
contract: TGlobalDbBackendAdapter::NewModel,
state: ContractState,
) -> Result<(), StorageError> {
self.adapter
.save_contract(contract_id, mined_height, state)
.save_contract(contract, state)
.map_err(TGlobalDbBackendAdapter::Error::into)
}

Expand All @@ -67,4 +66,10 @@ impl<TGlobalDbBackendAdapter: GlobalDbBackendAdapter> GlobalDb<TGlobalDbBackendA
.update_contract_state(contract_id, state)
.map_err(TGlobalDbBackendAdapter::Error::into)
}

pub fn get_active_contracts(&self) -> Result<Vec<TGlobalDbBackendAdapter::Model>, StorageError> {
self.adapter
.get_active_contracts()
.map_err(TGlobalDbBackendAdapter::Error::into)
}
}
10 changes: 8 additions & 2 deletions dan_layer/core/src/storage/global/global_db_backend_adapter.rs
Expand Up @@ -29,6 +29,8 @@ use crate::storage::StorageError;
pub trait GlobalDbBackendAdapter: Send + Sync + Clone {
type BackendTransaction;
type Error: Into<StorageError>;
type Model;
type NewModel;

fn create_transaction(&self) -> Result<Self::BackendTransaction, Self::Error>;
fn commit(&self, tx: &Self::BackendTransaction) -> Result<(), Self::Error>;
Expand All @@ -39,9 +41,9 @@ pub trait GlobalDbBackendAdapter: Send + Sync + Clone {
key: &GlobalDbMetadataKey,
connection: &Self::BackendTransaction,
) -> Result<Option<Vec<u8>>, Self::Error>;
fn save_contract(&self, contract_id: FixedHash, mined_height: u64, state: ContractState)
-> Result<(), Self::Error>;
fn save_contract(&self, contract: Self::NewModel, state: ContractState) -> Result<(), Self::Error>;
fn update_contract_state(&self, contract_id: FixedHash, state: ContractState) -> Result<(), Self::Error>;
fn get_active_contracts(&self) -> Result<Vec<Self::Model>, Self::Error>;
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -65,6 +67,10 @@ pub enum ContractState {
Pending = 0,
Accepted = 1,
Expired = 2,
QuorumMet = 3,
Active = 4,
Quarantined = 5,
Shutdown = 6,
}

impl ContractState {
Expand Down
13 changes: 7 additions & 6 deletions dan_layer/core/src/storage/mocks/global_db.rs
Expand Up @@ -33,6 +33,8 @@ pub struct MockGlobalDbBackupAdapter;
impl GlobalDbBackendAdapter for MockGlobalDbBackupAdapter {
type BackendTransaction = ();
type Error = StorageError;
type Model = ();
type NewModel = ();

fn create_transaction(&self) -> Result<Self::BackendTransaction, Self::Error> {
todo!()
Expand All @@ -58,16 +60,15 @@ impl GlobalDbBackendAdapter for MockGlobalDbBackupAdapter {
todo!()
}

fn save_contract(
&self,
_contract_id: FixedHash,
_mined_height: u64,
_status: ContractState,
) -> Result<(), Self::Error> {
fn save_contract(&self, _contract: Self::Model, _status: ContractState) -> Result<(), Self::Error> {
todo!()
}

fn update_contract_state(&self, _contract_id: FixedHash, _state: ContractState) -> Result<(), Self::Error> {
todo!()
}

fn get_active_contracts(&self) -> Result<Vec<Self::Model>, Self::Error> {
todo!()
}
}
Expand Up @@ -21,9 +21,12 @@
-- // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

create table contracts (
id blob primary key not null,
height bigint not null,
state integer not null
id Integer primary key autoincrement not null,
contract_id blob not null,
height bigint not null,
state integer not null,
constitution blob not null
);

create index contracts_contract_id_index on contracts (contract_id);
create index contracts_state_index on contracts (state);
24 changes: 22 additions & 2 deletions dan_layer/storage_sqlite/src/global/models/contract.rs
Expand Up @@ -20,11 +20,31 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_dan_core::storage::global::ContractState;

use crate::global::schema::*;

#[derive(Queryable, Insertable, Identifiable)]
#[derive(Queryable, Identifiable)]
pub struct Contract {
pub id: Vec<u8>,
pub id: i32,
pub contract_id: Vec<u8>,
pub height: i64,
pub state: i32,
pub constitution: Vec<u8>,
}

#[derive(Insertable)]
#[table_name = "contracts"]
pub struct NewContract {
pub contract_id: Vec<u8>,
pub height: i64,
pub state: i32,
pub constitution: Vec<u8>,
}

impl NewContract {
pub fn with_state(&mut self, state: ContractState) -> &mut Self {
self.state = i32::from(state.as_byte());
self
}
}

0 comments on commit 43b4a53

Please sign in to comment.