Skip to content

Commit

Permalink
feat(comms): tcp-only p2p protocol listener (#3127)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
<!--- Describe your changes in detail -->
- Allow an optional "auxilary" TCP listener to be configured on the base
  node, in addition to the usual transport.
- Allow the SOCKS transport to have a set of addresses that "bypass" the
  SOCKS proxy and protocol, and instead directly connect using TCP.
- Configuration for base node and wallet are hooked up for these.
- Remove `ConnectionManagerEvent::Listen` and `ConnectionManagerEvent::ListenFailed`
  events. These are replaced by a simpler 'wait_until_listening' call in
  the connection manager handle that waits for both listeners.
- Add `[app].[network].auxilary_tcp_listener_address` config item
- Add `[app].[network].tor_proxy_bypass_addresses` config item

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->
Allow wallet and base node to communicate directly

![image](https://user-images.githubusercontent.com/1057902/126602149-20b6eaa5-e92c-4a11-86df-e470fbbca6c3.png)

wallet:
```toml
 # set local address in the UI, or as a seed here:
peer_seeds = ["f27a6ae0321b471d1f65988ded21c48dc57a71edebb936837db1302a91ddc122::/dns4/my-basenode/tcp/9998"]
# Because the tor transport still (correctly) uses the tor network for DNS/IP addresses, we have to explicitly list which addresses to bypass. 
tor_proxy_bypass_addresses = ["/dns4/my-basenode/tcp/9998"]
```

base node:
```toml
auxilary_tcp_listener_address = "/ip4/127.0.0.1/tcp/9998"
```

The config will need to be updated when #3121 is merged

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
- New connection manager unit test
- Tested using configured console wallet and base node

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
aviator-app[bot] committed Jul 22, 2021
2 parents 162e98b + c399e99 commit 6fefd18
Show file tree
Hide file tree
Showing 34 changed files with 559 additions and 225 deletions.
4 changes: 4 additions & 0 deletions applications/tari_app_utilities/src/utilities.rs
Expand Up @@ -172,6 +172,7 @@ pub fn create_transport_type(config: &GlobalConfig) -> TransportType {
tor_socks_config: tor_socks_address.map(|proxy_address| SocksConfig {
proxy_address,
authentication: tor_socks_auth.map(convert_socks_authentication).unwrap_or_default(),
proxy_bypass_addresses: vec![],
}),
},
CommsTransport::TorHiddenService {
Expand All @@ -180,6 +181,7 @@ pub fn create_transport_type(config: &GlobalConfig) -> TransportType {
forward_address,
auth,
onion_port,
tor_proxy_bypass_addresses,
} => {
let identity = Some(&config.base_node_tor_identity_file)
.filter(|p| p.exists())
Expand Down Expand Up @@ -211,6 +213,7 @@ pub fn create_transport_type(config: &GlobalConfig) -> TransportType {
port_mapping: (onion_port, forward_addr).into(),
socks_address_override,
socks_auth: socks::Authentication::None,
tor_proxy_bypass_addresses,
})
},
CommsTransport::Socks5 {
Expand All @@ -221,6 +224,7 @@ pub fn create_transport_type(config: &GlobalConfig) -> TransportType {
socks_config: SocksConfig {
proxy_address,
authentication: convert_socks_authentication(auth),
proxy_bypass_addresses: vec![],
},
listener_address,
},
Expand Down
1 change: 1 addition & 0 deletions applications/tari_base_node/src/bootstrap.rs
Expand Up @@ -234,6 +234,7 @@ where B: BlockchainBackend + 'static
network: self.config.network,
node_identity: self.node_identity.clone(),
transport_type: create_transport_type(self.config),
auxilary_tcp_listener_address: self.config.auxilary_tcp_listener_address.clone(),
datastore_path: self.config.peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 100,
Expand Down
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/init/mod.rs
Expand Up @@ -320,6 +320,7 @@ pub async fn init_wallet(
node_identity,
user_agent: format!("tari/wallet/{}", env!("CARGO_PKG_VERSION")),
transport_type,
auxilary_tcp_listener_address: None,
datastore_path: config.console_wallet_peer_db_path.clone(),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 100,
Expand Down
18 changes: 14 additions & 4 deletions base_layer/p2p/src/initialization.rs
Expand Up @@ -44,6 +44,7 @@ use std::{
use tari_common::configuration::Network;
use tari_comms::{
backoff::ConstantBackoff,
multiaddr::Multiaddr,
peer_manager::{NodeIdentity, Peer, PeerFeatures, PeerManagerError},
pipeline,
pipeline::SinkService,
Expand Down Expand Up @@ -150,6 +151,10 @@ pub struct CommsConfig {
pub dns_seeds_name_server: SocketAddr,
/// All DNS seed records must pass DNSSEC validation
pub dns_seeds_use_dnssec: bool,
/// The address to bind on using the TCP transport _in addition to_ the primary transport. This is typically useful
/// for direct comms between a wallet and base node. If this is set to None, no listener will be bound.
/// Default: None
pub auxilary_tcp_listener_address: Option<Multiaddr>,
}

/// Initialize Tari Comms configured for tests
Expand Down Expand Up @@ -304,7 +309,8 @@ async fn initialize_hidden_service(
.with_socks_address_override(config.socks_address_override)
.with_socks_authentication(config.socks_auth)
.with_control_server_auth(config.control_server_auth)
.with_control_server_address(config.control_server_addr);
.with_control_server_address(config.control_server_addr)
.with_bypass_proxy_addresses(config.tor_proxy_bypass_addresses);

if let Some(identity) = config.identity {
builder = builder.with_tor_identity(*identity);
Expand Down Expand Up @@ -337,12 +343,16 @@ where
let listener_liveness_allowlist_cidrs = parse_cidrs(&config.listener_liveness_allowlist_cidrs)
.map_err(CommsInitializationError::InvalidLivenessCidrs)?;

let mut comms = builder
let builder = builder
.with_listener_liveness_max_sessions(config.listener_liveness_max_sessions)
.with_listener_liveness_allowlist_cidrs(listener_liveness_allowlist_cidrs)
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.with_peer_storage(peer_database, Some(file_lock))
.build()?;
.with_peer_storage(peer_database, Some(file_lock));

let mut comms = match config.auxilary_tcp_listener_address {
Some(ref addr) => builder.with_auxilary_tcp_listener_address(addr.clone()).build()?,
None => builder.build()?,
};

let peer_manager = comms.peer_manager();
let connectivity = comms.connectivity();
Expand Down
3 changes: 3 additions & 0 deletions base_layer/p2p/src/transport.rs
Expand Up @@ -58,6 +58,9 @@ pub struct TorConfig {
pub socks_address_override: Option<Multiaddr>,
/// Authentication for the Tor SOCKS5 proxy
pub socks_auth: socks::Authentication,
/// If the underlying SOCKS transport encounters these addresses, bypass the proxy and dial directly using the
/// TcpTransport
pub tor_proxy_bypass_addresses: Vec<Multiaddr>,
}

impl fmt::Display for TorConfig {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/testnet_utils.rs
Expand Up @@ -134,6 +134,7 @@ pub async fn create_wallet(
transport_type: TransportType::Memory {
listener_address: public_address,
},
auxilary_tcp_listener_address: None,
node_identity,
datastore_path: datastore_path.clone(),
peer_database_name: random::string(8),
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/tests/wallet/mod.rs
Expand Up @@ -101,6 +101,7 @@ async fn create_wallet(
transport_type: TransportType::Memory {
listener_address: node_identity.public_address(),
},
auxilary_tcp_listener_address: None,
datastore_path: data_path.to_path_buf(),
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 100,
Expand Down Expand Up @@ -675,6 +676,7 @@ async fn test_import_utxo() {
listener_address: "/ip4/127.0.0.1/tcp/0".parse().unwrap(),
tor_socks_config: None,
},
auxilary_tcp_listener_address: None,
datastore_path: temp_dir.path().to_path_buf(),
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 100,
Expand Down Expand Up @@ -767,6 +769,7 @@ async fn test_data_generation() {
transport_type: TransportType::Memory {
listener_address: node_id.public_address(),
},
auxilary_tcp_listener_address: None,
datastore_path: temp_dir.path().to_path_buf(),
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 100,
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Expand Up @@ -2486,6 +2486,7 @@ pub unsafe extern "C" fn transport_tor_create(
port_mapping: tor::PortMapping::new(tor_port, "127.0.0.1:0".parse().unwrap()),
socks_address_override: None,
socks_auth: authentication,
tor_proxy_bypass_addresses: vec![],
};
let transport = TariTransportType::Tor(tor_config);

Expand Down Expand Up @@ -2627,6 +2628,7 @@ pub unsafe extern "C" fn comms_config_create(
network: Network::Weatherwax,
node_identity: Arc::new(ni),
transport_type: (*transport_type).clone(),
auxilary_tcp_listener_address: None,
datastore_path,
peer_database_name: database_name_string,
max_concurrent_inbound_tasks: 100,
Expand Down
22 changes: 22 additions & 0 deletions common/config/presets/tari_config_example.toml
Expand Up @@ -204,6 +204,17 @@ tor_control_auth = "none" # or "password=xxxxxx"
#socks5_listener_address = "/ip4/127.0.0.1/tcp/18188"
#socks5_auth = "none" # or "username_password=username:xxxxxxx"

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
# - allowing wallets to locally connect to their base node, rather than through tor, when used in conjunction with `tor_proxy_bypass_addresses`
# - multiple P2P addresses, one public over DNS and one private over TOR
# - a "bridge" between TOR and TCP-only nodes
# auxilary_tcp_listener_address = "/ip4/127.0.0.1/tcp/9998"

# When these addresses are encountered when dialing another peer, the tor proxy is bypassed and the connection is made
# direcly over TCP. /ip4, /ip6, /dns, /dns4 and /dns6 are supported.
# tor_proxy_bypass_addresses = ["/dns4/my-foo-base-node/tcp/9998"]

########################################################################################################################
# #
# Base Node Configuration Options #
Expand Down Expand Up @@ -359,6 +370,17 @@ base_node_tor_identity_file = "config/base_node_tor.json"
# A path to the file that stores the console wallet's tor hidden service private key, if using the tor transport.
console_wallet_tor_identity_file = "config/console_wallet_tor.json"

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
# - allowing wallets to locally connect to their base node, rather than through tor, when used in conjunction with `tor_proxy_bypass_addresses`
# - multiple P2P addresses, one public over DNS and one private over TOR
# - a "bridge" between TOR and TCP-only nodes
# auxilary_tcp_listener_address = "/ip4/127.0.0.1/tcp/9998"

# When these addresses are encountered when dialing another peer, the tor proxy is bypassed and the connection is made
# direcly over TCP. /ip4, /ip6, /dns, /dns4 and /dns6 are supported.
# tor_proxy_bypass_addresses = ["/dns4/my-foo-base-node/tcp/9998"]

########################################################################################################################
# #
# Mempool Configuration Options #
Expand Down
26 changes: 26 additions & 0 deletions common/src/configuration/global.rs
Expand Up @@ -58,6 +58,7 @@ pub struct GlobalConfig {
pub autoupdate_hashes_sig_url: String,
pub network: Network,
pub comms_transport: CommsTransport,
pub auxilary_tcp_listener_address: Option<Multiaddr>,
pub allow_test_addresses: bool,
pub listnener_liveness_max_sessions: usize,
pub listener_liveness_allowlist_cidrs: Vec<String>,
Expand Down Expand Up @@ -301,6 +302,14 @@ fn convert_node_config(
// Transport
let comms_transport = network_transport_config(&cfg, application, &net_str)?;

let key = config_string("base_node", &net_str, "auxilary_tcp_listener_address");
let auxilary_tcp_listener_address = optional(cfg.get_str(&key))?
.map(|addr| {
addr.parse::<Multiaddr>()
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))
})
.transpose()?;

let key = config_string("base_node", &net_str, "allow_test_addresses");
let allow_test_addresses = cfg
.get_bool(&key)
Expand Down Expand Up @@ -650,6 +659,7 @@ fn convert_node_config(
autoupdate_hashes_sig_url,
network,
comms_transport,
auxilary_tcp_listener_address,
allow_test_addresses,
listnener_liveness_max_sessions: liveness_max_sessions,
listener_liveness_allowlist_cidrs: liveness_allowlist_cidrs,
Expand Down Expand Up @@ -803,6 +813,20 @@ fn network_transport_config(
.get::<NonZeroU16>(&key)
.map_err(|err| ConfigurationError::new(&key, &err.to_string()))?;

// TODO
let key = config_string(app_str, network, "tor_proxy_bypass_addresses");
let tor_proxy_bypass_addresses = optional(cfg.get_array(&key))?
.unwrap_or_default()
.into_iter()
.map(|v| {
v.into_str()
.map_err(|err| ConfigurationError::new(&key, &err.to_string()))
.and_then(|s| {
Multiaddr::from_str(&s).map_err(|err| ConfigurationError::new(&key, &err.to_string()))
})
})
.collect::<Result<_, _>>()?;

let key = config_string(app_str, network, "tor_socks_address_override");
let socks_address_override = match get_conf_str(&key).ok() {
Some(addr) => Some(
Expand All @@ -818,6 +842,7 @@ fn network_transport_config(
socks_address_override,
forward_address,
onion_port,
tor_proxy_bypass_addresses,
})
},
"socks5" => {
Expand Down Expand Up @@ -957,6 +982,7 @@ pub enum CommsTransport {
forward_address: Multiaddr,
auth: TorControlAuthentication,
onion_port: NonZeroU16,
tor_proxy_bypass_addresses: Vec<Multiaddr>,
},
/// Use a SOCKS5 proxy transport. This transport recognises any addresses supported by the proxy.
Socks5 {
Expand Down
1 change: 0 additions & 1 deletion comms/dht/examples/memory_net/utilities.rs
Expand Up @@ -632,7 +632,6 @@ fn connection_manager_logger(
node_name, err
);
},
Listening(_) | ListenFailed(_) => unreachable!(),
NewInboundSubstream(node_id, protocol, _) => {
println!(
"'{}' negotiated protocol '{}' to '{}'",
Expand Down
1 change: 1 addition & 0 deletions comms/examples/stress/node.rs
Expand Up @@ -121,6 +121,7 @@ pub async fn create(
.spawn_with_transport(TcpWithTorTransport::with_tor_socks_proxy(SocksConfig {
proxy_address: TOR_SOCKS_ADDR.parse().unwrap(),
authentication: Default::default(),
proxy_bypass_addresses: vec![],
}))
.await
.unwrap()
Expand Down
42 changes: 11 additions & 31 deletions comms/src/builder/comms_node.rs
Expand Up @@ -27,6 +27,7 @@ use crate::{
ConnectionManagerEvent,
ConnectionManagerRequest,
ConnectionManagerRequester,
ListenerInfo,
},
connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester},
multiaddr::Multiaddr,
Expand All @@ -45,11 +46,11 @@ use crate::{
CommsBuilder,
Substream,
};
use futures::{channel::mpsc, AsyncRead, AsyncWrite, StreamExt};
use futures::{channel::mpsc, AsyncRead, AsyncWrite};
use log::*;
use std::{iter, sync::Arc, time::Duration};
use std::{iter, sync::Arc};
use tari_shutdown::ShutdownSignal;
use tokio::{sync::broadcast, time};
use tokio::sync::broadcast;

const LOG_TARGET: &str = "comms::node";

Expand Down Expand Up @@ -116,25 +117,6 @@ impl UnspawnedCommsNode {
self
}

/// Wait until the ConnectionManager emits a Listening event. This is the signal that comms is ready.
async fn wait_listening(
mut events: broadcast::Receiver<Arc<ConnectionManagerEvent>>,
) -> Result<Multiaddr, CommsBuilderError> {
loop {
let event = time::timeout(Duration::from_secs(10), events.next())
.await
.map_err(|_| CommsBuilderError::ConnectionManagerEventStreamTimeout)?
.ok_or(CommsBuilderError::ConnectionManagerEventStreamClosed)?
.map_err(|_| CommsBuilderError::ConnectionManagerEventStreamLagged)?;

match &*event {
ConnectionManagerEvent::Listening(addr) => return Ok(addr.clone()),
ConnectionManagerEvent::ListenFailed(err) => return Err(err.clone().into()),
_ => {},
}
}
}

pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
Expand All @@ -143,7 +125,7 @@ impl UnspawnedCommsNode {
let UnspawnedCommsNode {
builder,
connection_manager_request_rx,
connection_manager_requester,
mut connection_manager_requester,
connectivity_requester,
connectivity_rx,
node_identity,
Expand Down Expand Up @@ -204,8 +186,6 @@ impl UnspawnedCommsNode {
ext_context.register_complete_signal(connection_manager.complete_signal());
connection_manager.add_protocols(ext_context.take_protocols().expect("Protocols already taken"));
connection_manager.add_protocols(protocols);
// Subscribe to events before spawning the actor to ensure that no events are missed
let connection_manager_event_subscription = connection_manager_requester.get_event_subscription();

//---------------------------------- Spawn Actors --------------------------------------------//
connectivity_manager.create().spawn();
Expand All @@ -223,10 +203,10 @@ impl UnspawnedCommsNode {
node_identity.node_id()
);

let listening_addr = Self::wait_listening(connection_manager_event_subscription).await?;
let listening_info = connection_manager_requester.wait_until_listening().await?;
let mut hidden_service = None;
if let Some(mut ctl) = hidden_service_ctl {
ctl.set_proxied_addr(listening_addr.clone());
ctl.set_proxied_addr(listening_info.bind_address().clone());
let hs = ctl.create_hidden_service().await?;
node_identity.set_public_address(hs.get_onion_address());
hidden_service = Some(hs);
Expand All @@ -241,7 +221,7 @@ impl UnspawnedCommsNode {
shutdown_signal,
connection_manager_requester,
connectivity_requester,
listening_addr,
listening_info,
node_identity,
peer_manager,
hidden_service,
Expand Down Expand Up @@ -291,8 +271,8 @@ pub struct CommsNode {
node_identity: Arc<NodeIdentity>,
/// Shared PeerManager instance
peer_manager: Arc<PeerManager>,
/// The resolved Ip-Tcp listening address.
listening_addr: Multiaddr,
/// The bind addresses of the listener(s)
listening_info: ListenerInfo,
/// `Some` if the comms node is configured to run via a hidden service, otherwise `None`
hidden_service: Option<tor::HiddenService>,
/// The 'reciprocal' shutdown signals for each comms service
Expand Down Expand Up @@ -327,7 +307,7 @@ impl CommsNode {

/// Return the Ip/Tcp address that this node is listening on
pub fn listening_address(&self) -> &Multiaddr {
&self.listening_addr
self.listening_info.bind_address()
}

/// Return the Ip/Tcp address that this node is listening on
Expand Down
5 changes: 5 additions & 0 deletions comms/src/builder/mod.rs
Expand Up @@ -145,6 +145,11 @@ impl CommsBuilder {
self
}

pub fn with_auxilary_tcp_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.connection_manager_config.auxilary_tcp_listener_address = Some(listener_address);
self
}

pub fn with_listener_liveness_max_sessions(mut self, max_sessions: usize) -> Self {
self.connection_manager_config.liveness_max_sessions = max_sessions;
self
Expand Down

0 comments on commit 6fefd18

Please sign in to comment.