Skip to content

Commit

Permalink
feat(experiments): Add pod memory hog experiment (#31)
Browse files Browse the repository at this point in the history
Signed-off-by: Udit Gaurav <uditgaurav@gmail.com>
  • Loading branch information
uditgaurav committed Jul 1, 2020
1 parent 3803ac3 commit c8271b9
Show file tree
Hide file tree
Showing 7 changed files with 474 additions and 25 deletions.
5 changes: 2 additions & 3 deletions chaoslib/litmus/cpu_hog/cpu-hog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
"github.com/sirupsen/logrus"
core_v1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/klog"
)

// StressCPU Uses the REST API to exec into the target container of the target pod
Expand Down Expand Up @@ -78,7 +77,7 @@ func StressCPU(containerName, podName, namespace string, clients environment.Cli
return nil
}

//This function orchestrates the experiment by calling the StressCPU function for every core, of every container, of every pod that is targetted
//ExperimentCPU function orchestrates the experiment by calling the StressCPU function for every core, of every container, of every pod that is targetted
func ExperimentCPU(experimentsDetails *types.ExperimentDetails, clients environment.ClientSets, resultDetails *types.ResultDetails) error {

var endTime <-chan time.Time
Expand Down
250 changes: 250 additions & 0 deletions chaoslib/litmus/pod_memory_hog/pod-memory-hog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package pod_memory_hog

import (
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/litmuschaos/litmus-go/pkg/environment"
"github.com/litmuschaos/litmus-go/pkg/events"
"github.com/litmuschaos/litmus-go/pkg/log"
"github.com/litmuschaos/litmus-go/pkg/math"
"github.com/litmuschaos/litmus-go/pkg/result"
"github.com/litmuschaos/litmus-go/pkg/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
core_v1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/remotecommand"
)

// StressMemory Uses the REST API to exec into the target container of the target pod
// The function will be constantly increasing the Memory utilisation until it reaches the maximum available or allowed number.
// Using the TOTAL_CHAOS_DURATION we will need to specify for how long this experiment will last
func StressMemory(MemoryConsumption, containerName, podName, namespace string, clients environment.ClientSets) error {

log.Infof("The memory consumption is: %v", MemoryConsumption)

command := fmt.Sprintf("dd if=/dev/zero of=/dev/null bs=" + MemoryConsumption + "M")

req := clients.KubeClient.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
scheme := runtime.NewScheme()
if err := core_v1.AddToScheme(scheme); err != nil {
return fmt.Errorf("error adding to scheme: %v", err)
}

parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&core_v1.PodExecOptions{
Command: strings.Fields(command),
Container: containerName,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, parameterCodec)

exec, err := remotecommand.NewSPDYExecutor(clients.KubeConfig, "POST", req.URL())
if err != nil {
return fmt.Errorf("error while creating Executor: %v", err)
}

stdout := os.Stdout
stderr := os.Stderr

err = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
Tty: false,
})

if err != nil {
error_code := strings.Contains(err.Error(), "143")
if error_code != true {
log.Infof("[Chaos]:Memory stress error: %v", err.Error())
return err
}
}

return nil
}

//ExperimentMemory function orchestrates the experiment by calling the StressMemory function, of every container, of every pod that is targetted
func ExperimentMemory(experimentsDetails *types.ExperimentDetails, clients environment.ClientSets, resultDetails *types.ResultDetails) error {

var endTime <-chan time.Time
timeDelay := time.Duration(experimentsDetails.ChaosDuration) * time.Second

//Getting the list of all the target pod for deletion
realpods, err := PreparePodList(experimentsDetails, clients, resultDetails)
if err != nil {
return err
}

for _, pod := range realpods.Items {

for _, container := range pod.Status.ContainerStatuses {
if container.Ready != true {
return errors.Errorf("containers are not yet in running state")
}
log.InfoWithValues("The running status of container to stress is as follows", logrus.Fields{
"container": container.Name, "Pod": pod.Name, "Status": pod.Status.Phase})

log.Infof("[Chaos]:Stressing: %v Megabytes", strconv.Itoa(experimentsDetails.MemoryConsumption))

go StressMemory(strconv.Itoa(experimentsDetails.MemoryConsumption), container.Name, pod.Name, experimentsDetails.AppNS, clients)

log.Infof("[Chaos]:Waiting for: %vs", strconv.Itoa(experimentsDetails.ChaosDuration))

// signChan channel is used to transmit signal notifications.
signChan := make(chan os.Signal, 1)
// Catch and relay certain signal(s) to signChan channel.
signal.Notify(signChan, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
loop:
for {
endTime = time.After(timeDelay)
select {
case <-signChan:
log.Info("[Chaos]: Killing process started because of terminated signal received")
err = KillStressMemory(container.Name, pod.Name, experimentsDetails.AppNS, clients)
if err != nil {
klog.V(0).Infof("Error in Kill stress after")
return err
}
resultDetails.FailStep = "Memory hog Chaos injection stopped!"
resultDetails.Verdict = "Stopped"
result.ChaosResult(experimentsDetails, clients, resultDetails, "EOT")
os.Exit(1)
case <-endTime:
log.Infof("[Chaos]: Time is up for experiment: %v", experimentsDetails.ExperimentName)
endTime = nil
break loop
}
}
err = KillStressMemory(container.Name, pod.Name, experimentsDetails.AppNS, clients)
if err != nil {
error_code := strings.Contains(err.Error(), "143")
if error_code != true {
log.Infof("[Chaos]:Memory stress error: %v", err.Error())
return err
}
}
}
}

return nil
}

//PrepareMemoryStress contains the steps for prepration before chaos
func PrepareMemoryStress(experimentsDetails *types.ExperimentDetails, clients environment.ClientSets, resultDetails *types.ResultDetails, recorder *events.Recorder) error {

//Waiting for the ramp time before chaos injection
if experimentsDetails.RampTime != 0 {
log.Infof("[Ramp]: Waiting for the %vs ramp time before injecting chaos", strconv.Itoa(experimentsDetails.RampTime))
waitForRampTime(experimentsDetails)
}
//Starting the Memory stress experiment
err := ExperimentMemory(experimentsDetails, clients, resultDetails)
if err != nil {
return err
}
//Waiting for the ramp time after chaos injection
if experimentsDetails.RampTime != 0 {
log.Infof("[Ramp]: Waiting for the %vs ramp time after injecting chaos", strconv.Itoa(experimentsDetails.RampTime))
waitForRampTime(experimentsDetails)
}
return nil
}

//waitForRampTime waits for the given ramp time duration (in seconds)
func waitForRampTime(experimentsDetails *types.ExperimentDetails) {
time.Sleep(time.Duration(experimentsDetails.RampTime) * time.Second)
}

//PreparePodList will also adjust the number of the target pods depending on the specified percentage in PODS_AFFECTED_PERC variable
func PreparePodList(experimentsDetails *types.ExperimentDetails, clients environment.ClientSets, resultDetails *types.ResultDetails) (*core_v1.PodList, error) {

log.Infof("[Chaos]:Pods percentage to affect is %v", strconv.Itoa(experimentsDetails.PodsAffectedPerc))

//Getting the list of pods with the given labels and namespaces
pods, err := clients.KubeClient.CoreV1().Pods(experimentsDetails.AppNS).List(v1.ListOptions{LabelSelector: experimentsDetails.AppLabel})
if err != nil {
resultDetails.FailStep = "Getting the list of pods with the given labels and namespaces"
return nil, err
}

//If the default value has changed, means that we are aiming for a subset of the pods.
if experimentsDetails.PodsAffectedPerc != 100 {

new_podlist_length := math.Adjustment(experimentsDetails.PodsAffectedPerc, len(pods.Items))

pods.Items = pods.Items[:new_podlist_length]

log.Infof("[Chaos]:Number of pods targetted: %v", strconv.Itoa(new_podlist_length))

}
return pods, nil
}

// Function to kill the experiment. Triggered by either timeout of chaos duration or termination of the experiment
func KillStressMemory(containerName, podName, namespace string, clients environment.ClientSets) error {

command := []string{"/bin/sh", "-c", "kill $(find /proc -name exe -lname '*/dd' 2>&1 | grep -v 'Permission denied' | awk -F/ '{print $(NF-1)}' | head -n 1)"}

req := clients.KubeClient.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
scheme := runtime.NewScheme()
if err := core_v1.AddToScheme(scheme); err != nil {
return fmt.Errorf("error adding to scheme: %v", err)
}

parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&core_v1.PodExecOptions{
Command: command,
Container: containerName,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, parameterCodec)

exec, err := remotecommand.NewSPDYExecutor(clients.KubeConfig, "POST", req.URL())
if err != nil {
return fmt.Errorf("error while creating Executor: %v", err)
}

stdout := os.Stdout
stderr := os.Stderr

err = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
Tty: false,
})

//The kill command returns a 143 when it kills a process. This is expected
if err != nil {
error_code := strings.Contains(err.Error(), "143")
if error_code != true {
log.Infof("[Chaos]:Memory stress error: %v", err.Error())
return err
}
}

return nil
}
69 changes: 69 additions & 0 deletions experiments/generic/pod-memory-hog/pod-memory-hog-k8s-job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
apiVersion: batch/v1
kind: Job
metadata:
generateName: pod-memory-hog-
spec:
template:
metadata:
labels:
experiment: pod-memory-hog
spec:
# Placeholder that is updated by the executor for automated runs
# Provide appropriate SA (with desired permissions) if executed manually
serviceAccountName: %CHAOS_SERVICE_ACCOUNT%
restartPolicy: Never
containers:
- name: gotest
image: litmuschaos/go-runner:ci
imagePullPolicy: Always
env:
- name: APP_NAMESPACE
value: ''

- name: APP_LABEL
value: ''

- name: APP_KIND
value: ''

## Total duration to infuse chaos
- name: TOTAL_CHAOS_DURATION
value: '60'

- name: CHAOS_INTERVAL
value: '10'

# enter the amount of memory in megabytes to be consumed by the application pod
# default: 500 (Megabytes)
- name: MEMORY_CONSUMPTION
value: '500'

## Percentage of total pods to target
- name: PODS_AFFECTED_PERC
value: '100'

## Period to wait before injection of chaos in sec
- name: RAMP_TIME
value: ''

- name: CHAOS_NAMESPACE
value: ''

## env var that describes the library used to execute the chaos
## default: litmus. Supported values: litmus, powerfulseal, chaoskube
- name: LIB
value: ''

- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name

- name: CHAOS_SERVICE_ACCOUNT
valueFrom:
fieldRef:
fieldPath: spec.serviceAccountName

command: ["/bin/bash"]
args: ["-c", "./experiments/pod-memory-hog"]
Loading

0 comments on commit c8271b9

Please sign in to comment.