Skip to content

Commit

Permalink
feat(internal telemetry at shutdown): close internal sources after ex…
Browse files Browse the repository at this point in the history
…ternal ones (#17741)

<!--
**Your PR title must conform to the conventional commit spec!**

  <type>(<scope>)!: <description>

  * `type` = chore, enhancement, feat, fix, docs
  * `!` = OPTIONAL: signals a breaking change
* `scope` = Optional when `type` is "chore" or "docs", available scopes
https://github.com/vectordotdev/vector/blob/master/.github/semantic.yml#L20
  * `description` = short description of the change

Examples:

  * enhancement(file source): Add `sort` option to sort discovered files
  * feat(new source): Initial `statsd` source
  * fix(file source): Fix a bug discovering new files
  * chore(external docs): Clarify `batch_size` option
-->

We would like to close the internal logs, metrics, and trace sources
sent from Vector as late as possible during shutdown to facilitate
debugging. In this PR, we wait until all other sources are shut down
before shutting down internal telemetry sources. This means that
shutdown may be a bit longer, but we will have better observability on
the shutdown process.

issue: #15912
  • Loading branch information
DominicBurkart committed Jun 30, 2023
1 parent c19938c commit 812929b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 16 deletions.
33 changes: 22 additions & 11 deletions lib/vector-common/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ impl ShutdownSignal {
}
}

type IsInternal = bool;

#[derive(Debug, Default)]
pub struct SourceShutdownCoordinator {
shutdown_begun_triggers: HashMap<ComponentKey, Trigger>,
shutdown_begun_triggers: HashMap<ComponentKey, (IsInternal, Trigger)>,
shutdown_force_triggers: HashMap<ComponentKey, Trigger>,
shutdown_complete_tripwires: HashMap<ComponentKey, Tripwire>,
}
Expand All @@ -121,13 +123,14 @@ impl SourceShutdownCoordinator {
pub fn register_source(
&mut self,
id: &ComponentKey,
internal: bool,
) -> (ShutdownSignal, impl Future<Output = ()>) {
let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new();
let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new();
let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new();

self.shutdown_begun_triggers
.insert(id.clone(), shutdown_begun_trigger);
.insert(id.clone(), (internal, shutdown_begun_trigger));
self.shutdown_force_triggers
.insert(id.clone(), force_shutdown_trigger);
self.shutdown_complete_tripwires
Expand Down Expand Up @@ -201,13 +204,14 @@ impl SourceShutdownCoordinator {
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_all(self, deadline: Option<Instant>) -> impl Future<Output = ()> {
let mut complete_futures = Vec::new();
let mut internal_sources_complete_futures = Vec::new();
let mut external_sources_complete_futures = Vec::new();

let shutdown_begun_triggers = self.shutdown_begun_triggers;
let mut shutdown_complete_tripwires = self.shutdown_complete_tripwires;
let mut shutdown_force_triggers = self.shutdown_force_triggers;

for (id, trigger) in shutdown_begun_triggers {
for (id, (internal, trigger)) in shutdown_begun_triggers {
trigger.cancel();

let shutdown_complete_tripwire =
Expand All @@ -229,10 +233,16 @@ impl SourceShutdownCoordinator {
deadline,
);

complete_futures.push(source_complete);
if internal {
internal_sources_complete_futures.push(source_complete);
} else {
external_sources_complete_futures.push(source_complete);
}
}

futures::future::join_all(complete_futures).map(|_| ())
futures::future::join_all(external_sources_complete_futures)
.then(|_| futures::future::join_all(internal_sources_complete_futures))
.map(|_| ())
}

/// Sends the signal to the given source to begin shutting down. Returns a future that resolves
Expand All @@ -250,11 +260,12 @@ impl SourceShutdownCoordinator {
id: &ComponentKey,
deadline: Instant,
) -> impl Future<Output = bool> {
let begin_shutdown_trigger = self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| {
panic!(
let (_, begin_shutdown_trigger) =
self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| {
panic!(
"shutdown_begun_trigger for source \"{id}\" not found in the ShutdownCoordinator"
)
});
});
// This is what actually triggers the source to begin shutting down.
begin_shutdown_trigger.cancel();

Expand Down Expand Up @@ -336,7 +347,7 @@ mod test {
let mut shutdown = SourceShutdownCoordinator::default();
let id = ComponentKey::from("test");

let (shutdown_signal, _) = shutdown.register_source(&id);
let (shutdown_signal, _) = shutdown.register_source(&id, false);

let deadline = Instant::now() + Duration::from_secs(1);
let shutdown_complete = shutdown.shutdown_source(&id, deadline);
Expand All @@ -352,7 +363,7 @@ mod test {
let mut shutdown = SourceShutdownCoordinator::default();
let id = ComponentKey::from("test");

let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id);
let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id, false);

let deadline = Instant::now() + Duration::from_secs(1);
let shutdown_complete = shutdown.shutdown_source(&id, deadline);
Expand Down
2 changes: 1 addition & 1 deletion src/config/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl SourceContext {
out: SourceSender,
) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
let (shutdown_signal, _) = shutdown.register_source(key);
let (shutdown_signal, _) = shutdown.register_source(key, false);
(
Self {
key: key.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ mod test {
source_id: &ComponentKey,
shutdown: &mut SourceShutdownCoordinator,
) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
let (shutdown_signal, _) = shutdown.register_source(source_id);
let (shutdown_signal, _) = shutdown.register_source(source_id, false);
init_udp_inner(sender, source_id, shutdown_signal, None, false).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/sources/util/framestream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ mod test {
let source_id = ComponentKey::from(source_id);
let socket_path = frame_handler.socket_path();
let mut shutdown = SourceShutdownCoordinator::default();
let (shutdown_signal, _) = shutdown.register_source(&source_id);
let (shutdown_signal, _) = shutdown.register_source(&source_id, false);
let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline)
.expect("Failed to build framestream unix source.");

Expand Down
7 changes: 5 additions & 2 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {
.unwrap_or_else(crate::num_threads)
});

const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];

/// Builds only the new pieces, and doesn't check their topology.
pub async fn build_pieces(
config: &super::Config,
Expand Down Expand Up @@ -313,8 +315,9 @@ impl<'a> Builder<'a> {

let pipeline = builder.build();

let (shutdown_signal, force_shutdown_tripwire) =
self.shutdown_coordinator.register_source(key);
let (shutdown_signal, force_shutdown_tripwire) = self
.shutdown_coordinator
.register_source(key, INTERNAL_SOURCES.contains(&typetag));

let context = SourceContext {
key: key.clone(),
Expand Down

0 comments on commit 812929b

Please sign in to comment.