New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic network functionality #440
Conversation
core/near-network/src/lib.rs
Outdated
// This is very awkward because we cannot copy sink and have to use a channel | ||
// to tunnel the data through. Definitely need to revisit this later to check for | ||
// better solutions | ||
type NearStream = (Sender<Bytes>, SplitStream<Framed<TcpStream, Codec>>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of writing codec is to be able to convert a stream/sink of bytes into a stream/sink of objects of our own type. The codec in codec.rs
converts stream/sink of bytes into stream/sink of bytes, which does not achieve much.
As the result most of the code here operates with streams of bytes, which is what Codec+FramedStream meant to prevent. Instead of having this happening here https://github.com/nearprotocol/nearcore/pull/440/files#diff-0e9eae49ad6578692ac36809bfab3b5fR237 and here https://github.com/nearprotocol/nearcore/pull/440/files#diff-0e9eae49ad6578692ac36809bfab3b5fR167 it should be happening in the codec itself.
This will also make the code of the codec very small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Fixed in 9896565
core/near-network/src/lib.rs
Outdated
/// peers that we are trying to connect to | ||
pending_peers: HashSet<SocketAddr>, | ||
/// pending connection futures | ||
pending_connections: VecDeque<(SocketAddr, ConnectFuture)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, having our own pool of pending futures that we explicitly iterate with some task is an exact re-implementation of the core tokio functionality that works with kqueue/epoll. We should not be reimplementing it, because it will be highly inefficient and will require more code than just using tokio.
If we are trying to send a message to a peer then:
- if we have open connection to this peer we just send it over. We spawn a task that puts message in the open socket;
- if we know ip:port of that peer we open said connection and send the message over. We spawn a task to do that;
- if we don't know ip:port we fail without panic.
If we receive an incoming socket connection we create:
- one task that listens to the incoming stream of messages and calls Protocol on it;
- another task that opens a channel and redirects messages from that channel into the sink of the socket;
We need only two more tasks:
- Task that periodically announces its known peer info to random peers;
- Task that listens to such announcements and merges the peer info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is what I tried previously, but due to some unknown problem with lock/asynchronous code, it didn't work. It could be related to the poll method I implemented, will investigate more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could submit it to this or separate branch and we could debug it together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 9896565
core/near-network/src/lib.rs
Outdated
type Error = Error; | ||
|
||
fn poll(&mut self) -> Poll<Option<ServiceEvent>, Error> { | ||
// poll pending connections in a round robin fashion, the futures that are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicitly implementing Future or Stream interface is a low-level approach. Usually the general combinators are more than enough. There is only one place in our code where we implement poll explicitly -- TxFlow, which is because the async logic is quite complex there.
Looking at this poll I see no reason why it should be implemented inside one poll method. It is very inefficient in terms of the performance, because it linearizes most of the async operations that otherwise could have worked in parallel. The major indicator of inefficiency is that the blocks of code in this poll do not return anything to each other -- they can and should work in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 9896565
It is sort of subjective whether to put crates in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is getting very good!
let service2 = Arc::new(Mutex::new(Service::new(addr2.clone(), peer_id2, message_tx2))); | ||
let peer = Peer::new(addr1.parse::<SocketAddr>().unwrap(), peer_id1, None); | ||
let message_queue = Arc::new(Mutex::new(vec![])); | ||
thread::spawn({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can try to do a blocking read from message_rx2 using wait() that transforms it into a blocking iterator. Then we would not need to have message_queue
. It might require adding some timeout future on it just like you did with the batching for passthrough consensus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want the test to depend on time. It's better to wait on some specific condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two points:
- We can try using
wait()
instead of adding an additional queue, because it is shorter. - We should have a timeout on the test. Right now the test would hang if message_queue is not populated due to some bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in e5607f1
core/near-network/src/lib.rs
Outdated
/// to the pending connection queue | ||
pub fn dial(&self, addr: SocketAddr) { | ||
if self.peer_state.read().contains_key(&addr) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method should return Result
. If peer is unknown it should return Err
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d9e5a3f
} | ||
|
||
pub fn spawn_listening_task(&mut self) { | ||
let peer_state = self.peer_state.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method should better be private and called exactly once from new
. Then we can also remove listener
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of doing that and spawn all background tasks at once, but then call to new
has to be inside a task, unless we spawn a thread from within new
. Do we still want to access service after initialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we don't want other parts of our infra to communicate with Service
object directly. It should be done through the channels, for better compatibility with async code. Nothing outside network should be calling dial
or send_message
directly. Instead dial
should be happening upon the spawning of the network task and other subtasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will make writing tests harder if we cannot access the service object after calling new
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit tests that are located in the same file have access to private methods and fields. Leave the current methods as they currently are but make them private, then continue using them in unit tests just like you are using it now. The code that is currently in the new
method can be renamed to init
and invoked from the new
method before the tasks are spawned. Unit tests would then use private init
method, while external code would use new
.
The problem is that dial
and send_message
will be wrapped into tasks that expose channels anyway, because that's the only way to call them from code that uses tokio. This wrapping can be either done outside the network code or inside the network code. It makes sense to do it inside because this wrapping is network-specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in e5607f1. Switched new and init due to clippy warnings.
core/near-network/src/lib.rs
Outdated
/// gossip account info to some of peers | ||
// TODO: find efficient way of gossiping. Maybe store what has been gossiped to each peer? | ||
pub fn gossip_account_info(&self) { | ||
let mut rng = thread_rng(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also can be a one long task that on a given Interval
chooses random peers and gossips to them. This method should also be called from new
only. Specifically, have a task that iterates over a stream produced by Interval
.
core/near-network/src/lib.rs
Outdated
} | ||
|
||
/// sending message to peer. Must be used in a task | ||
pub fn send_message(&self, peer: &PeerId, data: Vec<u8>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better compatibility with the rest of our code that uses tokio, this method should not spawn, but instead should return a future. There are several ways of returning futures: https://tokio.rs/docs/going-deeper/returning/ I think in this scenario impl Future
works best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d9e5a3f
core/near-network/src/lib.rs
Outdated
// spawn the task that forwards what receiver receives to send through sink | ||
tokio::spawn( | ||
sink.send_all( | ||
receiver.map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("{:?}", e))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just discovered that I was incorrectly formatting tokio error messages all along. It should be {}
not {:?}
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d0c453b
core/near-network/src/lib.rs
Outdated
// spawn the task that forwards what receiver receives to send through sink | ||
tokio::spawn( | ||
sink.send_all( | ||
receiver.map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("{:?}", e))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forward
is better for piping a stream into a sink. See transaction_to_payload
for the reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d0c453b
core/near-network/src/lib.rs
Outdated
|
||
#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)] | ||
/// Information about a peer | ||
pub struct Peer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If nothing outside network is going to be using this struct then it does not have to be pub
, and there is no need for constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d9e5a3f
I think it is good enough before we integrate it with |
Implemented basic network functionality to communicate between peers. Currently put the network infra in a separate crate from node/network. Will probably merge the two crates in the future. Still need to figure out how to gossip and route message between peers.