Skip to content

Commit

Permalink
Add BanksService to Validator
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed Jul 11, 2020
1 parent 17ba625 commit eb9236a
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 109 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2421,7 +2421,8 @@ impl Node {
let rpc_pubsub_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port);
let rpc_banks_port = find_available_port_in_range(bind_ip_addr, (1024, 65535)).unwrap();
let rpc_banks_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_banks_port);
let rpc_banks_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_banks_port);

let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand Down
83 changes: 49 additions & 34 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use solana_runtime::{
bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
rpc_banks_service::RpcBanksService,
};
use solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -138,7 +139,7 @@ impl ValidatorExit {
pub struct Validator {
pub id: Pubkey,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
rpc_service: Option<(JsonRpcService, PubSubService)>,
rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
gossip_service: GossipService,
Expand Down Expand Up @@ -260,37 +261,46 @@ impl Validator {
));

let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| {
if ContactInfo::is_valid_address(&node.info.rpc) {
assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub));
assert_eq!(rpc_port, node.info.rpc.port());
assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port());
assert_eq!(rpc_banks_port, node.info.rpc_banks.port());
} else {
assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub));
}
(
JsonRpcService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port),
config.rpc_config.clone(),
config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_config.hash(),
ledger_path,
validator_exit.clone(),
config.trusted_validators.clone(),
rpc_override_health_check.clone(),
),
PubSubService::new(
&subscriptions,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port),
&exit,
),
)
});
let rpc_service = config
.rpc_ports
.map(|(rpc_port, rpc_pubsub_port, rpc_banks_port)| {
if ContactInfo::is_valid_address(&node.info.rpc) {
assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub));
assert_eq!(rpc_port, node.info.rpc.port());
assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port());
assert_eq!(rpc_banks_port, node.info.rpc_banks.port());
} else {
assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub));
}
let tpu_address = cluster_info.my_contact_info().tpu;
(
JsonRpcService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port),
config.rpc_config.clone(),
config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_config.hash(),
ledger_path,
validator_exit.clone(),
config.trusted_validators.clone(),
rpc_override_health_check.clone(),
),
PubSubService::new(
&subscriptions,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port),
&exit,
),
RpcBanksService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_banks_port),
tpu_address,
&bank_forks,
&exit,
),
)
});

let (transaction_status_sender, transaction_status_service) =
if rpc_service.is_some() && config.rpc_config.enable_rpc_transaction_history {
Expand Down Expand Up @@ -549,9 +559,10 @@ impl Validator {
pub fn join(self) -> Result<()> {
self.poh_service.join()?;
drop(self.poh_recorder);
if let Some((rpc_service, rpc_pubsub_service)) = self.rpc_service {
if let Some((rpc_service, rpc_pubsub_service, rpc_banks_service)) = self.rpc_service {
rpc_service.join()?;
rpc_pubsub_service.join()?;
rpc_banks_service.join()?;
}
if let Some(transaction_status_service) = self.transaction_status_service {
transaction_status_service.join()?;
Expand Down Expand Up @@ -833,7 +844,11 @@ impl TestValidator {

let leader_voting_keypair = Arc::new(voting_keypair);
let config = ValidatorConfig {
rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port(), node.info.rpc_banks.port())),
rpc_ports: Some((
node.info.rpc.port(),
node.info.rpc_pubsub.port(),
node.info.rpc_banks.port(),
)),
..ValidatorConfig::default()
};
let node = Validator::new(
Expand Down
9 changes: 7 additions & 2 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl LocalCluster {
leader_config.rpc_ports = Some((
leader_node.info.rpc.port(),
leader_node.info.rpc_pubsub.port(),
leader_node.info.rpc_banks.port(),
));
leader_config.account_paths = vec![leader_ledger_path.join("accounts")];
let leader_server = Validator::new(
Expand Down Expand Up @@ -299,6 +300,7 @@ impl LocalCluster {
config.rpc_ports = Some((
validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(),
validator_node.info.rpc_banks.port(),
));
let voting_keypair = Arc::new(voting_keypair);
config.account_paths = vec![ledger_path.join("accounts")];
Expand Down Expand Up @@ -546,8 +548,11 @@ impl Cluster for LocalCluster {
// Update the stored ContactInfo for this node
let node = Node::new_localhost_with_pubkey(&pubkey);
cluster_validator_info.info.contact_info = node.info.clone();
cluster_validator_info.config.rpc_ports =
Some((node.info.rpc.port(), node.info.rpc_pubsub.port()));
cluster_validator_info.config.rpc_ports = Some((
node.info.rpc.port(),
node.info.rpc_pubsub.port(),
node.info.rpc_banks.port(),
));

let entry_point_info = {
if *pubkey == self.entry_point_info.id {
Expand Down
Loading

0 comments on commit eb9236a

Please sign in to comment.