Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publish to non-subscribed topic #933

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
use url::Url;

const DEFAULT_RECEIVE_ONLINE_WALLET_DIR: &str = "receive_online";
const TRANSFER_NOTIF_TOPIC: &str = "TRANSFER_NOTIFICATION";
const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION";

// Please do not remove the blank lines in these doc comments.
// They are used for inserting line breaks when the help menu is rendered in the UI.
Expand Down Expand Up @@ -407,7 +407,7 @@ async fn listen_notifs_and_deposit(root_dir: &Path, client: &Client, sk: String)
let main_pk = wallet.address();
let pk = main_pk.public_key();

client.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())?;
client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())?;
let mut events_receiver = client.events_channel();

println!("Current balance in local wallet: {}", wallet.balance());
Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod spends;
pub use self::{
event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver},
log_markers::Marker,
node::{NodeBuilder, NodeCmd, TRANSFER_NOTIF_TOPIC},
node::{NodeBuilder, NodeCmd, ROYALTY_TRANSFER_NOTIF_TOPIC},
};

use crate::error::{Error, Result};
Expand Down
31 changes: 23 additions & 8 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use bytes::Bytes;
use libp2p::{autonat::NatStatus, identity::Keypair, Multiaddr};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::{
rngs::{OsRng, StdRng},
Rng, SeedableRng,
};
use sn_networking::{
MsgResponder, Network, NetworkBuilder, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE,
};
Expand All @@ -39,10 +42,13 @@ use tokio::{
task::spawn,
};

/// Expected topic name where notifications of transfers are sent on.
/// Expected topic name where notifications of royalty transfers are sent on.
/// The notification msg is expected to contain the serialised public key, followed by the
/// serialised transfer info encrypted against the referenced public key.
pub const TRANSFER_NOTIF_TOPIC: &str = "TRANSFER_NOTIFICATION";
pub const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION";

/// Defines the percentage (1/50) of node to act as royalty_transfer_notify forwarder.
const FORWARDER_CHOOSING_FACTOR: usize = 50;

/// Interval to trigger replication on a random close_group peer
const PERIODIC_REPLICATION_INTERVAL: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -143,10 +149,18 @@ impl NodeBuilder {

// Run the node
node.run(swarm_driver, network_event_receiver);
// subscribe to receive transfer notifications over gossipsub topic TRANSFER_NOTIF_TOPIC
running_node
.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())
.map(|()| info!("Node has been subscribed to gossipsub topic '{TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."))?;

// Having a portion of nodes (1/50) subscribe to the ROYALTY_TRANSFER_NOTIF_TOPIC
// Such nodes become `forwarder` to ensure the actual beneficary won't miss.
let index: usize = OsRng.gen_range(0..FORWARDER_CHOOSING_FACTOR);
if index == FORWARDER_CHOOSING_FACTOR / 2 {
trace!("Picked as a forwarding node to subscribe to the {ROYALTY_TRANSFER_NOTIF_TOPIC} topic");
// Forwarder only needs to forward topic msgs on libp2p level,
// i.e. no need to handle topic msgs, hence not a `listener`.
running_node
.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())
.map(|()| info!("Node has been subscribed to gossipsub topic '{ROYALTY_TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."))?;
}

Ok(running_node)
}
Expand Down Expand Up @@ -393,10 +407,11 @@ impl Node {
}
NetworkEvent::GossipsubMsgReceived { topic, msg }
| NetworkEvent::GossipsubMsgPublished { topic, msg } => {
trace!("Received a gossip msg for the topic of {topic}");
if self.events_channel.receiver_count() == 0 {
return;
}
if topic == TRANSFER_NOTIF_TOPIC {
if topic == ROYALTY_TRANSFER_NOTIF_TOPIC {
// this is expected to be a notification of a transfer which we treat specially,
// and we try to decode it only if it's referring to a PK the user is interested in
if let Some(filter_pk) = self.transfer_notifs_filter {
Expand Down
7 changes: 4 additions & 3 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use crate::{
node::Node,
node::TRANSFER_NOTIF_TOPIC,
node::ROYALTY_TRANSFER_NOTIF_TOPIC,
spends::{aggregate_spends, check_parent_spends},
Marker,
};
Expand Down Expand Up @@ -538,7 +538,8 @@ impl Node {
));
}

// publish a notification over gossipsub topic TRANSFER_NOTIF_TOPIC for the network royalties payment.
// publish a notification over gossipsub topic ROYALTY_TRANSFER_NOTIF_TOPIC
// for the network royalties payment.
let royalties_pk = *NETWORK_ROYALTIES_PK;
trace!("Publishing a royalties transfer notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}");
let royalties_pk_bytes = royalties_pk.to_bytes();
Expand All @@ -550,7 +551,7 @@ impl Node {
match royalties_cash_notes_r.serialize(&mut serialiser) {
Ok(()) => {
let msg = msg.into_inner().freeze();
if let Err(err) = self.network.publish_on_topic(TRANSFER_NOTIF_TOPIC.to_string(), msg) {
if let Err(err) = self.network.publish_on_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), msg) {
debug!("Failed to publish a network royalties payment notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}: {err:?}");
}
}
Expand Down
25 changes: 19 additions & 6 deletions sn_node/tests/nodes_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use crate::common::{
};
use sn_client::{Client, ClientEvent, WalletClient};
use sn_logging::LogBuilder;
use sn_node::{NodeEvent, TRANSFER_NOTIF_TOPIC};
use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, NodeEventsRequest, TransferNotifsFilterRequest,
safe_node_client::SafeNodeClient, GossipsubSubscribeRequest, NodeEventsRequest,
TransferNotifsFilterRequest,
};
use sn_transfers::{
CashNoteRedemption, LocalWallet, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK,
Expand Down Expand Up @@ -211,8 +212,10 @@ async fn nodes_rewards_transfer_notifs_filter() -> Result<()> {
// this other node shall *not* receive any notification either since we don't set any pk as filter
let handle_3 = spawn_royalties_payment_listener(node_rpc_addresses[2], royalties_pk, false);

sleep(Duration::from_secs(20)).await;

let num_of_chunks = chunks.len();
println!("Paying for {num_of_chunks} random addresses...");
println!("Paying for {num_of_chunks} chunks");
let (_, storage_cost, royalties_fees) = files_api
.pay_and_upload_bytes_test(*content_addr.xorname(), chunks, false)
.await?;
Expand Down Expand Up @@ -283,18 +286,26 @@ fn current_rewards_balance() -> Result<NanoTokens> {
fn spawn_royalties_payment_listener(
rpc_addr: SocketAddr,
royalties_pk: PublicKey,
set_fiter: bool,
set_filter: bool,
) -> JoinHandle<Result<usize, eyre::Report>> {
let endpoint = format!("https://{rpc_addr}");
tokio::spawn(async move {
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
if set_fiter {

if set_filter {
let _ = rpc_client
.transfer_notifs_filter(Request::new(TransferNotifsFilterRequest {
pk: royalties_pk.to_bytes().to_vec(),
}))
.await?;
}

let _ = rpc_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest {
topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(),
}))
.await?;

let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;
Expand All @@ -311,6 +322,7 @@ fn spawn_royalties_payment_listener(
println!("Transfer notif received for key {key:?}");
if key == royalties_pk {
count += 1;
println!("Received {count} royalty notifs so far");
}
}
Ok(_) => { /* ignored */ }
Expand All @@ -337,7 +349,8 @@ fn spawn_royalties_payment_client_listener(
let sk = SecretKey::from_hex(sn_transfers::GENESIS_CASHNOTE_SK)?;
let mut wallet = LocalWallet::load_from_path(&temp_dir, Some(MainSecretKey::new(sk)))?;
let royalties_pk = NETWORK_ROYALTIES_PK.public_key();
client.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())?;
client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())?;

let mut events_receiver = client.events_channel();

let handle = tokio::spawn(async move {
Expand Down
12 changes: 10 additions & 2 deletions sn_node_rpc_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use color_eyre::eyre::{eyre, Result};
use libp2p::Multiaddr;
use sn_client::Client;
use sn_logging::LogBuilder;
use sn_node::NodeEvent;
use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC};
use sn_peers_acquisition::{parse_peers_args, PeersArgs};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, NodeEventsRequest, TransferNotifsFilterRequest,
safe_node_client::SafeNodeClient, GossipsubSubscribeRequest, NodeEventsRequest,
TransferNotifsFilterRequest,
};
use sn_protocol::storage::SpendAddress;
use sn_transfers::{LocalWallet, MainPubkey, MainSecretKey};
Expand Down Expand Up @@ -236,6 +237,13 @@ pub async fn transfers_events(
pk: pk.to_bytes().to_vec(),
}))
.await?;

let _ = node_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest {
topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(),
}))
.await?;

let response = node_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;
Expand Down