Skip to content

Commit

Permalink
feat: side load chat (#6042)
Browse files Browse the repository at this point in the history
Description
---
Chat, and the Chat FFI were designed to operate as an independent
functioning node on the network. It brings up it's own p2p and comms
stack, and has it's own identity.

This is great but wallets like Aurora already have this functionality,
and currently to run chat it operates two entire stacks. In the case of
having a wallet already we don't need to run two entire stacks. We can
utilize the wallet resources and sideload chat from an existing node.

Motivation and Context
---
Share resources, lower memory usage, increase network stability, make
chat easier to utilize within Aurora.

How Has This Been Tested?
---
CI

What process can a PR reviewer use to test or verify this change?
---
Inspect the FFI changes for proper error handling and returns.


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify
  • Loading branch information
brianp committed Dec 14, 2023
1 parent c37a0a8 commit d729c45
Show file tree
Hide file tree
Showing 14 changed files with 413 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions base_layer/chat_ffi/chat.h
Expand Up @@ -18,6 +18,8 @@ struct Confirmation;

struct ContactsLivenessData;

struct ContactsServiceHandle;

struct ConversationalistsVector;

struct Message;
Expand Down Expand Up @@ -71,6 +73,37 @@ struct ChatClient *create_chat_client(struct ApplicationConfig *config,
CallbackDeliveryConfirmationReceived callback_delivery_confirmation_received,
CallbackReadConfirmationReceived callback_read_confirmation_received);

/**
* Side loads a chat client
*
* ## Arguments
* `config` - The ApplicationConfig pointer
* `contacts_handler` - A pointer to a contacts handler extracted from the wallet ffi
* `error_out` - Pointer to an int which will be modified
* `callback_contact_status_change` - A callback function pointer. this is called whenever a
* contacts liveness event comes in.
* `callback_message_received` - A callback function pointer. This is called whenever a chat
* message is received.
* `callback_delivery_confirmation_received` - A callback function pointer. This is called when the
* client receives a confirmation of message delivery.
* `callback_read_confirmation_received` - A callback function pointer. This is called when the
* client receives a confirmation of message read.
*
* ## Returns
* `*mut ChatClient` - Returns a pointer to a ChatClient, note that it returns ptr::null_mut()
* if any error was encountered or if the runtime could not be created.
*
* # Safety
* The ```destroy_chat_client``` method must be called when finished with a ClientFFI to prevent a memory leak
*/
struct ChatClient *sideload_chat_client(struct ApplicationConfig *config,
struct ContactsServiceHandle *contacts_handle,
int *error_out,
CallbackContactStatusChange callback_contact_status_change,
CallbackMessageReceived callback_message_received,
CallbackDeliveryConfirmationReceived callback_delivery_confirmation_received,
CallbackReadConfirmationReceived callback_read_confirmation_received);

/**
* Frees memory for a ChatClient
*
Expand Down
103 changes: 102 additions & 1 deletion base_layer/chat_ffi/src/lib.rs
Expand Up @@ -29,6 +29,7 @@ use libc::c_int;
use log::info;
use minotari_app_utilities::identity_management::setup_node_identity;
use tari_chat_client::{config::ApplicationConfig, networking::PeerFeatures, ChatClient as ChatClientTrait, Client};
use tari_contacts::contacts_service::handle::ContactsServiceHandle;
use tokio::runtime::Runtime;

use crate::{
Expand Down Expand Up @@ -148,9 +149,109 @@ pub unsafe extern "C" fn create_chat_client(
};

let mut client = Client::new(identity, (*config).clone());

if let Ok(()) = runtime.block_on(client.initialize()) {
let contacts_handler = match client.contacts.clone() {
Some(contacts_handler) => contacts_handler,
None => {
error =
LibChatError::from(InterfaceError::NullError("No contacts service loaded yet".to_string())).code;
ptr::swap(error_out, &mut error as *mut c_int);
return ptr::null_mut();
},
};

let mut callback_handler = CallbackHandler::new(
contacts_handler,
client.shutdown.to_signal(),
callback_contact_status_change,
callback_message_received,
callback_delivery_confirmation_received,
callback_read_confirmation_received,
);

runtime.spawn(async move {
callback_handler.start().await;
});
}

let client = ChatClient { client, runtime };

Box::into_raw(Box::new(client))
}

/// Side loads a chat client
///
/// ## Arguments
/// `config` - The ApplicationConfig pointer
/// `contacts_handler` - A pointer to a contacts handler extracted from the wallet ffi
/// `error_out` - Pointer to an int which will be modified
/// `callback_contact_status_change` - A callback function pointer. this is called whenever a
/// contacts liveness event comes in.
/// `callback_message_received` - A callback function pointer. This is called whenever a chat
/// message is received.
/// `callback_delivery_confirmation_received` - A callback function pointer. This is called when the
/// client receives a confirmation of message delivery.
/// `callback_read_confirmation_received` - A callback function pointer. This is called when the
/// client receives a confirmation of message read.
///
/// ## Returns
/// `*mut ChatClient` - Returns a pointer to a ChatClient, note that it returns ptr::null_mut()
/// if any error was encountered or if the runtime could not be created.
///
/// # Safety
/// The ```destroy_chat_client``` method must be called when finished with a ClientFFI to prevent a memory leak
#[no_mangle]
pub unsafe extern "C" fn sideload_chat_client(
config: *mut ApplicationConfig,
contacts_handle: *mut ContactsServiceHandle,
error_out: *mut c_int,
callback_contact_status_change: CallbackContactStatusChange,
callback_message_received: CallbackMessageReceived,
callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived,
callback_read_confirmation_received: CallbackReadConfirmationReceived,
) -> *mut ChatClient {
let mut error = 0;
ptr::swap(error_out, &mut error as *mut c_int);

if config.is_null() {
error = LibChatError::from(InterfaceError::NullError("config".to_string())).code;
ptr::swap(error_out, &mut error as *mut c_int);
return ptr::null_mut();
}

if let Some(log_path) = (*config).clone().chat_client.log_path {
init_logging(log_path, (*config).clone().chat_client.log_verbosity, error_out);

if error > 0 {
return ptr::null_mut();
}
}
info!(
target: LOG_TARGET,
"Sideloading Tari Chat FFI version: {}",
consts::APP_VERSION
);

let runtime = match Runtime::new() {
Ok(r) => r,
Err(e) => {
error = LibChatError::from(InterfaceError::TokioError(e.to_string())).code;
ptr::swap(error_out, &mut error as *mut c_int);
return ptr::null_mut();
},
};

if contacts_handle.is_null() {
error = LibChatError::from(InterfaceError::NullError("contacts_handle".to_string())).code;
ptr::swap(error_out, &mut error as *mut c_int);
return ptr::null_mut();
}

let mut client = Client::sideload((*config).clone(), (*contacts_handle).clone());
if let Ok(()) = runtime.block_on(client.initialize()) {
let mut callback_handler = CallbackHandler::new(
client.contacts.clone().expect("No contacts service loaded yet"),
(*contacts_handle).clone(),
client.shutdown.to_signal(),
callback_contact_status_change,
callback_message_received,
Expand Down
1 change: 1 addition & 0 deletions base_layer/contacts/src/chat_client/Cargo.toml
Expand Up @@ -26,5 +26,6 @@ config = { version = "0.13.0" }
diesel = { version = "2.0.3", features = ["sqlite", "r2d2", "serde_json", "chrono", "64-column-tables"] }
lmdb-zero = "0.4.4"
log = "0.4.17"
rand = "0.8"
serde = "1.0.136"
thiserror = "1.0.50"
58 changes: 39 additions & 19 deletions base_layer/contacts/src/chat_client/src/client.rs
Expand Up @@ -28,16 +28,17 @@ use std::{

use async_trait::async_trait;
use log::debug;
use rand::rngs::OsRng;
use tari_common_types::tari_address::TariAddress;
use tari_comms::{CommsNode, NodeIdentity};
use tari_comms::{peer_manager::PeerFeatures, CommsNode, NodeIdentity};
use tari_contacts::contacts_service::{
handle::ContactsServiceHandle,
service::ContactOnlineStatus,
types::{Message, MessageBuilder, MessageMetadata, MessageMetadataType},
};
use tari_shutdown::Shutdown;

use crate::{config::ApplicationConfig, error::Error, networking};
use crate::{config::ApplicationConfig, error::Error, networking, networking::Multiaddr};

const LOG_TARGET: &str = "contacts::chat_client";

Expand All @@ -51,7 +52,7 @@ pub trait ChatClient {
async fn send_message(&self, message: Message) -> Result<(), Error>;
async fn send_read_receipt(&self, message: Message) -> Result<(), Error>;
async fn get_conversationalists(&self) -> Result<Vec<TariAddress>, Error>;
fn identity(&self) -> &NodeIdentity;
fn address(&self) -> TariAddress;
fn shutdown(&mut self);
}

Expand Down Expand Up @@ -88,26 +89,45 @@ impl Client {
}
}

pub async fn initialize(&mut self) -> Result<(), Error> {
debug!(target: LOG_TARGET, "initializing chat");
pub fn sideload(config: ApplicationConfig, contacts: ContactsServiceHandle) -> Self {
// Create a placeholder ID. It won't be written or used when sideloaded.
let identity = Arc::new(NodeIdentity::random(
&mut OsRng,
Multiaddr::empty(),
PeerFeatures::COMMUNICATION_NODE,
));

let signal = self.shutdown.to_signal();
Self {
config,
contacts: Some(contacts),
identity,
shutdown: Shutdown::new(),
}
}

let (contacts, comms_node) = networking::start(self.identity.clone(), self.config.clone(), signal)
.await
.map_err(|e| Error::InitializationError(e.to_string()))?;
pub async fn initialize(&mut self) -> Result<(), Error> {
debug!(target: LOG_TARGET, "initializing chat");

if !self.config.peer_seeds.peer_seeds.is_empty() {
loop {
debug!(target: LOG_TARGET, "Waiting for peer connections...");
match wait_for_connectivity(comms_node.clone()).await {
Ok(_) => break,
Err(e) => debug!(target: LOG_TARGET, "{}. Still waiting...", e),
// Only run the networking if we're operating as a standalone client. If we're sideloading we can skip all this
if self.contacts.is_none() {
let signal = self.shutdown.to_signal();

let (contacts, comms_node) = networking::start(self.identity.clone(), self.config.clone(), signal)
.await
.map_err(|e| Error::InitializationError(e.to_string()))?;

if !self.config.peer_seeds.peer_seeds.is_empty() {
loop {
debug!(target: LOG_TARGET, "Waiting for peer connections...");
match wait_for_connectivity(comms_node.clone()).await {
Ok(_) => break,
Err(e) => debug!(target: LOG_TARGET, "{}. Still waiting...", e),
}
}
}
}

self.contacts = Some(contacts);
self.contacts = Some(contacts);
}

debug!(target: LOG_TARGET, "Connections established");

Expand All @@ -121,8 +141,8 @@ impl Client {

#[async_trait]
impl ChatClient for Client {
fn identity(&self) -> &NodeIdentity {
&self.identity
fn address(&self) -> TariAddress {
TariAddress::from_public_key(self.identity.public_key(), self.config.chat_client.network)
}

fn shutdown(&mut self) {
Expand Down

0 comments on commit d729c45

Please sign in to comment.