Skip to content

Commit

Permalink
feat: most of nodes not subscribe to royalty_transfer topic
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 28, 2023
1 parent ff09ba9 commit 067017f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 22 deletions.
4 changes: 2 additions & 2 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
use url::Url;

const DEFAULT_RECEIVE_ONLINE_WALLET_DIR: &str = "receive_online";
const TRANSFER_NOTIF_TOPIC: &str = "TRANSFER_NOTIFICATION";
const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION";

// Please do not remove the blank lines in these doc comments.
// They are used for inserting line breaks when the help menu is rendered in the UI.
Expand Down Expand Up @@ -363,7 +363,7 @@ async fn listen_notifs_and_deposit(root_dir: &Path, client: &Client, sk: String)
let main_pk = wallet.address();
let pk = main_pk.public_key();

client.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())?;
client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())?;
let mut events_receiver = client.events_channel();

println!("Current balance in local wallet: {}", wallet.balance());
Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod spends;
pub use self::{
event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver},
log_markers::Marker,
node::{NodeBuilder, NodeCmd, TRANSFER_NOTIF_TOPIC},
node::{NodeBuilder, NodeCmd, ROYALTY_TRANSFER_NOTIF_TOPIC},
};

use crate::error::{Error, Result};
Expand Down
31 changes: 23 additions & 8 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use bytes::Bytes;
use libp2p::{autonat::NatStatus, identity::Keypair, Multiaddr};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::{
rngs::{OsRng, StdRng},
Rng, SeedableRng,
};
use sn_networking::{
MsgResponder, Network, NetworkBuilder, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE,
};
Expand All @@ -39,10 +42,13 @@ use tokio::{
task::spawn,
};

/// Expected topic name where notifications of transfers are sent on.
/// Expected topic name where notifications of royalty transfers are sent on.
/// The notification msg is expected to contain the serialised public key, followed by the
/// serialised transfer info encrypted against the referenced public key.
pub const TRANSFER_NOTIF_TOPIC: &str = "TRANSFER_NOTIFICATION";
pub const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION";

/// Defines the percentage (1/50) of node to act as royalty_transfer_notify forwarder.
const FORWARDER_CHOOSING_FACTOR: usize = 50;

/// Interval to trigger replication on a random close_group peer
const PERIODIC_REPLICATION_INTERVAL: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -143,10 +149,18 @@ impl NodeBuilder {

// Run the node
node.run(swarm_driver, network_event_receiver);
// subscribe to receive transfer notifications over gossipsub topic TRANSFER_NOTIF_TOPIC
running_node
.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())
.map(|()| info!("Node has been subscribed to gossipsub topic '{TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."))?;

// Having a portion of nodes (1/50) subscribe to the ROYALTY_TRANSFER_NOTIF_TOPIC
// Such nodes become `forwarder` to ensure the actual beneficary won't miss.
let index: usize = OsRng.gen_range(0..FORWARDER_CHOOSING_FACTOR);
if index == FORWARDER_CHOOSING_FACTOR / 2 {
trace!("Picked as a forwarding node to subscribe to the {ROYALTY_TRANSFER_NOTIF_TOPIC} topic");
// Forwarder only needs to forward topic msgs on libp2p level,
// i.e. no need to handle topic msgs, hence not a `listener`.
running_node
.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())
.map(|()| info!("Node has been subscribed to gossipsub topic '{ROYALTY_TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."))?;
}

Ok(running_node)
}
Expand Down Expand Up @@ -393,10 +407,11 @@ impl Node {
}
NetworkEvent::GossipsubMsgReceived { topic, msg }
| NetworkEvent::GossipsubMsgPublished { topic, msg } => {
trace!("Received a gossip msg for the topic of {topic}");
if self.events_channel.receiver_count() == 0 {
return;
}
if topic == TRANSFER_NOTIF_TOPIC {
if topic == ROYALTY_TRANSFER_NOTIF_TOPIC {
// this is expected to be a notification of a transfer which we treat specially,
// and we try to decode it only if it's referring to a PK the user is interested in
if let Some(filter_pk) = self.transfer_notifs_filter {
Expand Down
7 changes: 4 additions & 3 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use crate::{
node::Node,
node::TRANSFER_NOTIF_TOPIC,
node::ROYALTY_TRANSFER_NOTIF_TOPIC,
spends::{aggregate_spends, check_parent_spends},
Marker,
};
Expand Down Expand Up @@ -538,7 +538,8 @@ impl Node {
));
}

// publish a notification over gossipsub topic TRANSFER_NOTIF_TOPIC for the network royalties payment.
// publish a notification over gossipsub topic ROYALTY_TRANSFER_NOTIF_TOPIC
// for the network royalties payment.
let royalties_pk = *NETWORK_ROYALTIES_PK;
trace!("Publishing a royalties transfer notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}");
let royalties_pk_bytes = royalties_pk.to_bytes();
Expand All @@ -550,7 +551,7 @@ impl Node {
match royalties_cash_notes_r.serialize(&mut serialiser) {
Ok(()) => {
let msg = msg.into_inner().freeze();
if let Err(err) = self.network.publish_on_topic(TRANSFER_NOTIF_TOPIC.to_string(), msg) {
if let Err(err) = self.network.publish_on_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), msg) {
debug!("Failed to publish a network royalties payment notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}: {err:?}");
}
}
Expand Down
23 changes: 17 additions & 6 deletions sn_node/tests/nodes_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use crate::common::{
};
use sn_client::{Client, ClientEvent, WalletClient};
use sn_logging::LogBuilder;
use sn_node::{NodeEvent, TRANSFER_NOTIF_TOPIC};
use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, NodeEventsRequest, TransferNotifsFilterRequest,
safe_node_client::SafeNodeClient, GossipsubSubscribeRequest, NodeEventsRequest,
TransferNotifsFilterRequest,
};
use sn_transfers::{
CashNoteRedemption, LocalWallet, MainSecretKey, NanoTokens, NETWORK_ROYALTIES_PK,
Expand Down Expand Up @@ -212,7 +213,7 @@ async fn nodes_rewards_transfer_notifs_filter() -> Result<()> {
let handle_3 = spawn_royalties_payment_listener(node_rpc_addresses[2], royalties_pk, false);

let num_of_chunks = chunks.len();
println!("Paying for {num_of_chunks} random addresses...");
println!("Paying for {num_of_chunks} chunks");
let (_, storage_cost, royalties_fees) = files_api
.pay_and_upload_bytes_test(*content_addr.xorname(), chunks, false)
.await?;
Expand Down Expand Up @@ -283,18 +284,26 @@ fn current_rewards_balance() -> Result<NanoTokens> {
fn spawn_royalties_payment_listener(
rpc_addr: SocketAddr,
royalties_pk: PublicKey,
set_fiter: bool,
set_filter: bool,
) -> JoinHandle<Result<usize, eyre::Report>> {
let endpoint = format!("https://{rpc_addr}");
tokio::spawn(async move {
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
if set_fiter {

if set_filter {
let _ = rpc_client
.transfer_notifs_filter(Request::new(TransferNotifsFilterRequest {
pk: royalties_pk.to_bytes().to_vec(),
}))
.await?;
}

let _ = rpc_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest {
topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(),
}))
.await?;

let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;
Expand All @@ -311,6 +320,7 @@ fn spawn_royalties_payment_listener(
println!("Transfer notif received for key {key:?}");
if key == royalties_pk {
count += 1;
println!("Received {count} royalty notifs so far");
}
}
Ok(_) => { /* ignored */ }
Expand All @@ -337,7 +347,8 @@ fn spawn_royalties_payment_client_listener(
let sk = SecretKey::from_hex(sn_transfers::GENESIS_CASHNOTE_SK)?;
let mut wallet = LocalWallet::load_from_path(&temp_dir, Some(MainSecretKey::new(sk)))?;
let royalties_pk = NETWORK_ROYALTIES_PK.public_key();
client.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())?;
client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())?;

let mut events_receiver = client.events_channel();

let handle = tokio::spawn(async move {
Expand Down
12 changes: 10 additions & 2 deletions sn_node_rpc_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use color_eyre::eyre::{eyre, Result};
use libp2p::Multiaddr;
use sn_client::Client;
use sn_logging::LogBuilder;
use sn_node::NodeEvent;
use sn_node::{NodeEvent, ROYALTY_TRANSFER_NOTIF_TOPIC};
use sn_peers_acquisition::{parse_peers_args, PeersArgs};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, NodeEventsRequest, TransferNotifsFilterRequest,
safe_node_client::SafeNodeClient, GossipsubSubscribeRequest, NodeEventsRequest,
TransferNotifsFilterRequest,
};
use sn_protocol::storage::SpendAddress;
use sn_transfers::{LocalWallet, MainPubkey, MainSecretKey};
Expand Down Expand Up @@ -236,6 +237,13 @@ pub async fn transfers_events(
pk: pk.to_bytes().to_vec(),
}))
.await?;

let _ = node_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest {
topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(),
}))
.await?;

let response = node_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;
Expand Down

0 comments on commit 067017f

Please sign in to comment.