Skip to content
This repository has been archived by the owner on Dec 21, 2021. It is now read-only.

Commit

Permalink
Fix race condition (#176)
Browse files Browse the repository at this point in the history
* zbus version upgraded to prevent deadlock
* Version set to 0.3.0 and changelog updated
* DBus message queue size made dependent on maximum number of pods
  • Loading branch information
siegfriedweber committed May 27, 2021
1 parent 4905e57 commit 15b143b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 50 deletions.
18 changes: 17 additions & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
= Changelog

== 0.3.0 - unreleased
== 0.3.0 - 2021-05-27

:165: https://github.com/stackabletech/agent/pull/165[#165]
:169: https://github.com/stackabletech/agent/pull/169[#169]
:173: https://github.com/stackabletech/agent/pull/176[#173]
:176: https://github.com/stackabletech/agent/pull/176[#176]

=== Added
* Artifacts for merge requests are created ({169}, {173}).

=== Changed
* Structure of the documentation changed so that it can be incorporated
into the overall Stackable documentation ({165}).

=== Fixed
* Deadlock fixed which occurred when multiple pods were started or
stopped simultaneously ({176}).

== 0.2.0 - 2021-05-20

Expand Down
76 changes: 46 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "stackable-agent"
description = "The component of the Stackable Platform that manages installation of services on the workers"
version = "0.3.0-nightly"
version = "0.3.0"
authors = ["Sönke Liebau <soenke.liebau@stackable.de>"]
edition = "2018"
license = "Apache-2.0"
Expand Down Expand Up @@ -42,8 +42,15 @@ tar = "0.4"
thiserror = "1.0"
tokio = { version = "1.6", features = ["macros", "rt-multi-thread", "time"] }
url = "2.2"
zbus = "2.0.0-beta.3"
zvariant = "2.6"
# The current zbus version 2.0.0-beta.3 causes deadlocks (see
# https://gitlab.freedesktop.org/dbus/zbus/-/issues/150). So the
# dependency is pinned to the current HEAD of the main branch until the
# next version is released.
zbus = { git = "https://gitlab.freedesktop.org/dbus/zbus.git", rev = "9c551554e665532abc76469cdc73c1943bfb6285" }
# The current zvariant version 2.6.0 is not compatible with the pinned
# zbus version. So the dependency is pinned to the current HEAD of the
# main branch until the next version is released.
zvariant = { git = "https://gitlab.freedesktop.org/dbus/zbus.git", rev = "9c551554e665532abc76469cdc73c1943bfb6285" }

[dev-dependencies]
indoc = "1.0"
Expand Down
1 change: 1 addition & 0 deletions src/bin/stackable-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async fn main() -> anyhow::Result<()> {
agent_config.log_directory.clone(),
agent_config.session,
agent_config.pod_cidr,
krustlet_config.max_pods,
)
.await
.expect("Error initializing provider.");
Expand Down
3 changes: 2 additions & 1 deletion src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ impl StackableProvider {
log_directory: PathBuf,
session: bool,
pod_cidr: String,
max_pods: u16,
) -> Result<Self, StackableError> {
let systemd_manager = Arc::new(SystemdManager::new(session).await?);
let systemd_manager = Arc::new(SystemdManager::new(session, max_pods).await?);

let provider_state = ProviderState {
handles: Default::default(),
Expand Down
10 changes: 7 additions & 3 deletions src/provider/systemdmanager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct SystemdManager {
impl SystemdManager {
/// Creates a new instance, takes a flag whether to run within the
/// user session or manage services system-wide.
pub async fn new(user_mode: bool) -> Result<Self, StackableError> {
pub async fn new(user_mode: bool, max_pods: u16) -> Result<Self, StackableError> {
// Connect to session or system bus depending on the value of [user_mode]
let connection = if user_mode {
Connection::new_session().await.map_err(|e| RuntimeError {
Expand All @@ -57,8 +57,12 @@ impl SystemdManager {
})?
};

let proxy =
AsyncManagerProxy::new(&connection).map_err(|e| RuntimeError { msg: e.to_string() })?;
// The maximum number of queued DBus messages must be higher
// than the number of containers which can be started and
// stopped simultaneously.
let connection = connection.set_max_queued(max_pods as usize * 2);

let proxy = AsyncManagerProxy::new(&connection);

// Depending on whether we are supposed to run in user space or system-wide
// we'll pick the default directory to initialize the systemd manager with
Expand Down
24 changes: 12 additions & 12 deletions src/provider/systemdmanager/systemd1_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ macro_rules! impl_type_for_enum {
}

/// Type of an entry in a changes list
#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[derive(Clone, Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[strum(serialize_all = "kebab-case")]
pub enum ChangeType {
Symlink,
Expand All @@ -91,7 +91,7 @@ impl_deserialize_for_enum!(ChangeType);
impl_type_for_enum!(ChangeType);

/// Entry of a changes list
#[derive(Debug, Type, Deserialize)]
#[derive(Clone, Debug, Type, Deserialize)]
pub struct Change {
pub change_type: ChangeType,
pub filename: String,
Expand All @@ -102,7 +102,7 @@ pub struct Change {
type Changes = Vec<Change>;

/// Mode in which a unit will be started
#[derive(Debug, Display, AsRefStr)]
#[derive(Clone, Debug, Display, AsRefStr)]
#[strum(serialize_all = "kebab-case")]
pub enum StartMode {
/// The unit and its dependencies will be started, possibly
Expand Down Expand Up @@ -134,7 +134,7 @@ impl_serialize_for_enum!(StartMode);
impl_type_for_enum!(StartMode);

/// Mode in which a unit will be stopped
#[derive(Debug, Display, AsRefStr)]
#[derive(Clone, Debug, Display, AsRefStr)]
#[strum(serialize_all = "kebab-case")]
pub enum StopMode {
/// The unit and its dependencies will be stopped, possibly
Expand Down Expand Up @@ -172,7 +172,7 @@ impl_type_for_enum!(StopMode);
/// ```
/// # use stackable_agent::provider::systemdmanager::systemd1_api::*;
/// let connection = zbus::Connection::new_system().unwrap();
/// let manager = ManagerProxy::new(&connection).unwrap();
/// let manager = ManagerProxy::new(&connection);
/// let unit = manager.load_unit("dbus.service").unwrap();
/// ```
///
Expand All @@ -182,7 +182,7 @@ impl_type_for_enum!(StopMode);
/// # use stackable_agent::provider::systemdmanager::systemd1_api::*;
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let connection = zbus::azync::Connection::new_system().await.unwrap();
/// let manager = AsyncManagerProxy::new(&connection).unwrap();
/// let manager = AsyncManagerProxy::new(&connection);
/// let unit = manager.load_unit("dbus.service").await.unwrap();
/// # });
/// ```
Expand Down Expand Up @@ -262,20 +262,20 @@ trait Manager {
///
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let connection = zbus::azync::Connection::new_system().await.unwrap();
/// let manager = AsyncManagerProxy::new(&connection).unwrap();
/// let manager = AsyncManagerProxy::new(&connection);
/// let signals = manager
/// .receive_signal(ManagerSignals::JobRemoved.into()).await.unwrap()
/// .map(|message| message.body::<JobRemovedSignal>().unwrap());
/// # });
/// ```
#[derive(Debug, Display, Eq, PartialEq, IntoStaticStr)]
#[derive(Clone, Debug, Display, Eq, PartialEq, IntoStaticStr)]
pub enum ManagerSignals {
/// Sent out each time a job is dequeued
JobRemoved,
}

/// Result in the `JobRemoved` signal.
#[derive(Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[derive(Clone, Debug, Display, EnumString, EnumVariantNames, Eq, PartialEq)]
#[strum(serialize_all = "kebab-case")]
pub enum JobRemovedResult {
/// Indicates successful execution of a job
Expand Down Expand Up @@ -305,7 +305,7 @@ impl_deserialize_for_enum!(JobRemovedResult);
impl_type_for_enum!(JobRemovedResult);

/// Message body of [`ManagerSignals::JobRemoved`]
#[derive(Debug, Deserialize, Type)]
#[derive(Clone, Debug, Deserialize, Type)]
pub struct JobRemovedSignal {
/// Numeric job ID
pub id: u32,
Expand All @@ -322,7 +322,7 @@ pub struct JobRemovedSignal {

/// ActiveState contains a state value that reflects whether the unit is
/// currently active or not.
#[derive(Debug, Display, EnumString, Eq, PartialEq)]
#[derive(Clone, Debug, Display, EnumString, Eq, PartialEq)]
#[strum(serialize_all = "kebab-case")]
pub enum ActiveState {
/// The unit is active.
Expand Down Expand Up @@ -358,7 +358,7 @@ impl TryFrom<OwnedValue> for ActiveState {
}

/// Unique ID for a runtime cycle of a unit
#[derive(Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct InvocationId(Vec<u8>);

impl TryFrom<OwnedValue> for InvocationId {
Expand Down

0 comments on commit 15b143b

Please sign in to comment.