Skip to content

Commit

Permalink
stream changes (#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb committed Nov 7, 2023
1 parent fb0e4fd commit 3e6c729
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 25 deletions.
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod state;
pub mod supervisor;
pub mod timing;
pub mod types;
pub mod util;

/// This is a stand-in for the “never” type until RFC 1216 is stabilized.
/// Because it is not constructable, the compiler enforces that a function
Expand Down
12 changes: 6 additions & 6 deletions core/src/messages/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@ pub enum DroneLogMessageKind {

#[derive(Serialize, Deserialize, Debug, TypedMessage)]
#[typed_message(subject = "backend.#backend_id.log")]
pub struct DroneLogMessage {
pub struct BackendLogMessage {
pub backend_id: BackendId,
pub kind: DroneLogMessageKind,
pub text: String,
}

impl DroneLogMessage {
impl BackendLogMessage {
#[cfg(feature = "bollard")]
pub fn from_log_message(
backend_id: &BackendId,
log_message: &LogOutput,
) -> Option<DroneLogMessage> {
) -> Option<BackendLogMessage> {
match log_message {
bollard::container::LogOutput::StdErr { message } => Some(DroneLogMessage {
bollard::container::LogOutput::StdErr { message } => Some(BackendLogMessage {
backend_id: backend_id.clone(),
kind: DroneLogMessageKind::Stderr,
text: std::str::from_utf8(message).ok()?.to_string(),
}),
bollard::container::LogOutput::StdOut { message } => Some(DroneLogMessage {
bollard::container::LogOutput::StdOut { message } => Some(BackendLogMessage {
backend_id: backend_id.clone(),
kind: DroneLogMessageKind::Stdout,
text: std::str::from_utf8(message).ok()?.to_string(),
Expand All @@ -101,7 +101,7 @@ impl DroneLogMessage {
}
}

impl JetStreamable for DroneLogMessage {
impl JetStreamable for BackendLogMessage {
fn stream_name() -> &'static str {
"backend_logs"
}
Expand Down
18 changes: 13 additions & 5 deletions core/src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::nats::JetStreamable;
use crate::{nats::JetStreamable, util::LogAndIgnoreError};
use anyhow::anyhow;
pub mod agent;
pub mod cert;
Expand Down Expand Up @@ -29,10 +29,18 @@ async fn add_jetstream_stream<T: JetStreamable>(
pub async fn initialize_jetstreams(
jetstream: &async_nats::jetstream::Context,
) -> anyhow::Result<()> {
let _ = add_jetstream_stream::<state::WorldStateMessage>(jetstream).await;
let _ = add_jetstream_stream::<agent::DroneLogMessage>(jetstream).await;
let _ = add_jetstream_stream::<agent::BackendStateMessage>(jetstream).await;
let _ = add_jetstream_stream::<dns::SetDnsRecord>(jetstream).await;
add_jetstream_stream::<state::WorldStateMessage>(jetstream)
.await
.log_and_ignore_error();
add_jetstream_stream::<agent::BackendLogMessage>(jetstream)
.await
.log_and_ignore_error();
add_jetstream_stream::<agent::BackendStateMessage>(jetstream)
.await
.log_and_ignore_error();
add_jetstream_stream::<dns::SetDnsRecord>(jetstream)
.await
.log_and_ignore_error();

Ok(())
}
13 changes: 13 additions & 0 deletions core/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::fmt::Debug;

pub trait LogAndIgnoreError {
fn log_and_ignore_error(self);
}

impl<T, E: Debug> LogAndIgnoreError for Result<T, E> {
fn log_and_ignore_error(self) {
if let Err(error) = self {
tracing::error!(?error);
}
}
}
8 changes: 4 additions & 4 deletions dev/tests/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use plane_controller::{drone_state::monitor_drone_state, run::update_backend_sta
use plane_core::{
messages::{
agent::{
BackendState, BackendStatsMessage, DroneLogMessage, DroneLogMessageKind, SpawnRequest,
TerminationRequest,
BackendLogMessage, BackendState, BackendStatsMessage, DroneLogMessageKind,
SpawnRequest, TerminationRequest,
},
drone_state::{DroneStatusMessage, UpdateBackendStateMessage},
scheduler::DrainDrone,
Expand Down Expand Up @@ -308,7 +308,7 @@ async fn invalid_container_fails() {
let (ctx, mut sub) = do_spawn_request(&mut req).await;
let log_subscription = ctx
.nats_connection
.subscribe(DroneLogMessage::subscribe_subject(&req.backend_id))
.subscribe(BackendLogMessage::subscribe_subject(&req.backend_id))
.await
.unwrap()
.timeout(Duration::from_secs(10));
Expand Down Expand Up @@ -354,7 +354,7 @@ async fn check_that_logs_work() {
let (ctx, mut status_sub) = do_spawn_request(&mut req).await;
let mut log_subscription = Box::pin(
ctx.nats_connection
.subscribe(DroneLogMessage::subscribe_subject(&req.backend_id))
.subscribe(BackendLogMessage::subscribe_subject(&req.backend_id))
.await
.unwrap()
.timeout(Duration::from_secs(10)),
Expand Down
10 changes: 5 additions & 5 deletions drone/src/agent/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::agent::engine::Engine;
use futures::Future;
use plane_core::{
logging::LogError,
messages::agent::DroneLogMessage,
messages::agent::BackendLogMessage,
messages::{
agent::DroneLogMessageKind,
dns::{DnsRecordType, SetDnsRecord},
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct BackendMonitor {
_stats_loop: AbortOnDrop<()>,
_dns_loop: AbortOnDrop<Result<(), anyhow::Error>>,
_backend_id: BackendId,
meta_log_tx: Sender<DroneLogMessage>,
meta_log_tx: Sender<BackendLogMessage>,
}

impl BackendMonitor {
Expand Down Expand Up @@ -62,8 +62,8 @@ impl BackendMonitor {
&mut self,
text: String,
kind: DroneLogMessageKind,
) -> impl Future<Output = Result<(), SendError<DroneLogMessage>>> + '_ {
self.meta_log_tx.send(DroneLogMessage {
) -> impl Future<Output = Result<(), SendError<BackendLogMessage>>> + '_ {
self.meta_log_tx.send(BackendLogMessage {
backend_id: self._backend_id.clone(),
kind,
text,
Expand Down Expand Up @@ -100,7 +100,7 @@ impl BackendMonitor {
backend_id: &BackendId,
engine: &E,
nc: &TypedNats,
meta_log_rx: ReceiverStream<DroneLogMessage>,
meta_log_rx: ReceiverStream<BackendLogMessage>,
) -> JoinHandle<()> {
let mut stream = engine.log_stream(backend_id).merge(meta_log_rx);
let nc = nc.clone();
Expand Down
4 changes: 2 additions & 2 deletions drone/src/agent/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use plane_core::{
messages::agent::{BackendStatsMessage, DroneLogMessage, SpawnRequest},
messages::agent::{BackendLogMessage, BackendStatsMessage, SpawnRequest},
types::{BackendId, ClusterName, DroneId},
};
use std::{net::SocketAddr, pin::Pin};
Expand Down Expand Up @@ -46,7 +46,7 @@ pub trait Engine: Send + Sync + 'static {
fn log_stream(
&self,
backend: &BackendId,
) -> Pin<Box<dyn Stream<Item = DroneLogMessage> + Send>>;
) -> Pin<Box<dyn Stream<Item = BackendLogMessage> + Send>>;

fn stats_stream(
&self,
Expand Down
6 changes: 3 additions & 3 deletions drone/src/agent/engines/docker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bollard::{
};
use plane_core::{
messages::agent::{
BackendStatsMessage, DockerExecutableConfig, DockerPullPolicy, DroneLogMessage,
BackendLogMessage, BackendStatsMessage, DockerExecutableConfig, DockerPullPolicy,
SpawnRequest,
},
timing::Timer,
Expand Down Expand Up @@ -403,13 +403,13 @@ impl Engine for DockerInterface {
fn log_stream(
&self,
backend: &BackendId,
) -> Pin<Box<dyn Stream<Item = DroneLogMessage> + Send>> {
) -> Pin<Box<dyn Stream<Item = BackendLogMessage> + Send>> {
let stream = self.get_logs(&backend.to_resource_name());
let backend = backend.clone();
let stream = stream.filter_map(move |v| {
v.ok()
.as_ref()
.and_then(|d| DroneLogMessage::from_log_message(&backend, d))
.and_then(|d| BackendLogMessage::from_log_message(&backend, d))
});
Box::pin(stream)
}
Expand Down

0 comments on commit 3e6c729

Please sign in to comment.