Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpanSend trait to allow custom channels, and optional support of tokio's channel. #9

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ default = [ "stacktrace" ]
backtrace = { version = "0.3", optional = true }
crossbeam-channel = "0.4"
rand = "0.7"
tokio = { version = "0.2", optional = true, features = ["sync"] }
trackable = "0.2"
109 changes: 79 additions & 30 deletions src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@ use std::borrow::Cow;
use std::io::{Read, Write};
use std::time::SystemTime;

/// Finished span receiver.
pub type SpanReceiver<T> = crossbeam_channel::Receiver<FinishedSpan<T>>;
/// Sender of finished spans to the destination channel.
pub type SpanSender<T> = crossbeam_channel::Sender<FinishedSpan<T>>;
/// The default receiver of finished spans.
pub type DefaultSpanReceiver<T> = crossbeam_channel::Receiver<FinishedSpan<T>>;

/// The default sender of finished spans.
pub type DefaultSpanSender<T> = crossbeam_channel::Sender<FinishedSpan<T>>;

/// Span.
///
/// When this span is dropped, it will be converted to `FinishedSpan` and
/// it will be sent to the associated `SpanReceiver`.
#[derive(Debug)]
pub struct Span<T>(Option<SpanInner<T>>);
impl<T> Span<T> {
pub struct Span<T, Sender = DefaultSpanSender<T>>(Option<SpanInner<T, Sender>>)
where
Sender: SpanSend<T>;
impl<T, Sender> Span<T, Sender>
where
Sender: SpanSend<T>,
{
/// Makes an inactive span.
///
/// This span is never traced.
Expand All @@ -38,7 +44,7 @@ impl<T> Span<T> {
}

/// Returns a handle of this span.
pub fn handle(&self) -> SpanHandle<T>
pub fn handle(&self) -> SpanHandle<T, Sender>
where
T: Clone,
{
Expand Down Expand Up @@ -169,21 +175,21 @@ impl<T> Span<T> {
}

/// Starts a `ChildOf` span if this span is sampled.
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
self.handle().child(operation_name, f)
}

/// Starts a `FollowsFrom` span if this span is sampled.
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
self.handle().follower(operation_name, f)
}
Expand All @@ -195,7 +201,7 @@ impl<T> Span<T> {
tags: Vec<Tag>,
state: T,
baggage_items: Vec<BaggageItem>,
span_tx: SpanSender<T>,
span_tx: Sender,
) -> Self {
let context = SpanContext::new(state, baggage_items);
let inner = SpanInner {
Expand All @@ -211,9 +217,12 @@ impl<T> Span<T> {
Span(Some(inner))
}
}
impl<T> Drop for Span<T> {
impl<T, Sender> Drop for Span<T, Sender>
where
Sender: SpanSend<T>,
{
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
if let Some(mut inner) = self.0.take() {
let finished = FinishedSpan {
operation_name: inner.operation_name,
start_time: inner.start_time,
Expand All @@ -223,7 +232,7 @@ impl<T> Drop for Span<T> {
logs: inner.logs,
context: inner.context,
};
let _ = inner.span_tx.try_send(finished);
inner.span_tx.send(finished);
}
}
}
Expand All @@ -234,15 +243,15 @@ impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
}

#[derive(Debug)]
struct SpanInner<T> {
struct SpanInner<T, Sender> {
operation_name: Cow<'static, str>,
start_time: SystemTime,
finish_time: Option<SystemTime>,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
logs: Vec<Log>,
context: SpanContext<T>,
span_tx: SpanSender<T>,
span_tx: Sender,
}

/// Finished span.
Expand Down Expand Up @@ -485,18 +494,20 @@ impl<'a, T: 'a> CandidateSpan<'a, T> {

/// Options for starting a span.
#[derive(Debug)]
pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
pub struct StartSpanOptions<'a, S: 'a, T: 'a, Sender: 'a = DefaultSpanSender<T>> {
operation_name: Cow<'static, str>,
start_time: Option<SystemTime>,
tags: Vec<Tag>,
references: Vec<SpanReference<T>>,
baggage_items: Vec<BaggageItem>,
span_tx: &'a SpanSender<T>,
span_tx: &'a Sender,
sampler: &'a S,
}
impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
impl<'a, S, T, Sender> StartSpanOptions<'a, S, T, Sender>
where
S: Sampler<T>,
S: 'a + Sampler<T>,
T: 'a,
Sender: 'a + SpanSend<T>,
{
/// Sets the start time of this span.
pub fn start_time(mut self, time: SystemTime) -> Self {
Expand Down Expand Up @@ -541,7 +552,7 @@ where
}

/// Starts a new span.
pub fn start(mut self) -> Span<T>
pub fn start(mut self) -> Span<T, Sender>
where
T: for<'b> From<CandidateSpan<'b, T>>,
{
Expand All @@ -562,7 +573,7 @@ where
}

/// Starts a new span with the explicit `state`.
pub fn start_with_state(mut self, state: T) -> Span<T> {
pub fn start_with_state(mut self, state: T) -> Span<T, Sender> {
self.normalize();
if !self.is_sampled() {
return Span(None);
Expand All @@ -578,7 +589,7 @@ where
)
}

pub(crate) fn new<N>(operation_name: N, span_tx: &'a SpanSender<T>, sampler: &'a S) -> Self
pub(crate) fn new<N>(operation_name: N, span_tx: &'a Sender, sampler: &'a S) -> Self
where
N: Into<Cow<'static, str>>,
{
Expand Down Expand Up @@ -627,8 +638,11 @@ where

/// Immutable handle of `Span`.
#[derive(Debug, Clone)]
pub struct SpanHandle<T>(Option<(SpanContext<T>, SpanSender<T>)>);
impl<T> SpanHandle<T> {
pub struct SpanHandle<T, Sender = DefaultSpanSender<T>>(Option<(SpanContext<T>, Sender)>);
impl<T, Sender> SpanHandle<T, Sender>
where
Sender: SpanSend<T>,
{
/// Returns `true` if this span is sampled (i.e., being traced).
pub fn is_sampled(&self) -> bool {
self.0.is_some()
Expand All @@ -649,11 +663,11 @@ impl<T> SpanHandle<T> {
}

/// Starts a `ChildOf` span if this span is sampled.
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
if let Some(&(ref context, ref span_tx)) = self.0.as_ref() {
let options =
Expand All @@ -665,11 +679,11 @@ impl<T> SpanHandle<T> {
}

/// Starts a `FollowsFrom` span if this span is sampled.
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
if let Some(&(ref context, ref span_tx)) = self.0.as_ref() {
let options =
Expand All @@ -680,3 +694,38 @@ impl<T> SpanHandle<T> {
}
}
}

/// This trait allows sending finished spans to a receiver.
pub trait SpanSend<T>: Clone {
/// Sends a finished span to the associated receiver.
///
/// Note that this method should be implemented in a non-blocking manner.
/// And if the receiver is a temporarily full or has dropped,
/// this method should just discard the span without any errors.
fn send(&mut self, span: FinishedSpan<T>);
}

impl<T> SpanSend<T> for crossbeam_channel::Sender<FinishedSpan<T>> {
fn send(&mut self, span: FinishedSpan<T>) {
let _ = self.try_send(span);
}
}

#[cfg(feature = "tokio")]
impl<T> SpanSend<T> for tokio::sync::mpsc::Sender<FinishedSpan<T>> {
fn send(&mut self, span: FinishedSpan<T>) {
let _ = self.try_send(span);
}
}

#[cfg(all(test, feature = "tokio"))]
mod tests {
use crate::sampler::AllSampler;
use crate::Tracer;

#[test]
fn tokio_channel() {
let (span_tx, _span_rx) = tokio::sync::mpsc::channel(10);
let _tracer = Tracer::<_, (), _>::with_sender(AllSampler, span_tx);
}
}
41 changes: 30 additions & 11 deletions src/tracer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::sampler::Sampler;
use crate::span::{SpanReceiver, SpanSender, StartSpanOptions};
use crate::span::{DefaultSpanReceiver, DefaultSpanSender, SpanSend, StartSpanOptions};
use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::Arc;

/// Tracer.
Expand All @@ -20,50 +21,68 @@ use std::sync::Arc;
/// assert_eq!(span.operation_name(), "foo");
/// ```
#[derive(Debug)]
pub struct Tracer<S, T> {
pub struct Tracer<S, T, Sender = DefaultSpanSender<T>> {
sampler: Arc<S>,
span_tx: SpanSender<T>,
span_tx: Sender,
_span_state: PhantomData<T>,
}
impl<S: Sampler<T>, T> Tracer<S, T> {
impl<S, T> Tracer<S, T, DefaultSpanSender<T>>
where
S: Sampler<T>,
{
/// This constructor is mainly for backward compatibility, it has the same interface
/// as in previous versions except the type of `SpanReceiver`.
/// It builds an unbounded channel which may cause memory issues if there is no reader,
/// prefer `with_sender()` alternative with a bounded one.
pub fn new(sampler: S) -> (Self, SpanReceiver<T>) {
pub fn new(sampler: S) -> (Self, DefaultSpanReceiver<T>) {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
(Self::with_sender(sampler, span_tx), span_rx)
}

}
impl<S, T, Sender> Tracer<S, T, Sender>
where
S: Sampler<T>,
Sender: SpanSend<T>,
{
/// Makes a new `Tracer` instance.
pub fn with_sender(sampler: S, span_tx: SpanSender<T>) -> Self {
pub fn with_sender(sampler: S, span_tx: Sender) -> Self {
Tracer {
sampler: Arc::new(sampler),
span_tx,
_span_state: PhantomData,
}
}

/// Returns `StartSpanOptions` for starting a span which has the name `operation_name`.
pub fn span<N>(&self, operation_name: N) -> StartSpanOptions<S, T>
pub fn span<N>(&self, operation_name: N) -> StartSpanOptions<S, T, Sender>
where
N: Into<Cow<'static, str>>,
{
StartSpanOptions::new(operation_name, &self.span_tx, &self.sampler)
}
}
impl<S, T> Tracer<S, T> {
impl<S, T, Sender> Tracer<S, T, Sender>
where
Sender: SpanSend<T>,
{
/// Clone with the given `sampler`.
pub fn clone_with_sampler<U: Sampler<T>>(&self, sampler: U) -> Tracer<U, T> {
pub fn clone_with_sampler<U: Sampler<T>>(&self, sampler: U) -> Tracer<U, T, Sender> {
Tracer {
sampler: Arc::new(sampler),
span_tx: self.span_tx.clone(),
_span_state: PhantomData,
}
}
}
impl<S, T> Clone for Tracer<S, T> {
impl<S, T, Sender> Clone for Tracer<S, T, Sender>
where
Sender: SpanSend<T>,
{
fn clone(&self) -> Self {
Tracer {
sampler: Arc::clone(&self.sampler),
span_tx: self.span_tx.clone(),
_span_state: PhantomData,
}
}
}