Skip to content

Commit

Permalink
fix: whem mpl core authority changes then go update all asset_authori…
Browse files Browse the repository at this point in the history
…ty belonging to the collection to match the one set on the collection account (#187)
  • Loading branch information
kespinola committed Apr 26, 2024
1 parent 3cc7b04 commit 8b29d79
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 5 deletions.
13 changes: 12 additions & 1 deletion digital_asset_types/src/dao/extensions/asset_grouping.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use sea_orm::{EntityTrait, EnumIter, Related, RelationDef, RelationTrait};

use crate::dao::{asset, asset_grouping};
use crate::dao::{asset, asset_authority, asset_grouping};

#[derive(Copy, Clone, Debug, EnumIter)]
pub enum Relation {
Asset,
AssetAuthority,
}

impl RelationTrait for Relation {
Expand All @@ -14,6 +15,10 @@ impl RelationTrait for Relation {
.from(asset_grouping::Column::AssetId)
.to(asset::Column::Id)
.into(),
Self::AssetAuthority => asset_grouping::Entity::belongs_to(asset_authority::Entity)
.from(asset_grouping::Column::AssetId)
.to(asset_authority::Column::Id)
.into(),
}
}
}
Expand All @@ -23,3 +28,9 @@ impl Related<asset::Entity> for asset_grouping::Entity {
Relation::Asset.def()
}
}

impl Related<asset_authority::Entity> for asset_grouping::Entity {
fn to() -> RelationDef {
Relation::AssetAuthority.def()
}
}
72 changes: 68 additions & 4 deletions program_transformers/src/mpl_core_program/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use {
entity::{ActiveValue, ColumnTrait, EntityTrait},
query::{JsonValue, QueryFilter, QueryTrait},
sea_query::query::OnConflict,
ConnectionTrait, DbBackend, TransactionTrait,
sea_query::Expr,
ConnectionTrait, CursorTrait, DbBackend, TransactionTrait,
},
serde_json::{value::Value, Map},
solana_sdk::pubkey::Pubkey,
Expand Down Expand Up @@ -76,8 +77,8 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(

// Note: This indexes both Core Assets and Core Collections.
let asset = match account_data {
MplCoreAccountData::Asset(indexable_asset) => indexable_asset,
MplCoreAccountData::Collection(indexable_asset) => indexable_asset,
MplCoreAccountData::Asset(indexable_asset)
| MplCoreAccountData::Collection(indexable_asset) => indexable_asset,
_ => return Err(ProgramTransformerError::NotImplemented),
};

Expand Down Expand Up @@ -133,6 +134,11 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
.await
.map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?;

if matches!(account_data, MplCoreAccountData::Collection(_)) {
update_group_asset_authorities(conn, id_vec.clone(), update_authority.clone(), slot_i)
.await?;
}

//-----------------------
// asset_data table
//-----------------------
Expand Down Expand Up @@ -361,7 +367,7 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated > asset_grouping.slot_updated",
"{} WHERE excluded.slot_updated >= asset_grouping.slot_updated",
query.sql
);
txn.execute(query)
Expand Down Expand Up @@ -536,3 +542,61 @@ fn convert_keys_to_snake_case(plugins_json: &mut Value) {
_ => {}
}
}

/// Updates the `asset_authority` for all assets that are part of a collection in a batch.
/// This function performs a cursor-based paginated read and batch update.
async fn update_group_asset_authorities<T: ConnectionTrait + TransactionTrait>(
conn: &T,
group_value: Vec<u8>,
authority: Vec<u8>,
slot: i64,
) -> ProgramTransformerResult<()> {
let mut after = None;

let group_key = "collection".to_string();
let group_value = bs58::encode(group_value).into_string();

let mut query = asset_grouping::Entity::find()
.filter(asset_grouping::Column::GroupKey.eq(group_key))
.filter(asset_grouping::Column::GroupValue.eq(group_value))
.cursor_by(asset_grouping::Column::AssetId);
let mut query = query.first(1_000);

loop {
if let Some(after) = after.clone() {
query = query.after(after);
}

let entries = query.all(conn).await?;

if entries.is_empty() {
break;
}

let asset_ids = entries
.clone()
.into_iter()
.map(|entry| entry.asset_id)
.collect::<Vec<_>>();

asset_authority::Entity::update_many()
.col_expr(
asset_authority::Column::Authority,
Expr::value(authority.clone()),
)
.col_expr(asset_authority::Column::SlotUpdated, Expr::value(slot))
.filter(asset_authority::Column::AssetId.is_in(asset_ids))
.filter(asset_authority::Column::Authority.ne(authority.clone()))
.filter(Expr::cust_with_values(
"asset_authority.slot_updated < $1",
vec![slot],
))
.exec(conn)
.await
.map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?;

after = entries.last().map(|entry| entry.asset_id.clone());
}

Ok(())
}

0 comments on commit 8b29d79

Please sign in to comment.