Skip to content

Commit

Permalink
feat: remove spawn blocking calls from wallet db (contacts service) (#…
Browse files Browse the repository at this point in the history
…4575)

Description
---
- Removed spawn blocking calls for db operations from the wallet in the contacts service. (This is another PR in a couple of PRs required to implement this fully throughout the wallet code.)
- Reset the wallet's default db connection pool size back to 16 (from 5).

Motivation and Context
---
As per #3982 and #4555

How Has This Been Tested?
---
Unit tests
Cucumber tests
System-level test
  • Loading branch information
hansieodendaal committed Aug 31, 2022
1 parent 0b2a155 commit 7464581
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 61 deletions.
2 changes: 1 addition & 1 deletion base_layer/wallet/src/config.rs
Expand Up @@ -136,7 +136,7 @@ impl Default for WalletConfig {
base_node_service_config: Default::default(),
data_dir: PathBuf::from_str("data/wallet").unwrap(),
db_file: PathBuf::from_str("db/console_wallet.db").unwrap(),
db_connection_pool_size: 5, // TODO: get actual default
db_connection_pool_size: 16, // Note: Do not reduce this default number
password: None,
contacts_auto_ping_interval: Duration::from_secs(30),
contacts_online_ping_window: 30,
Expand Down
23 changes: 11 additions & 12 deletions base_layer/wallet/src/contacts_service/service.rs
Expand Up @@ -148,7 +148,7 @@ where T: ContactsBackend + 'static
pin_mut!(shutdown);

// Add all contacts as monitored peers to the liveness service
let result = self.db.get_contacts().await;
let result = self.db.get_contacts();
if let Ok(ref contacts) = result {
self.add_contacts_to_liveness_service(contacts).await?;
}
Expand Down Expand Up @@ -195,14 +195,14 @@ where T: ContactsBackend + 'static
) -> Result<ContactsServiceResponse, ContactsServiceError> {
match request {
ContactsServiceRequest::GetContact(pk) => {
let result = self.db.get_contact(pk.clone()).await;
let result = self.db.get_contact(pk.clone());
if let Ok(ref contact) = result {
self.liveness.check_add_monitored_peer(contact.node_id.clone()).await?;
};
Ok(result.map(ContactsServiceResponse::Contact)?)
},
ContactsServiceRequest::UpsertContact(c) => {
self.db.upsert_contact(c.clone()).await?;
self.db.upsert_contact(c.clone())?;
self.liveness.check_add_monitored_peer(c.node_id).await?;
info!(
target: LOG_TARGET,
Expand All @@ -211,7 +211,7 @@ where T: ContactsBackend + 'static
Ok(ContactsServiceResponse::ContactSaved)
},
ContactsServiceRequest::RemoveContact(pk) => {
let result = self.db.remove_contact(pk.clone()).await?;
let result = self.db.remove_contact(pk.clone())?;
self.liveness
.check_remove_monitored_peer(result.node_id.clone())
.await?;
Expand All @@ -222,7 +222,7 @@ where T: ContactsBackend + 'static
Ok(ContactsServiceResponse::ContactRemoved(result))
},
ContactsServiceRequest::GetContacts => {
let result = self.db.get_contacts().await;
let result = self.db.get_contacts();
if let Ok(ref contacts) = result {
self.add_contacts_to_liveness_service(contacts).await?;
}
Expand Down Expand Up @@ -254,11 +254,11 @@ where T: ContactsBackend + 'static
match event {
// Received a ping, check if it contains ContactsLiveness
LivenessEvent::ReceivedPing(event) => {
self.update_with_ping_pong(event, ContactMessageType::Ping).await?;
self.update_with_ping_pong(event, ContactMessageType::Ping)?;
},
// Received a pong, check if our neighbour sent it and it contains ContactsLiveness
LivenessEvent::ReceivedPong(event) => {
self.update_with_ping_pong(event, ContactMessageType::Pong).await?;
self.update_with_ping_pong(event, ContactMessageType::Pong)?;
},
// New ping round has begun
LivenessEvent::PingRoundBroadcast(num_peers) => {
Expand All @@ -277,7 +277,7 @@ where T: ContactsBackend + 'static
self.resize_contacts_liveness_data_buffer(*num_peers);

// Update offline status
if let Ok(contacts) = self.db.get_contacts().await {
if let Ok(contacts) = self.db.get_contacts() {
for contact in contacts {
let online_status = self.get_online_status(&contact).await?;
if online_status == ContactOnlineStatus::Online {
Expand Down Expand Up @@ -332,7 +332,7 @@ where T: ContactsBackend + 'static
Utc::now().naive_utc().sub(last_seen) <= ping_window
}

async fn update_with_ping_pong(
fn update_with_ping_pong(
&mut self,
event: &PingPongEvent,
message_type: ContactMessageType,
Expand All @@ -356,15 +356,14 @@ where T: ContactsBackend + 'static
}
let this_public_key = self
.db
.update_contact_last_seen(&event.node_id, last_seen.naive_utc(), latency)
.await?;
.update_contact_last_seen(&event.node_id, last_seen.naive_utc(), latency)?;

let data = ContactsLivenessData::new(
this_public_key,
event.node_id.clone(),
latency,
Some(last_seen.naive_utc()),
message_type.clone(),
message_type,
ContactOnlineStatus::Online,
);
self.liveness_data.push(data.clone());
Expand Down
68 changes: 22 additions & 46 deletions base_layer/wallet/src/contacts_service/storage/database.rs
Expand Up @@ -118,65 +118,46 @@ where T: ContactsBackend + 'static
Self { db: Arc::new(db) }
}

pub async fn get_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
pub fn get_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
let db_clone = self.db.clone();
tokio::task::spawn_blocking(move || fetch!(db_clone, pub_key.clone(), Contact))
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)
fetch!(db_clone, pub_key, Contact)
}

pub async fn get_contacts(&self) -> Result<Vec<Contact>, ContactsServiceStorageError> {
pub fn get_contacts(&self) -> Result<Vec<Contact>, ContactsServiceStorageError> {
let db_clone = self.db.clone();

let c = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::Contacts) {
match db_clone.fetch(&DbKey::Contacts) {
Ok(None) => log_error(
DbKey::Contacts,
ContactsServiceStorageError::UnexpectedResult("Could not retrieve contacts".to_string()),
),
Ok(Some(DbValue::Contacts(c))) => Ok(c),
Ok(Some(other)) => unexpected_result(DbKey::Contacts, other),
Err(e) => log_error(DbKey::Contacts, e),
})
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(c)
}
}

pub async fn upsert_contact(&self, contact: Contact) -> Result<(), ContactsServiceStorageError> {
let db_clone = self.db.clone();

tokio::task::spawn_blocking(move || {
db_clone.write(WriteOperation::Upsert(Box::new(DbKeyValuePair::Contact(
contact.public_key.clone(),
contact,
))))
})
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))??;
pub fn upsert_contact(&self, contact: Contact) -> Result<(), ContactsServiceStorageError> {
self.db.write(WriteOperation::Upsert(Box::new(DbKeyValuePair::Contact(
contact.public_key.clone(),
contact,
))))?;
Ok(())
}

pub async fn update_contact_last_seen(
pub fn update_contact_last_seen(
&self,
node_id: &NodeId,
last_seen: NaiveDateTime,
latency: Option<u32>,
) -> Result<CommsPublicKey, ContactsServiceStorageError> {
let db_clone = self.db.clone();
let node_id_clone = node_id.clone();

let result = tokio::task::spawn_blocking(move || {
db_clone.write(WriteOperation::UpdateLastSeen(Box::new(DbKeyValuePair::LastSeen(
node_id_clone,
let result = self
.db
.write(WriteOperation::UpdateLastSeen(Box::new(DbKeyValuePair::LastSeen(
node_id.clone(),
last_seen,
latency.map(|val| val as i32),
))))
})
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::ContactId(node_id.clone())))?;
))))?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::ContactId(node_id.clone())))?;
match result {
DbValue::PublicKey(k) => Ok(*k),
_ => Err(ContactsServiceStorageError::UnexpectedResult(
Expand All @@ -185,16 +166,11 @@ where T: ContactsBackend + 'static
}
}

pub async fn remove_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
let db_clone = self.db.clone();
let pub_key_clone = pub_key.clone();
let result =
tokio::task::spawn_blocking(move || db_clone.write(WriteOperation::Remove(DbKey::Contact(pub_key_clone))))
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::Contact(pub_key.clone())))?;

pub fn remove_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
let result = self
.db
.write(WriteOperation::Remove(DbKey::Contact(pub_key.clone())))?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::Contact(pub_key.clone())))?;
match result {
DbValue::Contact(c) => Ok(*c),
DbValue::Contacts(_) | DbValue::PublicKey(_) => Err(ContactsServiceStorageError::UnexpectedResult(
Expand Down
4 changes: 2 additions & 2 deletions common/config/presets/d_console_wallet.toml
Expand Up @@ -32,8 +32,8 @@
# DO NOT EVER DELETE THIS FILE unless you (a) have backed up your seed phrase and (b) know what you are doing!
#db_file = "db/console_wallet.db"

# The main wallet db sqlite database backend connection pool size for concurrent reads (default = 5)
#db_connection_pool_size = 5
# The main wallet db sqlite database backend connection pool size for concurrent reads (default = 16)
#db_connection_pool_size = 16

# Console wallet password. Should you wish to start your console wallet without typing in your password, the following
# options are available:
Expand Down

0 comments on commit 7464581

Please sign in to comment.