Skip to content

Commit

Permalink
payload union
Browse files Browse the repository at this point in the history
Signed-off-by: Lee Benson <lee@leebenson.com>
  • Loading branch information
leebenson committed Mar 26, 2021
1 parent 36b297f commit 10df625
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 98 deletions.
8 changes: 8 additions & 0 deletions src/api/schema/events/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use async_graphql::Enum;

#[derive(Enum, Copy, Clone, PartialEq, Eq)]
/// Encoding format for the event
pub enum EventEncodingType {
Json,
Yaml,
}
68 changes: 0 additions & 68 deletions src/api/schema/events/event.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use super::event::EventEncodingType;
use super::EventEncodingType;
use crate::event;

use async_graphql::Object;
use chrono::{DateTime, Utc};

#[derive(Debug)]
pub struct LogEvent {
pub struct Log {
component_name: String,
event: event::LogEvent,
}

impl LogEvent {
impl Log {
pub fn new(component_name: &str, event: event::LogEvent) -> Self {
Self {
component_name: component_name.to_string(),
Expand All @@ -28,7 +29,7 @@ impl LogEvent {

#[Object]
/// Log event with fields for querying log data
impl LogEvent {
impl Log {
/// Name of the component associated with the log event
async fn component_name(&self) -> &str {
&self.component_name
Expand Down
45 changes: 27 additions & 18 deletions src/api/schema/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
mod event;
mod log_event;
mod encoding;
mod log;
mod notification;
mod output;

use event::Event;
use output::OutputEventsPayload;

use crate::{api::tap::TapSink, topology::WatchRx};

use async_graphql::{validators::IntRange, Context, Subscription};
use async_graphql::{validators::IntRange, Context, Enum, Subscription};
use futures::StreamExt;
use itertools::Itertools;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use tokio::{select, stream::Stream, sync::mpsc, time};

#[derive(Enum, Copy, Clone, PartialEq, Eq)]
/// Encoding format for the event
pub enum EventEncodingType {
Json,
Yaml,
}

#[derive(Debug, Default)]
pub struct EventsSubscription;

Expand All @@ -23,7 +32,7 @@ impl EventsSubscription {
component_names: Vec<String>,
#[graphql(default = 500)] interval: u32,
#[graphql(default = 100, validator(IntRange(min = "1", max = "10_000")))] limit: u32,
) -> impl Stream<Item = Vec<Event>> + 'a {
) -> impl Stream<Item = Vec<OutputEventsPayload>> + 'a {
let watch_rx = ctx.data_unchecked::<WatchRx>().clone();

// Client input is confined to `u32` to provide sensible bounds.
Expand All @@ -39,15 +48,15 @@ fn create_events_stream(
component_names: Vec<String>,
interval: u64,
limit: usize,
) -> impl Stream<Item = Vec<Event>> {
) -> impl Stream<Item = Vec<OutputEventsPayload>> {
// Channel for receiving individual tap payloads. Since we can process at most `limit` per
// interval, this is capped to the same value.
let (tap_tx, mut tap_rx) = mpsc::channel(limit);

// The resulting vector of `Event` sent to the client. Only one result set will be streamed
// back to the client at a time. This value is set higher than `1` to prevent blocking the event
// pipeline on slower client connections, but low enough to apply a modest cap on mem usage.
let (mut event_tx, event_rx) = mpsc::channel::<Vec<Event>>(10);
let (mut event_tx, event_rx) = mpsc::channel::<Vec<OutputEventsPayload>>(10);

tokio::spawn(async move {
// Create a tap sink. When this drops out of scope, clean up will be performed on the
Expand All @@ -58,14 +67,14 @@ fn create_events_stream(
let mut interval = time::interval(time::Duration::from_millis(interval));

// Temporary structure to hold sortable values of `Event`.
struct SortableEvent {
struct SortableOutputEventsPayload {
batch: usize,
event: Event,
payload: OutputEventsPayload,
}

// Collect a vector of results, with a capacity of `limit`. As new `Event`s come in,
// they will be sampled and added to results.
let mut results = Vec::<SortableEvent>::with_capacity(limit);
let mut results = Vec::<SortableOutputEventsPayload>::with_capacity(limit);

// Random number generator to allow for sampling. Speed trumps cryptographic security here.
// The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
Expand All @@ -80,32 +89,32 @@ fn create_events_stream(
// Process `TapPayload`s. A tap payload could contain log/metric events or a
// notification. Notifications are emitted immediately; events buffer until
// the next `interval`.
Some(event) = tap_rx.next() => {
let event = event.into();
Some(payload) = tap_rx.next() => {
let payload = payload.into();

// Emit notifications immediately; these don't count as a 'batch'.
if let Event::Notification(_) = event {
if let OutputEventsPayload::Notification(_) = payload {
// If an error occurs when sending, the subscription has likely gone
// away. Break the loop to terminate the thread.
if let Err(err) = event_tx.send(vec![event]).await {
if let Err(err) = event_tx.send(vec![payload]).await {
debug!(message = "Couldn't send notification.", error = ?err);
break;
}
} else {
// Wrap tap in a 'sortable' wrapper, using the batch as a key, to
// re-sort after random eviction.
let event = SortableEvent { batch, event };
let payload = SortableOutputEventsPayload { batch, payload };

// A simple implementation of "Algorithm R" per
// https://en.wikipedia.org/wiki/Reservoir_sampling. As we're unable to
// pluck the nth result, this is chosen over the more optimal "Algorithm L"
// since discarding results isn't an option.
if limit > results.len() {
results.push(event);
results.push(payload);
} else {
let random_number = rng.gen_range(0..batch);
if random_number < results.len() {
results[random_number] = event;
results[random_number] = payload;
}
}
// Increment the batch count, to be used for the next Algo R loop.
Expand All @@ -123,7 +132,7 @@ fn create_events_stream(
let results = results
.drain(..)
.sorted_by_key(|r| r.batch)
.map(|r| r.event)
.map(|r| r.payload)
.collect();

// If we get an error here, it likely means that the subscription has
Expand Down
29 changes: 29 additions & 0 deletions src/api/schema/events/notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use async_graphql::{Enum, SimpleObject};

#[derive(Enum, Debug, Copy, Clone, PartialEq, Eq)]
/// Event notification type
pub enum EventNotificationType {
/// A component was found that matched the provided pattern
Matched,
/// There isn't currently a component that matches this pattern
NotMatched,
}

#[derive(Debug, SimpleObject)]
/// A notification regarding events observation
pub struct EventNotification {
/// Name of the component associated with the notification
component_name: String,

/// Event notification type
notification: EventNotificationType,
}

impl EventNotification {
pub fn new(component_name: &str, notification: EventNotificationType) -> Self {
Self {
component_name: component_name.to_string(),
notification,
}
}
}
37 changes: 37 additions & 0 deletions src/api/schema/events/output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use super::{
log::Log,
notification::{EventNotification, EventNotificationType},
};
use crate::api::tap::{TapNotification, TapPayload};

use async_graphql::Union;

#[derive(Union, Debug)]
/// An event or a notification
pub enum OutputEventsPayload {
/// Log event
Log(Log),

// Notification
Notification(EventNotification),
}

/// Convert an `api::TapPayload` to the equivalent GraphQL type.
impl From<TapPayload> for OutputEventsPayload {
fn from(t: TapPayload) -> Self {
match t {
TapPayload::Log(name, ev) => Self::Log(Log::new(&name, ev)),
TapPayload::Notification(name, n) => match n {
TapNotification::Matched => Self::Notification(EventNotification::new(
&name,
EventNotificationType::Matched,
)),
TapNotification::NotMatched => Self::Notification(EventNotification::new(
&name,
EventNotificationType::NotMatched,
)),
},
_ => unreachable!("TODO: implement metrics"),
}
}
}
13 changes: 5 additions & 8 deletions src/api/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ pub enum TapNotification {
/// to be communicated back to the client to alert them about the status of the tap request.
#[derive(Debug)]
pub enum TapPayload {
LogEvent(String, LogEvent),
MetricEvent(String, LogEvent),
Log(String, LogEvent),
Metric(String, LogEvent),
Notification(String, TapNotification),
}

Expand Down Expand Up @@ -92,7 +92,7 @@ async fn send_not_matched(mut tx: TapSender, pattern: &str) -> Result<(), SendEr
tx.send(TapPayload::not_matched(pattern)).await
}

/// Makes a `RouterSink` that relays `LogEvent` as `TapPayload::LogEvent` to a client.
/// Makes a `RouterSink` that relays `Log` as `TapPayload::Log` to a client.
fn make_router(mut tx: TapSender, component_name: &str) -> fanout::RouterSink {
let (event_tx, mut event_rx) = futures_mpsc::unbounded();
let component_name = component_name.to_string();
Expand All @@ -102,10 +102,7 @@ fn make_router(mut tx: TapSender, component_name: &str) -> fanout::RouterSink {

while let Some(ev) = event_rx.next().await {
if let Event::Log(ev) = ev {
if let Err(err) = tx
.send(TapPayload::LogEvent(component_name.clone(), ev))
.await
{
if let Err(err) = tx.send(TapPayload::Log(component_name.clone(), ev)).await {
debug!(
message = "Couldn't send log event.",
error = ?err,
Expand Down Expand Up @@ -334,7 +331,7 @@ mod tests {
// 3rd payload should be the log event
assert!(matches!(
sink_rx.recv().await,
Some(TapPayload::LogEvent(returned_name, _)) if returned_name == name
Some(TapPayload::Log(returned_name, _)) if returned_name == name
));
}
}

0 comments on commit 10df625

Please sign in to comment.