Skip to content

Commit

Permalink
more consistent handling of peers_preferred during startup (#3578)
Browse files Browse the repository at this point in the history
  • Loading branch information
antiochp committed Mar 2, 2021
1 parent 03b7518 commit 4284458
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 43 deletions.
34 changes: 34 additions & 0 deletions p2p/src/msg.rs
Expand Up @@ -529,6 +529,40 @@ impl Readable for PeerAddrs {
}
}

impl IntoIterator for PeerAddrs {
type Item = PeerAddr;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.peers.into_iter()
}
}

impl Default for PeerAddrs {
fn default() -> Self {
PeerAddrs { peers: vec![] }
}
}

impl PeerAddrs {
pub fn as_slice(&self) -> &[PeerAddr] {
self.peers.as_slice()
}

pub fn contains(&self, addr: &PeerAddr) -> bool {
self.peers.contains(addr)
}

pub fn difference(&self, other: &[PeerAddr]) -> PeerAddrs {
let peers = self
.peers
.iter()
.filter(|x| !other.contains(x))
.cloned()
.collect();
PeerAddrs { peers }
}
}

/// We found some issue in the communication, sending an error back, usually
/// followed by closing the connection.
pub struct PeerError {
Expand Down
5 changes: 4 additions & 1 deletion p2p/src/peers.rs
Expand Up @@ -27,6 +27,7 @@ use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::{OutputIdentifier, Segment, SegmentIdentifier, TxKernel};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::msg::PeerAddrs;
use crate::peer::Peer;
use crate::store::{PeerData, PeerStore, State};
use crate::types::{
Expand Down Expand Up @@ -322,8 +323,10 @@ impl Peers {
&self,
max_inbound_count: usize,
max_outbound_count: usize,
preferred_peers: &[PeerAddr],
config: P2PConfig,
) {
let preferred_peers = config.peers_preferred.unwrap_or(PeerAddrs::default());

let mut rm = vec![];

// build a list of peers to be cleaned up
Expand Down
70 changes: 36 additions & 34 deletions servers/src/grin/seed.rs
Expand Up @@ -19,6 +19,7 @@

use chrono::prelude::{DateTime, Utc};
use chrono::{Duration, MIN_DATE};
use p2p::{msg::PeerAddrs, P2PConfig};
use rand::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
Expand Down Expand Up @@ -51,11 +52,9 @@ const TESTNET_DNS_SEEDS: &[&str] = &[
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr> + Send>,
preferred_peers: &[PeerAddr],
config: P2PConfig,
stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> {
let preferred_peers = preferred_peers.to_vec();

thread::Builder::new()
.name("seed".to_string())
.spawn(move || {
Expand All @@ -66,12 +65,7 @@ pub fn connect_and_monitor(
let (tx, rx) = mpsc::channel();

// check seeds first
connect_to_seeds_and_preferred_peers(
peers.clone(),
tx.clone(),
seed_list,
&preferred_peers,
);
connect_to_seeds_and_peers(peers.clone(), tx.clone(), seed_list, config);

let mut prev = MIN_DATE.and_hms(0, 0, 0);
let mut prev_expire_check = MIN_DATE.and_hms(0, 0, 0);
Expand Down Expand Up @@ -109,12 +103,7 @@ pub fn connect_and_monitor(
);

// monitor additional peers if we need to add more
monitor_peers(
peers.clone(),
p2p_server.config.clone(),
tx.clone(),
&preferred_peers,
);
monitor_peers(peers.clone(), p2p_server.config.clone(), tx.clone());

prev = Utc::now();
start_attempt = cmp::min(6, start_attempt + 1);
Expand All @@ -137,12 +126,7 @@ pub fn connect_and_monitor(
})
}

fn monitor_peers(
peers: Arc<p2p::Peers>,
config: p2p::P2PConfig,
tx: mpsc::Sender<PeerAddr>,
preferred_peers: &[PeerAddr],
) {
fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sender<PeerAddr>) {
// regularly check if we need to acquire more peers and if so, gets
// them from db
let mut total_count = 0;
Expand Down Expand Up @@ -195,7 +179,7 @@ fn monitor_peers(
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
preferred_peers,
config.clone(),
);

if peers.enough_outbound_peers() {
Expand All @@ -221,13 +205,14 @@ fn monitor_peers(
}

// Attempt to connect to any preferred peers.
for p in preferred_peers {
let peers_preferred = config.peers_preferred.unwrap_or(PeerAddrs::default());
for p in peers_preferred {
if !connected_peers.is_empty() {
if !connected_peers.contains(p) {
tx.send(*p).unwrap();
if !connected_peers.contains(&p) {
let _ = tx.send(p);
}
} else {
tx.send(*p).unwrap();
let _ = tx.send(p);
}
}

Expand Down Expand Up @@ -261,33 +246,50 @@ fn monitor_peers(

// Check if we have any pre-existing peer in db. If so, start with those,
// otherwise use the seeds provided.
fn connect_to_seeds_and_preferred_peers(
fn connect_to_seeds_and_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>,
seed_list: Box<dyn Fn() -> Vec<PeerAddr>>,
peers_preferred: &[PeerAddr],
config: P2PConfig,
) {
let peers_deny = config.peers_deny.unwrap_or(PeerAddrs::default());

// If "peers_allow" is explicitly configured then just use this list
// remembering to filter out "peers_deny".
if let Some(peers) = config.peers_allow {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
return;
}

// Always try our "peers_preferred" remembering to filter out "peers_deny".
if let Some(peers) = config.peers_preferred {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
}

// check if we have some peers in db
// look for peers that are able to give us other peers (via PEER_LIST capability)
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 100);

// if so, get their addresses, otherwise use our seeds
let mut peer_addrs = if peers.len() > 3 {
let peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list()
};

// If we have preferred peers add them to the initial list
peer_addrs.extend_from_slice(peers_preferred);

if peer_addrs.is_empty() {
warn!("No seeds were retrieved.");
}

// connect to this first set of addresses
// connect to this initial set of peer addresses (either seeds or from our local db).
for addr in peer_addrs {
tx.send(addr).unwrap();
if !peers_deny.as_slice().contains(&addr) {
let _ = tx.send(addr);
}
}
}

Expand Down
11 changes: 3 additions & 8 deletions servers/src/grin/server.rs
Expand Up @@ -240,7 +240,7 @@ impl Server {
let mut connect_thread = None;

if config.p2p_config.seeding_type != p2p::Seeding::Programmatic {
let seeder = match config.p2p_config.seeding_type {
let seed_list = match config.p2p_config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
seed::predefined_seeds(vec![])
Expand All @@ -257,15 +257,10 @@ impl Server {
_ => unreachable!(),
};

let preferred_peers = match &config.p2p_config.peers_preferred {
Some(addrs) => addrs.peers.clone(),
None => vec![],
};

connect_thread = Some(seed::connect_and_monitor(
p2p_server.clone(),
seeder,
&preferred_peers,
seed_list,
config.p2p_config.clone(),
stop_state.clone(),
)?);
}
Expand Down

0 comments on commit 4284458

Please sign in to comment.