Skip to content

Commit

Permalink
fix(api): clean up receipt route handlers and db queries (#344)
Browse files Browse the repository at this point in the history
* Clean up receipt route handlers and db queries

* Refactor

* Format
  • Loading branch information
Alex6323 committed Jun 27, 2022
1 parent 986cdbf commit aa09e5c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 139 deletions.
35 changes: 17 additions & 18 deletions bin/inx-chronicle/src/api/stardust/core/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ use chronicle::{
db::MongoDb,
types::{
ledger::{OutputMetadata, OutputWithMetadata},
stardust::block::{BlockId, MilestoneId, MilestoneOption, OutputId, TransactionId},
stardust::block::{BlockId, MilestoneId, OutputId, TransactionId},
tangle::MilestoneIndex,
},
};
use futures::TryStreamExt;
use lazy_static::lazy_static;
use mongodb::bson;
use packable::PackableExt;

use super::responses::BlockChildrenResponse;
Expand Down Expand Up @@ -226,38 +225,38 @@ async fn transaction_included_block(
}

async fn receipts(database: Extension<MongoDb>) -> ApiResult<ReceiptsResponse> {
let mut milestone_options = database.get_milestone_options().await?;
let mut receipts_at = database.stream_all_receipts().await?;
let mut receipts = Vec::new();
while let Some(doc) = milestone_options.try_next().await? {
// TODO: unwrap
let (index, opt): (MilestoneIndex, MilestoneOption) = bson::from_document(doc).unwrap();
let opt: &bee_block_stardust::payload::milestone::MilestoneOption = &opt.try_into().unwrap();
let opt: MilestoneOptionDto = opt.into();
while let Some((receipt, at)) = receipts_at.try_next().await? {
let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?;
let receipt: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = receipt.into();

if let MilestoneOptionDto::Receipt(receipt) = opt {
if let MilestoneOptionDto::Receipt(receipt) = receipt {
receipts.push(ReceiptDto {
receipt,
milestone_index: *index,
milestone_index: *at,
});
} else {
unreachable!("the query only returns receipt milestone options");
}
}
Ok(ReceiptsResponse { receipts })
}

async fn receipts_migrated_at(database: Extension<MongoDb>, Path(index): Path<u32>) -> ApiResult<ReceiptsResponse> {
let mut milestone_options = database.get_milestone_options_migrated_at(index.into()).await?;
let mut receipts_at = database.stream_receipts_migrated_at(index.into()).await?;
let mut receipts = Vec::new();
while let Some(doc) = milestone_options.try_next().await? {
// TODO: unwrap
let (index, opt): (MilestoneIndex, MilestoneOption) = bson::from_document(doc).unwrap();
let opt: &bee_block_stardust::payload::milestone::MilestoneOption = &opt.try_into().unwrap();
let opt: MilestoneOptionDto = opt.into();
while let Some((receipt, at)) = receipts_at.try_next().await? {
let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?;
let receipt: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = receipt.into();

if let MilestoneOptionDto::Receipt(receipt) = opt {
if let MilestoneOptionDto::Receipt(receipt) = receipt {
receipts.push(ReceiptDto {
receipt,
milestone_index: *index,
milestone_index: *at,
});
} else {
unreachable!("the query only returns receipt milestone options");
}
}
Ok(ReceiptsResponse { receipts })
Expand Down
178 changes: 57 additions & 121 deletions src/db/collections/milestone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

use std::ops::RangeInclusive;

use bee_rest_api_stardust::types::dtos as bee;
use futures::{Stream, TryStreamExt};
use mongodb::{
bson::{self, doc, Document},
bson::{self, doc},
error::Error,
options::{FindOneOptions, FindOptions, IndexOptions, UpdateOptions},
IndexModel,
Expand All @@ -17,7 +16,7 @@ use crate::{
db::MongoDb,
types::{
stardust::{
block::{MilestoneId, MilestonePayload},
block::{MilestoneId, MilestoneOption, MilestonePayload},
milestone::MilestoneTimestamp,
},
tangle::MilestoneIndex,
Expand Down Expand Up @@ -127,7 +126,7 @@ impl MongoDb {
.aggregate(
vec![
doc! { "$match": { "milestone_id": milestone_id } },
doc! { "$replaceRoot": { "newRoot": "$payload" } },
doc! { "$replaceWith": "$payload" },
],
None,
)
Expand All @@ -148,7 +147,7 @@ impl MongoDb {
.aggregate(
vec![
doc! { "$match": { "milestone_index": index } },
doc! { "$replaceRoot": { "newRoot": "$payload" } },
doc! { "$replaceWith": "$payload" },
],
None,
)
Expand Down Expand Up @@ -335,140 +334,77 @@ impl MongoDb {
Ok(sync_data)
}

// TODO: use dedicated type `MilestoneOptionsWithIndex` instead of Document
/// Returns a stream of all available receipt milestone options together with their corresponding `MilestoneIndex`.
pub async fn get_milestone_options(&self) -> Result<impl Stream<Item = Result<Document, Error>>, Error> {
self.0
.collection::<Document>(MilestoneDocument::COLLECTION)
/// Streams all available receipt milestone options together with their corresponding `MilestoneIndex`.
pub async fn stream_all_receipts(
&self,
) -> Result<impl Stream<Item = Result<(MilestoneOption, MilestoneIndex), Error>>, Error> {
#[derive(Deserialize)]
struct ReceiptAtIndex {
receipt: MilestoneOption,
at: MilestoneIndex,
}

Ok(self
.0
.collection::<ReceiptAtIndex>(MilestoneDocument::COLLECTION)
.aggregate(
vec![
doc! { "$unwind": "payload.essence.options"},
doc! { "$match": {
"payload.essence.options.receipt.migrated_at": { "$exists": true },
"options.receipt.migrated_at": { "$exists": true },
} },
doc! { "$replaceWith": {
"receipt": "options.receipt" ,
"at": "$milestone_index" ,
} },
doc! { "$replaceRoot": { "newRoot": {
"milestone_index": "$milestone_index" ,
"milestone_options": "$payload.essence.options" ,
} } },
doc! { "$sort": { "at": 1 } },
],
None,
)
.await
.await?
.map_ok(|doc| {
// Panic: we made sure that this is infallible.
let ReceiptAtIndex { receipt, at } = bson::from_document::<ReceiptAtIndex>(doc).unwrap();

(receipt, at)
}))
}

// TODO: use dedicated type `MilestoneOptionsWithIndex` instead of Document
// TODO: might be better to return a Vec right away instead of a Stream.
/// Returns a stream of all available receipt milestone options belonging to a given migration index.
pub async fn get_milestone_options_migrated_at(
/// Streams all available receipt milestone options together with their corresponding `MilestoneIndex` that were
/// migrated at the given index.
pub async fn stream_receipts_migrated_at(
&self,
migrated_at: MilestoneIndex,
) -> Result<impl Stream<Item = Result<Document, Error>>, Error> {
self.0
.collection::<Document>(MilestoneDocument::COLLECTION)
) -> Result<impl Stream<Item = Result<(MilestoneOption, MilestoneIndex), Error>>, Error> {
#[derive(Deserialize)]
struct ReceiptAtIndex {
receipt: MilestoneOption,
at: MilestoneIndex,
}

Ok(self
.0
.collection::<ReceiptAtIndex>(MilestoneDocument::COLLECTION)
.aggregate(
vec![
doc! { "$unwind": "payload.essence.options"},
doc! { "$match": {
"payload.essence.options.receipt.migrated_at": migrated_at ,
"options.receipt.migrated_at": { "$and": [ { "$exists": true }, { "$eq": migrated_at } ] },
} },
doc! { "$replaceWith": {
"receipt": "options.receipt" ,
"at": "$milestone_index" ,
} },
doc! { "$replaceRoot": { "newRoot": {
"milestone_index": "$milestone_index" ,
"milestone_options": "$payload.essence.options" ,
} } },
doc! { "$sort": { "at": 1 } },
],
None,
)
.await
}

async fn milestone_records_sorted_with_receipt(
&self,
) -> Result<impl Stream<Item = Result<MilestoneRecord, Error>>, Error> {
self.0
.collection::<MilestoneRecord>(MilestoneDocument::COLLECTION)
.find(
doc! { "payload.essence.options.receipt.migrated_at": { "$ne": 0} },
FindOptions::builder().sort(doc! {"milestone_index": 1u32}).build(),
)
.await
}

/// Returns all stored receipts.
pub async fn get_receipts(&self) -> Result<Vec<bee::ReceiptDto>, Error> {
let mut milestone_records = self.milestone_records_sorted_with_receipt().await?;
let mut receipt_dtos = vec![];
while let Some(milestone_record) = milestone_records.try_next().await? {
receipt_dtos.extend(
milestone_record
.payload
.essence
.options
.iter()
.cloned()
.filter_map(|o| {
// TODO: fix this uglyness
let o: &bee_block_stardust::payload::milestone::MilestoneOption = &o.try_into().unwrap();
let o: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = o.into();
if let bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto::Receipt(
receipt,
) = o
{
Some(bee::ReceiptDto {
receipt,
milestone_index: *milestone_record.milestone_index,
})
} else {
None
}
}),
);
}
Ok(receipt_dtos)
}

/// Returns all stored receipts with the given migration index.
pub async fn get_receipts_migrated_at(&self, migrated_at: MilestoneIndex) -> Result<Vec<bee::ReceiptDto>, Error> {
let mut milestone_records = self
.milestone_records_sorted_with_receipt_migrated_at(migrated_at)
.await?;
let mut receipt_dtos = vec![];
while let Some(milestone_record) = milestone_records.try_next().await? {
receipt_dtos.extend(
milestone_record
.payload
.essence
.options
.iter()
.cloned()
.filter_map(|o| {
// TODO: fix this uglyness
let o: &bee_block_stardust::payload::milestone::MilestoneOption = &o.try_into().unwrap();
let o: bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto = o.into();
if let bee_block_stardust::payload::milestone::option::dto::MilestoneOptionDto::Receipt(
receipt,
) = o
{
Some(bee::ReceiptDto {
receipt,
milestone_index: *milestone_record.milestone_index,
})
} else {
None
}
}),
);
}
Ok(receipt_dtos)
}
.await?
.map_ok(|doc| {
// Panic: we made sure that this is infallible.
let ReceiptAtIndex { receipt, at } = bson::from_document::<ReceiptAtIndex>(doc).unwrap();

async fn milestone_records_sorted_with_receipt_migrated_at(
&self,
migrated_at: MilestoneIndex,
) -> Result<impl Stream<Item = Result<MilestoneRecord, Error>>, Error> {
self.0
.collection::<MilestoneRecord>(MilestoneDocument::COLLECTION)
.find(
doc! { "payload.essence.options.receipt.migrated_at": migrated_at },
FindOptions::builder().sort(doc! {"milestone_index": 1u32}).build(),
)
.await
(receipt, at)
}))
}
}

0 comments on commit aa09e5c

Please sign in to comment.