Skip to content

Commit

Permalink
Refactor MongoDb (#99)
Browse files Browse the repository at this point in the history
* Refactor `MongoDb` and `model`

* Address comments

* Remove `.clone()`s
  • Loading branch information
grtlr committed Apr 29, 2022
1 parent 1adb4fe commit 46cc56c
Show file tree
Hide file tree
Showing 22 changed files with 447 additions and 723 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Expand Up @@ -66,7 +66,6 @@ bee-rest-api-stardust = { package = "bee-rest-api", git = "https://github.com/io
default = [
"api-analytics",
"api-explorer",
"api-indexer",
"api-node",
"inx",
"stardust",
Expand All @@ -85,9 +84,6 @@ api-analytics = [
api-explorer = [
"api",
]
api-indexer = [
"api",
]
api-node = [
"api",
]
Expand Down
9 changes: 1 addition & 8 deletions bin/inx-chronicle/src/api/routes.rs
Expand Up @@ -8,7 +8,6 @@ use chronicle::db::{
};
use futures::TryStreamExt;
use hyper::Method;
use mongodb::{bson::doc, options::FindOptions};
use tower_http::{
catch_panic::CatchPanicLayer,
cors::{Any, CorsLayer},
Expand Down Expand Up @@ -52,13 +51,7 @@ async fn info() -> InfoResponse {
}

async fn sync(database: Extension<MongoDb>) -> ApiResult<SyncDataResponse> {
let mut res = database
.collection::<SyncRecord>()
.find(
doc! { "synced": true },
FindOptions::builder().sort(doc! {"milestone_index": 1}).build(),
)
.await?;
let mut res = database.sync_records_sorted().await?;
let mut sync_data = SyncData::default();
let mut last_record: Option<SyncRecord> = None;
while let Some(sync_record) = res.try_next().await? {
Expand Down
76 changes: 4 additions & 72 deletions bin/inx-chronicle/src/api/stardust/analytics/routes.rs
Expand Up @@ -2,16 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use axum::{routing::get, Extension, Router};
use chronicle::{
db::{
bson::DocExt,
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDb,
},
stardust::payload::TransactionPayload,
};
use futures::TryStreamExt;
use mongodb::bson::doc;
use chronicle::db::{bson::DocExt, MongoDb};

use super::responses::AddressAnalyticsResponse;
use crate::api::{
Expand All @@ -35,68 +26,9 @@ async fn address_analytics(
let end_milestone = end_milestone(&database, end_timestamp).await?;

let res = database
.doc_collection::<MessageRecord>()
.aggregate(
vec![
doc! { "$match": {
"inclusion_state": LedgerInclusionState::Included,
"milestone_index": { "$gt": start_milestone, "$lt": end_milestone },
"message.payload.data.kind": TransactionPayload::KIND as i32,
} },
doc! { "$unwind": { "path": "$message.payload.data.essence.data.inputs", "includeArrayIndex": "message.payload.data.essence.data.inputs.idx" } },
doc! { "$lookup": {
"from": "stardust_messages",
"let": { "transaction_id": "$message.payload.data.essence.data.inputs.transaction_id", "index": "$message.payload.data.essence.data.inputs.index" },
"pipeline": [
{ "$match": {
"inclusion_state": LedgerInclusionState::Included,
"message.payload.transaction_id": "$$transaction_id",
} },
{ "$set": {
"message.payload.data.essence.data.outputs": {
"$arrayElemAt": [
"$message.payload.data.essence.data.outputs",
"$$index"
]
}
} },
],
"as": "spent_transaction"
} },
doc! { "$set": { "send_address": "$spent_transaction.message.payload.data.essence.data.outputs.address.data" } },
doc! { "$unwind": { "path": "$message.payload.data.essence.data.outputs", "includeArrayIndex": "message.payload.data.essence.data.outputs.idx" } },
doc! { "$set": { "recv_address": "$message.payload.data.essence.data.outputs.address.data" } },
doc! { "$facet": {
"total": [
{ "$set": { "address": ["$send_address", "$recv_address"] } },
{ "$unwind": { "path": "$address" } },
{ "$group" : {
"_id": "$address",
"addresses": { "$count": { } }
}},
],
"recv": [
{ "$group" : {
"_id": "$recv_address",
"addresses": { "$count": { } }
}},
],
"send": [
{ "$group" : {
"_id": "$send_address",
"addresses": { "$count": { } }
}},
],
} },
doc! { "$project": {
"total_addresses": { "$arrayElemAt": ["$total.addresses", 0] },
"recv_addresses": { "$arrayElemAt": ["$recv.addresses", 0] },
"send_addresses": { "$arrayElemAt": ["$send.addresses", 0] },
} },
],
None,
)
.await?.try_next().await?.ok_or(ApiError::NoResults)?;
.aggregate_addresses(start_milestone, end_milestone)
.await?
.ok_or(ApiError::NoResults)?;

Ok(AddressAnalyticsResponse {
total_addresses: res.get_as_u64("total_addresses")?,
Expand Down
65 changes: 2 additions & 63 deletions bin/inx-chronicle/src/api/stardust/explorer/routes.rs
Expand Up @@ -4,11 +4,10 @@
use axum::{extract::Path, routing::get, Extension, Router};
use chronicle::db::{
bson::{BsonExt, DocExt},
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
model::inclusion_state::LedgerInclusionState,
MongoDb,
};
use futures::TryStreamExt;
use mongodb::bson::doc;

use super::responses::TransactionHistoryResponse;
use crate::api::{
Expand Down Expand Up @@ -38,67 +37,7 @@ async fn transaction_history(
let end_milestone = end_milestone(&database, end_timestamp).await?;

let records = database
.collection::<MessageRecord>()
.aggregate(vec![
// Only outputs for this address
doc! { "$match": {
"milestone_index": { "$gt": start_milestone, "$lt": end_milestone },
"inclusion_state": LedgerInclusionState::Included,
"message.payload.data.essence.data.outputs.address.data": &address
} },
doc! { "$set": {
"message.payload.data.essence.data.outputs": {
"$filter": {
"input": "$message.payload.data.essence.data.outputs",
"as": "output",
"cond": { "$eq": [ "$$output.address.data", &address ] }
}
}
} },
// One result per output
doc! { "$unwind": { "path": "$message.payload.data.essence.data.outputs", "includeArrayIndex": "message.payload.data.essence.data.outputs.idx" } },
// Lookup spending inputs for each output, if they exist
doc! { "$lookup": {
"from": "stardust_messages",
// Keep track of the output id
"let": { "transaction_id": "$message.payload.transaction_id", "index": "$message.payload.data.essence.data.outputs.idx" },
"pipeline": [
// Match using the output's index
{ "$match": {
"inclusion_state": LedgerInclusionState::Included,
"message.payload.data.essence.data.inputs.transaction_id": "$$transaction_id",
"message.payload.data.essence.data.inputs.index": "$$index"
} },
{ "$set": {
"message.payload.data.essence.data.inputs": {
"$filter": {
"input": "$message.payload.data.essence.data.inputs",
"as": "input",
"cond": { "$and": {
"$eq": [ "$$input.transaction_id", "$$transaction_id" ],
"$eq": [ "$$input.index", "$$index" ],
} }
}
}
} },
// One result per spending input
{ "$unwind": { "path": "$message.payload.data.essence.data.outputs", "includeArrayIndex": "message.payload.data.essence.data.outputs.idx" } },
],
// Store the result
"as": "spending_transaction"
} },
// Add a null spending transaction so that unwind will create two records
doc! { "$set": { "spending_transaction": { "$concatArrays": [ "$spending_transaction", [ null ] ] } } },
// Unwind the outputs into one or two results
doc! { "$unwind": { "path": "$spending_transaction", "preserveNullAndEmptyArrays": true } },
// Replace the milestone index with the spending transaction's milestone index if there is one
doc! { "$set": {
"milestone_index": { "$cond": [ { "$not": [ "$spending_transaction" ] }, "$milestone_index", "$spending_transaction.0.milestone_index" ] }
} },
doc! { "$sort": { "milestone_index": -1 } },
doc! { "$skip": (page_size * page) as i64 },
doc! { "$limit": page_size as i64 },
], None)
.get_transaction_history(&address, page_size, page, start_milestone, end_milestone)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down
92 changes: 0 additions & 92 deletions bin/inx-chronicle/src/api/stardust/indexer/extractors.rs

This file was deleted.

8 changes: 0 additions & 8 deletions bin/inx-chronicle/src/api/stardust/indexer/mod.rs

This file was deleted.

34 changes: 0 additions & 34 deletions bin/inx-chronicle/src/api/stardust/indexer/responses.rs

This file was deleted.

0 comments on commit 46cc56c

Please sign in to comment.