Skip to content

Commit

Permalink
feat: Upgrade RPC local_node_id to local_node_info
Browse files Browse the repository at this point in the history
BREAKING CHANGE: RPC local_node_id no longer exists, use new added RPC
local_node_info to get node addresses.
  • Loading branch information
jjyr committed Jan 28, 2019
1 parent a159cdf commit 64e41f6
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions network/src/identify_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::Interval;

const MAX_LISTEND_ADDRS: usize = 10;

pub struct IdentifyService {
pub client_version: String,
pub protocol_version: String,
Expand Down Expand Up @@ -71,10 +73,7 @@ impl IdentifyService {
// get an external addrs for our node
if let Some(ext_addr) = transport.nat_traversal(original_address, &observed_addr) {
debug!(target: "network", "get new external address {:?}", ext_addr);
let mut listened_addresses = network.listened_addresses.write();
if !listened_addresses.iter().any(|a| a == &ext_addr) {
listened_addresses.push(ext_addr.clone());
}
network.discovery_listened_address(ext_addr.to_owned());
}
}

Expand Down Expand Up @@ -127,7 +126,11 @@ where
public_key: network.local_public_key().clone(),
protocol_version: format!("ckb/{}", self.protocol_version).to_owned(),
agent_version: format!("ckb/{}", self.client_version).to_owned(),
listen_addrs: network.listened_addresses.read().clone(),
listen_addrs: network
.listened_addresses(MAX_LISTEND_ADDRS)
.into_iter()
.map(|(addr, _)| addr)
.collect(),
protocols: vec![], // TODO FIXME: report local protocols
},
&addr,
Expand Down
62 changes: 50 additions & 12 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::NetworkConfig;
use crate::{Error, ErrorKind, PeerIndex, ProtocolId};
use bytes::Bytes;
use ckb_util::{Mutex, RwLock};
use fnv::FnvHashMap;
use futures::future::{self, select_all, Future};
use futures::sync::mpsc::UnboundedSender;
use futures::sync::oneshot;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl PeerInfo {
pub struct Network {
peers_registry: RwLock<PeersRegistry>,
peer_store: Arc<RwLock<dyn PeerStore>>,
pub(crate) listened_addresses: RwLock<Vec<Multiaddr>>,
listened_addresses: RwLock<FnvHashMap<Multiaddr, u8>>,
pub(crate) original_listened_addresses: RwLock<Vec<Multiaddr>>,
pub(crate) ckb_protocols: CKBProtocols<Arc<CKBProtocolHandler>>,
local_private_key: secio::SecioKeyPair,
Expand All @@ -86,6 +87,21 @@ impl Network {
&self.local_peer_id
}

pub(crate) fn discovery_listened_address(&self, addr: Multiaddr) {
let mut listened_addresses = self.listened_addresses.write();
let score = listened_addresses.entry(addr).or_insert(0);
*score = score.saturating_add(1);
}

pub(crate) fn listened_addresses(&self, count: usize) -> Vec<(Multiaddr, u8)> {
let listened_addresses = self.listened_addresses.read();
listened_addresses
.iter()
.take(count)
.map(|(addr, score)| (addr.to_owned(), *score))
.collect()
}

pub(crate) fn get_peer_index(&self, peer_id: &PeerId) -> Option<PeerIndex> {
let peers_registry = self.peers_registry.read();
peers_registry
Expand Down Expand Up @@ -189,19 +205,31 @@ impl Network {
self.local_private_key.to_public_key()
}

pub fn external_url(&self) -> Option<String> {
self.original_listened_addresses
.read()
.get(0)
.map(|addr| self.to_external_url(addr))
pub fn external_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
let mut urls = Vec::with_capacity(max_urls);
let original_listened_addresses = self.original_listened_addresses.read();

urls.append(
&mut self
.listened_addresses(max_urls.saturating_sub(original_listened_addresses.len()))
.into_iter()
.map(|(addr, score)| (self.to_external_url(&addr), score))
.collect(),
);
urls.append(
&mut original_listened_addresses
.iter()
.map(|addr| (self.to_external_url(addr), 1))
.collect(),
);
urls
}
pub fn node_id(&self) -> String {
self.local_private_key.to_peer_id().to_base58()
}

fn to_external_url(&self, addr: &Multiaddr) -> String {
format!(
"{}/p2p/{}",
addr,
self.local_private_key.to_peer_id().to_base58()
)
format!("{}/p2p/{}", addr, self.node_id())
}

pub(crate) fn send(
Expand Down Expand Up @@ -470,7 +498,17 @@ impl Network {
Some(private_key) => private_key?,
None => return Err(ErrorKind::Other("secret_key not set".to_owned()).into()),
};
let listened_addresses = config.public_addresses.clone();
let listened_addresses = {
let mut listened_addresses = FnvHashMap::with_capacity_and_hasher(
config.public_addresses.len(),
Default::default(),
);
// set max score to public addresses
for addr in &config.public_addresses {
listened_addresses.insert(addr.to_owned(), std::u8::MAX);
}
listened_addresses
};
let peer_store: Arc<RwLock<dyn PeerStore>> = {
let mut peer_store = SqlitePeerStore::default();
let bootnodes = config.bootnodes()?;
Expand Down
11 changes: 8 additions & 3 deletions network/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ impl Drop for NetworkService {

impl NetworkService {
#[inline]
pub fn external_url(&self) -> Option<String> {
self.network.external_url()
pub fn external_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
self.network.external_urls(max_urls)
}

#[inline]
pub fn node_id(&self) -> String {
self.network.node_id()
}

pub fn with_protocol_context<F, T>(&self, protocol_id: ProtocolId, f: F) -> Option<T>
Expand Down Expand Up @@ -86,7 +91,7 @@ impl NetworkService {
// This method will not wait for the server stopped, you should use server_future or
// thread_handle to achieve that.
fn shutdown(&mut self) -> Result<(), IoError> {
debug!(target: "network", "shutdown network service self: {:?}", self.external_url());
debug!(target: "network", "shutdown network service self: {:?}", self.external_urls(1).get(0).map(|(addr, _)|addr.to_owned()));
if let Some(close_tx) = self.close_tx.take() {
let _ = close_tx
.send(())
Expand Down
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ flatbuffers = "0.5.0"
num_cpus = "1.0"
faster-hex = "0.3"
jsonrpc-types = { path = "../util/jsonrpc-types" }
build-info = { path = "../util/build-info" }

[dev-dependencies]
ckb-db = { path = "../db" }
Expand Down
19 changes: 14 additions & 5 deletions rpc/doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,21 +288,30 @@ curl -d '{"id": 2, "jsonrpc": "2.0", "method":"get_tip_block_number","params": [
}
```

# local_node_id
# local_node_info

Returns the local node id.
Returns the local node information.

## Examples

```shell
curl -d '{"id": 2, "jsonrpc": "2.0", "method":"local_node_id","params": []}' -H 'content-type:application/json' 'http://localhost:8114'
curl -d '{"id": 2, "jsonrpc": "2.0", "method":"local_node_info","params": []}' -H 'content-type:application/json' 'http://localhost:8114'
```

```json
{
"jsonrpc": "2.0",
"result": "/ip4/0.0.0.0/tcp/8115/p2p/QmdSxB6iTcbhj6gbZNthvJrwRkJrwnsohNpVixY4FtcZwv",
"id": 2
"result": {
"addresses": [
{
"address": "/ip4/0.0.0.0/tcp/12344/p2p/QmWRU2NSro4wKgVbFX6y8SPFkcJ1tE2X5xzk9msMhdRmdS",
"score": 1
}
],
"node_id": "QmWRU2NSro4wKgVbFX6y8SPFkcJ1tE2X5xzk9msMhdRmdS",
"version": "0.5.0"
}
"id": 2,
}
```

Expand Down
23 changes: 18 additions & 5 deletions rpc/src/module/net.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use build_info::{get_version, Version};
use ckb_network::NetworkService;
use jsonrpc_core::Result;
use jsonrpc_macros::build_rpc_trait;
use jsonrpc_types::{LocalNode, NodeAddress};
use std::sync::Arc;

const MAX_ADDRS: usize = 50;

build_rpc_trait! {
pub trait NetworkRpc {
// curl -d '{"id": 2, "jsonrpc": "2.0", "method":"send_transaction","params": [{"version":2, "deps":[], "inputs":[], "outputs":[]}]}' -H 'content-type:application/json' 'http://localhost:8114'
#[rpc(name = "local_node_id")]
fn local_node_id(&self) -> Result<Option<String>>;
// curl -d '{"id": 2, "jsonrpc": "2.0", "method":"local_node_info","params": []}' -H 'content-type:application/json' 'http://localhost:8114'
#[rpc(name = "local_node_info")]
fn local_node_info(&self) -> Result<LocalNode>;
}
}

Expand All @@ -16,7 +20,16 @@ pub(crate) struct NetworkRpcImpl {
}

impl NetworkRpc for NetworkRpcImpl {
fn local_node_id(&self) -> Result<Option<String>> {
Ok(self.network.external_url())
fn local_node_info(&self) -> Result<LocalNode> {
Ok(LocalNode {
version: get_version!().to_string(),
node_id: self.network.node_id(),
addresses: self
.network
.external_urls(MAX_ADDRS)
.into_iter()
.map(|(address, score)| NodeAddress { address, score })
.collect(),
})
}
}
2 changes: 2 additions & 0 deletions util/jsonrpc-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod block_template;
mod blockchain;
mod bytes;
mod cell;
mod local_node;
mod proposal_short_id;

pub use self::block_template::{
Expand All @@ -10,4 +11,5 @@ pub use self::block_template::{
pub use self::blockchain::{Block, Header, OutPoint, Transaction, UncleBlock};
pub use self::bytes::Bytes;
pub use self::cell::{CellOutputWithOutPoint, CellWithStatus};
pub use self::local_node::{LocalNode, NodeAddress};
pub use jsonrpc_core::types::{error, id, params, request, response, version};
14 changes: 14 additions & 0 deletions util/jsonrpc-types/src/local_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use serde_derive::{Deserialize, Serialize};

#[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
pub struct LocalNode {
pub version: String,
pub node_id: String,
pub addresses: Vec<NodeAddress>,
}

#[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
pub struct NodeAddress {
pub address: String,
pub score: u8,
}

0 comments on commit 64e41f6

Please sign in to comment.