Skip to content

Commit

Permalink
refactor: add lock error to the StorageError (#3998)
Browse files Browse the repository at this point in the history
Description
---
These changes remove unwrapping grpc connections from options in every method.
Also it adds the `LockError` variant to the `StorageError`.

Motivation and Context
---
Reducing amount of `unwrap` calls, replace with `Result` when possible.

How Has This Been Tested?
---
CI
  • Loading branch information
therustmonk committed Apr 5, 2022
1 parent df36ee6 commit 9279de5
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,34 @@ use tari_dan_core::{

const LOG_TARGET: &str = "tari::validator_node::app";

type Inner = grpc::base_node_client::BaseNodeClient<tonic::transport::Channel>;

#[derive(Clone)]
pub struct GrpcBaseNodeClient {
endpoint: SocketAddr,
inner: Option<grpc::base_node_client::BaseNodeClient<tonic::transport::Channel>>,
inner: Option<Inner>,
}

impl GrpcBaseNodeClient {
pub fn new(endpoint: SocketAddr) -> GrpcBaseNodeClient {
Self { endpoint, inner: None }
}

pub async fn connect(&mut self) -> Result<(), DigitalAssetError> {
self.inner = Some(grpc::base_node_client::BaseNodeClient::connect(format!("http://{}", self.endpoint)).await?);
Ok(())
pub async fn connection(&mut self) -> Result<&mut Inner, DigitalAssetError> {
if self.inner.is_none() {
let url = format!("http://{}", self.endpoint);
let inner = Inner::connect(url).await?;
self.inner = Some(inner);
}
self.inner
.as_mut()
.ok_or_else(|| DigitalAssetError::FatalError("no connection".into()))
}
}
#[async_trait]
impl BaseNodeClient for GrpcBaseNodeClient {
async fn get_tip_info(&mut self) -> Result<BaseLayerMetadata, DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let inner = self.connection().await?;
let request = grpc::Empty {};
let result = inner.get_tip_info(request).await?.into_inner();
Ok(BaseLayerMetadata {
Expand All @@ -74,13 +76,7 @@ impl BaseNodeClient for GrpcBaseNodeClient {
asset_public_key: PublicKey,
checkpoint_unique_id: Vec<u8>,
) -> Result<Option<BaseLayerOutput>, DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let inner = self.connection().await?;
let request = grpc::GetTokensRequest {
asset_public_key: asset_public_key.as_bytes().to_vec(),
unique_ids: vec![checkpoint_unique_id],
Expand Down Expand Up @@ -140,13 +136,7 @@ impl BaseNodeClient for GrpcBaseNodeClient {
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<(AssetDefinition, u64)>, DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let inner = self.connection().await?;
// TODO: probably should use output mmr indexes here
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 100 };
let mut result = inner.list_asset_registrations(request).await?.into_inner();
Expand Down Expand Up @@ -198,18 +188,12 @@ impl BaseNodeClient for GrpcBaseNodeClient {
&mut self,
asset_public_key: PublicKey,
) -> Result<Option<BaseLayerOutput>, DigitalAssetError> {
let conn = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let inner = self.connection().await?;

let req = grpc::GetAssetMetadataRequest {
asset_public_key: asset_public_key.to_vec(),
};
let output = conn.get_asset_metadata(req).await.unwrap().into_inner();
let output = inner.get_asset_metadata(req).await.unwrap().into_inner();

let mined_height = output.mined_height;
let output = output
Expand Down
24 changes: 13 additions & 11 deletions applications/tari_validator_node/src/grpc/services/wallet_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,28 @@ use tari_comms::types::CommsPublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{models::StateRoot, services::WalletClient, DigitalAssetError};

type Inner = grpc::wallet_client::WalletClient<tonic::transport::Channel>;

#[derive(Clone)]
pub struct GrpcWalletClient {
endpoint: SocketAddr,
inner: Option<grpc::wallet_client::WalletClient<tonic::transport::Channel>>,
inner: Option<Inner>,
}

impl GrpcWalletClient {
pub fn new(endpoint: SocketAddr) -> GrpcWalletClient {
Self { endpoint, inner: None }
}

pub async fn connect(&mut self) -> Result<(), DigitalAssetError> {
self.inner = Some(grpc::wallet_client::WalletClient::connect(format!("http://{}", self.endpoint)).await?);
Ok(())
pub async fn connection(&mut self) -> Result<&mut Inner, DigitalAssetError> {
if self.inner.is_none() {
let url = format!("http://{}", self.endpoint);
let inner = Inner::connect(url).await?;
self.inner = Some(inner);
}
self.inner
.as_mut()
.ok_or_else(|| DigitalAssetError::FatalError("no connection".into()))
}
}

Expand All @@ -55,13 +63,7 @@ impl WalletClient for GrpcWalletClient {
state_root: &StateRoot,
next_committee: Vec<CommsPublicKey>,
) -> Result<(), DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let inner = self.connection().await?;

let request = CreateFollowOnAssetCheckpointRequest {
asset_public_key: asset_public_key.as_bytes().to_vec(),
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/core/src/services/asset_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
const LOG_TARGET: &str = "tari::dan_layer::core::services::asset_proxy";

#[async_trait]
pub trait AssetProxy {
pub trait AssetProxy: Send + Sync {
async fn invoke_method(
&self,
asset_public_key: &PublicKey,
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/core/src/services/base_node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
};

#[async_trait]
pub trait BaseNodeClient {
pub trait BaseNodeClient: Send + Sync {
async fn get_tip_info(&mut self) -> Result<BaseLayerMetadata, DigitalAssetError>;

async fn get_current_checkpoint(
Expand Down
22 changes: 8 additions & 14 deletions dan_layer/core/src/services/service_specification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,26 @@ use crate::{
/// This trait is intended to only include `types` and no methods.
pub trait ServiceSpecification: Default + Clone {
type Addr: NodeAddressable;
type AssetProcessor: AssetProcessor + Clone + Sync + Send + 'static;
type AssetProxy: AssetProxy + Clone + Sync + Send + 'static;
type BaseNodeClient: BaseNodeClient + Clone + Sync + Send + 'static;
type AssetProcessor: AssetProcessor + Clone;
type AssetProxy: AssetProxy + Clone;
type BaseNodeClient: BaseNodeClient + Clone;
type ChainDbBackendAdapter: ChainDbBackendAdapter;
type ChainStorageService: ChainStorageService<Self::Payload>;
type CheckpointManager: CheckpointManager<Self::Addr>;
type CommitteeManager: CommitteeManager<Self::Addr>;
type DbFactory: DbFactory<
StateDbBackendAdapter = Self::StateDbBackendAdapter,
ChainDbBackendAdapter = Self::ChainDbBackendAdapter,
> + Clone
+ Sync
+ Send
+ 'static;
> + Clone;
type EventsPublisher: EventsPublisher<ConsensusWorkerDomainEvent>;
type InboundConnectionService: InboundConnectionService<Addr = Self::Addr, Payload = Self::Payload>
+ 'static
+ Send
+ Sync;
type MempoolService: MempoolService + Clone + Sync + Send + 'static;
type InboundConnectionService: InboundConnectionService<Addr = Self::Addr, Payload = Self::Payload>;
type MempoolService: MempoolService + Clone;
type OutboundService: OutboundService<Addr = Self::Addr, Payload = Self::Payload>;
type Payload: Payload;
type PayloadProcessor: PayloadProcessor<Self::Payload>;
type PayloadProvider: PayloadProvider<Self::Payload>;
type SigningService: SigningService<Self::Addr>;
type StateDbBackendAdapter: StateDbBackendAdapter;
type ValidatorNodeClientFactory: ValidatorNodeClientFactory<Addr = Self::Addr> + Clone + Sync + Send + 'static;
type WalletClient: WalletClient + Clone + Sync + Send + 'static;
type ValidatorNodeClientFactory: ValidatorNodeClientFactory<Addr = Self::Addr> + Clone;
type WalletClient: WalletClient + Clone;
}
6 changes: 3 additions & 3 deletions dan_layer/core/src/services/validator_node_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ use crate::{
services::infrastructure_services::NodeAddressable,
};

pub trait ValidatorNodeClientFactory {
pub trait ValidatorNodeClientFactory: Send + Sync {
type Addr: NodeAddressable;
type Client: ValidatorNodeRpcClient + Sync + Send;
type Client: ValidatorNodeRpcClient;
fn create_client(&self, address: &Self::Addr) -> Self::Client;
}

#[async_trait]
pub trait ValidatorNodeRpcClient {
pub trait ValidatorNodeRpcClient: Send + Sync {
async fn invoke_read_method(
&mut self,
asset_public_key: &PublicKey,
Expand Down
18 changes: 9 additions & 9 deletions dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
// }

fn commit(&mut self) -> Result<(), StorageError> {
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write()?;
let tx = inner
.backend_adapter
.create_transaction()
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
}

fn add_node(&mut self, hash: TreeNodeHash, parent: TreeNodeHash, height: u32) -> Result<(), StorageError> {
self.inner.write().unwrap().nodes.push((
self.inner.write()?.nodes.push((
None,
UnitOfWorkTracker::new(
DbNode {
Expand All @@ -159,15 +159,15 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
}

fn add_instruction(&mut self, node_hash: TreeNodeHash, instruction: Instruction) -> Result<(), StorageError> {
self.inner.write().unwrap().instructions.push((
self.inner.write()?.instructions.push((
None,
UnitOfWorkTracker::new(DbInstruction { node_hash, instruction }, true),
));
Ok(())
}

fn get_locked_qc(&mut self) -> Result<QuorumCertificate, StorageError> {
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write()?;

if let Some(locked_qc) = &inner.locked_qc {
let locked_qc = locked_qc.get();
Expand Down Expand Up @@ -197,7 +197,7 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
}

fn set_locked_qc(&mut self, qc: &QuorumCertificate) -> Result<(), StorageError> {
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write()?;

if let Some(locked_qc) = &inner.locked_qc.as_ref() {
let mut locked_qc = locked_qc.get_mut();
Expand Down Expand Up @@ -230,7 +230,7 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
}

fn get_prepare_qc(&mut self) -> Result<Option<QuorumCertificate>, StorageError> {
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write()?;

if let Some(prepare_qc) = &inner.prepare_qc {
let prepare_qc = prepare_qc.get();
Expand Down Expand Up @@ -265,7 +265,7 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
fn set_prepare_qc(&mut self, qc: &QuorumCertificate) -> Result<(), StorageError> {
// put it in the tracker
let _ = self.get_prepare_qc()?;
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write()?;
match inner.prepare_qc.as_mut() {
None => {
inner.prepare_qc = Some(UnitOfWorkTracker::new(
Expand All @@ -291,15 +291,15 @@ impl<TBackendAdapter: ChainDbBackendAdapter> ChainDbUnitOfWork for ChainDbUnitOf
}

fn commit_node(&mut self, node_hash: &TreeNodeHash) -> Result<(), StorageError> {
let mut inner = self.inner.write().unwrap();
let mut inner = self.inner.write()?;
let found_node = inner.find_proposed_node(node_hash)?;
let mut node = found_node.1.get_mut();
node.is_committed = true;
Ok(())
}

fn get_tip_node(&self) -> Result<Option<Node>, StorageError> {
let inner = self.inner.read().unwrap();
let inner = self.inner.read()?;
inner.get_tip_node()
}
}
Expand Down
10 changes: 9 additions & 1 deletion dan_layer/core/src/storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::io;
use std::{io, sync::PoisonError};

use lmdb_zero as lmdb;
use tari_mmr::error::MerkleMountainRangeError;
Expand Down Expand Up @@ -52,4 +52,12 @@ pub enum StorageError {
MerkleMountainRangeError(#[from] MerkleMountainRangeError),
#[error("General storage error: {details}")]
General { details: String },
#[error("Lock error")]
LockError,
}

impl<T> From<PoisonError<T>> for StorageError {
fn from(_err: PoisonError<T>) -> Self {
Self::LockError
}
}
Loading

0 comments on commit 9279de5

Please sign in to comment.