diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 5cc5b82344e0d1..16d78a913bc90b 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -7,7 +7,7 @@ use { base64::{prelude::BASE64_STANDARD, Engine}, bincode::{config::Options, serialize}, crossbeam_channel::{unbounded, Receiver, Sender}, - jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result}, + jsonrpc_core::{futures::future, types::error, BoxFuture, Error, ErrorCode, Metadata, Result}, jsonrpc_derive::rpc, solana_account_decoder::{ parse_token::{is_known_spl_token_id, token_amount_to_ui_amount, UiTokenAmount}, @@ -62,6 +62,10 @@ use { clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES}, commitment_config::{CommitmentConfig, CommitmentLevel}, epoch_info::EpochInfo, + epoch_rewards_hasher::EpochRewardsHasher, + epoch_rewards_partition_data::{ + get_epoch_rewards_partition_data_address, EpochRewardsPartitionDataVersion, + }, epoch_schedule::EpochSchedule, exit::Exit, feature_set, @@ -519,6 +523,38 @@ impl JsonRpcRequestProcessor { }) } + async fn get_reward_map( + &self, + slot: Slot, + addresses: &[String], + reward_type_filter: &F, + config: &RpcEpochConfig, + ) -> Result> + where + F: Fn(RewardType) -> bool, + { + let Ok(Some(block)) = self + .get_block( + slot, + Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + ) + .await + else { + return Err(RpcCustomError::BlockNotAvailable { slot }.into()); + }; + + Ok(block + .rewards + .unwrap_or_default() + .into_iter() + .filter(|reward| { + reward.reward_type.is_some_and(reward_type_filter) + && addresses.contains(&reward.pubkey) + }) + .map(|reward| (reward.clone().pubkey, (reward, slot))) + .collect()) + } + pub async fn get_inflation_reward( &self, addresses: Vec, @@ -527,18 +563,20 @@ impl JsonRpcRequestProcessor { let config = config.unwrap_or_default(); let epoch_schedule = self.get_epoch_schedule(); let first_available_block = self.get_first_available_block().await; + let slot_context = RpcContextConfig { + commitment: config.commitment, + min_context_slot: config.min_context_slot, + }; let epoch = match config.epoch { Some(epoch) => epoch, None => epoch_schedule - .get_epoch(self.get_slot(RpcContextConfig { - commitment: config.commitment, - min_context_slot: config.min_context_slot, - })?) + .get_epoch(self.get_slot(slot_context)?) .saturating_sub(1), }; - // Rewards for this epoch are found in the first confirmed block of the next epoch - let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); + let rewards_epoch = epoch.saturating_add(1); + let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(rewards_epoch); + if first_slot_in_epoch < first_available_block { if self.bigtable_ledger_storage.is_some() { return Err(RpcCustomError::LongTermStorageSlotSkipped { @@ -554,6 +592,8 @@ impl JsonRpcRequestProcessor { } } + let bank = self.get_bank_with_config(slot_context)?; + let first_confirmed_block_in_epoch = *self .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) .await? @@ -561,44 +601,94 @@ impl JsonRpcRequestProcessor { .ok_or(RpcCustomError::BlockNotAvailable { slot: first_slot_in_epoch, })?; + let partitioned_epoch_reward_enabled_slot = bank + .feature_set + .activated_slot(&feature_set::enable_partitioned_epoch_reward::id()); + let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot + .map(|slot| slot <= first_confirmed_block_in_epoch) + .unwrap_or(false); - let Ok(Some(first_confirmed_block)) = self - .get_block( + let mut reward_map: HashMap = { + let addresses: Vec = + addresses.iter().map(|pubkey| pubkey.to_string()).collect(); + + self.get_reward_map( first_confirmed_block_in_epoch, - Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + &addresses, + &|reward_type| -> bool { + reward_type == RewardType::Voting + || (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking) + }, + &config, ) - .await - else { - return Err(RpcCustomError::BlockNotAvailable { - slot: first_confirmed_block_in_epoch, - } - .into()); + .await? }; - let addresses: Vec = addresses - .into_iter() - .map(|pubkey| pubkey.to_string()) - .collect(); + if partitioned_epoch_reward_enabled { + let partition_data_address = get_epoch_rewards_partition_data_address(rewards_epoch); + let partition_data_account = + bank.get_account(&partition_data_address) + .ok_or_else(|| Error { + code: ErrorCode::InternalError, + message: format!( + "Partition data account not found for epoch {:?} at {:?}", + epoch, partition_data_address + ), + data: None, + })?; + let EpochRewardsPartitionDataVersion::V0(partition_data) = + bincode::deserialize(partition_data_account.data()) + .map_err(|_| Error::internal_error())?; + let hasher = EpochRewardsHasher::new( + partition_data.num_partitions, + &partition_data.parent_blockhash, + ); + let mut partition_index_addresses: HashMap> = HashMap::new(); + for address in addresses.iter() { + let address_string = address.to_string(); + // Skip this address if (Voting) rewards were already found in + // the first block of the epoch + if !reward_map.contains_key(&address_string) { + let partition_index = hasher.clone().hash_address_to_partition(address); + partition_index_addresses + .entry(partition_index) + .and_modify(|list| list.push(address_string.clone())) + .or_insert(vec![address_string]); + } + } - let reward_hash: HashMap = first_confirmed_block - .rewards - .unwrap_or_default() - .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => addresses - .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, reward)), - _ => None, - }) - .collect(); + let block_list = self + .get_blocks_with_limit( + first_confirmed_block_in_epoch + 1, + partition_data.num_partitions, + config.commitment, + ) + .await?; + + for (partition_index, addresses) in partition_index_addresses.iter() { + let slot = *block_list + .get(*partition_index) + .ok_or_else(Error::internal_error)?; + + let index_reward_map = self + .get_reward_map( + slot, + addresses, + &|reward_type| -> bool { reward_type == RewardType::Staking }, + &config, + ) + .await?; + reward_map.extend(index_reward_map); + } + } let rewards = addresses .iter() .map(|address| { - if let Some(reward) = reward_hash.get(address) { + if let Some((reward, slot)) = reward_map.get(&address.to_string()) { return Some(RpcInflationReward { epoch, - effective_slot: first_confirmed_block_in_epoch, + effective_slot: *slot, amount: reward.lamports.unsigned_abs(), post_balance: reward.post_balance, commission: reward.commission, @@ -607,7 +697,6 @@ impl JsonRpcRequestProcessor { None }) .collect(); - Ok(rewards) } diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index aee5fc039df410..3c851e7788e2c3 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -19,7 +19,6 @@ use { account::AccountSharedData, clock::Slot, epoch_schedule::EpochSchedule, - feature_set, native_token::sol_to_lamports, pubkey::Pubkey, rent::Rent, @@ -349,9 +348,7 @@ fn main() { exit(1); }); - let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default(); - // Remove this when client support is ready for the enable_partitioned_epoch_reward feature - features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id()); + let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default(); if TestValidatorGenesis::ledger_exists(&ledger_path) { for (name, long) in &[