Skip to content
This repository has been archived by the owner on Dec 21, 2021. It is now read-only.

Commit

Permalink
Synchronize pod state with systemd service state (#164)
Browse files Browse the repository at this point in the history
* Functions to start and stop systemd units now await the JobRemoved signal
* All dependencies upgraded
* Changelog converted to asciidoc
* Version set to 0.2.0 and changelog updated
  • Loading branch information
siegfriedweber committed May 20, 2021
1 parent eb68eb0 commit 0c826f8
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 125 deletions.
62 changes: 62 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
= Changelog

== 0.2.0 - 2021-05-20

:159: https://github.com/stackabletech/agent/pull/159[#159]
:164: https://github.com/stackabletech/agent/pull/164[#164]

=== Added
* Templating facility added to the `config-directory` parameter ({159}).

=== Fixed
* Pod state synchronized with systemd service state ({164}).

== 0.1.0 - 2021-05-17

:1: https://github.com/stackabletech/agent/pull/1[#1]
:18: https://github.com/stackabletech/agent/pull/18[#18]
:23: https://github.com/stackabletech/agent/pull/23[#23]
:25: https://github.com/stackabletech/agent/pull/25[#25]
:26: https://github.com/stackabletech/agent/pull/26[#26]
:30: https://github.com/stackabletech/agent/pull/30[#30]
:32: https://github.com/stackabletech/agent/pull/32[#32]
:35: https://github.com/stackabletech/agent/pull/35[#35]
:36: https://github.com/stackabletech/agent/pull/36[#36]
:40: https://github.com/stackabletech/agent/pull/40[#40]
:43: https://github.com/stackabletech/agent/pull/43[#43]
:50: https://github.com/stackabletech/agent/pull/50[#50]
:53: https://github.com/stackabletech/agent/pull/53[#53]
:56: https://github.com/stackabletech/agent/pull/56[#56]
:57: https://github.com/stackabletech/agent/pull/57[#57]
:63: https://github.com/stackabletech/agent/pull/63[#63]
:72: https://github.com/stackabletech/agent/pull/72[#72]
:73: https://github.com/stackabletech/agent/pull/73[#73]
:77: https://github.com/stackabletech/agent/pull/77[#77]
:78: https://github.com/stackabletech/agent/pull/78[#78]
:79: https://github.com/stackabletech/agent/pull/79[#79]
:94: https://github.com/stackabletech/agent/pull/94[#94]
:100: https://github.com/stackabletech/agent/pull/100[#100]
:109: https://github.com/stackabletech/agent/pull/109[#109]
:110: https://github.com/stackabletech/agent/pull/110[#110]
:135: https://github.com/stackabletech/agent/pull/135[#135]
:138: https://github.com/stackabletech/agent/pull/138[#138]
:144: https://github.com/stackabletech/agent/pull/144[#144]
:145: https://github.com/stackabletech/agent/pull/145[#145]
:152: https://github.com/stackabletech/agent/pull/152[#152]

=== Added
* Apache license v2.0 set ({23}).
* Krustlet based agent implementation created ({1}, {18}, {26}, {35}, {40}).
* Functionality to stop and restart processes added ({25}).
* Agent restart without impacting running services enabled ({63}).
* Rendering of template variables to environment variables added ({30}).
* Setting of pod condition "ready" for state "running" added ({32}).
* Support for command line parameters added ({36}, {50}, {72}, {109}).
* Integration with systemd implemented ({43}, {53}, {100}, {152}).
* Dependabot and security audit enabled ({56}, {57}).
* Building and publishing of nightly deb and rpm packages added ({73}, {78}, {94}, {110}, {144}).
* Bootstrapping of certificates and kubeconfig added ({77}).
* Support for running of services as application users added ({79}).
* Retrieval of container logs with kubectl logs implemented ({135}).
* Configuration of terminationGracePeriodSeconds considered in systemd units ({138}).
* Systemd dependency adapted so that it is compatible with systemd version 241 ({145}).
20 changes: 0 additions & 20 deletions CHANGELOG.md

This file was deleted.

49 changes: 25 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "stackable-agent"
description = "The component of the Stackable Platform that manages installation of services on the workers"
version = "0.2.0-nightly"
version = "0.2.0"
authors = ["Sönke Liebau <soenke.liebau@stackable.de>"]
edition = "2018"
license = "Apache-2.0"
Expand All @@ -15,6 +15,7 @@ async-trait = "0.1"
byteorder = "1.4"
env_logger = "0.8"
flate2 = "1.0"
futures-util = "0.3"
handlebars = "3.5"
hostname = "0.3"
k8s-openapi = { version = "0.11", default-features = false, features = ["api", "v1_18"] }
Expand Down
85 changes: 71 additions & 14 deletions src/provider/systemdmanager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
//! The module offers the ability to create, remove, start, stop, enable and
//! disable systemd units.
//!
use super::systemd1_api::{ActiveState, AsyncManagerProxy, StartMode, StopMode};
use super::systemd1_api::{
ActiveState, AsyncJobProxy, AsyncManagerProxy, JobRemovedResult, JobRemovedSignal,
ManagerSignals, StartMode, StopMode,
};
use crate::provider::systemdmanager::systemdunit::SystemDUnit;
use crate::provider::StackableError;
use crate::provider::StackableError::RuntimeError;
use anyhow::anyhow;
use futures_util::{future, stream::StreamExt};
use log::debug;
use std::fs;
use std::fs::File;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use zbus::azync::Connection;
Expand Down Expand Up @@ -273,15 +278,17 @@ impl SystemdManager {
/// systemd at the time this is called.
/// To make a service known please take a look at the [`SystemdManager::enable`] function.
pub async fn start(&self, unit: &str) -> anyhow::Result<()> {
debug!("Attempting to start unit {}", unit);
debug!("Trying to start unit [{}]", unit);

match self.proxy.start_unit(unit, StartMode::Fail).await {
Ok(result) => {
debug!("Successfully started service [{}]: [{:?}]", unit, result);
Ok(())
}
Err(e) => Err(anyhow!("Error starting service [{}]: {}", unit, e)),
let result = self
.call_method(|proxy| proxy.start_unit(unit, StartMode::Fail))
.await;

if result.is_ok() {
debug!("Successfully started service [{}]", unit);
}

result.map_err(|e| anyhow!("Error starting service [{}]: {}", unit, e))
}

/// Attempts to stop a systemd unit
Expand All @@ -291,12 +298,62 @@ impl SystemdManager {
pub async fn stop(&self, unit: &str) -> anyhow::Result<()> {
debug!("Trying to stop systemd unit [{}]", unit);

match self.proxy.stop_unit(unit, StopMode::Fail).await {
Ok(result) => {
debug!("Successfully stopped service [{}]: [{:?}]", unit, result);
Ok(())
}
Err(e) => Err(anyhow!("Error stopping service [{}]: {}", unit, e)),
let result = self
.call_method(|proxy| proxy.stop_unit(unit, StopMode::Fail))
.await;

if result.is_ok() {
debug!("Successfully stopped service [{}]", unit);
}

result.map_err(|e| anyhow!("Error stopping service [{}]: {}", unit, e))
}

/// Calls a systemd method and waits until the dependent job is
/// finished.
///
/// The given method enqueues a job in systemd and returns the job
/// object. Systemd sends out a `JobRemoved` signal when the job is
/// dequeued. The signal contains the reason for the dequeuing like
/// `"done"`, `"failed"`, or `"canceled"`.
///
/// This function subscribes to `JobRemoved` signals, calls the
/// given method, awaits the signal for the corresponding job, and
/// returns `Ok(())` if the result is [`JobRemovedResult::Done`].
/// If the signal contains another result or no signal is returned
/// (which should never happen) then an error with a corresponding
/// message is returned.
async fn call_method<'a, F, Fut>(&'a self, method: F) -> anyhow::Result<()>
where
F: Fn(&'a AsyncManagerProxy) -> Fut,
Fut: Future<Output = zbus::Result<AsyncJobProxy<'a>>>,
{
let signals = self
.proxy
.receive_signal(ManagerSignals::JobRemoved.into())
.await?
.map(|message| message.body::<JobRemovedSignal>().unwrap());

let job = method(&self.proxy).await?;

let mut signals = signals
.filter(|signal| future::ready(&signal.job.to_owned().into_inner() == job.path()));

let signal = signals.next().await;

// Unsubscribe from receiving signals.
// If `signals` goes out of scope prematurely due to an error
// then the subscription is cancelled synchronously in the
// destructor of `SignalStream`.
let _ = signals.into_inner().into_inner().close().await;

match signal {
Some(message) if message.result == JobRemovedResult::Done => Ok(()),
Some(message) => Err(anyhow!("The systemd job failed: {:?}", message)),
None => Err(anyhow!(
"No signal was returned for the systemd job: {:?}",
job
)),
}
}

Expand Down
Loading

0 comments on commit 0c826f8

Please sign in to comment.