From 1a4bdea06d5f6acbc5db850fdefb2326db6d78d0 Mon Sep 17 00:00:00 2001 From: Evan Hataishi Date: Tue, 12 Jan 2021 11:00:37 -1000 Subject: [PATCH] test: update persistence agent unit tests. Part of #388 (#403) * Update metrics reporter unit tests * Update persistence worker unit tests * Update workflow saver unit tests * Fix spelling typo --- .../persistence/worker/metrics_reporter.go | 2 +- .../worker/metrics_reporter_test.go | 412 +----------------- .../worker/persistence_worker_test.go | 12 +- .../persistence/worker/workflow_saver_test.go | 37 +- 4 files changed, 45 insertions(+), 418 deletions(-) diff --git a/backend/src/agent/persistence/worker/metrics_reporter.go b/backend/src/agent/persistence/worker/metrics_reporter.go index eff74016f96..9ff2a8b1538 100644 --- a/backend/src/agent/persistence/worker/metrics_reporter.go +++ b/backend/src/agent/persistence/worker/metrics_reporter.go @@ -93,7 +93,7 @@ func (r MetricsReporter) collectNodeMetricsOrNil( []*api.RunMetric, error) { defer func() { if panicMessage := recover(); panicMessage != nil { - log.Info("nodeStatus is not yet created. Panic message: '%v'.", panicMessage) + log.Infof("nodeStatus is not yet created. Panic message: '%v'.", panicMessage) } }() if nodeStatus.Status == nil || diff --git a/backend/src/agent/persistence/worker/metrics_reporter_test.go b/backend/src/agent/persistence/worker/metrics_reporter_test.go index ff7b812ba43..ec2dc80c151 100644 --- a/backend/src/agent/persistence/worker/metrics_reporter_test.go +++ b/backend/src/agent/persistence/worker/metrics_reporter_test.go @@ -17,11 +17,10 @@ package worker import ( "testing" - workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" - api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/agent/persistence/client" "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/stretchr/testify/assert" + workflowapi "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -31,20 +30,13 @@ func TestReportMetrics_NoCompletedNode_NoOP(t *testing.T) { reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", UID: types.UID("run-1"), }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeRunning, - }, - }, - }, + Status: workflowapi.PipelineRunStatus{}, }) err := reporter.ReportMetrics(workflow) assert.Nil(t, err) @@ -56,20 +48,13 @@ func TestReportMetrics_NoRunID_NoOP(t *testing.T) { reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", UID: types.UID("run-1"), }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - }, - }, - }, + Status: workflowapi.PipelineRunStatus{}, }) err := reporter.ReportMetrics(workflow) assert.Nil(t, err) @@ -82,21 +67,14 @@ func TestReportMetrics_NoArtifact_NoOP(t *testing.T) { reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", UID: types.UID("run-1"), Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - }, - }, - }, + Status: workflowapi.PipelineRunStatus{}, }) err := reporter.ReportMetrics(workflow) assert.Nil(t, err) @@ -109,24 +87,14 @@ func TestReportMetrics_NoMetricsArtifact_NoOP(t *testing.T) { reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", UID: types.UID("run-1"), Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-ui-metadata"}}, - }, - }, - }, - }, + Status: workflowapi.PipelineRunStatus{}, }) err := reporter.ReportMetrics(workflow) assert.Nil(t, err) @@ -134,364 +102,16 @@ func TestReportMetrics_NoMetricsArtifact_NoOP(t *testing.T) { assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) } -func TestReportMetrics_Succeed(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - metricsJSON := `{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "logloss", "numberValue": 1.2}]}` - artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON}) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(artifactData), - }) - pipelineFake.StubReportRunMetrics(&api.ReportRunMetricsResponse{ - Results: []*api.ReportRunMetricsResponse_ReportRunMetricResult{}, - }, nil) - - err := reporter.ReportMetrics(workflow) - - assert.Nil(t, err) - expectedMetricsRequest := &api.ReportRunMetricsRequest{ - RunId: "run-1", - Metrics: []*api.RunMetric{ - { - Name: "accuracy", - NodeId: "node-1", - Value: &api.RunMetric_NumberValue{NumberValue: 0.77}, - }, - { - Name: "logloss", - NodeId: "node-1", - Value: &api.RunMetric_NumberValue{NumberValue: 1.2}, - }, - }, - } - assert.Equal(t, expectedMetricsRequest, pipelineFake.GetReportedMetricsRequest()) -} - -func TestReportMetrics_EmptyArchive_Fail(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - artifactData, _ := util.ArchiveTgz(map[string]string{}) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(artifactData), - }) - - err := reporter.ReportMetrics(workflow) - - assert.NotNil(t, err) - assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) - // Verify that ReportRunMetrics is not called. - assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) -} - -func TestReportMetrics_MultipleFilesInArchive_Fail(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - validMetricsJSON := `{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "logloss", "numberValue": 1.2}]}` - invalidMetricsJSON := `invalid JSON` - artifactData, _ := util.ArchiveTgz(map[string]string{"file1": validMetricsJSON, "file2": invalidMetricsJSON}) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(artifactData), - }) - - err := reporter.ReportMetrics(workflow) - - assert.NotNil(t, err) - assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) - // Verify that ReportRunMetrics is not called. - assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) -} - -func TestReportMetrics_InvalidMetricsJSON_Fail(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - metricsJSON := `invalid JSON` - artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON}) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(artifactData), - }) - - err := reporter.ReportMetrics(workflow) - - assert.NotNil(t, err) - assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) - // Verify that ReportRunMetrics is not called. - assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) -} - -func TestReportMetrics_InvalidMetricsJSON_PartialFail(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - "node-2": workflowapi.NodeStatus{ - ID: "node-2", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - validMetricsJSON := `{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "logloss", "numberValue": 1.2}]}` - invalidMetricsJSON := `invalid JSON` - validArtifactData, _ := util.ArchiveTgz(map[string]string{"file": validMetricsJSON}) - invalidArtifactData, _ := util.ArchiveTgz(map[string]string{"file": invalidMetricsJSON}) - // Stub two artifacts, node-1 is invalid, node-2 is valid. - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(invalidArtifactData), - }) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-2", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(validArtifactData), - }) +// Removed TestReportMetrics_Succeed - specific to argo artifact spec - err := reporter.ReportMetrics(workflow) +// Removed TestReportMetrics_EmptyArchive_Fail - specific to argo artifact spec - // Partial failure is reported while valid metrics are reported. - assert.NotNil(t, err) - assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) - expectedMetricsRequest := &api.ReportRunMetricsRequest{ - RunId: "run-1", - Metrics: []*api.RunMetric{ - &api.RunMetric{ - Name: "accuracy", - NodeId: "node-2", - Value: &api.RunMetric_NumberValue{NumberValue: 0.77}, - }, - &api.RunMetric{ - Name: "logloss", - NodeId: "node-2", - Value: &api.RunMetric_NumberValue{NumberValue: 1.2}, - }, - }, - } - assert.Equal(t, expectedMetricsRequest, pipelineFake.GetReportedMetricsRequest()) -} +// Removed TestReportMetrics_MultipleFilesInArchive_Fail - specific to argo artifact spec -func TestReportMetrics_CorruptedArchiveFile_Fail(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte("invalid tgz content"), - }) +// Removed TestReportMetrics_InvalidMetricsJSON_Fail - specific to argo artifact spec - err := reporter.ReportMetrics(workflow) +// Removed TestReportMetrics_InvalidMetricsJSON_PartialFail - specific to argo artifact spec - assert.NotNil(t, err) - assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_PERMANENT)) - // Verify that ReportRunMetrics is not called. - assert.Nil(t, pipelineFake.GetReportedMetricsRequest()) -} +// Removed TestReportMetrics_CorruptedArchiveFile_Fail - specific to argo artifact spec -func TestReportMetrics_MultiplMetricErrors_TransientErrowWin(t *testing.T) { - pipelineFake := client.NewPipelineClientFake() - reporter := NewMetricsReporter(pipelineFake) - workflow := util.NewWorkflow(&workflowapi.Workflow{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "MY_NAMESPACE", - Name: "MY_NAME", - UID: types.UID("run-1"), - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - }, - Status: workflowapi.WorkflowStatus{ - Nodes: map[string]workflowapi.NodeStatus{ - "node-1": workflowapi.NodeStatus{ - ID: "node-1", - Phase: workflowapi.NodeSucceeded, - Outputs: &workflowapi.Outputs{ - Artifacts: []workflowapi.Artifact{{Name: "mlpipeline-metrics"}}, - }, - }, - }, - }, - }) - metricsJSON := - `{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "log loss", "numberValue": 1.2}, {"name": "accuracy", "numberValue": 1.2}]}` - artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON}) - pipelineFake.StubArtifact( - &api.ReadArtifactRequest{ - RunId: "run-1", - NodeId: "node-1", - ArtifactName: "mlpipeline-metrics", - }, - &api.ReadArtifactResponse{ - Data: []byte(artifactData), - }) - pipelineFake.StubReportRunMetrics(&api.ReportRunMetricsResponse{ - Results: []*api.ReportRunMetricsResponse_ReportRunMetricResult{ - &api.ReportRunMetricsResponse_ReportRunMetricResult{ - MetricNodeId: "node-1", - MetricName: "accuracy", - Status: api.ReportRunMetricsResponse_ReportRunMetricResult_OK, - }, - // Invalid argument error triggers permanent error - &api.ReportRunMetricsResponse_ReportRunMetricResult{ - MetricNodeId: "node-1", - MetricName: "log loss", - Status: api.ReportRunMetricsResponse_ReportRunMetricResult_INVALID_ARGUMENT, - }, - // Internal error triggers transient error - &api.ReportRunMetricsResponse_ReportRunMetricResult{ - MetricNodeId: "node-1", - MetricName: "accuracy", - Status: api.ReportRunMetricsResponse_ReportRunMetricResult_INTERNAL_ERROR, - }, - }, - }, nil) - - err := reporter.ReportMetrics(workflow) - - assert.NotNil(t, err) - assert.True(t, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) -} +// Removed TestReportMetrics_MultiplMetricErrors_TransientErrowWin - specific to argo artifact spec diff --git a/backend/src/agent/persistence/worker/persistence_worker_test.go b/backend/src/agent/persistence/worker/persistence_worker_test.go index 75141f00d82..3113a591cc7 100644 --- a/backend/src/agent/persistence/worker/persistence_worker_test.go +++ b/backend/src/agent/persistence/worker/persistence_worker_test.go @@ -18,10 +18,10 @@ import ( "fmt" "testing" - workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" client "github.com/kubeflow/pipelines/backend/src/agent/persistence/client" "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/stretchr/testify/assert" + workflowapi "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/cache" @@ -41,7 +41,7 @@ func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) { func TestPersistenceWorker_Success(t *testing.T) { // Set up workflow client - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -73,7 +73,7 @@ func TestPersistenceWorker_Success(t *testing.T) { func TestPersistenceWorker_NotFoundError(t *testing.T) { // Set up workflow client - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -103,7 +103,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) { func TestPersistenceWorker_GetWorklowError(t *testing.T) { // Set up workflow client - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -134,7 +134,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) { func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) { // Set up workflow client - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -168,7 +168,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) { func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) { // Set up workflow client - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", diff --git a/backend/src/agent/persistence/worker/workflow_saver_test.go b/backend/src/agent/persistence/worker/workflow_saver_test.go index 22926fbc3d6..8cd76eb349b 100644 --- a/backend/src/agent/persistence/worker/workflow_saver_test.go +++ b/backend/src/agent/persistence/worker/workflow_saver_test.go @@ -19,10 +19,10 @@ import ( "testing" "time" - workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/backend/src/agent/persistence/client" "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/stretchr/testify/assert" + workflowapi "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) @@ -31,7 +31,7 @@ func TestWorkflow_Save_Success(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -84,7 +84,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) { pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, "My Permanent Error")) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -110,7 +110,7 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_TRANSIENT, "My Transient Error")) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", @@ -137,16 +137,19 @@ func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, "My Permanent Error")) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + currentTime := metav1.Now() + + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"}, }, - Status: workflowapi.WorkflowStatus{ - FinishedAt: metav1.Now(), - }, - }) + Status: workflowapi.PipelineRunStatus{ + PipelineRunStatusFields: workflowapi.PipelineRunStatusFields{ + CompletionTime: ¤tTime, + }, + }}) workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) @@ -166,17 +169,21 @@ func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) { pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, "My Permanent Error")) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + currentTime := metav1.Now() + + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME", - Labels: map[string]string{ - util.LabelKeyWorkflowRunId: "MY_UUID", + Labels: map[string]string{ + util.LabelKeyWorkflowRunId: "MY_UUID", util.LabelKeyWorkflowPersistedFinalState: "true", }, }, - Status: workflowapi.WorkflowStatus{ - FinishedAt: metav1.Now(), + Status: workflowapi.PipelineRunStatus{ + PipelineRunStatusFields: workflowapi.PipelineRunStatusFields{ + CompletionTime: ¤tTime, + }, }, }) @@ -202,7 +209,7 @@ func TestWorkflow_Save_SkippedDDueToMissingRunID(t *testing.T) { pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, "My Permanent Error")) - workflow := util.NewWorkflow(&workflowapi.Workflow{ + workflow := util.NewWorkflow(&workflowapi.PipelineRun{ ObjectMeta: metav1.ObjectMeta{ Namespace: "MY_NAMESPACE", Name: "MY_NAME",