Skip to content

Commit

Permalink
Use num_partitions to find specific stake rewards in partitions (anza…
Browse files Browse the repository at this point in the history
…-xyz#1677)

* Add helper to find and filter rewards from a slot

* Check feature enabled for desired epoch

* Refactor existing rewards code to support vote-rewards after activation

* Append stake rewards from partitions

* Remove feature deactivation from TestValidator

* Improve comments

* Add comment about retaining feature activation slot logic

* Add custom error and use in getInflationReward

* Review nit
  • Loading branch information
CriesofCarrots committed Jun 26, 2024
1 parent 9ee36fa commit 0496b06
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 24 deletions.
22 changes: 22 additions & 0 deletions rpc-client-api/src/custom_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub const JSON_RPC_SERVER_ERROR_TRANSACTION_SIGNATURE_LEN_MISMATCH: i64 = -32013
pub const JSON_RPC_SERVER_ERROR_BLOCK_STATUS_NOT_AVAILABLE_YET: i64 = -32014;
pub const JSON_RPC_SERVER_ERROR_UNSUPPORTED_TRANSACTION_VERSION: i64 = -32015;
pub const JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED: i64 = -32016;
pub const JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE: i64 = -32017;

#[derive(Error, Debug)]
pub enum RpcCustomError {
Expand Down Expand Up @@ -65,6 +66,12 @@ pub enum RpcCustomError {
UnsupportedTransactionVersion(u8),
#[error("MinContextSlotNotReached")]
MinContextSlotNotReached { context_slot: Slot },
#[error("EpochRewardsPeriodActive")]
EpochRewardsPeriodActive {
slot: Slot,
current_block_height: u64,
rewards_complete_block_height: u64,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -79,6 +86,13 @@ pub struct MinContextSlotNotReachedErrorData {
pub context_slot: Slot,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EpochRewardsPeriodActiveErrorData {
pub current_block_height: u64,
pub rewards_complete_block_height: u64,
}

impl From<EncodeError> for RpcCustomError {
fn from(err: EncodeError) -> Self {
match err {
Expand Down Expand Up @@ -206,6 +220,14 @@ impl From<RpcCustomError> for Error {
context_slot,
})),
},
RpcCustomError::EpochRewardsPeriodActive { slot, current_block_height, rewards_complete_block_height } => Self {
code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE),
message: format!("Epoch rewards period still active at slot {slot}"),
data: Some(serde_json::json!(EpochRewardsPeriodActiveErrorData {
current_block_height,
rewards_complete_block_height,
})),
},
}
}
}
164 changes: 144 additions & 20 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use {
clock::{Slot, UnixTimestamp, MAX_PROCESSING_AGE},
commitment_config::{CommitmentConfig, CommitmentLevel},
epoch_info::EpochInfo,
epoch_rewards_hasher::EpochRewardsHasher,
epoch_schedule::EpochSchedule,
exit::Exit,
feature_set,
Expand Down Expand Up @@ -93,8 +94,9 @@ use {
solana_transaction_status::{
map_inner_instructions, BlockEncodingOptions, ConfirmedBlock,
ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta,
EncodedConfirmedTransactionWithStatusMeta, Reward, RewardType, TransactionBinaryEncoding,
TransactionConfirmationStatus, TransactionStatus, UiConfirmedBlock, UiTransactionEncoding,
EncodedConfirmedTransactionWithStatusMeta, Reward, RewardType, Rewards,
TransactionBinaryEncoding, TransactionConfirmationStatus, TransactionStatus,
UiConfirmedBlock, UiTransactionEncoding,
},
solana_vote_program::vote_state::{VoteState, MAX_LOCKOUT_HISTORY},
spl_token_2022::{
Expand Down Expand Up @@ -548,6 +550,34 @@ impl JsonRpcRequestProcessor {
})
}

fn filter_map_rewards<'a, F>(
rewards: &'a Option<Rewards>,
slot: Slot,
addresses: &'a [String],
reward_type_filter: &'a F,
) -> HashMap<String, (Reward, Slot)>
where
F: Fn(RewardType) -> bool,
{
Self::filter_rewards(rewards, reward_type_filter)
.filter(|reward| addresses.contains(&reward.pubkey))
.map(|reward| (reward.pubkey.clone(), (reward.clone(), slot)))
.collect()
}

fn filter_rewards<'a, F>(
rewards: &'a Option<Rewards>,
reward_type_filter: &'a F,
) -> impl Iterator<Item = &'a Reward>
where
F: Fn(RewardType) -> bool,
{
rewards
.iter()
.flatten()
.filter(move |reward| reward.reward_type.is_some_and(reward_type_filter))
}

pub async fn get_inflation_reward(
&self,
addresses: Vec<Pubkey>,
Expand Down Expand Up @@ -592,7 +622,22 @@ impl JsonRpcRequestProcessor {
slot: first_slot_in_epoch,
})?;

let Ok(Some(first_confirmed_block)) = self
// Determine if partitioned epoch rewards were enabled for the desired
// epoch
let bank = self.get_bank_with_config(context_config)?;

// DO NOT CLEAN UP with feature_set::enable_partitioned_epoch_reward
// This logic needs to be retained indefinitely to support historical
// rewards before and after feature activation.
let partitioned_epoch_reward_enabled_slot = bank
.feature_set
.activated_slot(&feature_set::enable_partitioned_epoch_reward::id());
let partitioned_epoch_reward_enabled = partitioned_epoch_reward_enabled_slot
.map(|slot| slot <= first_confirmed_block_in_epoch)
.unwrap_or(false);

// Get first block in the epoch
let Ok(Some(epoch_boundary_block)) = self
.get_block(
first_confirmed_block_in_epoch,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
Expand All @@ -605,30 +650,109 @@ impl JsonRpcRequestProcessor {
.into());
};

let addresses: Vec<String> = addresses
.into_iter()
.map(|pubkey| pubkey.to_string())
.collect();
// Collect rewards from first block in the epoch if partitioned epoch
// rewards not enabled, or address is a vote account
let mut reward_map: HashMap<String, (Reward, Slot)> = {
let addresses: Vec<String> =
addresses.iter().map(|pubkey| pubkey.to_string()).collect();
Self::filter_map_rewards(
&epoch_boundary_block.rewards,
first_confirmed_block_in_epoch,
&addresses,
&|reward_type| -> bool {
reward_type == RewardType::Voting
|| (!partitioned_epoch_reward_enabled && reward_type == RewardType::Staking)
},
)
};

let reward_hash: HashMap<String, Reward> = first_confirmed_block
.rewards
.unwrap_or_default()
.into_iter()
.filter_map(|reward| match reward.reward_type? {
RewardType::Staking | RewardType::Voting => addresses
.contains(&reward.pubkey)
.then(|| (reward.clone().pubkey, reward)),
_ => None,
})
.collect();
// Append stake account rewards from partitions if partitions epoch
// rewards is enabled
if partitioned_epoch_reward_enabled {
let num_partitions = epoch_boundary_block.num_reward_partitions.expect(
"epoch-boundary block should have num_reward_partitions after partitioned epoch \
rewards enabled",
);

let num_partitions = usize::try_from(num_partitions)
.expect("num_partitions should never exceed usize::MAX");
let hasher = EpochRewardsHasher::new(
num_partitions,
&Hash::from_str(&epoch_boundary_block.previous_blockhash)
.expect("UiConfirmedBlock::previous_blockhash should be properly formed"),
);
let mut partition_index_addresses: HashMap<usize, Vec<String>> = HashMap::new();
for address in addresses.iter() {
let address_string = address.to_string();
// Skip this address if (Voting) rewards were already found in
// the first block of the epoch
if !reward_map.contains_key(&address_string) {
let partition_index = hasher.clone().hash_address_to_partition(address);
partition_index_addresses
.entry(partition_index)
.and_modify(|list| list.push(address_string.clone()))
.or_insert(vec![address_string]);
}
}

let block_list = self
.get_blocks_with_limit(
first_confirmed_block_in_epoch + 1,
num_partitions,
Some(context_config),
)
.await?;

for (partition_index, addresses) in partition_index_addresses.iter() {
let slot = *block_list.get(*partition_index).ok_or_else(|| {
// If block_list.len() too short to contain
// partition_index, the epoch rewards period must be
// currently active.
let rewards_complete_block_height = epoch_boundary_block
.block_height
.map(|block_height| {
block_height
.saturating_add(num_partitions as u64)
.saturating_add(1)
})
.expect(
"every block after partitioned_epoch_reward_enabled should have a \
populated block_height",
);
RpcCustomError::EpochRewardsPeriodActive {
slot: bank.slot(),
current_block_height: bank.block_height(),
rewards_complete_block_height,
}
})?;

let Ok(Some(block)) = self
.get_block(
slot,
Some(RpcBlockConfig::rewards_with_commitment(config.commitment).into()),
)
.await
else {
return Err(RpcCustomError::BlockNotAvailable { slot }.into());
};

let index_reward_map = Self::filter_map_rewards(
&block.rewards,
slot,
addresses,
&|reward_type| -> bool { reward_type == RewardType::Staking },
);
reward_map.extend(index_reward_map);
}
}

let rewards = addresses
.iter()
.map(|address| {
if let Some(reward) = reward_hash.get(address) {
if let Some((reward, slot)) = reward_map.get(&address.to_string()) {
return Some(RpcInflationReward {
epoch,
effective_slot: first_confirmed_block_in_epoch,
effective_slot: *slot,
amount: reward.lamports.unsigned_abs(),
post_balance: reward.post_balance,
commission: reward.commission,
Expand Down
5 changes: 1 addition & 4 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use {
account::AccountSharedData,
clock::Slot,
epoch_schedule::EpochSchedule,
feature_set,
native_token::sol_to_lamports,
pubkey::Pubkey,
rent::Rent,
Expand Down Expand Up @@ -352,9 +351,7 @@ fn main() {
exit(1);
});

let mut features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();
// Remove this when client support is ready for the enable_partitioned_epoch_reward feature
features_to_deactivate.push(feature_set::enable_partitioned_epoch_reward::id());
let features_to_deactivate = pubkeys_of(&matches, "deactivate_feature").unwrap_or_default();

if TestValidatorGenesis::ledger_exists(&ledger_path) {
for (name, long) in &[
Expand Down

0 comments on commit 0496b06

Please sign in to comment.