Skip to content

Commit

Permalink
refactor: remove optionally writting cl_audits. (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Jun 14, 2024
1 parent 7c94c94 commit e21e037
Show file tree
Hide file tree
Showing 16 changed files with 56 additions and 91 deletions.
2 changes: 1 addition & 1 deletion integration_tests/tests/integration_tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub async fn apply_migrations_and_delete_data(db: Arc<DatabaseConnection>) {
}

async fn load_ingest_program_transformer(pool: sqlx::Pool<sqlx::Postgres>) -> ProgramTransformer {
ProgramTransformer::new(pool, Box::new(|_info| ready(Ok(())).boxed()), false)
ProgramTransformer::new(pool, Box::new(|_info| ready(Ok(())).boxed()))
}

pub async fn get_transaction(
Expand Down
1 change: 0 additions & 1 deletion nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub fn account_worker<T: Messenger>(
let manager = Arc::new(ProgramTransformer::new(
pool,
create_download_metadata_notifier(bg_task_sender),
false,
));
loop {
let e = msg.recv(stream_key, consumption_type.clone()).await;
Expand Down
1 change: 0 additions & 1 deletion nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
stream_name,
);
}
Expand Down
2 changes: 0 additions & 2 deletions nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub fn transaction_worker<T: Messenger>(
bg_task_sender: UnboundedSender<TaskData>,
ack_channel: UnboundedSender<(&'static str, String)>,
consumption_type: ConsumptionType,
cl_audits: bool,
stream_key: &'static str,
) -> JoinHandle<()> {
tokio::spawn(async move {
Expand All @@ -35,7 +34,6 @@ pub fn transaction_worker<T: Messenger>(
let manager = Arc::new(ProgramTransformer::new(
pool,
create_download_metadata_notifier(bg_task_sender),
cl_audits,
));
loop {
let e = msg.recv(stream_key, consumption_type.clone()).await;
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ pub async fn burn<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/cancel_redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ pub async fn cancel_redeem<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
match le.schema {
LeafSchema::V1 {
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -44,8 +43,7 @@ where
"Handling collection verification event for {} (verify: {}): {}",
collection, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/creator_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -60,8 +59,7 @@ where
"Handling creator verification event for creator {} (verify: {}): {}",
creator, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;

match le.schema {
LeafSchema::V1 {
Expand Down
68 changes: 32 additions & 36 deletions program_transformers/src/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ pub async fn save_changelog_event<'c, T>(
txn_id: &str,
txn: &T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<u64>
where
T: ConnectionTrait + TransactionTrait,
{
insert_change_log(change_log_event, slot, txn_id, txn, instruction, cl_audits).await?;
insert_change_log(change_log_event, slot, txn_id, txn, instruction).await?;
Ok(change_log_event.seq)
}

Expand All @@ -44,7 +43,6 @@ pub async fn insert_change_log<'c, T>(
txn_id: &str,
txn: &T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -98,39 +96,37 @@ where
.map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?;
}

// Insert the audit item after the insert into cl_items have been completed
if cl_audits {
let tx_id_bytes = bs58::decode(txn_id)
.into_vec()
.map_err(|_e| ProgramTransformerError::ChangeLogEventMalformed)?;
let ix = Instruction::from(instruction);
if ix == Instruction::Unknown {
error!("Unknown instruction: {}", instruction);
}
let audit_item_v2 = cl_audits_v2::ActiveModel {
tree: ActiveValue::Set(tree_id.to_vec()),
leaf_idx: ActiveValue::Set(change_log_event.index as i64),
seq: ActiveValue::Set(change_log_event.seq as i64),
tx: ActiveValue::Set(tx_id_bytes),
instruction: ActiveValue::Set(ix),
..Default::default()
};
let query = cl_audits_v2::Entity::insert(audit_item_v2)
.on_conflict(
OnConflict::columns([
cl_audits_v2::Column::Tree,
cl_audits_v2::Column::LeafIdx,
cl_audits_v2::Column::Seq,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
error!("Error while inserting into cl_audits_v2: {:?}", e);
}
let tx_id_bytes = bs58::decode(txn_id)
.into_vec()
.map_err(|_e| ProgramTransformerError::ChangeLogEventMalformed)?;
let ix = Instruction::from(instruction);
if ix == Instruction::Unknown {
error!("Unknown instruction: {}", instruction);
}
let audit_item_v2 = cl_audits_v2::ActiveModel {
tree: ActiveValue::Set(tree_id.to_vec()),
leaf_idx: ActiveValue::Set(change_log_event.index as i64),
seq: ActiveValue::Set(change_log_event.seq as i64),
tx: ActiveValue::Set(tx_id_bytes),
instruction: ActiveValue::Set(ix),
..Default::default()
};

let query = cl_audits_v2::Entity::insert(audit_item_v2)
.on_conflict(
OnConflict::columns([
cl_audits_v2::Column::Tree,
cl_audits_v2::Column::LeafIdx,
cl_audits_v2::Column::Seq,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
error!("Error while inserting into cl_audits_v2: {:?}", e);
}
}

Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ pub async fn delegate<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
match le.schema {
LeafSchema::V1 {
id,
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub async fn mint_v1<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<Option<DownloadMetadataInfo>>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -51,8 +50,7 @@ where
&parsing_result.tree_update,
&parsing_result.payload,
) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let metadata = args;
#[allow(unreachable_patterns)]
return match le.schema {
Expand Down
23 changes: 9 additions & 14 deletions program_transformers/src/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub async fn handle_bubblegum_instruction<'c, T>(
bundle: &'c InstructionBundle<'c>,
txn: &T,
download_metadata_notifier: &DownloadMetadataNotifier,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -63,46 +62,42 @@ where

match ix_type {
InstructionName::Transfer => {
transfer::transfer(parsing_result, bundle, txn, ix_str, cl_audits).await?;
transfer::transfer(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::Burn => {
burn::burn(parsing_result, bundle, txn, ix_str, cl_audits).await?;
burn::burn(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::Delegate => {
delegate::delegate(parsing_result, bundle, txn, ix_str, cl_audits).await?;
delegate::delegate(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::MintV1 | InstructionName::MintToCollectionV1 => {
if let Some(info) =
mint_v1::mint_v1(parsing_result, bundle, txn, ix_str, cl_audits).await?
{
if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? {
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
}
}
InstructionName::Redeem => {
redeem::redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?;
redeem::redeem(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::CancelRedeem => {
cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?;
cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::DecompressV1 => {
debug!("No action necessary for decompression")
}
InstructionName::VerifyCreator | InstructionName::UnverifyCreator => {
creator_verification::process(parsing_result, bundle, txn, ix_str, cl_audits).await?;
creator_verification::process(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::VerifyCollection
| InstructionName::UnverifyCollection
| InstructionName::SetAndVerifyCollection => {
collection_verification::process(parsing_result, bundle, txn, ix_str, cl_audits)
.await?;
collection_verification::process(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::SetDecompressibleState => (), // Nothing to index.
InstructionName::UpdateMetadata => {
if let Some(info) =
update_metadata::update_metadata(parsing_result, bundle, txn, ix_str, cl_audits)
.await?
update_metadata::update_metadata(parsing_result, bundle, txn, ix_str).await?
{
download_metadata_notifier(info)
.await
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ pub async fn redeem<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ pub async fn transfer<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
match le.schema {
LeafSchema::V1 {
id,
Expand Down
4 changes: 1 addition & 3 deletions program_transformers/src/bubblegum/update_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub async fn update_metadata<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> ProgramTransformerResult<Option<DownloadMetadataInfo>>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -49,8 +48,7 @@ where
&parsing_result.tree_update,
&parsing_result.payload,
) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;

#[allow(unreachable_patterns)]
return match le.schema {
Expand Down
14 changes: 5 additions & 9 deletions program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,10 @@ pub struct ProgramTransformer {
download_metadata_notifier: DownloadMetadataNotifier,
parsers: HashMap<Pubkey, Box<dyn ProgramParser>>,
key_set: HashSet<Pubkey>,
cl_audits: bool,
}

impl ProgramTransformer {
pub fn new(
pool: PgPool,
download_metadata_notifier: DownloadMetadataNotifier,
cl_audits: bool,
) -> Self {
pub fn new(pool: PgPool, download_metadata_notifier: DownloadMetadataNotifier) -> Self {
let mut parsers: HashMap<Pubkey, Box<dyn ProgramParser>> = HashMap::with_capacity(3);
let bgum = BubblegumParser {};
let token_metadata = TokenMetadataParser {};
Expand All @@ -112,7 +107,6 @@ impl ProgramTransformer {
download_metadata_notifier,
parsers,
key_set: hs,
cl_audits,
}
}

Expand Down Expand Up @@ -186,7 +180,6 @@ impl ProgramTransformer {
&ix,
&self.storage,
&self.download_metadata_notifier,
self.cl_audits,
)
.await
.map_err(|err| {
Expand All @@ -205,7 +198,10 @@ impl ProgramTransformer {
}

if not_impl == ixlen {
debug!("Not imple");
debug!(
"Not implemented for transaction signature: {:?}",
tx_info.signature
);
return Err(ProgramTransformerError::NotImplemented);
}
Ok(())
Expand Down

0 comments on commit e21e037

Please sign in to comment.