diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d7ed952a622..f1215235ff0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,12 +2,11 @@ name: Code build and checks on: push: - branches: [ main ] + branches: [main] pull_request: - branches: [ main ] + branches: [main] jobs: - build: runs-on: ubuntu-latest services: @@ -15,21 +14,24 @@ jobs: image: bitnami/mongodb ports: - 27017:27017 - + steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v2 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.17 - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.17 + - name: Unit test + run: go test -v ./... - - name: Unit test - run: go test -v ./... + - name: Integration tests + run: go test --tags=integration -v ./... - # Don't work yet as expected https://github.com/nwestfall/openapi-action/issues/3 - - name: OpenAPI Lint Checks - uses: nwestfall/openapi-action@v1.0.1 - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - file: api/v1/testkube.yaml \ No newline at end of file + # Don't work yet as expected https://github.com/nwestfall/openapi-action/issues/3 + - name: OpenAPI Lint Checks + uses: nwestfall/openapi-action@v1.0.1 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + file: api/v1/testkube.yaml diff --git a/Makefile b/Makefile index e77dc47d1e8..a0c72385ca9 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ VERSION ?= 0.0.0-$(shell git log -1 --pretty=format:"%h") LD_FLAGS += -X github.com/kubeshop/testkube/pkg/telemetry.telemetryToken=$(TELEMETRY_TOKEN) run-api: - DEBUG=1 APISERVER_PORT=8088 go run -ldflags "-X github.com/kubeshop/testkube/internal/pkg/api.Version=$(VERSION) -X github.com/kubeshop/testkube/internal/pkg/api.Commit=$(COMMIT)" cmd/api-server/main.go + SCRAPPERENABLED=true STORAGE_SSL=true DEBUG=1 APISERVER_PORT=8088 go run -ldflags "-X github.com/kubeshop/testkube/internal/pkg/api.Version=$(VERSION) -X github.com/kubeshop/testkube/internal/pkg/api.Commit=$(COMMIT)" cmd/api-server/main.go run-api-race-detector: DEBUG=1 APISERVER_PORT=8088 go run -race -ldflags "-X github.com/kubeshop/testkube/internal/pkg/api.Version=$(VERSION) -X github.com/kubeshop/testkube/internal/pkg/api.Commit=$(COMMIT)" cmd/api-server/main.go @@ -56,11 +56,15 @@ openapi-generate-model-testkube: test: - go test ./... -cover + go test ./... -cover -v test-e2e: go test --tags=e2e -v ./test/e2e +test-integration: + go test --tags=integration -v ./... + + test-e2e-namespace: NAMESPACE=$(NAMESPACE) go test --tags=e2e -v ./test/e2e diff --git a/cmd/kubectl-testkube/commands/scripts/common.go b/cmd/kubectl-testkube/commands/scripts/common.go index 51cf981ea04..ab882f1d3de 100644 --- a/cmd/kubectl-testkube/commands/scripts/common.go +++ b/cmd/kubectl-testkube/commands/scripts/common.go @@ -49,7 +49,13 @@ func watchLogs(id string, client client.Client) { for l := range logs { switch l.Type_ { case output.TypeError: - ui.Warn(l.Content) + ui.Errf(l.Content) + if l.Result != nil { + ui.Errf("Error: %s", l.Result.ErrorMessage) + ui.Debug("Output: %s", l.Result.Output) + } + os.Exit(1) + return case output.TypeResult: ui.Info("Execution completed", l.Result.Output) default: diff --git a/internal/app/api/v1/executions.go b/internal/app/api/v1/executions.go index d31bdd63d91..40512b17ed1 100644 --- a/internal/app/api/v1/executions.go +++ b/internal/app/api/v1/executions.go @@ -73,7 +73,6 @@ func (s TestKubeAPI) executeScript(ctx context.Context, options client.ExecuteOp return execution.Errw("can't create new script execution, can't insert into storage: %w", err) } - // call executor rest or job based and update execution object after queueing execution s.Log.Infow("calling executor with options", "options", options.Request) execution.Start() err = s.ExecutionResults.StartExecution(ctx, execution.Id, execution.StartTime) diff --git a/internal/pkg/api/repository/result/mongo_test.go b/internal/pkg/api/repository/result/mongo_test.go index 5308cab0fc4..e16c149cd4f 100644 --- a/internal/pkg/api/repository/result/mongo_test.go +++ b/internal/pkg/api/repository/result/mongo_test.go @@ -1,3 +1,5 @@ +//go:build integration + package result import ( diff --git a/pkg/api/v1/client/factory.go b/pkg/api/v1/client/factory.go index 89ee94c8a1f..8ffb98749a6 100644 --- a/pkg/api/v1/client/factory.go +++ b/pkg/api/v1/client/factory.go @@ -1,5 +1,7 @@ package client +import "fmt" + type ClientType string const ( @@ -18,6 +20,8 @@ func GetClient(clientType ClientType, namespace string) (client Client, err erro return client, err } client = NewProxyScriptsAPI(clientset, NewProxyConfig(namespace)) + default: + err = fmt.Errorf("Client %s is not handled by testkube, use one of: %v", clientType, []ClientType{ClientDirect, ClientProxy}) } return client, err diff --git a/pkg/executor/client/job.go b/pkg/executor/client/job.go index 55825853011..e14a4a89bd5 100644 --- a/pkg/executor/client/job.go +++ b/pkg/executor/client/job.go @@ -66,24 +66,26 @@ func (c JobExecutor) Get(id string) (execution testkube.ExecutionResult, err err return *exec.ExecutionResult, nil } -// Logs returns job logs -// TODO too many goroutines - need to be simplified +// Logs returns job logs using kubernetes api func (c JobExecutor) Logs(id string) (out chan output.Output, err error) { out = make(chan output.Output) logs := make(chan []byte) - if err := c.Client.TailJobLogs(id, logs); err != nil { - return out, err - } - go func() { defer func() { c.Log.Debug("closing JobExecutor.Logs out log") close(out) }() + + if err := c.Client.TailJobLogs(id, logs); err != nil { + out <- output.NewOutputError(err) + return + } + for l := range logs { entry, err := output.GetLogEntry(l) if err != nil { + out <- output.NewOutputError(err) return } out <- entry diff --git a/pkg/jobs/jobclient.go b/pkg/jobs/jobclient.go index 7ed401feaa3..1d47970579f 100644 --- a/pkg/jobs/jobclient.go +++ b/pkg/jobs/jobclient.go @@ -18,6 +18,7 @@ import ( "github.com/kubeshop/testkube/pkg/secret" "go.uber.org/zap" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,6 +36,9 @@ const ( GitTokenSecretName = "git-token" // GitTokenEnvVarName is git token environment var name GitTokenEnvVarName = "RUNNER_GITTOKEN" + + pollTimeout = 24 * time.Hour + pollInterval = 200 * time.Millisecond ) type JobClient struct { @@ -89,6 +93,7 @@ func (c *JobClient) LaunchK8sJobSync(image string, repo result.Repository, execu // get job pod and for _, pod := range pods.Items { if pod.Status.Phase != v1.PodRunning && pod.Labels["job-name"] == execution.Id { + l := c.Log.With("pod", pod.Name, "namespace", pod.Namespace) // save stop time defer func() { @@ -97,8 +102,10 @@ func (c *JobClient) LaunchK8sJobSync(image string, repo result.Repository, execu }() // wait for complete - if err := wait.PollImmediate(time.Second, time.Duration(0)*time.Second, k8sclient.HasPodSucceeded(c.ClientSet, pod.Name, c.Namespace)); err != nil { - c.Log.Errorw("poll immediate error", "error", err) + l.Debugw("waiting for pod complete error", "error", err) + + if err := wait.PollImmediate(pollInterval, pollTimeout, IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil { + l.Errorw("waiting for pod complete error", "error", err) repo.UpdateResult(ctx, execution.Id, result.Err(err)) return result, err } @@ -106,7 +113,7 @@ func (c *JobClient) LaunchK8sJobSync(image string, repo result.Repository, execu var logs []byte logs, err = c.GetPodLogs(pod.Name) if err != nil { - c.Log.Errorw("get pod logs error", "error", err) + l.Errorw("get pod logs error", "error", err) repo.UpdateResult(ctx, execution.Id, result.Err(err)) return } @@ -114,12 +121,12 @@ func (c *JobClient) LaunchK8sJobSync(image string, repo result.Repository, execu // parse job ouput log (JSON stream) result, _, err := output.ParseRunnerOutput(logs) if err != nil { - c.Log.Errorw("parse ouput error", "error", err) + l.Errorw("parse ouput error", "error", err) repo.UpdateResult(ctx, execution.Id, result.Err(err)) return result, err } - c.Log.Infow("execution completed saving result", "executionId", execution.Id, "status", result.Status) + l.Infow("execution completed saving result", "executionId", execution.Id, "status", result.Status) repo.UpdateResult(ctx, execution.Id, result) return result, nil } @@ -165,11 +172,11 @@ func (c *JobClient) LaunchK8sJob(image string, repo result.Repository, execution execution.Stop() repo.EndExecution(ctx, execution.Id, execution.EndTime, execution.CalculateDuration()) }() + // wait for complete if err := wait.PollImmediate(time.Second, time.Duration(0)*time.Second, k8sclient.HasPodSucceeded(c.ClientSet, pod.Name, c.Namespace)); err != nil { + // continue on poll err and try to get logs later c.Log.Errorw("poll immediate error", "error", err) - repo.UpdateResult(ctx, execution.Id, result.Err(err)) - return } var logs []byte @@ -214,10 +221,11 @@ func (c *JobClient) GetJobPods(podsClient pods.PodInterface, jobName string, ret // TailJobLogs - locates logs for job pod(s) func (c *JobClient) TailJobLogs(id string, logs chan []byte) (err error) { + podsClient := c.ClientSet.CoreV1().Pods(c.Namespace) ctx := context.Background() - pods, err := c.GetJobPods(podsClient, id, 1, 10) + pods, err := c.GetJobPods(podsClient, id, 1, 10) if err != nil { close(logs) return err @@ -225,16 +233,40 @@ func (c *JobClient) TailJobLogs(id string, logs chan []byte) (err error) { for _, pod := range pods.Items { if pod.Labels["job-name"] == id { - if pod.Status.Phase != v1.PodRunning { - c.Log.Debugw("Waiting for pod to be ready", "pod", pod.Name) - if err = wait.PollImmediate(100*time.Millisecond, time.Duration(0)*time.Second, k8sclient.IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil { - c.Log.Errorw("poll immediate error when tailing logs", "error", err) + + l := c.Log.With("namespace", pod.Namespace, "pod", pod.Name) + + switch pod.Status.Phase { + + case v1.PodRunning: + l.Debug("Tailing pod logs immediately") + return c.TailPodLogs(ctx, pod.Name, logs) + + case v1.PodFailed: + err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name) + l.Errorw(err.Error()) + return err + + default: + l.Debugw("Waiting for pod to be ready") + if err = wait.PollImmediate(pollInterval, pollTimeout, IsPodReady(c.ClientSet, pod.Name, c.Namespace)); err != nil { + l.Errorw("poll immediate error when tailing logs", "error", err) + log, err := c.GetPodLogError(ctx, pod.Name) + if err != nil { + return fmt.Errorf("GetPodLogs error: %w", err) + } + + l.Debugw("poll immediete log", "log", string(log)) + entry, err := output.GetLogEntry(log) + if err != nil { + return fmt.Errorf("GetLogEntry error: %w", err) + } close(logs) - return err + + return fmt.Errorf("last log entry: %s", entry.String()) } - c.Log.Debug("Tailing pod logs") - return c.TailPodLogs(ctx, pod.Name, logs) - } else if pod.Status.Phase == v1.PodRunning { + + l.Debug("Tailing pod logs") return c.TailPodLogs(ctx, pod.Name, logs) } } @@ -243,8 +275,11 @@ func (c *JobClient) TailJobLogs(id string, logs chan []byte) (err error) { return } -func (c *JobClient) GetPodLogs(podName string) (logs []byte, err error) { +func (c *JobClient) GetPodLogs(podName string, logLinesCount ...int64) (logs []byte, err error) { count := int64(100) + if len(logLinesCount) > 0 { + count = logLinesCount[0] + } podLogOptions := v1.PodLogOptions{ Follow: false, @@ -271,6 +306,11 @@ func (c *JobClient) GetPodLogs(podName string) (logs []byte, err error) { return buf.Bytes(), nil } +func (c *JobClient) GetPodLogError(ctx context.Context, podName string) (logsBytes []byte, err error) { + // error line should be last one + return c.GetPodLogs(podName, 1) +} + func (c *JobClient) TailPodLogs(ctx context.Context, podName string, logs chan []byte) (err error) { count := int64(1) @@ -386,7 +426,8 @@ func (c *JobClient) CreatePersistentVolumeClaim(name string) error { // NewJobSpec is a method to create new job spec func NewJobSpec(id, namespace, image, jsn, scriptName string, hasSecrets bool) *batchv1.Job { var TTLSecondsAfterFinished int32 = 180 - var backOffLimit int32 = 2 + // TODO backOff need to be handled correctly by Logs and by Running job spec - currently we can get unexpected results + var backOffLimit int32 = 0 var secretEnvVars []v1.EnvVar if hasSecrets { @@ -472,3 +513,21 @@ var envVars = []v1.EnvVar{ Value: os.Getenv("SCRAPPERENABLED"), }, } + +// IsPodReady defines if pod is ready or failed for logs scrapping +func IsPodReady(c *kubernetes.Clientset, podName, namespace string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + + switch pod.Status.Phase { + case corev1.PodSucceeded: + return true, nil + case corev1.PodFailed: + return true, fmt.Errorf("pod %s/%s failed", pod.Namespace, pod.Name) + } + return false, nil + } +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index e493a636439..6bf505a1593 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -1,5 +1,4 @@ //go:build e2e -// +build e2e package main