Skip to content

Commit

Permalink
reimplement subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Oct 15, 2021
1 parent 26e2bd7 commit d82d83a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 24 deletions.
29 changes: 23 additions & 6 deletions explorer/src/indexer.rs
@@ -1,11 +1,10 @@
use std::sync::Arc;

use crate::db::ExplorerDb;

use chain_impl_mockchain::block::Block;
use chain_impl_mockchain::block::HeaderId as HeaderHash;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{span, Level};

#[derive(Debug, Error)]
pub enum IndexerError {
Expand All @@ -18,22 +17,33 @@ pub enum IndexerError {
#[error("url error")]
UrlError(#[from] url::ParseError),
#[error(transparent)]
DbError(#[from] crate::db::error::ExplorerError),
DbError(#[from] crate::db::error::DbError),
}

#[derive(Clone)]
pub struct Indexer {
pub db: ExplorerDb,
pub tip_broadcast: tokio::sync::broadcast::Sender<HeaderHash>,
tip_candidate: Arc<Mutex<Option<HeaderHash>>>,
}

impl Indexer {
pub fn new(db: crate::db::ExplorerDb) -> Self {
pub fn new(
db: crate::db::ExplorerDb,
tip_broadcast: tokio::sync::broadcast::Sender<HeaderHash>,
) -> Self {
let tip_candidate = Arc::new(Mutex::new(None));
Indexer { db, tip_candidate }
Indexer {
db,
tip_broadcast,
tip_candidate,
}
}

pub async fn apply_block(&self, block: Block) -> Result<(), IndexerError> {
let span = span!(Level::INFO, "Indexer::apply_block");
let _enter = span.enter();

tracing::info!(
"applying {} {}",
block.header.id(),
Expand All @@ -52,13 +62,20 @@ impl Indexer {
}

pub async fn set_tip(&self, tip: HeaderHash) -> Result<(), IndexerError> {
let span = span!(Level::INFO, "Indexer::set_tip");
let _enter = span.enter();

let successful = self.db.set_tip(tip).await?;

if !successful {
let mut guard = self.tip_candidate.lock().await;
guard.replace(tip);
} else {
tracing::info!("tip set to {}", tip);

if let Err(e) = self.tip_broadcast.send(tip) {
tracing::info!(?e);
}
}

Ok(())
Expand Down
47 changes: 29 additions & 18 deletions explorer/src/main.rs
Expand Up @@ -45,7 +45,7 @@ pub enum Error {
#[derive(Debug, Error)]
pub enum BootstrapError {
#[error(transparent)]
DbError(db::error::ExplorerError),
DbError(db::error::DbError),
#[error("empty bootstrap stream")]
EmptyStream,
}
Expand All @@ -57,6 +57,11 @@ enum GlobalState {
ShuttingDown,
}

/// Number of blocks to apply per transaction commit when bootstrapping.
///
/// Each commit flushes the database file, so a bigger number reduces IO overhead.
const BOOTSTRAP_BATCH_SIZE: u32 = 1000;

#[tokio::main]
async fn main() -> Result<(), Error> {
let (_guards, settings) = {
Expand Down Expand Up @@ -93,7 +98,13 @@ async fn main() -> Result<(), Error> {
.context("Couldn't establish connection with node")
.map_err(Error::UnrecoverableError)?;

let open_db = ExplorerDb::open()
if let Some(storage) = settings.storage.as_ref() {
std::fs::create_dir_all(storage)
.context("Couldn't create database directory")
.map_err(Error::UnrecoverableError)?;
}

let open_db = ExplorerDb::open(settings.storage.as_ref())
.context("Couldn't open database")
.map_err(Error::UnrecoverableError)?;

Expand Down Expand Up @@ -135,7 +146,8 @@ async fn main() -> Result<(), Error> {
async move {
let db = bootstrap(sync_stream, open_db).await?;

let msg = GlobalState::Ready(Indexer::new(db));
let (tip_sender, _) = tokio::sync::broadcast::channel(20);
let msg = GlobalState::Ready(Indexer::new(db, tip_sender));

state_tx
.send(msg)
Expand Down Expand Up @@ -302,7 +314,7 @@ async fn bootstrap(

non_commited += 1;

if non_commited == 20 {
if non_commited == BOOTSTRAP_BATCH_SIZE {
batch
.take()
.unwrap()
Expand Down Expand Up @@ -355,11 +367,12 @@ async fn rest_service(mut state: broadcast::Receiver<GlobalState>, settings: Set
}
});

let db = indexer_rx.await.unwrap().db;
let indexer = indexer_rx.await.unwrap();

let api = api::filter(
db,
crate::db::Settings {
indexer.db,
indexer.tip_broadcast.clone(),
crate::api::Settings {
address_bech32_prefix: settings.address_bech32_prefix,
},
);
Expand Down Expand Up @@ -443,17 +456,15 @@ async fn process_subscriptions(
tracing::debug!("received tip event");
let indexer = indexer.clone();

tokio::spawn(
async {
handle_tip(
tip.context("Failed to receive tip from subscription")
.map_err(Error::Other)?,
indexer,
)
.await
}
.instrument(span!(Level::INFO, "handle_tip")),
);
async {
handle_tip(
tip.context("Failed to receive tip from subscription")
.map_err(Error::Other)?,
indexer,
)
.await
}
.instrument(span!(Level::INFO, "handle_tip")).await?;
},
else => break,
};
Expand Down
5 changes: 5 additions & 0 deletions explorer/src/settings.rs
Expand Up @@ -53,6 +53,7 @@ pub struct Settings {
pub tls: Option<Tls>,
pub cors: Option<Cors>,
pub log_settings: Option<LogSettings>,
pub storage: Option<PathBuf>,
}

impl Settings {
Expand Down Expand Up @@ -88,6 +89,8 @@ impl Settings {

let log_settings = Some(Self::log_settings(&cmd, &file));

let storage = cmd.storage.or(file.storage);

let tls = file.tls;
let cors = file.cors;

Expand All @@ -99,6 +102,7 @@ impl Settings {
tls,
cors,
log_settings,
storage,
})
}

Expand Down Expand Up @@ -174,6 +178,7 @@ struct CommandLine {
#[structopt(long)]
pub address_bech32_prefix: Option<String>,
pub config: Option<PathBuf>,
pub storage: Option<PathBuf>,
/// Set log messages minimum severity. If not configured anywhere, defaults to "info".
#[structopt(
long = "log-level",
Expand Down

0 comments on commit d82d83a

Please sign in to comment.