Skip to content

Commit

Permalink
refactor(db): make MongoDbCollection trait async and move `create_i…
Browse files Browse the repository at this point in the history
…ndexes` in there (#779)

* Make MongoDbCollection trait async

* Fix merge

* Simplify collection creation in launch code

* Format

* Slight refactor

* cargo sort
  • Loading branch information
Alex6323 committed Oct 10, 2022
1 parent 4bab57d commit 598ff2d
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 97 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
@@ -1,13 +1,13 @@
[package]
name = "chronicle"
version = "1.0.0-beta.22"
authors = [ "IOTA Stiftung" ]
authors = ["IOTA Stiftung"]
edition = "2021"
description = "IOTA permanode implemented as an IOTA Node Extension (INX)."
readme = "README.md"
repository = "https://github.com/iotaledger/inx-chronicle"
license = "Apache-2.0"
keywords = [ "iota", "storage", "permanode", "chronicle", "inx" ]
keywords = ["iota", "storage", "permanode", "chronicle", "inx"]
homepage = "https://www.iota.org"
rust-version = "1.60"

Expand Down Expand Up @@ -121,7 +121,7 @@ opentelemetry = [
"dep:tracing-opentelemetry",
]
rand = [
"bee-block-stardust?/rand"
"bee-block-stardust?/rand",
]
stardust = [
"dep:bee-block-stardust",
Expand Down
18 changes: 4 additions & 14 deletions src/bin/inx-chronicle/main.rs
Expand Up @@ -58,20 +58,10 @@ async fn main() -> Result<(), Error> {
{
use chronicle::db::collections;
let start_indexes = db.get_index_names().await?;
db.create_collection::<collections::OutputCollection>().await;
db.create_collection::<collections::BlockCollection>().await;
db.create_collection::<collections::LedgerUpdateCollection>().await;
db.create_collection::<collections::MilestoneCollection>().await;
db.collection::<collections::OutputCollection>()
.create_indexes()
.await?;
db.collection::<collections::BlockCollection>().create_indexes().await?;
db.collection::<collections::LedgerUpdateCollection>()
.create_indexes()
.await?;
db.collection::<collections::MilestoneCollection>()
.create_indexes()
.await?;
db.create_indexes::<collections::OutputCollection>().await?;
db.create_indexes::<collections::BlockCollection>().await?;
db.create_indexes::<collections::LedgerUpdateCollection>().await?;
db.create_indexes::<collections::MilestoneCollection>().await?;
let end_indexes = db.get_index_names().await?;
for (collection, indexes) in end_indexes {
if let Some(old_indexes) = start_indexes.get(&collection) {
Expand Down
10 changes: 5 additions & 5 deletions src/db/collections/block.rs
Expand Up @@ -43,6 +43,7 @@ pub struct BlockCollection {
collection: mongodb::Collection<BlockDocument>,
}

#[async_trait::async_trait]
impl MongoDbCollection for BlockCollection {
const NAME: &'static str = "stardust_blocks";
type Document = BlockDocument;
Expand All @@ -54,12 +55,8 @@ impl MongoDbCollection for BlockCollection {
fn collection(&self) -> &mongodb::Collection<Self::Document> {
&self.collection
}
}

/// Implements the queries for the core API.
impl BlockCollection {
/// Creates block indexes.
pub async fn create_indexes(&self) -> Result<(), Error> {
async fn create_indexes(&self) -> Result<(), Error> {
self.create_index(
IndexModel::builder()
.keys(doc! { "block.payload.transaction_id": 1 })
Expand Down Expand Up @@ -93,7 +90,10 @@ impl BlockCollection {

Ok(())
}
}

/// Implements the queries for the core API.
impl BlockCollection {
/// Get a [`Block`] by its [`BlockId`].
pub async fn get_block(&self, block_id: &BlockId) -> Result<Option<Block>, Error> {
self.aggregate(
Expand Down
38 changes: 19 additions & 19 deletions src/db/collections/ledger_update.rs
Expand Up @@ -48,6 +48,7 @@ pub struct LedgerUpdateCollection {
collection: mongodb::Collection<LedgerUpdateDocument>,
}

#[async_trait::async_trait]
impl MongoDbCollection for LedgerUpdateCollection {
const NAME: &'static str = "stardust_ledger_updates";
type Document = LedgerUpdateDocument;
Expand All @@ -59,6 +60,24 @@ impl MongoDbCollection for LedgerUpdateCollection {
fn collection(&self) -> &mongodb::Collection<Self::Document> {
&self.collection
}

async fn create_indexes(&self) -> Result<(), Error> {
self.create_index(
IndexModel::builder()
.keys(newest())
.options(
IndexOptions::builder()
.unique(true)
.name("ledger_update_index".to_string())
.build(),
)
.build(),
None,
)
.await?;

Ok(())
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -87,25 +106,6 @@ fn oldest() -> Document {

/// Queries that are related to [`Output`](crate::types::stardust::block::Output)s.
impl LedgerUpdateCollection {
/// Creates ledger update indexes.
pub async fn create_indexes(&self) -> Result<(), Error> {
self.create_index(
IndexModel::builder()
.keys(newest())
.options(
IndexOptions::builder()
.unique(true)
.name("ledger_update_index".to_string())
.build(),
)
.build(),
None,
)
.await?;

Ok(())
}

/// Inserts [`LedgerSpent`] updates.
#[instrument(skip_all, err, level = "trace")]
pub async fn insert_spent_ledger_updates<'a, I>(&self, outputs: I) -> Result<(), Error>
Expand Down
26 changes: 13 additions & 13 deletions src/db/collections/milestone.rs
Expand Up @@ -49,6 +49,7 @@ pub struct MilestoneCollection {
collection: mongodb::Collection<MilestoneDocument>,
}

#[async_trait::async_trait]
impl MongoDbCollection for MilestoneCollection {
const NAME: &'static str = "stardust_milestones";
type Document = MilestoneDocument;
Expand All @@ -60,20 +61,8 @@ impl MongoDbCollection for MilestoneCollection {
fn collection(&self) -> &mongodb::Collection<Self::Document> {
&self.collection
}
}

/// An aggregation type that represents the ranges of completed milestones and gaps.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncData {
/// The completed(synced and logged) milestones data
pub completed: Vec<RangeInclusive<MilestoneIndex>>,
/// Gaps/missings milestones data
pub gaps: Vec<RangeInclusive<MilestoneIndex>>,
}

impl MilestoneCollection {
/// Creates ledger update indexes.
pub async fn create_indexes(&self) -> Result<(), Error> {
async fn create_indexes(&self) -> Result<(), Error> {
self.create_index(
IndexModel::builder()
.keys(doc! { "at.milestone_index": BY_OLDEST })
Expand Down Expand Up @@ -104,7 +93,18 @@ impl MilestoneCollection {

Ok(())
}
}

/// An aggregation type that represents the ranges of completed milestones and gaps.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncData {
/// The completed(synced and logged) milestones data
pub completed: Vec<RangeInclusive<MilestoneIndex>>,
/// Gaps/missings milestones data
pub gaps: Vec<RangeInclusive<MilestoneIndex>>,
}

impl MilestoneCollection {
/// Gets the [`MilestonePayload`] of a milestone.
pub async fn get_milestone_payload_by_id(
&self,
Expand Down
76 changes: 38 additions & 38 deletions src/db/collections/outputs/mod.rs
Expand Up @@ -52,6 +52,7 @@ pub struct OutputCollection {
collection: mongodb::Collection<OutputDocument>,
}

#[async_trait::async_trait]
impl MongoDbCollection for OutputCollection {
const NAME: &'static str = "stardust_outputs";
type Document = OutputDocument;
Expand All @@ -66,6 +67,43 @@ impl MongoDbCollection for OutputCollection {
fn collection(&self) -> &mongodb::Collection<Self::Document> {
&self.collection
}

async fn create_indexes(&self) -> Result<(), Error> {
self.create_index(
IndexModel::builder()
.keys(doc! { "details.address": 1 })
.options(
IndexOptions::builder()
.unique(false)
.name("address_index".to_string())
.partial_filter_expression(doc! {
"details.address": { "$exists": true },
})
.build(),
)
.build(),
None,
)
.await?;

self.create_index(
IndexModel::builder()
.keys(doc! { "metadata.block_id": 1 })
.options(
IndexOptions::builder()
.unique(false)
.name("metadata_block_id".to_string())
.build(),
)
.build(),
None,
)
.await?;

self.create_indexer_indexes().await?;

Ok(())
}
}

/// Precalculated info and other output details.
Expand Down Expand Up @@ -139,44 +177,6 @@ pub struct UtxoChangesResult {

/// Implements the queries for the core API.
impl OutputCollection {
/// Creates output indexes.
pub async fn create_indexes(&self) -> Result<(), Error> {
self.create_index(
IndexModel::builder()
.keys(doc! { "details.address": 1 })
.options(
IndexOptions::builder()
.unique(false)
.name("address_index".to_string())
.partial_filter_expression(doc! {
"details.address": { "$exists": true },
})
.build(),
)
.build(),
None,
)
.await?;

self.create_index(
IndexModel::builder()
.keys(doc! { "metadata.block_id": 1 })
.options(
IndexOptions::builder()
.unique(false)
.name("metadata_block_id".to_string())
.build(),
)
.build(),
None,
)
.await?;

self.create_indexer_indexes().await?;

Ok(())
}

/// Upserts [`Outputs`](crate::types::stardust::block::Output) with their
/// [`OutputMetadata`](crate::types::ledger::OutputMetadata).
#[instrument(skip_all, err, level = "trace")]
Expand Down
6 changes: 6 additions & 0 deletions src/db/mongodb/collection.rs
Expand Up @@ -20,6 +20,7 @@ use serde::{de::DeserializeOwned, Serialize};
use super::{MongoDb, DUPLICATE_KEY_CODE};

/// A MongoDB collection.
#[async_trait]
pub trait MongoDbCollection {
/// The collection name.
const NAME: &'static str;
Expand All @@ -37,6 +38,11 @@ pub trait MongoDbCollection {
fn with_type<T>(&self) -> mongodb::Collection<T> {
self.collection().clone_with_type()
}

/// Creates the collection indexes.
async fn create_indexes(&self) -> Result<(), Error> {
Ok(())
}
}

/// An extension trait which wraps the basic functionality of a mongodb
Expand Down
4 changes: 3 additions & 1 deletion src/db/mongodb/mod.rs
Expand Up @@ -56,8 +56,10 @@ impl MongoDb {
}

/// Creates a collection if it does not exist.
pub async fn create_collection<T: MongoDbCollection>(&self) {
pub async fn create_indexes<T: MongoDbCollection + Send + Sync>(&self) -> Result<(), Error> {
self.db.create_collection(T::NAME, None).await.ok();
self.collection::<T>().create_indexes().await?;
Ok(())
}

/// Gets a collection of the provided type.
Expand Down
5 changes: 4 additions & 1 deletion tests/blocks.rs
Expand Up @@ -7,7 +7,10 @@ mod common;
mod test_rand {

use chronicle::{
db::collections::{BlockCollection, OutputCollection},
db::{
collections::{BlockCollection, OutputCollection},
MongoDbCollection,
},
types::{
ledger::{
BlockMetadata, ConflictReason, LedgerInclusionState, LedgerOutput, MilestoneIndexTimestamp,
Expand Down
2 changes: 1 addition & 1 deletion tests/claiming.rs
Expand Up @@ -7,7 +7,7 @@ mod common;
mod test_rand {

use chronicle::{
db::collections::OutputCollection,
db::{collections::OutputCollection, MongoDbCollection},
types::{
ledger::{LedgerOutput, LedgerSpent, MilestoneIndexTimestamp, RentStructureBytes, SpentMetadata},
stardust::block::{
Expand Down
2 changes: 1 addition & 1 deletion tests/milestones.rs
Expand Up @@ -6,7 +6,7 @@ mod common;
#[cfg(feature = "rand")]
mod test_rand {
use chronicle::{
db::collections::MilestoneCollection,
db::{collections::MilestoneCollection, MongoDbCollection},
types::stardust::block::payload::{MilestoneId, MilestonePayload},
};

Expand Down
5 changes: 4 additions & 1 deletion tests/outputs.rs
Expand Up @@ -7,7 +7,10 @@ mod common;
mod test_rand {

use chronicle::{
db::collections::{OutputCollection, OutputMetadataResult, OutputWithMetadataResult},
db::{
collections::{OutputCollection, OutputMetadataResult, OutputWithMetadataResult},
MongoDbCollection,
},
types::{
ledger::{LedgerOutput, LedgerSpent, MilestoneIndexTimestamp, RentStructureBytes, SpentMetadata},
stardust::block::{output::OutputId, payload::TransactionId, BlockId, Output},
Expand Down

0 comments on commit 598ff2d

Please sign in to comment.