Skip to content
This repository has been archived by the owner on Dec 21, 2021. It is now read-only.

Test race condition #25

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ serde = "1"
serde_json = "1"
serde_yaml = "0.8"
spectral = "0.6"
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio = { version = "1.6", features = ["macros", "rt-multi-thread"] }
uuid = { version = "0.8", features = ["v4"] }
100 changes: 99 additions & 1 deletion tests/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod test;
use std::time::Duration;
use futures::future::join_all;
use std::{fmt, time::Duration};
use test::prelude::*;

#[test]
Expand Down Expand Up @@ -66,3 +67,100 @@ fn restart_after_ungraceful_shutdown_should_succeed() {
client.verify_pod_condition(&pod, "Ready");
}
}

#[tokio::test(flavor = "multi_thread")]
async fn starting_and_stopping_100_pods_simultaneously_should_succeed() {
let mut client = KubeClient::new()
.await
.expect("Kubernetes client could not be created");
client.timeouts.create = Duration::from_secs(60);
client.timeouts.delete = Duration::from_secs(60);
client.timeouts.verify_pod_condition = Duration::from_secs(60);

setup_repository_async(&client)
.await
.expect("Repository could not be setup.");

const NUM_PODS: u32 = 100;

let max_pods = client
.list_labeled::<Node>("kubernetes.io/arch=stackable-linux")
.await
.expect("List of Stackable nodes could not be retrieved")
.iter()
.map(get_allocatable_pods)
.sum();

assert!(
NUM_PODS <= max_pods,
"The test case tries to create {} pods but only {} pods are allocatable on the nodes.",
NUM_PODS,
max_pods
);

let pod_spec = indoc! {"
apiVersion: v1
kind: Pod
metadata:
name: agent-service-integration-test-race-condition
spec:
containers:
- name: noop-service
image: noop-service:1.0.0
command:
- noop-service-1.0.0/start.sh
tolerations:
- key: kubernetes.io/arch
operator: Equal
value: stackable-linux
"};

let pod_specs = (0..NUM_PODS)
.map(|_| with_unique_name(pod_spec))
.collect::<Vec<_>>();

let (pods, creation_errors) =
partition_results(join_all(pod_specs.iter().map(|spec| client.create::<Pod>(spec))).await);
let pods_created = pods.len();

let (ready_successes, ready_errors) = partition_results(
join_all(
pods.iter()
.map(|pod| client.verify_pod_condition(pod, "Ready")),
)
.await,
);
let pods_ready = ready_successes.len();

let (deletion_successes, deletion_errors) =
partition_results(join_all(pods.into_iter().map(|pod| client.delete(pod))).await);
let pods_deleted = deletion_successes.len();

let mut errors = Vec::new();
errors.extend(creation_errors);
errors.extend(ready_errors);
errors.extend(deletion_errors);

if let Some(error) = errors.first() {
panic!(
"Pods: {created}/{total} created, {ready}/{created} ready, {deleted}/{created} deleted; Error: {error}",
total = NUM_PODS,
created = pods_created,
ready = pods_ready,
deleted = pods_deleted,
error = error
);
}
}

fn partition_results<T, E>(results: Vec<Result<T, E>>) -> (Vec<T>, Vec<E>)
where
E: fmt::Debug,
T: fmt::Debug,
{
let (successes, errors) = results.into_iter().partition::<Vec<_>, _>(Result::is_ok);
let unwrapped_successes = successes.into_iter().map(Result::unwrap).collect();
let unwrapped_errors = errors.into_iter().map(Result::unwrap_err).collect();

(unwrapped_successes, unwrapped_errors)
}
20 changes: 16 additions & 4 deletions tests/test/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,6 @@ impl KubeClient {
.any(|condition| condition.type_ == condition_type && condition.status == "True")
};

if is_condition_true(&pod) {
return Ok(());
}

let timeout_secs = self.timeouts.verify_pod_condition.as_secs() as u32;
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);

Expand All @@ -345,6 +341,12 @@ impl KubeClient {
.timeout(timeout_secs);
let mut stream = pods.watch(&lp, "0").await?.boxed();

let pod = pods.get_status(&pod.name()).await?;

if is_condition_true(&pod) {
return Ok(());
}

while let Some(status) = stream.try_next().await? {
if let WatchEvent::Modified(pod) = status {
if is_condition_true(&pod) {
Expand Down Expand Up @@ -442,3 +444,13 @@ pub fn get_node_taints(node: &Node) -> Vec<Taint> {
.and_then(|spec| spec.taints.clone())
.unwrap_or_else(Vec::new)
}

/// Returns the number of allocatable pods of the given node.
pub fn get_allocatable_pods(node: &Node) -> u32 {
node.status
.as_ref()
.and_then(|status| status.allocatable.as_ref())
.and_then(|allocatable| allocatable.get("pods"))
.and_then(|allocatable_pods| allocatable_pods.0.parse().ok())
.unwrap_or_default()
}
2 changes: 1 addition & 1 deletion tests/test/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use super::assertions::*;
pub use super::kube::*;
pub use super::repository::setup_repository;
pub use super::repository::*;
pub use super::temporary_resource::TemporaryResource;

pub use indoc::{formatdoc, indoc};
Expand Down
33 changes: 20 additions & 13 deletions tests/test/repository.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
use super::prelude::TestKubeClient;
use indoc::indoc;
use super::prelude::{KubeClient, TestKubeClient};
use anyhow::Result;
use kube_derive::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

const REPO_SPEC: &str = "
apiVersion: stable.stackable.de/v1
kind: Repository
metadata:
name: integration-test-repository
namespace: default
spec:
repo_type: StackableRepo
properties:
url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/
";

/// Specification of a Stackable repository
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
Expand All @@ -25,16 +37,11 @@ pub enum RepoType {

pub fn setup_repository(client: &TestKubeClient) {
client.apply_crd(&Repository::crd());
client.apply::<Repository>(REPO_SPEC);
}

client.apply::<Repository>(indoc! {"
apiVersion: stable.stackable.de/v1
kind: Repository
metadata:
name: integration-test-repository
namespace: default
spec:
repo_type: StackableRepo
properties:
url: https://raw.githubusercontent.com/stackabletech/integration-test-repo/main/
"});
pub async fn setup_repository_async(client: &KubeClient) -> Result<()> {
client.apply_crd(&Repository::crd()).await?;
client.apply::<Repository>(REPO_SPEC).await?;
Ok(())
}