Skip to content

Commit

Permalink
feat: publish to non-subscribed topic
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 7, 2023
1 parent f035dec commit 4831fb5
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 33 deletions.
2 changes: 1 addition & 1 deletion sn_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ mod spends;
pub use self::{
event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver},
log_markers::Marker,
node::{NodeBuilder, NodeCmd},
node::{ROYALTY_TRANSFER_NOTIF_TOPIC, NodeBuilder, NodeCmd},
};

use crate::error::{Error, Result};
Expand Down
12 changes: 5 additions & 7 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use tokio::{
task::spawn,
};

/// Expected topic name where notifications of transfers are sent on.
/// Expected topic name where notifications of royalty transfers are sent on.
/// The notification msg is expected to contain the serialised public key, followed by the
/// serialised transfer info encrypted against the referenced public key.
pub(super) const TRANSFER_NOTIF_TOPIC: &str = "TRANSFER_NOTIFICATION";
pub const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION";

/// Interval to trigger replication on a random close_group peer
const PERIODIC_REPLICATION_INTERVAL: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -141,10 +141,6 @@ impl NodeBuilder {

// Run the node
node.run(swarm_driver, network_event_receiver);
// subscribe to receive transfer notifications over gossipsub topic TRANSFER_NOTIF_TOPIC
running_node
.subscribe_to_topic(TRANSFER_NOTIF_TOPIC.to_string())
.map(|()| info!("Node has been subscribed to gossipsub topic '{TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."))?;

Ok(running_node)
}
Expand Down Expand Up @@ -401,8 +397,10 @@ impl Node {
}
NetworkEvent::GossipsubMsgReceived { topic, msg }
| NetworkEvent::GossipsubMsgPublished { topic, msg } => {
trace!("Received a gossip msg for the topic of {topic}");

if self.events_channel.receiver_count() > 0 {
if topic == TRANSFER_NOTIF_TOPIC {
if topic == ROYALTY_TRANSFER_NOTIF_TOPIC {
// this is expected to be a notification of a transfer which we treat specially,
// and we try to decode it only if it's referring to a PK the user is interested in
if let Some(filter_pk) = self.transfer_notifs_filter {
Expand Down
7 changes: 4 additions & 3 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use crate::{
node::Node,
node::TRANSFER_NOTIF_TOPIC,
node::ROYALTY_TRANSFER_NOTIF_TOPIC,
spends::{aggregate_spends, check_parent_spends},
Marker,
};
Expand Down Expand Up @@ -534,7 +534,8 @@ impl Node {
));
}

// publish a notification over gossipsub topic TRANSFER_NOTIF_TOPIC for the network royalties payment.
// publish a notification over gossipsub topic ROYALTY_TRANSFER_NOTIF_TOPIC
// for the network royalties payment.
let royalties_pk = *NETWORK_ROYALTIES_PK;
trace!("Publishing a royalties transfer notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}");
let royalties_pk_bytes = royalties_pk.to_bytes();
Expand All @@ -547,7 +548,7 @@ impl Node {
match bincode::serialize_into(&mut msg, &royalties_transfers) {
Ok(_) => {
let msg = msg.into_inner().freeze();
if let Err(err) = self.network.publish_on_topic(TRANSFER_NOTIF_TOPIC.to_string(), msg) {
if let Err(err) = self.network.publish_on_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), msg) {
debug!("Failed to publish a network royalties payment notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}: {err:?}");
}
}
Expand Down
72 changes: 51 additions & 21 deletions sn_node/tests/nodes_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ mod common;
use crate::common::{get_client_and_wallet, random_content};
use sn_client::WalletClient;
use sn_logging::LogBuilder;
use sn_networking::CLOSE_GROUP_SIZE;
use sn_node::NodeEvent;
use sn_node::{ROYALTY_TRANSFER_NOTIF_TOPIC, NodeEvent};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, NodeEventsRequest, TransferNotifsFilterRequest,
safe_node_client::SafeNodeClient, GossipsubSubscribeRequest, NodeEventsRequest,
TransferNotifsFilterRequest,
};
use sn_transfers::{
LocalWallet, NanoTokens, NETWORK_ROYALTIES_AMOUNT_PER_ADDR, NETWORK_ROYALTIES_PK,
Expand Down Expand Up @@ -120,11 +120,17 @@ async fn nodes_rewards_for_chunks_notifs_over_gossipsub() -> Result<()> {
paying_wallet_dir.to_path_buf(),
chunks_dir.path().to_path_buf(),
)?;
let num_of_chunks = chunks.len();

println!("Paying for {num_of_chunks} random addresses...");

println!("Paying for {} random addresses...", chunks.len());
let royalties_pk = NETWORK_ROYALTIES_PK.public_key();
let handle =
spawn_royalties_payment_listener("https://127.0.0.1:12001".to_string(), royalties_pk, true);
let handle = spawn_royalties_payment_listener(
"https://127.0.0.1:12001".to_string(),

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
royalties_pk,
true,
chunks.len(),
);

let _cost = files_api
.pay_and_upload_bytes_test(*content_addr.xorname(), chunks)
Expand All @@ -134,7 +140,7 @@ async fn nodes_rewards_for_chunks_notifs_over_gossipsub() -> Result<()> {

let count = handle.await??;
println!("Number of notifications received by node: {count}");
assert_eq!(count, CLOSE_GROUP_SIZE, "Not enough notifications received");
assert_eq!(count, num_of_chunks, "Not enough notifications received");

Ok(())
}
Expand All @@ -154,9 +160,14 @@ async fn nodes_rewards_for_register_notifs_over_gossipsub() -> Result<()> {
let register_addr = XorName::random(&mut rng);

println!("Paying for random Register address {register_addr:?} ...");

let royalties_pk = NETWORK_ROYALTIES_PK.public_key();
let handle =
spawn_royalties_payment_listener("https://127.0.0.1:12001".to_string(), royalties_pk, true);
let handle = spawn_royalties_payment_listener(
"https://127.0.0.1:12001".to_string(),

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
royalties_pk,
true,
1,
);

let _register = client
.create_and_pay_for_register(register_addr, &mut wallet_client, true)
Expand All @@ -165,7 +176,7 @@ async fn nodes_rewards_for_register_notifs_over_gossipsub() -> Result<()> {

let count = handle.await??;
println!("Number of notifications received by node: {count}");
assert_eq!(count, CLOSE_GROUP_SIZE, "Not enough notifications received");
assert_eq!(count, 1, "Not enough notifications received");

Ok(())
}
Expand All @@ -186,22 +197,34 @@ async fn nodes_rewards_transfer_notifs_filter() -> Result<()> {
paying_wallet_dir.to_path_buf(),
chunks_dir.path().to_path_buf(),
)?;
let num_of_chunks = chunks.len();

// this node shall receive the notifications since we set the correct royalties pk as filter
let royalties_pk = NETWORK_ROYALTIES_PK.public_key();
let handle_1 =
spawn_royalties_payment_listener("https://127.0.0.1:12001".to_string(), royalties_pk, true);
let handle_1 = spawn_royalties_payment_listener(
"https://127.0.0.1:12001".to_string(),

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
royalties_pk,
true,
num_of_chunks,
);
// this other node shall *not* receive any notification since we set the wrong pk as filter
let random_pk = SecretKey::random().public_key();
let handle_2 =
spawn_royalties_payment_listener("https://127.0.0.1:12002".to_string(), random_pk, true);
let handle_2 = spawn_royalties_payment_listener(
"https://127.0.0.1:12002".to_string(),

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
random_pk,
true,
num_of_chunks,
);
// this other node shall *not* receive any notification either since we don't set any pk as filter
let handle_3 = spawn_royalties_payment_listener(
"https://127.0.0.1:12003".to_string(),
royalties_pk,
false,
num_of_chunks,
);

println!("Paying for {num_of_chunks} chunks");

let _cost = files_api
.pay_and_upload_bytes_test(*content_addr.xorname(), chunks)
.await?;
Expand All @@ -213,10 +236,7 @@ async fn nodes_rewards_transfer_notifs_filter() -> Result<()> {
let count_3 = handle_3.await??;
println!("Number of notifications received by node #3: {count_3}");

assert_eq!(
count_1, CLOSE_GROUP_SIZE,
"Not enough notifications received"
);
assert_eq!(count_1, num_of_chunks, "Not enough notifications received");
assert_eq!(count_2, 0, "Notifications were not expected");
assert_eq!(count_3, 0, "Notifications were not expected");

Expand Down Expand Up @@ -271,17 +291,26 @@ fn current_rewards_balance() -> Result<NanoTokens> {
fn spawn_royalties_payment_listener(
endpoint: String,
royalties_pk: PublicKey,
set_fiter: bool,
set_filter: bool,
expected_cout: usize,
) -> JoinHandle<Result<usize, eyre::Report>> {
tokio::spawn(async move {
let mut rpc_client = SafeNodeClient::connect(endpoint).await?;
if set_fiter {

if set_filter {
let _ = rpc_client
.transfer_notifs_filter(Request::new(TransferNotifsFilterRequest {
pk: royalties_pk.to_bytes().to_vec(),
}))
.await?;
}

let _ = rpc_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest {
topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(),
}))
.await?;

let response = rpc_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;
Expand All @@ -298,7 +327,8 @@ fn spawn_royalties_payment_listener(
println!("Transfer notif received for key {key:?}");
if key == royalties_pk {
count += 1;
if count == CLOSE_GROUP_SIZE {
println!("Received {count}/{expected_cout} royalty notifs so far");
if count == expected_cout {
break;
}
}
Expand Down
10 changes: 9 additions & 1 deletion sn_node_rpc_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use color_eyre::eyre::{eyre, Result};
use libp2p::{Multiaddr, PeerId};
use sn_client::Client;
use sn_logging::LogBuilder;
use sn_node::NodeEvent;
use sn_node::{ROYALTY_TRANSFER_NOTIF_TOPIC, NodeEvent};
use sn_peers_acquisition::{parse_peers_args, PeersArgs};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest,
Expand Down Expand Up @@ -242,11 +242,19 @@ pub async fn transfers_events(
};
let endpoint = format!("https://{addr}");
let mut node_client = SafeNodeClient::connect(endpoint).await?;

let _ = node_client
.transfer_notifs_filter(Request::new(TransferNotifsFilterRequest {
pk: pk.to_bytes().to_vec(),
}))
.await?;

let _ = node_client
.subscribe_to_topic(Request::new(GossipsubSubscribeRequest {
topic: ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(),
}))
.await?;

let response = node_client
.node_events(Request::new(NodeEventsRequest {}))
.await?;
Expand Down

0 comments on commit 4831fb5

Please sign in to comment.