Skip to content

Commit

Permalink
fix: pass errors in case of pod failure (#869)
Browse files Browse the repository at this point in the history
* fix: pass errors in case of pod failure

* testing poll immedieate change in API

* fix: all edge cases passed as error now

* fix: managed invalid client
  • Loading branch information
exu committed Jan 26, 2022
1 parent c9f0fad commit 2f81ad1
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 46 deletions.
36 changes: 19 additions & 17 deletions .github/workflows/test.yml
Expand Up @@ -2,34 +2,36 @@ name: Code build and checks

on:
push:
branches: [ main ]
branches: [main]
pull_request:
branches: [ main ]
branches: [main]

jobs:

build:
runs-on: ubuntu-latest
services:
mongo:
image: bitnami/mongodb
ports:
- 27017:27017

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
- name: Unit test
run: go test -v ./...

- name: Unit test
run: go test -v ./...
- name: Integration tests
run: go test --tags=integration -v ./...

# Don't work yet as expected https://github.com/nwestfall/openapi-action/issues/3
- name: OpenAPI Lint Checks
uses: nwestfall/openapi-action@v1.0.1
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
file: api/v1/testkube.yaml
# Don't work yet as expected https://github.com/nwestfall/openapi-action/issues/3
- name: OpenAPI Lint Checks
uses: nwestfall/openapi-action@v1.0.1
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
file: api/v1/testkube.yaml
8 changes: 6 additions & 2 deletions Makefile
Expand Up @@ -10,7 +10,7 @@ VERSION ?= 0.0.0-$(shell git log -1 --pretty=format:"%h")
LD_FLAGS += -X github.com/kubeshop/testkube/pkg/telemetry.telemetryToken=$(TELEMETRY_TOKEN)

run-api:
DEBUG=1 APISERVER_PORT=8088 go run -ldflags "-X github.com/kubeshop/testkube/internal/pkg/api.Version=$(VERSION) -X github.com/kubeshop/testkube/internal/pkg/api.Commit=$(COMMIT)" cmd/api-server/main.go
SCRAPPERENABLED=true STORAGE_SSL=true DEBUG=1 APISERVER_PORT=8088 go run -ldflags "-X github.com/kubeshop/testkube/internal/pkg/api.Version=$(VERSION) -X github.com/kubeshop/testkube/internal/pkg/api.Commit=$(COMMIT)" cmd/api-server/main.go

run-api-race-detector:
DEBUG=1 APISERVER_PORT=8088 go run -race -ldflags "-X github.com/kubeshop/testkube/internal/pkg/api.Version=$(VERSION) -X github.com/kubeshop/testkube/internal/pkg/api.Commit=$(COMMIT)" cmd/api-server/main.go
Expand Down Expand Up @@ -56,11 +56,15 @@ openapi-generate-model-testkube:


test:
go test ./... -cover
go test ./... -cover -v

test-e2e:
go test --tags=e2e -v ./test/e2e

test-integration:
go test --tags=integration -v ./...


test-e2e-namespace:
NAMESPACE=$(NAMESPACE) go test --tags=e2e -v ./test/e2e

Expand Down
8 changes: 7 additions & 1 deletion cmd/kubectl-testkube/commands/scripts/common.go
Expand Up @@ -49,7 +49,13 @@ func watchLogs(id string, client client.Client) {
for l := range logs {
switch l.Type_ {
case output.TypeError:
ui.Warn(l.Content)
ui.Errf(l.Content)
if l.Result != nil {
ui.Errf("Error: %s", l.Result.ErrorMessage)
ui.Debug("Output: %s", l.Result.Output)
}
os.Exit(1)
return
case output.TypeResult:
ui.Info("Execution completed", l.Result.Output)
default:
Expand Down
1 change: 0 additions & 1 deletion internal/app/api/v1/executions.go
Expand Up @@ -73,7 +73,6 @@ func (s TestKubeAPI) executeScript(ctx context.Context, options client.ExecuteOp
return execution.Errw("can't create new script execution, can't insert into storage: %w", err)
}

// call executor rest or job based and update execution object after queueing execution
s.Log.Infow("calling executor with options", "options", options.Request)
execution.Start()
err = s.ExecutionResults.StartExecution(ctx, execution.Id, execution.StartTime)
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/api/repository/result/mongo_test.go
@@ -1,3 +1,5 @@
//go:build integration

package result

import (
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/v1/client/factory.go
@@ -1,5 +1,7 @@
package client

import "fmt"

type ClientType string

const (
Expand All @@ -18,6 +20,8 @@ func GetClient(clientType ClientType, namespace string) (client Client, err erro
return client, err
}
client = NewProxyScriptsAPI(clientset, NewProxyConfig(namespace))
default:
err = fmt.Errorf("Client %s is not handled by testkube, use one of: %v", clientType, []ClientType{ClientDirect, ClientProxy})
}

return client, err
Expand Down
14 changes: 8 additions & 6 deletions pkg/executor/client/job.go
Expand Up @@ -66,24 +66,26 @@ func (c JobExecutor) Get(id string) (execution testkube.ExecutionResult, err err
return *exec.ExecutionResult, nil
}

// Logs returns job logs
// TODO too many goroutines - need to be simplified
// Logs returns job logs using kubernetes api
func (c JobExecutor) Logs(id string) (out chan output.Output, err error) {
out = make(chan output.Output)
logs := make(chan []byte)

if err := c.Client.TailJobLogs(id, logs); err != nil {
return out, err
}

go func() {
defer func() {
c.Log.Debug("closing JobExecutor.Logs out log")
close(out)
}()

if err := c.Client.TailJobLogs(id, logs); err != nil {
out <- output.NewOutputError(err)
return
}

for l := range logs {
entry, err := output.GetLogEntry(l)
if err != nil {
out <- output.NewOutputError(err)
return
}
out <- entry
Expand Down
95 changes: 77 additions & 18 deletions pkg/jobs/jobclient.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/kubeshop/testkube/pkg/secret"
"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,6 +36,9 @@ const (
GitTokenSecretName = "git-token"
// GitTokenEnvVarName is git token environment var name
GitTokenEnvVarName = "RUNNER_GITTOKEN"

pollTimeout = 24 * time.Hour
pollInterval = 200 * time.Millisecond
)

type JobClient struct {
Expand Down Expand Up @@ -89,6 +93,7 @@ func (c *JobClient) LaunchK8sJobSync(image string, repo result.Repository, execu
// get job pod and
for _, pod := range pods.Items {
if pod.Status.Phase != v1.PodRunning && pod.Labels["job-name"] == execution.Id {
l := c.Log.With("pod", pod.Name, "namespace", pod.Namespace)

// save stop time
defer func() {
Expand All @@ -97,29 +102,31 @@ func (c *JobClient) LaunchK8sJobSync(image string, repo result.Repository, execu
}()

// wait for complete
if err := wait.PollImmediate(time.Second, time.Duration(0)*time.Second, k8sclient.HasPodSucceeded(c.ClientSet, pod.Name, c.Namespace)); err != nil {
c.Log.Errorw("poll immediate error", "error", err)
l.Debugw("waiting for pod complete error", "error", err)

if err := wait.PollImmediate(pollInterval, pollTimeout, IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil {
l.Errorw("waiting for pod complete error", "error", err)
repo.UpdateResult(ctx, execution.Id, result.Err(err))
return result, err
}

var logs []byte
logs, err = c.GetPodLogs(pod.Name)
if err != nil {
c.Log.Errorw("get pod logs error", "error", err)
l.Errorw("get pod logs error", "error", err)
repo.UpdateResult(ctx, execution.Id, result.Err(err))
return
}

// parse job ouput log (JSON stream)
result, _, err := output.ParseRunnerOutput(logs)
if err != nil {
c.Log.Errorw("parse ouput error", "error", err)
l.Errorw("parse ouput error", "error", err)
repo.UpdateResult(ctx, execution.Id, result.Err(err))
return result, err
}

c.Log.Infow("execution completed saving result", "executionId", execution.Id, "status", result.Status)
l.Infow("execution completed saving result", "executionId", execution.Id, "status", result.Status)
repo.UpdateResult(ctx, execution.Id, result)
return result, nil
}
Expand Down Expand Up @@ -165,11 +172,11 @@ func (c *JobClient) LaunchK8sJob(image string, repo result.Repository, execution
execution.Stop()
repo.EndExecution(ctx, execution.Id, execution.EndTime, execution.CalculateDuration())
}()

// wait for complete
if err := wait.PollImmediate(time.Second, time.Duration(0)*time.Second, k8sclient.HasPodSucceeded(c.ClientSet, pod.Name, c.Namespace)); err != nil {
// continue on poll err and try to get logs later
c.Log.Errorw("poll immediate error", "error", err)
repo.UpdateResult(ctx, execution.Id, result.Err(err))
return
}

var logs []byte
Expand Down Expand Up @@ -214,27 +221,52 @@ func (c *JobClient) GetJobPods(podsClient pods.PodInterface, jobName string, ret

// TailJobLogs - locates logs for job pod(s)
func (c *JobClient) TailJobLogs(id string, logs chan []byte) (err error) {

podsClient := c.ClientSet.CoreV1().Pods(c.Namespace)
ctx := context.Background()
pods, err := c.GetJobPods(podsClient, id, 1, 10)

pods, err := c.GetJobPods(podsClient, id, 1, 10)
if err != nil {
close(logs)
return err
}

for _, pod := range pods.Items {
if pod.Labels["job-name"] == id {
if pod.Status.Phase != v1.PodRunning {
c.Log.Debugw("Waiting for pod to be ready", "pod", pod.Name)
if err = wait.PollImmediate(100*time.Millisecond, time.Duration(0)*time.Second, k8sclient.IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil {
c.Log.Errorw("poll immediate error when tailing logs", "error", err)

l := c.Log.With("namespace", pod.Namespace, "pod", pod.Name)

switch pod.Status.Phase {

case v1.PodRunning:
l.Debug("Tailing pod logs immediately")
return c.TailPodLogs(ctx, pod.Name, logs)

case v1.PodFailed:
err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)
l.Errorw(err.Error())
return err

default:
l.Debugw("Waiting for pod to be ready")
if err = wait.PollImmediate(pollInterval, pollTimeout, IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil {
l.Errorw("poll immediate error when tailing logs", "error", err)
log, err := c.GetPodLogError(ctx, pod.Name)
if err != nil {
return fmt.Errorf("GetPodLogs error: %w", err)
}

l.Debugw("poll immediete log", "log", string(log))
entry, err := output.GetLogEntry(log)
if err != nil {
return fmt.Errorf("GetLogEntry error: %w", err)
}
close(logs)
return err

return fmt.Errorf("last log entry: %s", entry.String())
}
c.Log.Debug("Tailing pod logs")
return c.TailPodLogs(ctx, pod.Name, logs)
} else if pod.Status.Phase == v1.PodRunning {

l.Debug("Tailing pod logs")
return c.TailPodLogs(ctx, pod.Name, logs)
}
}
Expand All @@ -243,8 +275,11 @@ func (c *JobClient) TailJobLogs(id string, logs chan []byte) (err error) {
return
}

func (c *JobClient) GetPodLogs(podName string) (logs []byte, err error) {
func (c *JobClient) GetPodLogs(podName string, logLinesCount ...int64) (logs []byte, err error) {
count := int64(100)
if len(logLinesCount) > 0 {
count = logLinesCount[0]
}

podLogOptions := v1.PodLogOptions{
Follow: false,
Expand All @@ -271,6 +306,11 @@ func (c *JobClient) GetPodLogs(podName string) (logs []byte, err error) {
return buf.Bytes(), nil
}

func (c *JobClient) GetPodLogError(ctx context.Context, podName string) (logsBytes []byte, err error) {
// error line should be last one
return c.GetPodLogs(podName, 1)
}

func (c *JobClient) TailPodLogs(ctx context.Context, podName string, logs chan []byte) (err error) {
count := int64(1)

Expand Down Expand Up @@ -386,7 +426,8 @@ func (c *JobClient) CreatePersistentVolumeClaim(name string) error {
// NewJobSpec is a method to create new job spec
func NewJobSpec(id, namespace, image, jsn, scriptName string, hasSecrets bool) *batchv1.Job {
var TTLSecondsAfterFinished int32 = 180
var backOffLimit int32 = 2
// TODO backOff need to be handled correctly by Logs and by Running job spec - currently we can get unexpected results
var backOffLimit int32 = 0

var secretEnvVars []v1.EnvVar
if hasSecrets {
Expand Down Expand Up @@ -472,3 +513,21 @@ var envVars = []v1.EnvVar{
Value: os.Getenv("SCRAPPERENABLED"),
},
}

// IsPodReady defines if pod is ready or failed for logs scrapping
func IsPodReady(c *kubernetes.Clientset, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}

switch pod.Status.Phase {
case corev1.PodSucceeded:
return true, nil
case corev1.PodFailed:
return true, fmt.Errorf("pod %s/%s failed", pod.Namespace, pod.Name)
}
return false, nil
}
}
1 change: 0 additions & 1 deletion test/e2e/e2e_test.go
@@ -1,5 +1,4 @@
//go:build e2e
// +build e2e

package main

Expand Down

0 comments on commit 2f81ad1

Please sign in to comment.