Skip to content

Commit

Permalink
Small inx-client and mongodb config and usage refactor (#85)
Browse files Browse the repository at this point in the history
* Small refactor of inx config

* More refactoring

* Fmt;Rebase fixes

* Adapt config template

* Format

* Fix intra-doc links

* Fix tests
  • Loading branch information
Alex6323 committed Apr 28, 2022
1 parent a227d67 commit c5e090e
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 174 deletions.
@@ -1,8 +1,8 @@
[mongodb]
location = "mongodb://localhost:27017"
connect_url = "mongodb://localhost:27017"
username = "root"
password = "root"

[inx]
address = "http://localhost:9029"
connect_url = "http://localhost:9029"
connection_retry_interval = "5s"
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/mod.rs
Expand Up @@ -17,7 +17,7 @@ mod routes;
use async_trait::async_trait;
use axum::Server;
use chronicle::{
db::MongoDatabase,
db::MongoDb,
runtime::actor::{context::ActorContext, Actor},
};
pub use error::ApiError;
Expand All @@ -32,13 +32,13 @@ pub type ApiResult<T> = Result<T, ApiError>;
/// The Chronicle API actor
#[derive(Debug)]
pub struct ApiWorker {
db: MongoDatabase,
db: MongoDb,
server_handle: Option<(JoinHandle<hyper::Result<()>>, oneshot::Sender<()>)>,
}

impl ApiWorker {
/// Create a new Chronicle API actor from a mongo connection.
pub fn new(db: MongoDatabase) -> Self {
pub fn new(db: MongoDb) -> Self {
Self {
db,
server_handle: None,
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/routes.rs
Expand Up @@ -4,7 +4,7 @@
use axum::{handler::Handler, routing::get, Extension, Router};
use chronicle::db::{
model::sync::{SyncData, SyncRecord},
MongoDatabase,
MongoDb,
};
use futures::TryStreamExt;
use hyper::Method;
Expand All @@ -17,7 +17,7 @@ use tower_http::{

use super::{error::ApiError, responses::*, ApiResult};

pub fn routes(db: MongoDatabase) -> Router {
pub fn routes(db: MongoDb) -> Router {
#[allow(unused_mut)]
let mut router = Router::new().route("/info", get(info)).route("/sync", get(sync));

Expand Down Expand Up @@ -51,7 +51,7 @@ async fn info() -> InfoResponse {
}
}

async fn sync(database: Extension<MongoDatabase>) -> ApiResult<SyncDataResponse> {
async fn sync(database: Extension<MongoDb>) -> ApiResult<SyncDataResponse> {
let mut res = database
.collection::<SyncRecord>()
.find(
Expand Down
4 changes: 2 additions & 2 deletions bin/inx-chronicle/src/api/stardust/analytics/routes.rs
Expand Up @@ -6,7 +6,7 @@ use chronicle::{
bson::DocExt,
db::{
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDatabase,
MongoDb,
},
stardust::payload::TransactionPayload,
};
Expand All @@ -25,7 +25,7 @@ pub fn routes() -> Router {
}

async fn address_analytics(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
TimeRange {
start_timestamp,
end_timestamp,
Expand Down
4 changes: 2 additions & 2 deletions bin/inx-chronicle/src/api/stardust/explorer/routes.rs
Expand Up @@ -6,7 +6,7 @@ use chronicle::{
bson::{BsonExt, DocExt},
db::{
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDatabase,
MongoDb,
},
};
use futures::TryStreamExt;
Expand All @@ -28,7 +28,7 @@ pub fn routes() -> Router {
}

async fn transaction_history(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(address): Path<String>,
Pagination { page_size, page }: Pagination,
TimeRange {
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/stardust/indexer/routes.rs
Expand Up @@ -8,7 +8,7 @@ use chronicle::{
bson::{BsonExt, DocExt},
db::{
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDatabase,
MongoDb,
},
stardust::{output::OutputId, payload::transaction::TransactionId},
};
Expand All @@ -33,7 +33,7 @@ pub fn routes() -> Router {
}

async fn messages_query(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
query: MessagesQuery,
Pagination { page_size, page }: Pagination,
Expanded { expanded }: Expanded,
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn messages_query(
}

async fn outputs_query(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
query: OutputsQuery,
Pagination { page_size, page }: Pagination,
Expanded { expanded }: Expanded,
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/stardust/mod.rs
Expand Up @@ -4,7 +4,7 @@
use axum::Router;
use chronicle::{
bson::DocExt,
db::{model::stardust::milestone::MilestoneRecord, MongoDatabase},
db::{model::stardust::milestone::MilestoneRecord, MongoDb},
};
use mongodb::{
bson::{doc, DateTime},
Expand Down Expand Up @@ -51,7 +51,7 @@ pub fn routes() -> Router {
router
}

pub(crate) async fn start_milestone(database: &MongoDatabase, start_timestamp: OffsetDateTime) -> ApiResult<u32> {
pub(crate) async fn start_milestone(database: &MongoDb, start_timestamp: OffsetDateTime) -> ApiResult<u32> {
database
.doc_collection::<MilestoneRecord>()
.find(
Expand All @@ -69,7 +69,7 @@ pub(crate) async fn start_milestone(database: &MongoDatabase, start_timestamp: O
.ok_or(ApiError::NotFound)
}

pub(crate) async fn end_milestone(database: &MongoDatabase, end_timestamp: OffsetDateTime) -> ApiResult<u32> {
pub(crate) async fn end_milestone(database: &MongoDb, end_timestamp: OffsetDateTime) -> ApiResult<u32> {
database
.doc_collection::<MilestoneRecord>()
.find(
Expand Down
20 changes: 10 additions & 10 deletions bin/inx-chronicle/src/api/stardust/v2/routes.rs
Expand Up @@ -15,7 +15,7 @@ use chronicle::{
inclusion_state::LedgerInclusionState,
stardust::{message::MessageRecord, milestone::MilestoneRecord},
},
MongoDatabase,
MongoDb,
},
stardust::output::OutputId,
};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub fn routes() -> Router {
.route("/milestones/:index", get(milestone))
}

async fn message(database: Extension<MongoDatabase>, Path(message_id): Path<String>) -> ApiResult<MessageResponse> {
async fn message(database: Extension<MongoDb>, Path(message_id): Path<String>) -> ApiResult<MessageResponse> {
let mut rec = database
.doc_collection::<MessageRecord>()
.find_one(doc! {"message_id": &message_id}, None)
Expand All @@ -75,7 +75,7 @@ async fn message(database: Extension<MongoDatabase>, Path(message_id): Path<Stri
})
}

async fn message_raw(database: Extension<MongoDatabase>, Path(message_id): Path<String>) -> ApiResult<Vec<u8>> {
async fn message_raw(database: Extension<MongoDb>, Path(message_id): Path<String>) -> ApiResult<Vec<u8>> {
let mut rec = database
.doc_collection::<MessageRecord>()
.find_one(doc! {"message_id": &message_id}, None)
Expand All @@ -86,7 +86,7 @@ async fn message_raw(database: Extension<MongoDatabase>, Path(message_id): Path<
}

async fn message_metadata(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(message_id): Path<String>,
) -> ApiResult<MessageMetadataResponse> {
let mut rec = database
Expand Down Expand Up @@ -138,7 +138,7 @@ async fn message_metadata(
}

async fn message_children(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(message_id): Path<String>,
Pagination { page_size, page }: Pagination,
Expanded { expanded }: Expanded,
Expand Down Expand Up @@ -182,7 +182,7 @@ async fn message_children(
})
}

async fn output(database: Extension<MongoDatabase>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
async fn output(database: Extension<MongoDb>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
output_by_transaction_id(
database,
Expand All @@ -192,7 +192,7 @@ async fn output(database: Extension<MongoDatabase>, Path(output_id): Path<String
}

async fn output_by_transaction_id(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path((transaction_id, idx)): Path<(String, u16)>,
) -> ApiResult<OutputResponse> {
let mut output = database
Expand Down Expand Up @@ -234,7 +234,7 @@ async fn output_by_transaction_id(
}

async fn transaction_for_message(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(message_id): Path<String>,
) -> ApiResult<TransactionResponse> {
let mut rec = database
Expand All @@ -253,7 +253,7 @@ async fn transaction_for_message(
}

async fn transaction_included_message(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(transaction_id): Path<String>,
) -> ApiResult<MessageResponse> {
let mut rec = database
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn transaction_included_message(
})
}

async fn milestone(database: Extension<MongoDatabase>, Path(index): Path<u32>) -> ApiResult<MilestoneResponse> {
async fn milestone(database: Extension<MongoDb>, Path(index): Path<u32>) -> ApiResult<MilestoneResponse> {
database
.doc_collection::<MilestoneRecord>()
.find_one(doc! {"milestone_index": &index}, None)
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/broker.rs
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
#[cfg(feature = "stardust")]
use chronicle::db::model::stardust;
use chronicle::{
db::{MongoDatabase, MongoDbError},
db::{MongoDb, MongoDbError},
runtime::{
actor::{context::ActorContext, event::HandleEvent, Actor},
error::RuntimeError,
Expand All @@ -23,11 +23,11 @@ pub enum BrokerError {

#[derive(Debug)]
pub struct Broker {
db: MongoDatabase,
db: MongoDb,
}

impl Broker {
pub fn new(db: MongoDatabase) -> Self {
pub fn new(db: MongoDb) -> Self {
Self { db }
}
}
Expand Down
21 changes: 10 additions & 11 deletions bin/inx-chronicle/src/config.rs
Expand Up @@ -3,7 +3,7 @@

use std::{fs, path::Path};

use chronicle::db::MongoConfig;
use chronicle::db::MongoDbConfig;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -20,28 +20,27 @@ pub enum ConfigError {

/// Configuration of Chronicle.
#[derive(Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Config {
pub mongodb: MongoConfig,
#[cfg(feature = "stardust")]
pub struct ChronicleConfig {
pub mongodb: MongoDbConfig,
#[cfg(feature = "inx")]
pub inx: InxConfig,
}

impl Config {
/// Reads a configuration file in `.toml` format.
impl ChronicleConfig {
pub fn from_file(path: impl AsRef<Path>) -> Result<Self, ConfigError> {
fs::read_to_string(&path)
.map_err(ConfigError::FileRead)
.and_then(|contents| toml::from_str::<Self>(&contents).map_err(ConfigError::TomlDeserialization))
}

/// Applies the appropriate command line arguments to the [`Config`].
/// Applies the appropriate command line arguments to the [`ChronicleConfig`].
pub fn apply_cli_args(&mut self, args: super::CliArgs) {
#[cfg(feature = "stardust")]
if let Some(inx) = args.inx {
self.inx = InxConfig::new(inx);
}
if let Some(db) = args.db {
self.mongodb = MongoConfig::new(db);
if let Some(connect_url) = args.db {
self.mongodb = MongoDbConfig::new().with_connect_url(connect_url);
}
}
}
Expand All @@ -52,9 +51,9 @@ mod test {

#[test]
fn config_file_conformity() -> Result<(), ConfigError> {
let _ = Config::from_file(concat!(
let _ = ChronicleConfig::from_file(concat!(
env!("CARGO_MANIFEST_DIR"),
"/bin/inx-chronicle/config.example.toml"
"/bin/inx-chronicle/config.template.toml"
))?;

Ok(())
Expand Down
20 changes: 3 additions & 17 deletions bin/inx-chronicle/src/inx/config.rs
Expand Up @@ -3,7 +3,6 @@

use std::time::Duration;

use inx::{client::InxClient, tonic::Channel};
use serde::{Deserialize, Serialize};

pub use super::InxWorkerError;
Expand All @@ -13,7 +12,7 @@ pub use super::InxWorkerError;
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct InxConfig {
/// The bind address of node's INX interface.
pub address: String,
pub connect_url: String,
/// The time that has to pass until a new connection attempt is made.
#[serde(with = "humantime_serde")]
pub connection_retry_interval: Duration,
Expand All @@ -22,7 +21,7 @@ pub struct InxConfig {
impl Default for InxConfig {
fn default() -> Self {
Self {
address: "http://localhost:9029".into(),
connect_url: "http://localhost:9029".into(),
connection_retry_interval: Duration::from_secs(5),
}
}
Expand All @@ -32,21 +31,8 @@ impl InxConfig {
/// Creates a new [`InxConfig`]. The `address` is the address of the node's INX interface.
pub fn new(address: impl Into<String>) -> Self {
Self {
address: address.into(),
connect_url: address.into(),
..Default::default()
}
}

/// Constructs an [`InxClient`] by consuming the [`InxConfig`].
pub async fn build(&self) -> Result<InxClient<Channel>, InxWorkerError> {
let url = url::Url::parse(&self.address)?;

if url.scheme() != "http" {
return Err(InxWorkerError::InvalidAddress(self.address.clone()));
}

InxClient::connect(self.address.clone())
.await
.map_err(InxWorkerError::ConnectionError)
}
}
1 change: 1 addition & 0 deletions bin/inx-chronicle/src/inx/listener.rs
Expand Up @@ -56,6 +56,7 @@ impl Actor for InxListener {

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
let message_stream = self.inx.listen_to_messages(MessageFilter {}).await?.into_inner();

cx.spawn_actor_supervised::<MessageStream, _>(
InxStreamListener::new(self.broker_addr.clone())?.with_stream(message_stream),
)
Expand Down

0 comments on commit c5e090e

Please sign in to comment.