From 2c4883f1d0aa49cd90ed801615f4d4bb96e3c71b Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 11:03:19 -0700 Subject: [PATCH 01/14] Check feature_set for enable_partitioned_epoch_reward --- rpc/src/rpc.rs | 147 ++++++++++++++++++++++++++----------------------- 1 file changed, 79 insertions(+), 68 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 5cc5b82344e0d1..a97be14e763435 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -527,88 +527,99 @@ impl JsonRpcRequestProcessor { let config = config.unwrap_or_default(); let epoch_schedule = self.get_epoch_schedule(); let first_available_block = self.get_first_available_block().await; + let slot_context = RpcContextConfig { + commitment: config.commitment, + min_context_slot: config.min_context_slot, + }; let epoch = match config.epoch { Some(epoch) => epoch, None => epoch_schedule - .get_epoch(self.get_slot(RpcContextConfig { - commitment: config.commitment, - min_context_slot: config.min_context_slot, - })?) + .get_epoch(self.get_slot(slot_context)?) .saturating_sub(1), }; - // Rewards for this epoch are found in the first confirmed block of the next epoch - let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); - if first_slot_in_epoch < first_available_block { - if self.bigtable_ledger_storage.is_some() { - return Err(RpcCustomError::LongTermStorageSlotSkipped { - slot: first_slot_in_epoch, - } - .into()); - } else { - return Err(RpcCustomError::BlockCleanedUp { - slot: first_slot_in_epoch, - first_available_block, + let bank = self.get_bank_with_config(slot_context)?; + + if bank + .feature_set + .is_active(&feature_set::enable_partitioned_epoch_reward::id()) + { + Ok(vec![]) + } else { + // Rewards for this epoch are found in the first confirmed block of the next epoch + let first_slot_in_epoch = + epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); + if first_slot_in_epoch < first_available_block { + if self.bigtable_ledger_storage.is_some() { + return Err(RpcCustomError::LongTermStorageSlotSkipped { + slot: first_slot_in_epoch, + } + .into()); + } else { + return Err(RpcCustomError::BlockCleanedUp { + slot: first_slot_in_epoch, + first_available_block, + } + .into()); } - .into()); } - } - let first_confirmed_block_in_epoch = *self - .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) - .await? - .first() - .ok_or(RpcCustomError::BlockNotAvailable { - slot: first_slot_in_epoch, - })?; + let first_confirmed_block_in_epoch = *self + .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) + .await? + .first() + .ok_or(RpcCustomError::BlockNotAvailable { + slot: first_slot_in_epoch, + })?; - let Ok(Some(first_confirmed_block)) = self - .get_block( - first_confirmed_block_in_epoch, - Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), - ) - .await - else { - return Err(RpcCustomError::BlockNotAvailable { - slot: first_confirmed_block_in_epoch, - } - .into()); - }; + let Ok(Some(first_confirmed_block)) = self + .get_block( + first_confirmed_block_in_epoch, + Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + ) + .await + else { + return Err(RpcCustomError::BlockNotAvailable { + slot: first_confirmed_block_in_epoch, + } + .into()); + }; - let addresses: Vec = addresses - .into_iter() - .map(|pubkey| pubkey.to_string()) - .collect(); + let addresses: Vec = addresses + .into_iter() + .map(|pubkey| pubkey.to_string()) + .collect(); - let reward_hash: HashMap = first_confirmed_block - .rewards - .unwrap_or_default() - .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => addresses - .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, reward)), - _ => None, - }) - .collect(); + let reward_hash: HashMap = first_confirmed_block + .rewards + .unwrap_or_default() + .into_iter() + .filter_map(|reward| match reward.reward_type? { + RewardType::Staking | RewardType::Voting => addresses + .contains(&reward.pubkey) + .then(|| (reward.clone().pubkey, reward)), + _ => None, + }) + .collect(); - let rewards = addresses - .iter() - .map(|address| { - if let Some(reward) = reward_hash.get(address) { - return Some(RpcInflationReward { - epoch, - effective_slot: first_confirmed_block_in_epoch, - amount: reward.lamports.unsigned_abs(), - post_balance: reward.post_balance, - commission: reward.commission, - }); - } - None - }) - .collect(); + let rewards = addresses + .iter() + .map(|address| { + if let Some(reward) = reward_hash.get(address) { + return Some(RpcInflationReward { + epoch, + effective_slot: first_confirmed_block_in_epoch, + amount: reward.lamports.unsigned_abs(), + post_balance: reward.post_balance, + commission: reward.commission, + }); + } + None + }) + .collect(); - Ok(rewards) + Ok(rewards) + } } pub fn get_inflation_governor( From 6f22f9009f277a87537b5fae0eb1b8ca373fabfb Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 11:04:15 -0700 Subject: [PATCH 02/14] Keep common variable outside if case --- rpc/src/rpc.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index a97be14e763435..7babd7eac4e20a 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -537,6 +537,8 @@ impl JsonRpcRequestProcessor { .get_epoch(self.get_slot(slot_context)?) .saturating_sub(1), }; + // Rewards for this epoch are found in the first confirmed block of the next epoch + let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); let bank = self.get_bank_with_config(slot_context)?; @@ -546,9 +548,6 @@ impl JsonRpcRequestProcessor { { Ok(vec![]) } else { - // Rewards for this epoch are found in the first confirmed block of the next epoch - let first_slot_in_epoch = - epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); if first_slot_in_epoch < first_available_block { if self.bigtable_ledger_storage.is_some() { return Err(RpcCustomError::LongTermStorageSlotSkipped { From fdfb7f8127051c4d4fbe95d638139eab0b3f38f2 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 13:37:11 -0700 Subject: [PATCH 03/14] Keep common early return out of if case, since the first_slot_in_epoch must exist for partiion PDA to exist --- rpc/src/rpc.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 7babd7eac4e20a..bd5e12b911fe59 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -540,6 +540,21 @@ impl JsonRpcRequestProcessor { // Rewards for this epoch are found in the first confirmed block of the next epoch let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); + if first_slot_in_epoch < first_available_block { + if self.bigtable_ledger_storage.is_some() { + return Err(RpcCustomError::LongTermStorageSlotSkipped { + slot: first_slot_in_epoch, + } + .into()); + } else { + return Err(RpcCustomError::BlockCleanedUp { + slot: first_slot_in_epoch, + first_available_block, + } + .into()); + } + } + let bank = self.get_bank_with_config(slot_context)?; if bank @@ -548,21 +563,6 @@ impl JsonRpcRequestProcessor { { Ok(vec![]) } else { - if first_slot_in_epoch < first_available_block { - if self.bigtable_ledger_storage.is_some() { - return Err(RpcCustomError::LongTermStorageSlotSkipped { - slot: first_slot_in_epoch, - } - .into()); - } else { - return Err(RpcCustomError::BlockCleanedUp { - slot: first_slot_in_epoch, - first_available_block, - } - .into()); - } - } - let first_confirmed_block_in_epoch = *self .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) .await? From 388dd37f3ab1b107f3412c8f4de09e301044ffcc Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 12:07:01 -0700 Subject: [PATCH 04/14] Get and parse epoch partition data PDA --- rpc/src/rpc.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index bd5e12b911fe59..f8f6a08b4cebb8 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -7,7 +7,7 @@ use { base64::{prelude::BASE64_STANDARD, Engine}, bincode::{config::Options, serialize}, crossbeam_channel::{unbounded, Receiver, Sender}, - jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result}, + jsonrpc_core::{futures::future, types::error, BoxFuture, Error, ErrorCode, Metadata, Result}, jsonrpc_derive::rpc, solana_account_decoder::{ parse_token::{is_known_spl_token_id, token_amount_to_ui_amount, UiTokenAmount}, @@ -62,6 +62,9 @@ use { clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES}, commitment_config::{CommitmentConfig, CommitmentLevel}, epoch_info::EpochInfo, + epoch_rewards_partition_data::{ + get_epoch_rewards_partition_data_address, EpochRewardsPartitionDataVersion, + }, epoch_schedule::EpochSchedule, exit::Exit, feature_set, @@ -538,7 +541,8 @@ impl JsonRpcRequestProcessor { .saturating_sub(1), }; // Rewards for this epoch are found in the first confirmed block of the next epoch - let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1)); + let rewards_epoch = epoch.saturating_add(1); + let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(rewards_epoch); if first_slot_in_epoch < first_available_block { if self.bigtable_ledger_storage.is_some() { @@ -561,6 +565,21 @@ impl JsonRpcRequestProcessor { .feature_set .is_active(&feature_set::enable_partitioned_epoch_reward::id()) { + let partition_data_address = get_epoch_rewards_partition_data_address(rewards_epoch); + let partition_data_account = + bank.get_account(&partition_data_address) + .ok_or_else(|| Error { + code: ErrorCode::InternalError, + message: format!( + "Partition data account not found for epoch {:?} at {:?}", + epoch, partition_data_address + ), + data: None, + })?; + let EpochRewardsPartitionDataVersion::V0(_partition_data) = + bincode::deserialize(partition_data_account.data()) + .map_err(|_| Error::internal_error())?; + Ok(vec![]) } else { let first_confirmed_block_in_epoch = *self From 6b52fc6862275f1a9e0f6eb3abd17efa269d72c5 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 12:07:38 -0700 Subject: [PATCH 05/14] Find partition index for all addresses --- rpc/src/rpc.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index f8f6a08b4cebb8..57e2b6e32c0687 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -62,6 +62,7 @@ use { clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES}, commitment_config::{CommitmentConfig, CommitmentLevel}, epoch_info::EpochInfo, + epoch_rewards_hasher::EpochRewardsHasher, epoch_rewards_partition_data::{ get_epoch_rewards_partition_data_address, EpochRewardsPartitionDataVersion, }, @@ -576,9 +577,21 @@ impl JsonRpcRequestProcessor { ), data: None, })?; - let EpochRewardsPartitionDataVersion::V0(_partition_data) = + let EpochRewardsPartitionDataVersion::V0(partition_data) = bincode::deserialize(partition_data_account.data()) .map_err(|_| Error::internal_error())?; + let hasher = EpochRewardsHasher::new( + partition_data.num_partitions, + &partition_data.parent_blockhash, + ); + let mut partition_index_addresses: HashMap> = HashMap::new(); + for address in addresses.iter() { + let partition_index = hasher.clone().hash_address_to_partition(address); + partition_index_addresses + .entry(partition_index) + .and_modify(|list| list.push(address.to_string())) + .or_insert(vec![address.to_string()]); + } Ok(vec![]) } else { From e1a5d3772d0df00187d64551ca53183470a47e0e Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 12:36:11 -0700 Subject: [PATCH 06/14] Pull relevant blocks and get rewards --- rpc/src/rpc.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 57e2b6e32c0687..c88176955f7c51 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -593,6 +593,42 @@ impl JsonRpcRequestProcessor { .or_insert(vec![address.to_string()]); } + let block_list = self + .get_blocks_with_limit( + first_slot_in_epoch, + partition_data.num_partitions + 1, + config.commitment, + ) + .await?; + + let mut reward_map: HashMap = HashMap::new(); + for (partition_index, addresses) in partition_index_addresses.iter() { + let slot = *block_list + .get(partition_index.saturating_add(1)) + .ok_or_else(Error::internal_error)?; + let Ok(Some(block)) = self + .get_block( + slot, + Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + ) + .await + else { + return Err(RpcCustomError::BlockNotAvailable { slot }.into()); + }; + let index_reward_map: HashMap = block + .rewards + .unwrap_or_default() + .into_iter() + .filter_map(|reward| match reward.reward_type? { + RewardType::Staking | RewardType::Voting => addresses + .contains(&reward.pubkey) + .then(|| (reward.clone().pubkey, reward)), + _ => None, + }) + .collect(); + reward_map.extend(index_reward_map); + } + Ok(vec![]) } else { let first_confirmed_block_in_epoch = *self From edfe08ba0514065bbbba400a85d44a43b4008b48 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 13:02:51 -0700 Subject: [PATCH 07/14] Reuse ordering and reformatting --- rpc/src/rpc.rs | 68 +++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index c88176955f7c51..d38b030de70f61 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -562,7 +562,7 @@ impl JsonRpcRequestProcessor { let bank = self.get_bank_with_config(slot_context)?; - if bank + let reward_map = if bank .feature_set .is_active(&feature_set::enable_partitioned_epoch_reward::id()) { @@ -601,7 +601,7 @@ impl JsonRpcRequestProcessor { ) .await?; - let mut reward_map: HashMap = HashMap::new(); + let mut reward_map: HashMap = HashMap::new(); for (partition_index, addresses) in partition_index_addresses.iter() { let slot = *block_list .get(partition_index.saturating_add(1)) @@ -615,21 +615,20 @@ impl JsonRpcRequestProcessor { else { return Err(RpcCustomError::BlockNotAvailable { slot }.into()); }; - let index_reward_map: HashMap = block + let index_reward_map: HashMap = block .rewards .unwrap_or_default() .into_iter() .filter_map(|reward| match reward.reward_type? { RewardType::Staking | RewardType::Voting => addresses .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, reward)), + .then(|| (reward.clone().pubkey, (reward, slot))), _ => None, }) .collect(); reward_map.extend(index_reward_map); } - - Ok(vec![]) + reward_map } else { let first_confirmed_block_in_epoch = *self .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) @@ -652,41 +651,42 @@ impl JsonRpcRequestProcessor { .into()); }; - let addresses: Vec = addresses - .into_iter() - .map(|pubkey| pubkey.to_string()) - .collect(); + let addresses: Vec = + addresses.iter().map(|pubkey| pubkey.to_string()).collect(); - let reward_hash: HashMap = first_confirmed_block + first_confirmed_block .rewards .unwrap_or_default() .into_iter() .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => addresses - .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, reward)), - _ => None, - }) - .collect(); - - let rewards = addresses - .iter() - .map(|address| { - if let Some(reward) = reward_hash.get(address) { - return Some(RpcInflationReward { - epoch, - effective_slot: first_confirmed_block_in_epoch, - amount: reward.lamports.unsigned_abs(), - post_balance: reward.post_balance, - commission: reward.commission, - }); + RewardType::Staking | RewardType::Voting => { + addresses.contains(&reward.pubkey).then(|| { + ( + reward.clone().pubkey, + (reward, first_confirmed_block_in_epoch), + ) + }) } - None + _ => None, }) - .collect(); - - Ok(rewards) - } + .collect() + }; + let rewards = addresses + .iter() + .map(|address| { + if let Some((reward, slot)) = reward_map.get(&address.to_string()) { + return Some(RpcInflationReward { + epoch, + effective_slot: *slot, + amount: reward.lamports.unsigned_abs(), + post_balance: reward.post_balance, + commission: reward.commission, + }); + } + None + }) + .collect(); + Ok(rewards) } pub fn get_inflation_governor( From 01626c9232bec9cbf360104cee816ac553bc1fb5 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 12 Jan 2024 13:24:25 -0700 Subject: [PATCH 08/14] Remove feature deactivation from TestValidator --- validator/src/bin/solana-test-validator.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index aee5fc039df410..3c851e7788e2c3 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -19,7 +19,6 @@ use { account::AccountSharedData, clock::Slot, epoch_schedule::EpochSchedule, - feature_set, native_token::sol_to_lamports, pubkey::Pubkey, rent::Rent, @@ -349,9 +348,7 @@ fn main() { exit(1); }); - let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default(); - // Remove this when client support is ready for the enable_partitioned_epoch_reward feature - features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id()); + let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default(); if TestValidatorGenesis::ledger_exists(&ledger_path) { for (name, long) in &[ From 8107e6c1df319a9ed09d3c01ade24d3c491b8ca2 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 16 Jan 2024 18:41:09 -0700 Subject: [PATCH 09/14] Restore rewards iteration in first block in epoch for feature case to catch Voting rewards --- rpc/src/rpc.rs | 104 ++++++++++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 49 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index d38b030de70f61..32b9c8c7ec5433 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -562,7 +562,50 @@ impl JsonRpcRequestProcessor { let bank = self.get_bank_with_config(slot_context)?; - let reward_map = if bank + let mut reward_map: HashMap = { + let first_confirmed_block_in_epoch = *self + .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) + .await? + .first() + .ok_or(RpcCustomError::BlockNotAvailable { + slot: first_slot_in_epoch, + })?; + + let Ok(Some(first_confirmed_block)) = self + .get_block( + first_confirmed_block_in_epoch, + Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + ) + .await + else { + return Err(RpcCustomError::BlockNotAvailable { + slot: first_confirmed_block_in_epoch, + } + .into()); + }; + + let addresses: Vec = + addresses.iter().map(|pubkey| pubkey.to_string()).collect(); + + first_confirmed_block + .rewards + .unwrap_or_default() + .into_iter() + .filter_map(|reward| match reward.reward_type? { + RewardType::Staking | RewardType::Voting => { + addresses.contains(&reward.pubkey).then(|| { + ( + reward.clone().pubkey, + (reward, first_confirmed_block_in_epoch), + ) + }) + } + _ => None, + }) + .collect() + }; + + if bank .feature_set .is_active(&feature_set::enable_partitioned_epoch_reward::id()) { @@ -586,11 +629,16 @@ impl JsonRpcRequestProcessor { ); let mut partition_index_addresses: HashMap> = HashMap::new(); for address in addresses.iter() { - let partition_index = hasher.clone().hash_address_to_partition(address); - partition_index_addresses - .entry(partition_index) - .and_modify(|list| list.push(address.to_string())) - .or_insert(vec![address.to_string()]); + let address_string = address.to_string(); + // Skip this address if (Voting) rewards were already found in + // the first block of the epoch + if !reward_map.contains_key(&address_string) { + let partition_index = hasher.clone().hash_address_to_partition(address); + partition_index_addresses + .entry(partition_index) + .and_modify(|list| list.push(address_string.clone())) + .or_insert(vec![address_string]); + } } let block_list = self @@ -601,7 +649,6 @@ impl JsonRpcRequestProcessor { ) .await?; - let mut reward_map: HashMap = HashMap::new(); for (partition_index, addresses) in partition_index_addresses.iter() { let slot = *block_list .get(partition_index.saturating_add(1)) @@ -628,49 +675,8 @@ impl JsonRpcRequestProcessor { .collect(); reward_map.extend(index_reward_map); } - reward_map - } else { - let first_confirmed_block_in_epoch = *self - .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) - .await? - .first() - .ok_or(RpcCustomError::BlockNotAvailable { - slot: first_slot_in_epoch, - })?; - - let Ok(Some(first_confirmed_block)) = self - .get_block( - first_confirmed_block_in_epoch, - Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), - ) - .await - else { - return Err(RpcCustomError::BlockNotAvailable { - slot: first_confirmed_block_in_epoch, - } - .into()); - }; - - let addresses: Vec = - addresses.iter().map(|pubkey| pubkey.to_string()).collect(); + } - first_confirmed_block - .rewards - .unwrap_or_default() - .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => { - addresses.contains(&reward.pubkey).then(|| { - ( - reward.clone().pubkey, - (reward, first_confirmed_block_in_epoch), - ) - }) - } - _ => None, - }) - .collect() - }; let rewards = addresses .iter() .map(|address| { From a1d2692093ed44794a7812ca122b0aae16f6cb32 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 17 Jan 2024 14:56:30 -0700 Subject: [PATCH 10/14] Add fn get_reward_map helper to dedupe code --- rpc/src/rpc.rs | 82 ++++++++++++++++++++------------------------------ 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 32b9c8c7ec5433..1a165cd8e7a714 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -523,6 +523,35 @@ impl JsonRpcRequestProcessor { }) } + async fn get_reward_map( + &self, + slot: Slot, + addresses: &[String], + config: &RpcEpochConfig, + ) -> Result> { + let Ok(Some(block)) = self + .get_block( + slot, + Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), + ) + .await + else { + return Err(RpcCustomError::BlockNotAvailable { slot }.into()); + }; + + Ok(block + .rewards + .unwrap_or_default() + .into_iter() + .filter_map(|reward| match reward.reward_type? { + RewardType::Staking | RewardType::Voting => addresses + .contains(&reward.pubkey) + .then(|| (reward.clone().pubkey, (reward, slot))), + _ => None, + }) + .collect()) + } + pub async fn get_inflation_reward( &self, addresses: Vec, @@ -571,38 +600,11 @@ impl JsonRpcRequestProcessor { slot: first_slot_in_epoch, })?; - let Ok(Some(first_confirmed_block)) = self - .get_block( - first_confirmed_block_in_epoch, - Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), - ) - .await - else { - return Err(RpcCustomError::BlockNotAvailable { - slot: first_confirmed_block_in_epoch, - } - .into()); - }; - let addresses: Vec = addresses.iter().map(|pubkey| pubkey.to_string()).collect(); - first_confirmed_block - .rewards - .unwrap_or_default() - .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => { - addresses.contains(&reward.pubkey).then(|| { - ( - reward.clone().pubkey, - (reward, first_confirmed_block_in_epoch), - ) - }) - } - _ => None, - }) - .collect() + self.get_reward_map(first_confirmed_block_in_epoch, &addresses, &config) + .await? }; if bank @@ -653,26 +655,8 @@ impl JsonRpcRequestProcessor { let slot = *block_list .get(partition_index.saturating_add(1)) .ok_or_else(Error::internal_error)?; - let Ok(Some(block)) = self - .get_block( - slot, - Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()), - ) - .await - else { - return Err(RpcCustomError::BlockNotAvailable { slot }.into()); - }; - let index_reward_map: HashMap = block - .rewards - .unwrap_or_default() - .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => addresses - .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, (reward, slot))), - _ => None, - }) - .collect(); + + let index_reward_map = self.get_reward_map(slot, addresses, &config).await?; reward_map.extend(index_reward_map); } } From 80507684e6d47d01d546101856b43f3f6acdfd11 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 17 Jan 2024 15:03:05 -0700 Subject: [PATCH 11/14] No need to start 2nd get_block_with_limit call with first block again --- rpc/src/rpc.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 1a165cd8e7a714..de8cc9d7ebd3cb 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -591,15 +591,15 @@ impl JsonRpcRequestProcessor { let bank = self.get_bank_with_config(slot_context)?; - let mut reward_map: HashMap = { - let first_confirmed_block_in_epoch = *self - .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) - .await? - .first() - .ok_or(RpcCustomError::BlockNotAvailable { - slot: first_slot_in_epoch, - })?; + let first_confirmed_block_in_epoch = *self + .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) + .await? + .first() + .ok_or(RpcCustomError::BlockNotAvailable { + slot: first_slot_in_epoch, + })?; + let mut reward_map: HashMap = { let addresses: Vec = addresses.iter().map(|pubkey| pubkey.to_string()).collect(); @@ -645,15 +645,15 @@ impl JsonRpcRequestProcessor { let block_list = self .get_blocks_with_limit( - first_slot_in_epoch, - partition_data.num_partitions + 1, + first_confirmed_block_in_epoch + 1, + partition_data.num_partitions, config.commitment, ) .await?; for (partition_index, addresses) in partition_index_addresses.iter() { let slot = *block_list - .get(partition_index.saturating_add(1)) + .get(*partition_index) .ok_or_else(Error::internal_error)?; let index_reward_map = self.get_reward_map(slot, addresses, &config).await?; From 4ad1470a3204c474c8a9995b6464ae60a3b7cbf3 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 18 Jan 2024 11:56:54 -0700 Subject: [PATCH 12/14] Replace filter_map to parameterize RewardType filter expression --- rpc/src/rpc.rs | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index de8cc9d7ebd3cb..b5fbdd736e8bb9 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -523,12 +523,16 @@ impl JsonRpcRequestProcessor { }) } - async fn get_reward_map( + async fn get_reward_map( &self, slot: Slot, addresses: &[String], + reward_type_filter: F, config: &RpcEpochConfig, - ) -> Result> { + ) -> Result> + where + F: Fn(RewardType) -> bool, + { let Ok(Some(block)) = self .get_block( slot, @@ -543,12 +547,11 @@ impl JsonRpcRequestProcessor { .rewards .unwrap_or_default() .into_iter() - .filter_map(|reward| match reward.reward_type? { - RewardType::Staking | RewardType::Voting => addresses - .contains(&reward.pubkey) - .then(|| (reward.clone().pubkey, (reward, slot))), - _ => None, + .filter(|reward| { + reward.reward_type.is_some_and(reward_type_filter) + && addresses.contains(&reward.pubkey) }) + .map(|reward| (reward.clone().pubkey, (reward, slot))) .collect()) } @@ -590,6 +593,9 @@ impl JsonRpcRequestProcessor { } let bank = self.get_bank_with_config(slot_context)?; + let partitioned_epoch_reward_enabled = bank + .feature_set + .is_active(&feature_set::enable_partitioned_epoch_reward::id()); let first_confirmed_block_in_epoch = *self .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) @@ -603,14 +609,19 @@ impl JsonRpcRequestProcessor { let addresses: Vec = addresses.iter().map(|pubkey| pubkey.to_string()).collect(); - self.get_reward_map(first_confirmed_block_in_epoch, &addresses, &config) - .await? + self.get_reward_map( + first_confirmed_block_in_epoch, + &addresses, + |reward_type| -> bool { + reward_type == RewardType::Voting + || (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking) + }, + &config, + ) + .await? }; - if bank - .feature_set - .is_active(&feature_set::enable_partitioned_epoch_reward::id()) - { + if partitioned_epoch_reward_enabled { let partition_data_address = get_epoch_rewards_partition_data_address(rewards_epoch); let partition_data_account = bank.get_account(&partition_data_address) @@ -656,7 +667,14 @@ impl JsonRpcRequestProcessor { .get(*partition_index) .ok_or_else(Error::internal_error)?; - let index_reward_map = self.get_reward_map(slot, addresses, &config).await?; + let index_reward_map = self + .get_reward_map( + slot, + addresses, + |reward_type| -> bool { reward_type == RewardType::Staking }, + &config, + ) + .await?; reward_map.extend(index_reward_map); } } From 0cc2104173c2128f4d8d950312c2bc0e12c231ca Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 18 Jan 2024 12:41:54 -0700 Subject: [PATCH 13/14] Weird thing to make clippy and compiler agree (https://github.com/rust-lang/rust-clippy/issues/8098) --- rpc/src/rpc.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index b5fbdd736e8bb9..b3ee080111e895 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -527,7 +527,7 @@ impl JsonRpcRequestProcessor { &self, slot: Slot, addresses: &[String], - reward_type_filter: F, + reward_type_filter: &F, config: &RpcEpochConfig, ) -> Result> where @@ -612,7 +612,7 @@ impl JsonRpcRequestProcessor { self.get_reward_map( first_confirmed_block_in_epoch, &addresses, - |reward_type| -> bool { + &|reward_type| -> bool { reward_type == RewardType::Voting || (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking) }, @@ -671,7 +671,7 @@ impl JsonRpcRequestProcessor { .get_reward_map( slot, addresses, - |reward_type| -> bool { reward_type == RewardType::Staking }, + &|reward_type| -> bool { reward_type == RewardType::Staking }, &config, ) .await?; From 89fbcbb88f632faa8259f496db9c90c8209ea24d Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 23 Jan 2024 14:07:00 -0700 Subject: [PATCH 14/14] Use activated_slot to ensure the right approach for past rewards epochs --- rpc/src/rpc.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index b3ee080111e895..16d78a913bc90b 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -593,9 +593,6 @@ impl JsonRpcRequestProcessor { } let bank = self.get_bank_with_config(slot_context)?; - let partitioned_epoch_reward_enabled = bank - .feature_set - .is_active(&feature_set::enable_partitioned_epoch_reward::id()); let first_confirmed_block_in_epoch = *self .get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment) @@ -604,6 +601,12 @@ impl JsonRpcRequestProcessor { .ok_or(RpcCustomError::BlockNotAvailable { slot: first_slot_in_epoch, })?; + let partitioned_epoch_reward_enabled_slot = bank + .feature_set + .activated_slot(&feature_set::enable_partitioned_epoch_reward::id()); + let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot + .map(|slot| slot <= first_confirmed_block_in_epoch) + .unwrap_or(false); let mut reward_map: HashMap = { let addresses: Vec =