Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IGNORE, WIP] Custom Runtime Implementation Without External Async Dependencies #1506

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 161 additions & 4 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,21 @@
//! [async-std]: https://crates.io/crates/async-std

use futures_util::{future::BoxFuture, stream::Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use std::{fmt::Debug, future::Future};
use thiserror::Error;
use std::{
sync::{ Arc, Mutex},
thread,
time::Duration,
};
use std::pin::Pin;

use futures_util::task::{Context, Poll};
//use std::{future::Future, time::Duration};
use futures_executor;
use crossbeam_channel::{self, Sender as CrossbeamSender, Receiver as CrossbeamReceiver};
//use std::task::{Context, Poll};


/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
Expand Down Expand Up @@ -43,6 +56,7 @@ pub trait Runtime: Clone + Send + Sync + 'static {
fn delay(&self, duration: Duration) -> Self::Delay;
}


/// Runtime implementation, which works with Tokio's multi thread runtime.
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
Expand Down Expand Up @@ -140,12 +154,12 @@ impl Runtime for AsyncStd {
/// [span]: crate::trace::BatchSpanProcessor
pub trait RuntimeChannel: Runtime {
/// A future stream to receive batch messages from channels.
type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
type Receiver<T: Debug + Send + 'static>: Stream<Item = T> + Send;
/// A batch messages sender that can be sent across threads safely.
type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
type Sender<T: Debug + Send + 'static>: TrySend<Message = T> + Debug;

/// Return the sender and receiver used to send batch messages.
fn batch_message_channel<T: Debug + Send>(
fn batch_message_channel<T: Debug + Send + 'static>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>);
Expand Down Expand Up @@ -249,3 +263,146 @@ impl RuntimeChannel for AsyncStd {
async_std::channel::bounded(capacity)
}
}

/// stdthreadtuntime
#[derive(Debug, Clone)]
pub struct StdThreadRuntime {
shutdown_signal: Arc<Mutex<bool>>,
}

impl StdThreadRuntime {
/// new
pub fn new() -> Self {
StdThreadRuntime {
shutdown_signal: Arc::new(Mutex::new(false)),
}
}
}

impl Runtime for StdThreadRuntime {
type Interval = StdInterval;
type Delay = StdDelay;

fn interval(&self, duration: Duration) -> Self::Interval {
StdInterval::new(duration, self.shutdown_signal.clone())
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
thread::spawn(move || {
futures_executor::block_on(future);
});
}

fn delay(&self, duration: Duration) -> Self::Delay {
StdDelay::new(duration)
}
}

/// stdinterval
#[derive(Debug)]
pub struct StdInterval {
duration: Duration,
shutdown_signal: Arc<Mutex<bool>>,
}

impl StdInterval {
/// new
pub fn new(duration: Duration, shutdown_signal: Arc<Mutex<bool>>) -> Self {
StdInterval { duration, shutdown_signal }
}
}

impl Stream for StdInterval {
type Item = ();

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if *self.shutdown_signal.lock().unwrap() {
Poll::Ready(None)
} else {
thread::sleep(self.duration);
cx.waker().wake_by_ref();
Poll::Ready(Some(()))
}
}
}

/// stddelay
#[derive(Debug)]
pub struct StdDelay {
duration: Duration,
elapsed: bool,
}

impl StdDelay {
/// new
pub fn new(duration: Duration) -> Self {
StdDelay { duration, elapsed: false }
}
}

impl Future for StdDelay {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.elapsed {
Poll::Ready(())
} else {
self.elapsed = true;
cx.waker().wake_by_ref();
thread::sleep(self.duration);
Poll::Pending
}
}
}


impl RuntimeChannel for StdThreadRuntime {
type Receiver<T: Debug + Send + 'static> = CrossbeamReceiverStream<T>;
type Sender<T: Debug + Send + 'static> = Arc<CrossbeamSender<T>>;

fn batch_message_channel<T: Debug + Send + 'static>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = crossbeam_channel::bounded(capacity);
(Arc::new(sender), CrossbeamReceiverStream::new(receiver))
}
}

/// crossbeamreceiverstream
#[derive(Debug)]
pub struct CrossbeamReceiverStream<T> {
receiver: CrossbeamReceiver<T>,
}

impl<T> CrossbeamReceiverStream<T> {
/// new
pub fn new(receiver: CrossbeamReceiver<T>) -> Self {
CrossbeamReceiverStream { receiver }
}
}

impl<T: Send + 'static> Stream for CrossbeamReceiverStream<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.try_recv() {
Ok(item) => Poll::Ready(Some(item)),
Err(crossbeam_channel::TryRecvError::Empty) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Err(crossbeam_channel::TryRecvError::Disconnected) => Poll::Ready(None),
}
}
}

impl<T: Send> TrySend for Arc<CrossbeamSender<T>> {
type Message = T;

fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
self.send(item).map_err(|_| TrySendError::ChannelClosed)
}
}


Loading