Skip to content

Commit

Permalink
Convert eth_subscribe to async (#398)
Browse files Browse the repository at this point in the history
Here I was unable to convert SubscriptionStream to `impl Stream` because
it has additional methods and runs code on drop.
To get rid of the Unpin requirement anyway we use pin_project
(https://docs.rs/pin-project/1.0.1/pin_project/).
  • Loading branch information
e00E committed Nov 1, 2020
1 parent 96b7ac3 commit 3595756
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ secp256k1 = { version = "0.19", features = ["recovery"] }
serde = { version = "1.0.90", features = ["derive"] }
serde_json = "1.0.39"
tiny-keccak = { version = "2.0.1", features = ["keccak"] }
pin-project = "1.0"
# Optional deps
## HTTP
base64 = { version = "0.13", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> web3::Result {
})
.await;

sub.unsubscribe();
sub.unsubscribe().await?;

Ok(())
}
87 changes: 35 additions & 52 deletions src/api/eth_subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! `Eth` namespace, subscriptions

use crate::api::Namespace;
use crate::helpers::{self, CallFuture};
use crate::helpers;
use crate::types::{BlockHeader, Filter, Log, SyncState, H256};
use crate::{error, DuplexTransport};
use futures::{
task::{Context, Poll},
Future, FutureExt, Stream, StreamExt,
Stream,
};
use pin_project::{pin_project, pinned_drop};
use std::marker::PhantomData;
use std::pin::Pin;

Expand Down Expand Up @@ -43,10 +44,12 @@ impl From<String> for SubscriptionId {
/// Stream of notifications from a subscription
/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
/// notifications are delivered.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct SubscriptionStream<T: DuplexTransport, I> {
transport: T,
id: SubscriptionId,
#[pin]
rx: T::NotificationStream,
_marker: PhantomData<I>,
}
Expand All @@ -68,92 +71,72 @@ impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
}

/// Unsubscribe from the event represented by this stream
pub fn unsubscribe(self) -> CallFuture<bool, T::Out> {
pub async fn unsubscribe(self) -> error::Result<bool> {
let &SubscriptionId(ref id) = &self.id;
let id = helpers::serialize(&id);
CallFuture::new(self.transport.execute("eth_unsubscribe", vec![id]))
let response = self.transport.execute("eth_unsubscribe", vec![id]).await?;
helpers::decode(response)
}
}

impl<T, I> Stream for SubscriptionStream<T, I>
where
T: DuplexTransport,
T::NotificationStream: Unpin,
I: serde::de::DeserializeOwned + Unpin,
I: serde::de::DeserializeOwned,
{
type Item = error::Result<I>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let x = ready!(self.rx.poll_next_unpin(ctx));
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let x = ready!(this.rx.poll_next(ctx));
Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
}
}

impl<T: DuplexTransport, I> Drop for SubscriptionStream<T, I> {
fn drop(&mut self) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}

/// A result of calling a subscription.
#[derive(Debug)]
pub struct SubscriptionResult<T: DuplexTransport, I> {
transport: T,
inner: CallFuture<String, T::Out>,
_marker: PhantomData<I>,
}

impl<T: DuplexTransport, I> SubscriptionResult<T, I> {
/// New `SubscriptionResult`.
pub fn new(transport: T, id_future: CallFuture<String, T::Out>) -> Self {
SubscriptionResult {
transport,
inner: id_future,
_marker: PhantomData,
}
}
}

impl<T, I> Future for SubscriptionResult<T, I>
#[pinned_drop]
impl<T, I> PinnedDrop for SubscriptionStream<T, I>
where
T: DuplexTransport,
I: serde::de::DeserializeOwned + Unpin,
{
type Output = error::Result<SubscriptionStream<T, I>>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let id = ready!(self.inner.poll_unpin(ctx))?;
Poll::Ready(SubscriptionStream::new(self.transport.clone(), SubscriptionId(id)))
fn drop(self: Pin<&mut Self>) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}

impl<T: DuplexTransport> EthSubscribe<T> {
/// Create a new heads subscription
pub fn subscribe_new_heads(&self) -> SubscriptionResult<T, BlockHeader> {
pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
let subscription = helpers::serialize(&&"newHeads");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a logs subscription
pub fn subscribe_logs(&self, filter: Filter) -> SubscriptionResult<T, Log> {
pub async fn subscribe_logs(&self, filter: Filter) -> error::Result<SubscriptionStream<T, Log>> {
let subscription = helpers::serialize(&&"logs");
let filter = helpers::serialize(&filter);
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription, filter]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self
.transport
.execute("eth_subscribe", vec![subscription, filter])
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a pending transactions subscription
pub fn subscribe_new_pending_transactions(&self) -> SubscriptionResult<T, H256> {
pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
let subscription = helpers::serialize(&&"newPendingTransactions");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a sync status subscription
pub fn subscribe_syncing(&self) -> SubscriptionResult<T, SyncState> {
pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
let subscription = helpers::serialize(&&"syncing");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
}
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod web3;
pub use self::accounts::Accounts;
pub use self::eth::Eth;
pub use self::eth_filter::{BaseFilter, EthFilter};
pub use self::eth_subscribe::{EthSubscribe, SubscriptionId, SubscriptionResult, SubscriptionStream};
pub use self::eth_subscribe::{EthSubscribe, SubscriptionId, SubscriptionStream};
pub use self::net::Net;
pub use self::parity::Parity;
pub use self::parity_accounts::ParityAccounts;
Expand Down

0 comments on commit 3595756

Please sign in to comment.