Skip to content

Commit

Permalink
fix(db)!: migration version checking (#1097)
Browse files Browse the repository at this point in the history
* Fix migration

* last migration

* suggestions

* clippy

* ergonomics

* Add last migration at startup

* block to allow migrations fn pointers

* clippy!!!! 😤

* log

* doc

* Ignore already missing indexes when dropping

* log

* ergonomics

* docs

* Minor cleanup of migration logic (#1098)

* Minor cleanup of migration logic

* fmt

* comment

---------

Co-authored-by: Alexandcoats <alexandcoats@gmail.com>

* log

* Refactor

* Switch to id based migrations with metadata

* Just migrate in the app

* refactor and check first run

* No block

* Fis migrations during restarts

* Re-add migration log

---------

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>
  • Loading branch information
DaughterOfMars and grtlr committed Feb 6, 2023
1 parent 315bf0c commit 4d1bc3e
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 151 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -43,7 +43,7 @@ serde = { version = "1.0", features = [ "derive" ], default-features = false }
serde_bytes = { version = "0.11", default-features = false }
serde_json = { version = "1.0", default-features = false, features = [ "std" ] }
thiserror = { version = "1.0", default-features = false }
time = { version = "0.3", default-features = false, features = [ "std" ] }
time = { version = "0.3", default-features = false, features = [ "std", "serde", "macros" ] }
tokio = { version = "1.24", default-features = false, features = [ "macros", "rt-multi-thread", "signal" ] }
tokio-stream = { version = "0.1", default-features = false }
tracing = { version = "0.1", default-features = false, features = [ "std", "attributes", "release_max_level_debug" ] }
Expand Down
6 changes: 3 additions & 3 deletions src/bin/inx-chronicle/cli.rs
Expand Up @@ -360,10 +360,10 @@ impl ClArgs {
tracing::info!("Indexes built successfully.");
return Ok(PostCommand::Exit);
}
Subcommands::MigrateTo { version } => {
Subcommands::Migrate => {
tracing::info!("Connecting to database using hosts: `{}`.", config.mongodb.hosts_str()?);
let db = chronicle::db::MongoDb::connect(&config.mongodb).await?;
crate::migrations::migrate(version, &db).await?;
crate::migrations::migrate(&db).await?;
tracing::info!("Migration completed successfully.");
return Ok(PostCommand::Exit);
}
Expand Down Expand Up @@ -404,7 +404,7 @@ pub enum Subcommands {
/// Manually build indexes.
BuildIndexes,
/// Migrate to a new version.
MigrateTo { version: String },
Migrate,
}

#[derive(Copy, Clone, PartialEq, Eq)]
Expand Down
7 changes: 6 additions & 1 deletion src/bin/inx-chronicle/main.rs
Expand Up @@ -20,7 +20,10 @@ use tokio::task::JoinSet;
use tracing::{debug, error, info};
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

use self::cli::{ClArgs, PostCommand};
use self::{
cli::{ClArgs, PostCommand},
migrations::check_migration_version,
};

#[tokio::main]
async fn main() -> eyre::Result<()> {
Expand All @@ -44,6 +47,8 @@ async fn main() -> eyre::Result<()> {
ByteSize::b(db.size().await?)
);

check_migration_version(&db).await?;

build_indexes(&db).await?;

let mut tasks: JoinSet<eyre::Result<()>> = JoinSet::new();
Expand Down
124 changes: 124 additions & 0 deletions src/bin/inx-chronicle/migrations/migrate_0.rs
@@ -0,0 +1,124 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use chronicle::{
db::{collections::OutputCollection, MongoDb, MongoDbCollectionExt},
types::stardust::block::output::{AliasId, NftId, OutputId},
};
use futures::TryStreamExt;
use mongodb::{bson::doc, options::IndexOptions, IndexModel};
use serde::Deserialize;

use super::Migration;

pub struct Migrate;

#[async_trait]
impl Migration for Migrate {
const ID: usize = 0;
const APP_VERSION: &'static str = "1.0.0-beta.32";
const DATE: time::Date = time::macros::date!(2023 - 02 - 03);

async fn migrate(db: &MongoDb) -> eyre::Result<()> {
let collection = db.collection::<OutputCollection>();

#[derive(Deserialize)]
struct Res {
output_id: OutputId,
}

// Convert the outputs with implicit IDs
let outputs = collection
.aggregate::<Res>(
[
doc! { "$match": { "$or": [
{ "output.alias_id": AliasId::implicit() },
{ "output.nft_id": NftId::implicit() }
] } },
doc! { "$project": {
"output_id": "$_id"
} },
],
None,
)
.await?
.map_ok(|res| res.output_id)
.try_collect::<Vec<_>>()
.await?;

for output_id in outputs {
// Alias and nft are the same length so both can be done this way since they are just serialized as bytes
let id = AliasId::from(output_id);
collection
.update_one(
doc! { "_id": output_id },
doc! { "$set": { "details.indexed_id": id } },
None,
)
.await?;
}

// Get the outputs that don't have implicit IDs
collection
.update_many(
doc! {
"output.kind": "alias",
"output.alias_id": { "$ne": AliasId::implicit() },
},
vec![doc! { "$set": {
"details.indexed_id": "$output.alias_id",
} }],
None,
)
.await?;

collection
.update_many(
doc! {
"output.kind": "nft",
"output.nft_id": { "$ne": NftId::implicit() },
},
vec![doc! { "$set": {
"details.indexed_id": "$output.nft_id",
} }],
None,
)
.await?;

collection
.update_many(
doc! { "output.kind": "foundry" },
vec![doc! { "$set": {
"details.indexed_id": "$output.foundry_id",
} }],
None,
)
.await?;

collection.drop_index("output_alias_id_index", None).await?;

collection.drop_index("output_foundry_id_index", None).await?;

collection.drop_index("output_nft_id_index", None).await?;

collection
.create_index(
IndexModel::builder()
.keys(doc! { "details.indexed_id": 1 })
.options(
IndexOptions::builder()
.name("output_indexed_id_index".to_string())
.partial_filter_expression(doc! {
"details.indexed_id": { "$exists": true },
})
.build(),
)
.build(),
None,
)
.await?;

Ok(())
}
}
120 changes: 0 additions & 120 deletions src/bin/inx-chronicle/migrations/migrate_1_0_0_beta_31.rs

This file was deleted.

0 comments on commit 4d1bc3e

Please sign in to comment.