Skip to content

Commit

Permalink
reimplement subscription: api
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Oct 15, 2021
1 parent 20ba55a commit 9616e68
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
35 changes: 20 additions & 15 deletions explorer/src/api/graphql/mod.rs
Expand Up @@ -29,7 +29,7 @@ use crate::db::{
self,
chain_storable::{BlockId, CertificateTag},
schema::{BlockMeta, StakePoolMeta, Txn},
ExplorerDb, SeqNum, Settings as ChainSettings,
ExplorerDb, SeqNum,
};
use chain_impl_mockchain::certificate;
use chain_impl_mockchain::key::BftLeaderId;
Expand Down Expand Up @@ -1321,27 +1321,32 @@ pub struct Subscription;

#[Subscription]
impl Subscription {
async fn tip(&self, _context: &Context<'_>) -> impl futures::Stream<Item = Branch> {
// use futures::StreamExt;
// extract_context(context)
// .db
// .tip_subscription()
// // missing a tip update doesn't seem that important, so I think it's
// // fine to ignore the error
// .filter_map(|tip| async move {
// tip.ok()
// .map(|(hash, state)| Branch::from_id_and_state(hash, state))
// })

futures::stream::iter(vec![])
async fn tip(&self, context: &Context<'_>) -> impl futures::Stream<Item = Branch> + '_ {
use futures::StreamExt;
let context = extract_context(context);

let db = context.db.clone();

tokio_stream::wrappers::BroadcastStream::new(context.tip_stream.subscribe())
// missing a tip update doesn't seem that important, so I think it's
// fine to ignore the error
.filter_map(move |tip| {
let db = db.clone();

async move {
let txn = Arc::new(db.get_txn().await.unwrap());
tip.ok().map(|id| Branch { id: id.into(), txn })
}
})
}
}

pub type Schema = async_graphql::Schema<Query, EmptyMutation, Subscription>;

pub struct EContext {
pub db: ExplorerDb,
pub settings: ChainSettings,
pub tip_stream: tokio::sync::broadcast::Sender<HeaderHash>,
pub settings: super::Settings,
}

fn extract_context<'a>(context: &Context<'a>) -> &'a EContext {
Expand Down
21 changes: 17 additions & 4 deletions explorer/src/api/mod.rs
@@ -1,15 +1,23 @@
pub mod graphql;

use crate::db::ExplorerDb;

use self::graphql::EContext;
use crate::db::ExplorerDb;
use async_graphql::http::{playground_source, GraphQLPlaygroundConfig};
use futures::Future;
use jormungandr_lib::interfaces::{Cors, Tls};
use std::{net::SocketAddr, time::Duration};
use warp::http::Response as HttpResponse;
use warp::{Filter, Rejection, Reply};

#[derive(Clone)]
pub struct Settings {
/// This is the prefix that's used for the Address bech32 string representation in the
/// responses (in the queries any prefix can be used). base32 serialization could
/// also be used, but the `Address` struct doesn't have a deserialization method right
/// now
pub address_bech32_prefix: String,
}

pub async fn setup_cors<API>(
api: API,
listen_addr: SocketAddr,
Expand Down Expand Up @@ -67,14 +75,19 @@ async fn serve<API>(

pub fn filter(
db: ExplorerDb,
settings: crate::db::Settings,
tip_stream: tokio::sync::broadcast::Sender<chain_impl_mockchain::key::Hash>,
settings: Settings,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
let schema = async_graphql::Schema::build(
crate::api::graphql::Query {},
async_graphql::EmptyMutation,
crate::api::graphql::Subscription {},
)
.data(EContext { db, settings })
.data(EContext {
db,
settings,
tip_stream,
})
.finish();

let graphql_post = async_graphql_warp::graphql(schema.clone())
Expand Down

0 comments on commit 9616e68

Please sign in to comment.