Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 113 additions & 13 deletions magicblock-chainlink/src/chainlink/fetch_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ struct AccountWithCompanion {
companion_account: Option<ResolvedAccountSharedData>,
}

enum RefreshDecision {
No,
Yes,
YesAndMarkEmptyIfNotFound,
}

#[derive(Debug, Default)]
pub struct FetchAndCloneResult {
pub not_found_on_chain: Vec<(Pubkey, u64)>,
Expand Down Expand Up @@ -1120,7 +1126,7 @@ where
pubkey: &Pubkey,
in_bank: &AccountSharedData,
fetch_origin: AccountFetchOrigin,
) -> bool {
) -> RefreshDecision {
if in_bank.undelegating() {
debug!("Fetching undelegating account {pubkey}. delegated={}, undelegating={}", in_bank.delegated(), in_bank.undelegating());
let deleg_record = self
Expand All @@ -1130,6 +1136,14 @@ where
fetch_origin,
)
.await;

if deleg_record.is_none() {
// If there is no delegation record then it is possible that the account itself
// does not exist either.
// In that case we need to refresh it as empty to clear the undelegation state.
return RefreshDecision::YesAndMarkEmptyIfNotFound;
}

let delegated_on_chain = deleg_record.as_ref().is_some_and(|dr| {
dr.authority.eq(&self.validator_pubkey)
|| dr.authority.eq(&Pubkey::default())
Expand All @@ -1144,14 +1158,14 @@ where
debug!(
"Account {pubkey} marked as undelegating will be overridden since undelegation completed"
);
return true;
return RefreshDecision::Yes;
}
} else if in_bank.owner().eq(&dlp::id()) {
debug!(
"Account {pubkey} owned by deleg program not marked as undelegating"
);
}
false
RefreshDecision::No
}

/// Fetch and clone accounts with request deduplication to avoid parallel fetches of the same account.
Expand Down Expand Up @@ -1191,25 +1205,35 @@ where
let mut await_pending = vec![];
let mut fetch_new = vec![];
let mut in_bank = vec![];
let mut extra_mark_empty = vec![];
for pubkey in pubkeys.iter() {
if let Some(account_in_bank) =
self.accounts_bank.get_account(pubkey)
{
let should_refresh_undelegating = self
let decision = self
.should_refresh_undelegating_in_bank_account(
pubkey,
&account_in_bank,
fetch_origin,
)
.await;
if should_refresh_undelegating {
debug!("Account {pubkey} completed undelegation which we missed and is fetched again");
metrics::inc_unstuck_undelegation_count();
}
if !should_refresh_undelegating {
// Account is in bank and subscribed correctly - no fetch needed
trace!("Account {pubkey} found in bank in valid state, no fetch needed");
in_bank.push(*pubkey);

match decision {
RefreshDecision::Yes
| RefreshDecision::YesAndMarkEmptyIfNotFound => {
debug!("Account {pubkey} completed undelegation which we missed and is fetched again");
metrics::inc_unstuck_undelegation_count();
if let RefreshDecision::YesAndMarkEmptyIfNotFound =
decision
{
extra_mark_empty.push(*pubkey);
}
}
RefreshDecision::No => {
// Account is in bank and subscribed correctly - no fetch needed
trace!("Account {pubkey} found in bank in valid state, no fetch needed");
in_bank.push(*pubkey);
}
}
}
}
Expand Down Expand Up @@ -1244,9 +1268,19 @@ where
// If we have accounts to fetch, delegate to the existing implementation
// but notify all pending requests when done
let result = if !fetch_new.is_empty() {
let mut all_mark_empty = mark_empty_if_not_found
.map(|x| x.to_vec())
.unwrap_or_default();
all_mark_empty.extend(extra_mark_empty);
let mark_empty_ref = if all_mark_empty.is_empty() {
None
} else {
Some(all_mark_empty.as_slice())
};

self.fetch_and_clone_accounts(
&fetch_new,
mark_empty_if_not_found,
mark_empty_ref,
slot,
fetch_origin,
program_ids,
Expand Down Expand Up @@ -2970,4 +3004,70 @@ mod tests {
&[&marked_non_existing_account_pubkey]
);
}

#[tokio::test]
async fn test_fetch_and_clone_undelegating_account_that_is_closed_on_chain()
{
init_logger();
let validator_pubkey = random_pubkey();
let account_pubkey = random_pubkey();
let account_owner = random_pubkey();
const CURRENT_SLOT: u64 = 100;

// The account exists in the bank (undelegating) but is closed on chain
let account_in_bank = Account {
lamports: 1_000_000,
data: vec![1, 2, 3, 4],
owner: account_owner,
executable: false,
rent_epoch: 0,
};

// Setup with NO accounts on chain
let FetcherTestCtx {
accounts_bank,
fetch_cloner,
remote_account_provider,
..
} = setup(
std::iter::empty::<(Pubkey, Account)>(),
CURRENT_SLOT,
validator_pubkey,
)
.await;

// Insert account into bank and mark as undelegating
accounts_bank
.insert(account_pubkey, AccountSharedData::from(account_in_bank));
accounts_bank.set_undelegating(&account_pubkey, true);

// Fetch and clone - should detect closed account and clone empty account
let result = fetch_cloner
.fetch_and_clone_accounts_with_dedup(
&[account_pubkey],
None,
None,
AccountFetchOrigin::GetAccount,
None,
)
.await;

debug!("Test result: {result:?}");
assert!(result.is_ok());

// Account should be replaced with empty account in bank
let cloned_account = accounts_bank.get_account(&account_pubkey);
assert!(cloned_account.is_some());
let cloned_account = cloned_account.unwrap();

assert_eq!(cloned_account.lamports(), 0);
assert!(cloned_account.data().is_empty());
assert_eq!(*cloned_account.owner(), system_program::id());

// Should be subscribed
assert_subscribed_without_delegation_record!(
remote_account_provider,
&[&account_pubkey]
);
}
}