Skip to content

Commit

Permalink
Add rpc support for partitioned rewards (#34773)
Browse files Browse the repository at this point in the history
* Check feature_set for enable_partitioned_epoch_reward

* Keep common variable outside if case

* Keep common early return out of if case, since the first_slot_in_epoch must exist for partiion PDA to exist

* Get and parse epoch partition data PDA

* Find partition index for all addresses

* Pull relevant blocks and get rewards

* Reuse ordering and reformatting

* Remove feature deactivation from TestValidator

* Restore rewards iteration in first block in epoch for feature case to catch Voting rewards

* Add fn get_reward_map helper to dedupe code

* No need to start 2nd get_block_with_limit call with first block again

* Replace filter_map to parameterize RewardType filter expression

* Weird thing to make clippy and compiler agree (rust-lang/rust-clippy#8098)

* Use activated_slot to ensure the right approach for past rewards epochs

(cherry picked from commit 22500c2)
  • Loading branch information
CriesofCarrots authored and mergify[bot] committed Jan 25, 2024
1 parent 2c5f24a commit 7a271f2
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 38 deletions.
157 changes: 123 additions & 34 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
base64::{prelude::BASE64_STANDARD, Engine},
bincode::{config::Options, serialize},
crossbeam_channel::{unbounded, Receiver, Sender},
jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result},
jsonrpc_core::{futures::future, types::error, BoxFuture, Error, ErrorCode, Metadata, Result},
jsonrpc_derive::rpc,
solana_account_decoder::{
parse_token::{is_known_spl_token_id, token_amount_to_ui_amount, UiTokenAmount},
Expand Down Expand Up @@ -62,6 +62,10 @@ use {
clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES},
commitment_config::{CommitmentConfig, CommitmentLevel},
epoch_info::EpochInfo,
epoch_rewards_hasher::EpochRewardsHasher,
epoch_rewards_partition_data::{
get_epoch_rewards_partition_data_address, EpochRewardsPartitionDataVersion,
},
epoch_schedule::EpochSchedule,
exit::Exit,
feature_set,
Expand Down Expand Up @@ -519,6 +523,38 @@ impl JsonRpcRequestProcessor {
})
}

async fn get_reward_map<F>(
&self,
slot: Slot,
addresses: &[String],
reward_type_filter: &F,
config: &RpcEpochConfig,
) -> Result<HashMap<String, (Reward, Slot)>>
where
F: Fn(RewardType) -> bool,
{
let Ok(Some(block)) = self
.get_block(
slot,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
)
.await
else {
return Err(RpcCustomError::BlockNotAvailable { slot }.into());
};

Ok(block
.rewards
.unwrap_or_default()
.into_iter()
.filter(|reward| {
reward.reward_type.is_some_and(reward_type_filter)
&& addresses.contains(&reward.pubkey)
})
.map(|reward| (reward.clone().pubkey, (reward, slot)))
.collect())
}

pub async fn get_inflation_reward(
&self,
addresses: Vec<Pubkey>,
Expand All @@ -527,18 +563,20 @@ impl JsonRpcRequestProcessor {
let config = config.unwrap_or_default();
let epoch_schedule = self.get_epoch_schedule();
let first_available_block = self.get_first_available_block().await;
let slot_context = RpcContextConfig {
commitment: config.commitment,
min_context_slot: config.min_context_slot,
};
let epoch = match config.epoch {
Some(epoch) => epoch,
None => epoch_schedule
.get_epoch(self.get_slot(RpcContextConfig {
commitment: config.commitment,
min_context_slot: config.min_context_slot,
})?)
.get_epoch(self.get_slot(slot_context)?)
.saturating_sub(1),
};

// Rewards for this epoch are found in the first confirmed block of the next epoch
let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch.saturating_add(1));
let rewards_epoch = epoch.saturating_add(1);
let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(rewards_epoch);

if first_slot_in_epoch < first_available_block {
if self.bigtable_ledger_storage.is_some() {
return Err(RpcCustomError::LongTermStorageSlotSkipped {
Expand All @@ -554,51 +592,103 @@ impl JsonRpcRequestProcessor {
}
}

let bank = self.get_bank_with_config(slot_context)?;

let first_confirmed_block_in_epoch = *self
.get_blocks_with_limit(first_slot_in_epoch, 1, config.commitment)
.await?
.first()
.ok_or(RpcCustomError::BlockNotAvailable {
slot: first_slot_in_epoch,
})?;
let partitioned_epoch_reward_enabled_slot = bank
.feature_set
.activated_slot(&feature_set::enable_partitioned_epoch_reward::id());
let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot
.map(|slot| slot <= first_confirmed_block_in_epoch)
.unwrap_or(false);

let Ok(Some(first_confirmed_block)) = self
.get_block(
let mut reward_map: HashMap<String, (Reward, Slot)> = {
let addresses: Vec<String> =
addresses.iter().map(|pubkey| pubkey.to_string()).collect();

self.get_reward_map(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
&addresses,
&|reward_type| -> bool {
reward_type == RewardType::Voting
|| (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking)
},
&config,
)
.await
else {
return Err(RpcCustomError::BlockNotAvailable {
slot: first_confirmed_block_in_epoch,
}
.into());
.await?
};

let addresses: Vec<String> = addresses
.into_iter()
.map(|pubkey| pubkey.to_string())
.collect();
if partitioned_epoch_reward_enabled {
let partition_data_address = get_epoch_rewards_partition_data_address(rewards_epoch);
let partition_data_account =
bank.get_account(&partition_data_address)
.ok_or_else(|| Error {
code: ErrorCode::InternalError,
message: format!(
"Partition data account not found for epoch {:?} at {:?}",
epoch, partition_data_address
),
data: None,
})?;
let EpochRewardsPartitionDataVersion::V0(partition_data) =
bincode::deserialize(partition_data_account.data())
.map_err(|_| Error::internal_error())?;
let hasher = EpochRewardsHasher::new(
partition_data.num_partitions,
&partition_data.parent_blockhash,
);
let mut partition_index_addresses: HashMap<usize, Vec<String>> = HashMap::new();
for address in addresses.iter() {
let address_string = address.to_string();
// Skip this address if (Voting) rewards were already found in
// the first block of the epoch
if !reward_map.contains_key(&address_string) {
let partition_index = hasher.clone().hash_address_to_partition(address);
partition_index_addresses
.entry(partition_index)
.and_modify(|list| list.push(address_string.clone()))
.or_insert(vec![address_string]);
}
}

let reward_hash: HashMap<String, Reward> = first_confirmed_block
.rewards
.unwrap_or_default()
.into_iter()
.filter_map(|reward| match reward.reward_type? {
RewardType::Staking | RewardType::Voting => addresses
.contains(&reward.pubkey)
.then(|| (reward.clone().pubkey, reward)),
_ => None,
})
.collect();
let block_list = self
.get_blocks_with_limit(
first_confirmed_block_in_epoch + 1,
partition_data.num_partitions,
config.commitment,
)
.await?;

for (partition_index, addresses) in partition_index_addresses.iter() {
let slot = *block_list
.get(*partition_index)
.ok_or_else(Error::internal_error)?;

let index_reward_map = self
.get_reward_map(
slot,
addresses,
&|reward_type| -> bool { reward_type == RewardType::Staking },
&config,
)
.await?;
reward_map.extend(index_reward_map);
}
}

let rewards = addresses
.iter()
.map(|address| {
if let Some(reward) = reward_hash.get(address) {
if let Some((reward, slot)) = reward_map.get(&address.to_string()) {
return Some(RpcInflationReward {
epoch,
effective_slot: first_confirmed_block_in_epoch,
effective_slot: *slot,
amount: reward.lamports.unsigned_abs(),
post_balance: reward.post_balance,
commission: reward.commission,
Expand All @@ -607,7 +697,6 @@ impl JsonRpcRequestProcessor {
None
})
.collect();

Ok(rewards)
}

Expand Down
5 changes: 1 addition & 4 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use {
account::AccountSharedData,
clock::Slot,
epoch_schedule::EpochSchedule,
feature_set,
native_token::sol_to_lamports,
pubkey::Pubkey,
rent::Rent,
Expand Down Expand Up @@ -349,9 +348,7 @@ fn main() {
exit(1);
});

let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();
// Remove this when client support is ready for the enable_partitioned_epoch_reward feature
features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id());
let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();

if TestValidatorGenesis::ledger_exists(&ledger_path) {
for (name, long) in &[
Expand Down

0 comments on commit 7a271f2

Please sign in to comment.