Skip to content

Commit

Permalink
refactor(kafka-broker-pod-failure): Refactor the kafka broker pod fai…
Browse files Browse the repository at this point in the history
…lure (#309)

Signed-off-by: shubhamchaudhary <shubham@chaosnative.com>
  • Loading branch information
ispeakc0de committed Mar 15, 2021
1 parent 524f1df commit c049b15
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 183 deletions.
97 changes: 38 additions & 59 deletions chaoslib/litmus/kafka-broker-pod-failure/lib/pod-delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,52 @@ import (
pod_delete "github.com/litmuschaos/litmus-go/chaoslib/litmus/pod-delete/lib"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/events"
experimentTypes "github.com/litmuschaos/litmus-go/pkg/generic/pod-delete/types"
"github.com/litmuschaos/litmus-go/pkg/kafka"
kafkaTypes "github.com/litmuschaos/litmus-go/pkg/kafka/types"
experimentTypes "github.com/litmuschaos/litmus-go/pkg/kafka/types"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/probe"
"github.com/litmuschaos/litmus-go/pkg/status"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var err error

//PreparePodDelete contains the prepration steps before chaos injection
func PreparePodDelete(kafkaDetails *kafkaTypes.ExperimentDetails, experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails) error {
func PreparePodDelete(experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, resultDetails *types.ResultDetails, eventsDetails *types.EventDetails, chaosDetails *types.ChaosDetails) error {

//Getting the iteration count for the pod deletion
pod_delete.GetIterations(experimentsDetails)
pod_delete.GetIterations(experimentsDetails.ChaoslibDetail)

//Waiting for the ramp time before chaos injection
if experimentsDetails.RampTime != 0 {
log.Infof("[Ramp]: Waiting for the %vs ramp time before injecting chaos", experimentsDetails.RampTime)
common.WaitForDuration(experimentsDetails.RampTime)
if experimentsDetails.ChaoslibDetail.RampTime != 0 {
log.Infof("[Ramp]: Waiting for the %vs ramp time before injecting chaos", experimentsDetails.ChaoslibDetail.RampTime)
common.WaitForDuration(experimentsDetails.ChaoslibDetail.RampTime)
}

if experimentsDetails.Sequence == "serial" {
if err = InjectChaosInSerialMode(kafkaDetails, experimentsDetails, clients, chaosDetails, eventsDetails, resultDetails); err != nil {
if experimentsDetails.ChaoslibDetail.Sequence == "serial" {
if err := InjectChaosInSerialMode(experimentsDetails, clients, chaosDetails, eventsDetails, resultDetails); err != nil {
return err
}
} else {
if err = InjectChaosInParallelMode(kafkaDetails, experimentsDetails, clients, chaosDetails, eventsDetails, resultDetails); err != nil {
if err := InjectChaosInParallelMode(experimentsDetails, clients, chaosDetails, eventsDetails, resultDetails); err != nil {
return err
}
}

//Waiting for the ramp time after chaos injection
if experimentsDetails.RampTime != 0 {
log.Infof("[Ramp]: Waiting for the %vs ramp time after injecting chaos", experimentsDetails.RampTime)
common.WaitForDuration(experimentsDetails.RampTime)
if experimentsDetails.ChaoslibDetail.RampTime != 0 {
log.Infof("[Ramp]: Waiting for the %vs ramp time after injecting chaos", experimentsDetails.ChaoslibDetail.RampTime)
common.WaitForDuration(experimentsDetails.ChaoslibDetail.RampTime)
}
return nil
}

// InjectChaosInSerialMode delete the kafka broker pods in serial mode(one by one)
func InjectChaosInSerialMode(kafkaDetails *kafkaTypes.ExperimentDetails, experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, chaosDetails *types.ChaosDetails, eventsDetails *types.EventDetails, resultDetails *types.ResultDetails) error {
func InjectChaosInSerialMode(experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, chaosDetails *types.ChaosDetails, eventsDetails *types.EventDetails, resultDetails *types.ResultDetails) error {

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 {
if err = probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
if err := probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
return err
}
}
Expand All @@ -65,24 +60,16 @@ func InjectChaosInSerialMode(kafkaDetails *kafkaTypes.ExperimentDetails, experim
//ChaosStartTimeStamp contains the start timestamp, when the chaos injection begin
ChaosStartTimeStamp := time.Now().Unix()

for count := 0; count < experimentsDetails.Iterations; count++ {

//When broker is not defined
if kafkaDetails.TargetPod == "" {
err = kafka.LaunchStreamDeriveLeader(kafkaDetails, clients)
if err != nil {
return errors.Errorf("fail to derive the leader, err: %v", err)
}
}
for count := 0; count < experimentsDetails.ChaoslibDetail.Iterations; count++ {

// Get the target pod details for the chaos execution
// if the target pod is not defined it will derive the random target pod list using pod affected percentage
targetPodList, err := common.GetPodList(experimentsDetails.TargetPods, experimentsDetails.PodsAffectedPerc, clients, chaosDetails)
targetPodList, err := common.GetPodList(experimentsDetails.KafkaBroker, experimentsDetails.ChaoslibDetail.PodsAffectedPerc, clients, chaosDetails)
if err != nil {
return err
}

if experimentsDetails.EngineName != "" {
if experimentsDetails.ChaoslibDetail.EngineName != "" {
msg := "Injecting " + experimentsDetails.ExperimentName + " chaos on application pod"
types.SetEngineEventAttributes(eventsDetails, types.ChaosInject, msg, "Normal", chaosDetails)
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")
Expand All @@ -94,24 +81,24 @@ func InjectChaosInSerialMode(kafkaDetails *kafkaTypes.ExperimentDetails, experim
log.InfoWithValues("[Info]: Killing the following pods", logrus.Fields{
"PodName": pod.Name})

if experimentsDetails.Force == true {
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.AppNS).Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &GracePeriod})
if experimentsDetails.ChaoslibDetail.Force == true {
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.ChaoslibDetail.AppNS).Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &GracePeriod})
} else {
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.AppNS).Delete(pod.Name, &v1.DeleteOptions{})
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.ChaoslibDetail.AppNS).Delete(pod.Name, &v1.DeleteOptions{})
}
if err != nil {
return err
}

//Waiting for the chaos interval after chaos injection
if experimentsDetails.ChaosInterval != 0 {
log.Infof("[Wait]: Wait for the chaos interval %vs", experimentsDetails.ChaosInterval)
common.WaitForDuration(experimentsDetails.ChaosInterval)
if experimentsDetails.ChaoslibDetail.ChaosInterval != 0 {
log.Infof("[Wait]: Wait for the chaos interval %vs", experimentsDetails.ChaoslibDetail.ChaosInterval)
common.WaitForDuration(experimentsDetails.ChaoslibDetail.ChaosInterval)
}

//Verify the status of pod after the chaos injection
log.Info("[Status]: Verification for the recreation of application pod")
if err = status.CheckApplicationStatus(experimentsDetails.AppNS, experimentsDetails.AppLabel, experimentsDetails.Timeout, experimentsDetails.Delay, clients); err != nil {
if err = status.CheckApplicationStatus(experimentsDetails.ChaoslibDetail.AppNS, experimentsDetails.ChaoslibDetail.AppLabel, experimentsDetails.ChaoslibDetail.Timeout, experimentsDetails.ChaoslibDetail.Delay, clients); err != nil {
return err
}

Expand All @@ -122,7 +109,7 @@ func InjectChaosInSerialMode(kafkaDetails *kafkaTypes.ExperimentDetails, experim
//It will helpful to track the total chaos duration
chaosDiffTimeStamp := ChaosCurrentTimeStamp - ChaosStartTimeStamp

if int(chaosDiffTimeStamp) >= experimentsDetails.ChaosDuration {
if int(chaosDiffTimeStamp) >= experimentsDetails.ChaoslibDetail.ChaosDuration {
log.Infof("[Chaos]: Time is up for experiment: %v", experimentsDetails.ExperimentName)
break
}
Expand All @@ -136,11 +123,11 @@ func InjectChaosInSerialMode(kafkaDetails *kafkaTypes.ExperimentDetails, experim
}

// InjectChaosInParallelMode delete the kafka broker pods in parallel mode (all at once)
func InjectChaosInParallelMode(kafkaDetails *kafkaTypes.ExperimentDetails, experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, chaosDetails *types.ChaosDetails, eventsDetails *types.EventDetails, resultDetails *types.ResultDetails) error {
func InjectChaosInParallelMode(experimentsDetails *experimentTypes.ExperimentDetails, clients clients.ClientSets, chaosDetails *types.ChaosDetails, eventsDetails *types.EventDetails, resultDetails *types.ResultDetails) error {

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 {
if err = probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
if err := probe.RunProbes(chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
return err
}
}
Expand All @@ -149,24 +136,16 @@ func InjectChaosInParallelMode(kafkaDetails *kafkaTypes.ExperimentDetails, exper
//ChaosStartTimeStamp contains the start timestamp, when the chaos injection begin
ChaosStartTimeStamp := time.Now().Unix()

for count := 0; count < experimentsDetails.Iterations; count++ {

//When broker is not defined
if kafkaDetails.TargetPod == "" {
err = kafka.LaunchStreamDeriveLeader(kafkaDetails, clients)
if err != nil {
return errors.Errorf("fail to derive the leader, err: %v", err)
}
}
for count := 0; count < experimentsDetails.ChaoslibDetail.Iterations; count++ {

// Get the target pod details for the chaos execution
// if the target pod is not defined it will derive the random target pod list using pod affected percentage
targetPodList, err := common.GetPodList(experimentsDetails.TargetPods, experimentsDetails.PodsAffectedPerc, clients, chaosDetails)
targetPodList, err := common.GetPodList(experimentsDetails.KafkaBroker, experimentsDetails.ChaoslibDetail.PodsAffectedPerc, clients, chaosDetails)
if err != nil {
return err
}

if experimentsDetails.EngineName != "" {
if experimentsDetails.ChaoslibDetail.EngineName != "" {
msg := "Injecting " + experimentsDetails.ExperimentName + " chaos on application pod"
types.SetEngineEventAttributes(eventsDetails, types.ChaosInject, msg, "Normal", chaosDetails)
events.GenerateEvents(eventsDetails, clients, chaosDetails, "ChaosEngine")
Expand All @@ -178,25 +157,25 @@ func InjectChaosInParallelMode(kafkaDetails *kafkaTypes.ExperimentDetails, exper
log.InfoWithValues("[Info]: Killing the following pods", logrus.Fields{
"PodName": pod.Name})

if experimentsDetails.Force == true {
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.AppNS).Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &GracePeriod})
if experimentsDetails.ChaoslibDetail.Force == true {
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.ChaoslibDetail.AppNS).Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &GracePeriod})
} else {
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.AppNS).Delete(pod.Name, &v1.DeleteOptions{})
err = clients.KubeClient.CoreV1().Pods(experimentsDetails.ChaoslibDetail.AppNS).Delete(pod.Name, &v1.DeleteOptions{})
}
if err != nil {
return err
}
}

//Waiting for the chaos interval after chaos injection
if experimentsDetails.ChaosInterval != 0 {
log.Infof("[Wait]: Wait for the chaos interval %vs", experimentsDetails.ChaosInterval)
common.WaitForDuration(experimentsDetails.ChaosInterval)
if experimentsDetails.ChaoslibDetail.ChaosInterval != 0 {
log.Infof("[Wait]: Wait for the chaos interval %vs", experimentsDetails.ChaoslibDetail.ChaosInterval)
common.WaitForDuration(experimentsDetails.ChaoslibDetail.ChaosInterval)
}

//Verify the status of pod after the chaos injection
log.Info("[Status]: Verification for the recreation of application pod")
if err = status.CheckApplicationStatus(experimentsDetails.AppNS, experimentsDetails.AppLabel, experimentsDetails.Timeout, experimentsDetails.Delay, clients); err != nil {
if err = status.CheckApplicationStatus(experimentsDetails.ChaoslibDetail.AppNS, experimentsDetails.ChaoslibDetail.AppLabel, experimentsDetails.ChaoslibDetail.Timeout, experimentsDetails.ChaoslibDetail.Delay, clients); err != nil {
return err
}

Expand All @@ -207,7 +186,7 @@ func InjectChaosInParallelMode(kafkaDetails *kafkaTypes.ExperimentDetails, exper
//It will helpful to track the total chaos duration
chaosDiffTimeStamp := ChaosCurrentTimeStamp - ChaosStartTimeStamp

if int(chaosDiffTimeStamp) >= experimentsDetails.ChaosDuration {
if int(chaosDiffTimeStamp) >= experimentsDetails.ChaoslibDetail.ChaosDuration {
log.Infof("[Chaos]: Time is up for experiment: %v", experimentsDetails.ExperimentName)
break
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package experiment

import (
kafka_broker_pod_failure "github.com/litmuschaos/litmus-go/chaoslib/litmus/kafka-broker-pod-failure/lib"
"strings"

kafkaPodDelete "github.com/litmuschaos/litmus-go/chaoslib/litmus/kafka-broker-pod-failure/lib"
clients "github.com/litmuschaos/litmus-go/pkg/clients"
"github.com/litmuschaos/litmus-go/pkg/events"
"github.com/litmuschaos/litmus-go/pkg/kafka"
Expand All @@ -15,7 +17,7 @@ import (
"github.com/sirupsen/logrus"
)

// KafkaBrokerPodFailure inject the kafka-broker-pod-failure chaos
// KafkaBrokerPodFailure derive and kill the kafka broker leader
func KafkaBrokerPodFailure(clients clients.ClientSets) {

var err error
Expand Down Expand Up @@ -71,8 +73,8 @@ func KafkaBrokerPodFailure(clients clients.ClientSets) {
log.Info("[Status]: Verify that the Kafka cluster is healthy(pre-chaos)")
err = kafka.ClusterHealthCheck(&experimentsDetails, clients)
if err != nil {
log.Errorf("Cluster status check failed, err: %v", err)
failStep := "Verify that the AUT (Application Under Test) is running (pre-chaos)"
log.Errorf("Cluster health check failed, err: %v", err)
failStep := "Verify that the Kafka cluster is healthy(pre-chaos)"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}
Expand Down Expand Up @@ -101,47 +103,28 @@ func KafkaBrokerPodFailure(clients clients.ClientSets) {
}

// PRE-CHAOS KAFKA APPLICATION LIVENESS CHECK
// when the kafka broker is provided
if experimentsDetails.KafkaBroker != "" {
if experimentsDetails.KafkaLivenessStream == "enabled" {
_, err := kafka.LivenessStream(&experimentsDetails, clients)
if err != nil {
log.Errorf("Liveness check failed, err: %v", err)
failStep := "Verify liveness check (pre-chaos)"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}
log.Info("The Liveness pod gets established")

} else if experimentsDetails.KafkaLivenessStream == "" || experimentsDetails.KafkaLivenessStream == "disabled" {
kafka.DisplayKafkaBroker(&experimentsDetails)
switch strings.ToLower(experimentsDetails.KafkaLivenessStream) {
case "enabled":
livenessTopicLeader, err := kafka.LivenessStream(&experimentsDetails, clients)
if err != nil {
log.Errorf("Liveness check failed, err: %v", err)
failStep := "Verify liveness check (pre-chaos)"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}
}
log.Info("The Liveness pod gets established")
log.Infof("[Info]: Kafka partition leader is %v", livenessTopicLeader)

// when the kafka broker is not provided
if experimentsDetails.KafkaBroker == "" {
if experimentsDetails.KafkaLivenessStream == "enabled" {
err = kafka.LaunchStreamDeriveLeader(&experimentsDetails, clients)
if err != nil {
log.Errorf("Error: %v", err)
failStep := "Launch the stream derive leader"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}
} else if experimentsDetails.KafkaLivenessStream == "" || experimentsDetails.KafkaLivenessStream == "disabled" {
_, err := kafka.SelectBroker(&experimentsDetails, "", clients)
if err != nil {
log.Errorf("Error: %v", err)
failStep := "Selecting broker when liveness is disabled"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}
if experimentsDetails.KafkaBroker == "" {
experimentsDetails.KafkaBroker = livenessTopicLeader
}
}

kafka.DisplayKafkaBroker(&experimentsDetails)

// Including the litmus lib for kafka-broker-pod-failure
if experimentsDetails.ChaoslibDetail.ChaosLib == "litmus" {
err = kafka_broker_pod_failure.PreparePodDelete(&experimentsDetails, experimentsDetails.ChaoslibDetail, clients, &resultDetails, &eventsDetails, &chaosDetails)
err = kafkaPodDelete.PreparePodDelete(&experimentsDetails, clients, &resultDetails, &eventsDetails, &chaosDetails)
if err != nil {
log.Errorf("Chaos injection failed, err: %v", err)
failStep := "Including the litmus lib for kafka-broker-pod-failure"
Expand All @@ -161,8 +144,8 @@ func KafkaBrokerPodFailure(clients clients.ClientSets) {
log.Info("[Status]: Verify that the Kafka cluster is healthy(post-chaos)")
err = kafka.ClusterHealthCheck(&experimentsDetails, clients)
if err != nil {
log.Errorf("Cluster status check failed, err: %v", err)
failStep := "Verify that the AUT (Application Under Test) is running (pre-chaos)"
log.Errorf("Cluster health check failed, err: %v", err)
failStep := "Verify that the Kafka cluster is healthy(post-chaos)"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}
Expand Down Expand Up @@ -191,22 +174,23 @@ func KafkaBrokerPodFailure(clients clients.ClientSets) {
}

// Liveness Status Check (post-chaos) and cleanup
if experimentsDetails.KafkaLivenessStream != "" {
err = status.CheckApplicationStatus(experimentsDetails.ChaoslibDetail.AppNS, "name=kafka-liveness-"+experimentsDetails.RunID, experimentsDetails.ChaoslibDetail.Timeout, experimentsDetails.ChaoslibDetail.Delay, clients)
if err != nil {
log.Errorf("Application liveness check failed, err: %v", err)
failStep := "Verify that the AUT (Application Under Test) is running (post-chaos)"
switch strings.ToLower(experimentsDetails.KafkaLivenessStream) {
case "enabled":
log.Info("[Status]: Verify that the Kafka liveness pod is running(post-chaos)")
if err = status.CheckApplicationStatus(experimentsDetails.ChaoslibDetail.AppNS, "name=kafka-liveness-"+experimentsDetails.RunID, experimentsDetails.ChaoslibDetail.Timeout, experimentsDetails.ChaoslibDetail.Delay, clients); err != nil {
log.Errorf("Application liveness status check failed, err: %v", err)
failStep := "Verify that the liveness pod is running (post-chaos)"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}

log.Info("[CleanUp]: Deleting the kafka liveness pod(post-chaos)")
if err := kafka.LivenessCleanup(&experimentsDetails, clients); err != nil {
log.Errorf("liveness cleanup failed, err: %v", err)
failStep := "Performing cleanup post chaos"
failStep := "Performing liveness pod cleanup (post-chaos)"
result.RecordAfterFailure(&chaosDetails, &resultDetails, failStep, clients, &eventsDetails)
return
}

}

//Updating the chaosResult in the end of experiment
Expand Down
16 changes: 0 additions & 16 deletions pkg/kafka/display-kafka-broker-info.go

This file was deleted.

Loading

0 comments on commit c049b15

Please sign in to comment.