Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests-on-push-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
cargo clippy \
--package acropolis_common \
--package acropolis_codec \
--package acropolis_module_accounts_state \
--package acropolis_module_address_state \
--package acropolis_module_assets_state \
--package acropolis_module_block_unpacker \
Expand Down
19 changes: 10 additions & 9 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct AccountsState;

impl AccountsState {
/// Async run loop
#[allow(clippy::too_many_arguments)]
async fn run(
history: Arc<Mutex<StateHistory<State>>>,
spdd_store: Option<Arc<Mutex<SPDDStore>>>,
Expand Down Expand Up @@ -166,8 +167,8 @@ impl AccountsState {
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
state.handle_drep_state(&dreps_msg);
Self::check_sync(&current_block, block_info);
state.handle_drep_state(dreps_msg);

let drdd = state.generate_drdd();
if let Err(e) = drep_publisher.publish_drdd(block_info, drdd).await {
Expand All @@ -188,7 +189,7 @@ impl AccountsState {
let span =
info_span!("account_state.handle_spo_state", block = block_info.number);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_spo_state(spo_msg)
.inspect_err(|e| error!("SPOState handling error: {e:#}"))
Expand Down Expand Up @@ -225,7 +226,7 @@ impl AccountsState {
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_parameters(params_msg)
.inspect_err(|e| error!("Messaging handling error: {e}"))
Expand All @@ -247,9 +248,9 @@ impl AccountsState {
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
let after_epoch_result = state
.handle_epoch_activity(ea_msg, &verifier)
.handle_epoch_activity(ea_msg, verifier)
.await
.inspect_err(|e| error!("EpochActivity handling error: {e:#}"))
.ok();
Expand Down Expand Up @@ -285,7 +286,7 @@ impl AccountsState {
Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_certs_msg))) => {
let span = info_span!("account_state.handle_certs", block = block_info.number);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_tx_certificates(tx_certs_msg)
.inspect_err(|e| error!("TxCertificates handling error: {e:#}"))
Expand All @@ -307,7 +308,7 @@ impl AccountsState {
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_withdrawals(withdrawals_msg)
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
Expand All @@ -329,7 +330,7 @@ impl AccountsState {
block = block_info.number
);
async {
Self::check_sync(&current_block, &block_info);
Self::check_sync(&current_block, block_info);
state
.handle_stake_deltas(deltas_msg)
.inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}"))
Expand Down
2 changes: 1 addition & 1 deletion modules/accounts_state/src/monetary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn calculate_monetary_change(

// Handle monetary expansion - movement from reserves to rewards and treasury
let eta = calculate_eta(params, total_non_obft_blocks)?;
let monetary_expansion = calculate_monetary_expansion(&params, old_pots.reserves, &eta);
let monetary_expansion = calculate_monetary_expansion(params, old_pots.reserves, &eta);

// Total rewards available is monetary expansion plus fees from last epoch
// TODO not sure why this is one epoch behind
Expand Down
31 changes: 17 additions & 14 deletions modules/accounts_state/src/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ pub fn calculate_rewards(
registrations: &HashSet<StakeAddress>,
deregistrations: &HashSet<StakeAddress>,
) -> Result<RewardsResult> {
let mut result = RewardsResult::default();
result.epoch = epoch;
let mut result = RewardsResult {
epoch,
..Default::default()
};

// If no blocks produced in previous epoch, don't do anything
let total_blocks = performance.blocks;
Expand Down Expand Up @@ -142,16 +144,16 @@ pub fn calculate_rewards(
// Check all SPOs to see if they match this reward account
for (other_id, other_spo) in staking.spos.iter() {
if other_spo.reward_account == staking_spo.reward_account
&& other_id.cmp(&operator_id) == Ordering::Less
&& other_id.cmp(operator_id) == Ordering::Less
// Lower ID (hash) wins
{
// It must have been paid a reward - we assume that checking it produced
// any blocks is enough here - if not we'll have to do this as a post-process
if performance.spos.get(other_id).map(|s| s.blocks_produced).unwrap_or(0) > 0 {
pay_to_pool_reward_account = false;
warn!("Shelley shared reward account bug: Dropping reward to {} in favour of {} on shared account {}",
hex::encode(&operator_id),
hex::encode(&other_id),
hex::encode(operator_id),
hex::encode(other_id),
staking_spo.reward_account);
break;
}
Expand Down Expand Up @@ -220,6 +222,7 @@ pub fn calculate_rewards(
}

/// Calculate rewards for an individual SPO
#[allow(clippy::too_many_arguments)]
fn calculate_spo_rewards(
operator_id: &KeyHash,
spo: &SnapshotSPO,
Expand All @@ -238,21 +241,21 @@ fn calculate_spo_rewards(
// Active stake (sigma)
let pool_stake = BigDecimal::from(spo.total_stake);
if pool_stake.is_zero() {
warn!("SPO {} has no stake - skipping", hex::encode(&operator_id),);
warn!("SPO {} has no stake - skipping", hex::encode(operator_id));

// No stake, no rewards or earnings
return vec![];
}

// Get the stake actually delegated by the owners accounts to this SPO
let pool_owner_stake =
staking.get_stake_delegated_to_spo_by_addresses(&operator_id, &spo.pool_owners);
staking.get_stake_delegated_to_spo_by_addresses(operator_id, &spo.pool_owners);

// If they haven't met their pledge, no dice
if pool_owner_stake < spo.pledge {
debug!(
"SPO {} has owner stake {} less than pledge {} - skipping",
hex::encode(&operator_id),
hex::encode(operator_id),
pool_owner_stake,
spo.pledge
);
Expand All @@ -263,11 +266,11 @@ fn calculate_spo_rewards(

// Relative stake as fraction of total supply (sigma), and capped with 1/k (sigma')
let relative_pool_stake = &pool_stake / total_supply;
let capped_relative_pool_stake = min(&relative_pool_stake, &relative_pool_saturation_size);
let capped_relative_pool_stake = min(&relative_pool_stake, relative_pool_saturation_size);

// Stake pledged by operator (s) and capped with 1/k (s')
let relative_pool_pledge = &pool_pledge / total_supply;
let capped_relative_pool_pledge = min(&relative_pool_pledge, &relative_pool_saturation_size);
let capped_relative_pool_pledge = min(&relative_pool_pledge, relative_pool_saturation_size);

// Get the optimum reward for this pool
let optimum_rewards = ((stake_rewards / (BigDecimal::one() + pledge_influence_factor))
Expand Down Expand Up @@ -301,7 +304,7 @@ fn calculate_spo_rewards(

debug!(%pool_stake, %relative_pool_stake, %pool_performance,
%optimum_rewards, %pool_rewards, pool_owner_stake, %pool_pledge,
"Pool {}", hex::encode(&operator_id));
"Pool {}", hex::encode(operator_id));

// Subtract fixed costs
let fixed_cost = BigDecimal::from(spo.fixed_cost);
Expand All @@ -316,7 +319,7 @@ fn calculate_spo_rewards(
let margin =
BigDecimal::from(spo.margin.numerator) / BigDecimal::from(spo.margin.denominator);

let relative_owner_stake = &pool_owner_stake / total_supply;
let relative_owner_stake = pool_owner_stake / total_supply;
let margin_cost = ((&pool_rewards - &fixed_cost)
* (&margin
+ (BigDecimal::one() - &margin) * (relative_owner_stake / relative_pool_stake)))
Expand Down Expand Up @@ -345,7 +348,7 @@ fn calculate_spo_rewards(
delegator_stake_address);

// Pool owners don't get member rewards (seems unfair!)
if spo.pool_owners.contains(&delegator_stake_address) {
if spo.pool_owners.contains(delegator_stake_address) {
debug!(
"Skipping pool owner reward account {}, losing {to_pay}",
delegator_stake_address
Expand Down Expand Up @@ -397,7 +400,7 @@ fn calculate_spo_rewards(
} else {
info!(
"SPO {}'s reward account {} not paid {}",
hex::encode(&operator_id),
hex::encode(operator_id),
spo.reward_account,
spo_benefit,
);
Expand Down
1 change: 1 addition & 0 deletions modules/accounts_state/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct Snapshot {

impl Snapshot {
/// Get a stake snapshot based the current stake addresses
#[allow(clippy::too_many_arguments)]
pub fn new(
epoch: u64,
stake_addresses: &StakeAddressMap,
Expand Down
69 changes: 33 additions & 36 deletions modules/accounts_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl State {
}

/// Get Pools Live stake
pub fn get_pools_live_stakes(&self, pool_operators: &Vec<KeyHash>) -> Vec<u64> {
pub fn get_pools_live_stakes(&self, pool_operators: &[KeyHash]) -> Vec<u64> {
self.stake_addresses.lock().unwrap().get_pools_live_stakes(pool_operators)
}

Expand Down Expand Up @@ -657,39 +657,36 @@ impl State {

// If rewards have been calculated, save the results
if let Some(task) = task.take() {
match task.await {
Ok(Ok(reward_result)) => {
// Collect rewards to stake addresses reward deltas
for (_, rewards) in &reward_result.rewards {
reward_deltas.extend(
rewards
.iter()
.map(|reward| StakeRewardDelta {
stake_address: reward.account.clone(),
delta: reward.amount as i64,
})
.collect::<Vec<_>>(),
);
}
if let Ok(Ok(reward_result)) = task.await {
// Collect rewards to stake addresses reward deltas
for rewards in reward_result.rewards.values() {
reward_deltas.extend(
rewards
.iter()
.map(|reward| StakeRewardDelta {
stake_address: reward.account.clone(),
delta: reward.amount as i64,
})
.collect::<Vec<_>>(),
);
}

// Verify them
verifier.verify_rewards(reward_result.epoch, &reward_result);
// Verify them
verifier.verify_rewards(reward_result.epoch, &reward_result);

// Pay the rewards
let mut stake_addresses = self.stake_addresses.lock().unwrap();
for (_, rewards) in reward_result.rewards {
for reward in rewards {
stake_addresses.add_to_reward(&reward.account, reward.amount);
}
// Pay the rewards
let mut stake_addresses = self.stake_addresses.lock().unwrap();
for (_, rewards) in reward_result.rewards {
for reward in rewards {
stake_addresses.add_to_reward(&reward.account, reward.amount);
}
}

// save SPO rewards
spo_rewards = reward_result.spo_rewards.into_iter().collect();
// save SPO rewards
spo_rewards = reward_result.spo_rewards.into_iter().collect();

// Adjust the reserves for next time with amount actually paid
self.pots.reserves -= reward_result.total_paid;
}
_ => (),
// Adjust the reserves for next time with amount actually paid
self.pots.reserves -= reward_result.total_paid;
}
};

Expand Down Expand Up @@ -796,7 +793,7 @@ impl State {
fn register_stake_address(&mut self, stake_address: &StakeAddress, deposit: Option<Lovelace>) {
// Stake addresses can be registered after being used in UTXOs
let mut stake_addresses = self.stake_addresses.lock().unwrap();
if stake_addresses.register_stake_address(&stake_address) {
if stake_addresses.register_stake_address(stake_address) {
// Account for the deposit
let deposit = match deposit {
Some(deposit) => deposit,
Expand Down Expand Up @@ -824,7 +821,7 @@ impl State {
fn deregister_stake_address(&mut self, stake_address: &StakeAddress, refund: Option<Lovelace>) {
// Check if it existed
let mut stake_addresses = self.stake_addresses.lock().unwrap();
if stake_addresses.deregister_stake_address(&stake_address) {
if stake_addresses.deregister_stake_address(stake_address) {
// Account for the deposit, if registered before
let deposit = match refund {
Some(deposit) => deposit,
Expand Down Expand Up @@ -856,7 +853,7 @@ impl State {
/// Record a stake delegation
fn record_stake_delegation(&mut self, stake_address: &StakeAddress, spo: &KeyHash) {
let mut stake_addresses = self.stake_addresses.lock().unwrap();
stake_addresses.record_stake_delegation(&stake_address, spo);
stake_addresses.record_stake_delegation(stake_address, spo);
}

/// Handle an MoveInstantaneousReward (pre-Conway only)
Expand All @@ -868,7 +865,7 @@ impl State {
/// record a drep delegation
fn record_drep_delegation(&mut self, stake_address: &StakeAddress, drep: &DRepChoice) {
let mut stake_addresses = self.stake_addresses.lock().unwrap();
stake_addresses.record_drep_delegation(&stake_address, drep);
stake_addresses.record_drep_delegation(stake_address, drep);
}

/// Handle TxCertificates
Expand All @@ -877,15 +874,15 @@ impl State {
for tx_cert in tx_certs_msg.certificates.iter() {
match &tx_cert.cert {
TxCertificate::StakeRegistration(reg) => {
self.register_stake_address(&reg, None);
self.register_stake_address(reg, None);
}

TxCertificate::StakeDeregistration(dreg) => {
self.deregister_stake_address(&dreg, None);
self.deregister_stake_address(dreg, None);
}

TxCertificate::MoveInstantaneousReward(mir) => {
self.handle_mir(&mir).unwrap_or_else(|e| error!("MIR failed: {e:#}"));
self.handle_mir(mir).unwrap_or_else(|e| error!("MIR failed: {e:#}"));
}

TxCertificate::Registration(reg) => {
Expand Down