Skip to content

Commit

Permalink
Merge 34ac502 into 117cbd1
Browse files Browse the repository at this point in the history
  • Loading branch information
sile committed Mar 29, 2020
2 parents 117cbd1 + 34ac502 commit 7f57d1e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ default = [ "stacktrace" ]
backtrace = { version = "0.3", optional = true }
crossbeam-channel = "0.4"
rand = "0.7"
tokio = { version = "0.2", optional = true, features = ["sync"] }
trackable = "0.2"
109 changes: 79 additions & 30 deletions src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@ use std::borrow::Cow;
use std::io::{Read, Write};
use std::time::SystemTime;

/// Finished span receiver.
pub type SpanReceiver<T> = crossbeam_channel::Receiver<FinishedSpan<T>>;
/// Sender of finished spans to the destination channel.
pub type SpanSender<T> = crossbeam_channel::Sender<FinishedSpan<T>>;
/// The default receiver of finished spans.
pub type DefaultSpanReceiver<T> = crossbeam_channel::Receiver<FinishedSpan<T>>;

/// The default sender of finished spans.
pub type DefaultSpanSender<T> = crossbeam_channel::Sender<FinishedSpan<T>>;

/// Span.
///
/// When this span is dropped, it will be converted to `FinishedSpan` and
/// it will be sent to the associated `SpanReceiver`.
#[derive(Debug)]
pub struct Span<T>(Option<SpanInner<T>>);
impl<T> Span<T> {
pub struct Span<T, Sender = DefaultSpanSender<T>>(Option<SpanInner<T, Sender>>)
where
Sender: SpanSend<T>;
impl<T, Sender> Span<T, Sender>
where
Sender: SpanSend<T>,
{
/// Makes an inactive span.
///
/// This span is never traced.
Expand All @@ -38,7 +44,7 @@ impl<T> Span<T> {
}

/// Returns a handle of this span.
pub fn handle(&self) -> SpanHandle<T>
pub fn handle(&self) -> SpanHandle<T, Sender>
where
T: Clone,
{
Expand Down Expand Up @@ -169,21 +175,21 @@ impl<T> Span<T> {
}

/// Starts a `ChildOf` span if this span is sampled.
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
self.handle().child(operation_name, f)
}

/// Starts a `FollowsFrom` span if this span is sampled.
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
self.handle().follower(operation_name, f)
}
Expand All @@ -195,7 +201,7 @@ impl<T> Span<T> {
tags: Vec<Tag>,
state: T,
baggage_items: Vec<BaggageItem>,
span_tx: SpanSender<T>,
span_tx: Sender,
) -> Self {
let context = SpanContext::new(state, baggage_items);
let inner = SpanInner {
Expand All @@ -211,9 +217,12 @@ impl<T> Span<T> {
Span(Some(inner))
}
}
impl<T> Drop for Span<T> {
impl<T, Sender> Drop for Span<T, Sender>
where
Sender: SpanSend<T>,
{
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
if let Some(mut inner) = self.0.take() {
let finished = FinishedSpan {
operation_name: inner.operation_name,
start_time: inner.start_time,
Expand All @@ -223,7 +232,7 @@ impl<T> Drop for Span<T> {
logs: inner.logs,
context: inner.context,
};
let _ = inner.span_tx.try_send(finished);
inner.span_tx.send(finished);
}
}
}
Expand All @@ -234,15 +243,15 @@ impl<T> MaybeAsRef<SpanContext<T>> for Span<T> {
}

#[derive(Debug)]
struct SpanInner<T> {
struct SpanInner<T, Sender> {
operation_name: Cow<'static, str>,
start_time: SystemTime,
finish_time: Option<SystemTime>,
references: Vec<SpanReference<T>>,
tags: Vec<Tag>,
logs: Vec<Log>,
context: SpanContext<T>,
span_tx: SpanSender<T>,
span_tx: Sender,
}

/// Finished span.
Expand Down Expand Up @@ -485,18 +494,20 @@ impl<'a, T: 'a> CandidateSpan<'a, T> {

/// Options for starting a span.
#[derive(Debug)]
pub struct StartSpanOptions<'a, S: 'a, T: 'a> {
pub struct StartSpanOptions<'a, S: 'a, T: 'a, Sender: 'a = DefaultSpanSender<T>> {
operation_name: Cow<'static, str>,
start_time: Option<SystemTime>,
tags: Vec<Tag>,
references: Vec<SpanReference<T>>,
baggage_items: Vec<BaggageItem>,
span_tx: &'a SpanSender<T>,
span_tx: &'a Sender,
sampler: &'a S,
}
impl<'a, S: 'a, T: 'a> StartSpanOptions<'a, S, T>
impl<'a, S, T, Sender> StartSpanOptions<'a, S, T, Sender>
where
S: Sampler<T>,
S: 'a + Sampler<T>,
T: 'a,
Sender: 'a + SpanSend<T>,
{
/// Sets the start time of this span.
pub fn start_time(mut self, time: SystemTime) -> Self {
Expand Down Expand Up @@ -541,7 +552,7 @@ where
}

/// Starts a new span.
pub fn start(mut self) -> Span<T>
pub fn start(mut self) -> Span<T, Sender>
where
T: for<'b> From<CandidateSpan<'b, T>>,
{
Expand All @@ -562,7 +573,7 @@ where
}

/// Starts a new span with the explicit `state`.
pub fn start_with_state(mut self, state: T) -> Span<T> {
pub fn start_with_state(mut self, state: T) -> Span<T, Sender> {
self.normalize();
if !self.is_sampled() {
return Span(None);
Expand All @@ -578,7 +589,7 @@ where
)
}

pub(crate) fn new<N>(operation_name: N, span_tx: &'a SpanSender<T>, sampler: &'a S) -> Self
pub(crate) fn new<N>(operation_name: N, span_tx: &'a Sender, sampler: &'a S) -> Self
where
N: Into<Cow<'static, str>>,
{
Expand Down Expand Up @@ -627,8 +638,11 @@ where

/// Immutable handle of `Span`.
#[derive(Debug, Clone)]
pub struct SpanHandle<T>(Option<(SpanContext<T>, SpanSender<T>)>);
impl<T> SpanHandle<T> {
pub struct SpanHandle<T, Sender = DefaultSpanSender<T>>(Option<(SpanContext<T>, Sender)>);
impl<T, Sender> SpanHandle<T, Sender>
where
Sender: SpanSend<T>,
{
/// Returns `true` if this span is sampled (i.e., being traced).
pub fn is_sampled(&self) -> bool {
self.0.is_some()
Expand All @@ -649,11 +663,11 @@ impl<T> SpanHandle<T> {
}

/// Starts a `ChildOf` span if this span is sampled.
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn child<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
if let Some(&(ref context, ref span_tx)) = self.0.as_ref() {
let options =
Expand All @@ -665,11 +679,11 @@ impl<T> SpanHandle<T> {
}

/// Starts a `FollowsFrom` span if this span is sampled.
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T>
pub fn follower<N, F>(&self, operation_name: N, f: F) -> Span<T, Sender>
where
N: Into<Cow<'static, str>>,
T: Clone,
F: FnOnce(StartSpanOptions<AllSampler, T>) -> Span<T>,
F: FnOnce(StartSpanOptions<AllSampler, T, Sender>) -> Span<T, Sender>,
{
if let Some(&(ref context, ref span_tx)) = self.0.as_ref() {
let options =
Expand All @@ -680,3 +694,38 @@ impl<T> SpanHandle<T> {
}
}
}

/// This trait allows sending finished spans to a receiver.
pub trait SpanSend<T>: Clone {
/// Sends a finished span to the associated receiver.
///
/// Note that this method should be implemented in a non-blocking manner.
/// And if the receiver is a temporarily full or has dropped,
/// this method should just discard the span without any errors.
fn send(&mut self, span: FinishedSpan<T>);
}

impl<T> SpanSend<T> for crossbeam_channel::Sender<FinishedSpan<T>> {
fn send(&mut self, span: FinishedSpan<T>) {
let _ = self.try_send(span);
}
}

#[cfg(feature = "tokio")]
impl<T> SpanSend<T> for tokio::sync::mpsc::Sender<FinishedSpan<T>> {
fn send(&mut self, span: FinishedSpan<T>) {
let _ = self.try_send(span);
}
}

#[cfg(all(test, feature = "tokio"))]
mod tests {
use crate::sampler::AllSampler;
use crate::Tracer;

#[test]
fn tokio_channel() {
let (span_tx, _span_rx) = tokio::sync::mpsc::channel(10);
let _tracer = Tracer::<_, (), _>::with_sender(AllSampler, span_tx);
}
}
41 changes: 30 additions & 11 deletions src/tracer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::sampler::Sampler;
use crate::span::{SpanReceiver, SpanSender, StartSpanOptions};
use crate::span::{DefaultSpanReceiver, DefaultSpanSender, SpanSend, StartSpanOptions};
use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::Arc;

/// Tracer.
Expand All @@ -20,50 +21,68 @@ use std::sync::Arc;
/// assert_eq!(span.operation_name(), "foo");
/// ```
#[derive(Debug)]
pub struct Tracer<S, T> {
pub struct Tracer<S, T, Sender = DefaultSpanSender<T>> {
sampler: Arc<S>,
span_tx: SpanSender<T>,
span_tx: Sender,
_span_state: PhantomData<T>,
}
impl<S: Sampler<T>, T> Tracer<S, T> {
impl<S, T> Tracer<S, T, DefaultSpanSender<T>>
where
S: Sampler<T>,
{
/// This constructor is mainly for backward compatibility, it has the same interface
/// as in previous versions except the type of `SpanReceiver`.
/// It builds an unbounded channel which may cause memory issues if there is no reader,
/// prefer `with_sender()` alternative with a bounded one.
pub fn new(sampler: S) -> (Self, SpanReceiver<T>) {
pub fn new(sampler: S) -> (Self, DefaultSpanReceiver<T>) {
let (span_tx, span_rx) = crossbeam_channel::unbounded();
(Self::with_sender(sampler, span_tx), span_rx)
}

}
impl<S, T, Sender> Tracer<S, T, Sender>
where
S: Sampler<T>,
Sender: SpanSend<T>,
{
/// Makes a new `Tracer` instance.
pub fn with_sender(sampler: S, span_tx: SpanSender<T>) -> Self {
pub fn with_sender(sampler: S, span_tx: Sender) -> Self {
Tracer {
sampler: Arc::new(sampler),
span_tx,
_span_state: PhantomData,
}
}

/// Returns `StartSpanOptions` for starting a span which has the name `operation_name`.
pub fn span<N>(&self, operation_name: N) -> StartSpanOptions<S, T>
pub fn span<N>(&self, operation_name: N) -> StartSpanOptions<S, T, Sender>
where
N: Into<Cow<'static, str>>,
{
StartSpanOptions::new(operation_name, &self.span_tx, &self.sampler)
}
}
impl<S, T> Tracer<S, T> {
impl<S, T, Sender> Tracer<S, T, Sender>
where
Sender: SpanSend<T>,
{
/// Clone with the given `sampler`.
pub fn clone_with_sampler<U: Sampler<T>>(&self, sampler: U) -> Tracer<U, T> {
pub fn clone_with_sampler<U: Sampler<T>>(&self, sampler: U) -> Tracer<U, T, Sender> {
Tracer {
sampler: Arc::new(sampler),
span_tx: self.span_tx.clone(),
_span_state: PhantomData,
}
}
}
impl<S, T> Clone for Tracer<S, T> {
impl<S, T, Sender> Clone for Tracer<S, T, Sender>
where
Sender: SpanSend<T>,
{
fn clone(&self) -> Self {
Tracer {
sampler: Arc::clone(&self.sampler),
span_tx: self.span_tx.clone(),
_span_state: PhantomData,
}
}
}

0 comments on commit 7f57d1e

Please sign in to comment.