Skip to content

Commit

Permalink
fix(backends): On Pipeline delete, clean all Minio records associated…
Browse files Browse the repository at this point in the history
… with the Pipeline and its Versions

Fixes: #7368

Signed-off-by: Diana Atanasova <dianaa@vmware.com>
  • Loading branch information
difince committed Apr 11, 2022
1 parent 2789d43 commit 79ca4a9
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 11 deletions.
25 changes: 15 additions & 10 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,25 @@ func (r *ResourceManager) DeletePipeline(pipelineId string) error {
return util.Wrap(err, "Delete pipeline failed")
}

// Delete pipeline file and DB entry.
versions, err := r.pipelineStore.ListPipelineVersionIds(pipelineId)
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to list all PipelineVersionIds for pipeline %v", pipelineId))
return nil
}

var fileNames []string
for _, v := range versions {
fileNames = append(fileNames, r.objectStore.GetPipelineKey(fmt.Sprint(v)))
}

// Not fail the request if this step failed. A background run will do the cleanup.
// https://github.com/kubeflow/pipelines/issues/388
// TODO(jingzhang36): For now (before exposing version API), we have only 1
// file with both pipeline and version pointing to it; so it is ok to do
// the deletion as follows. After exposing version API, we can have multiple
// versions and hence multiple files, and we shall improve performance by
// either using async deletion in order for this method to be non-blocking
// or or exploring other performance optimization tools provided by gcs.
err = r.objectStore.DeleteFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineId)))
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline file for pipeline %v", pipelineId))
errChan := r.objectStore.DeleteFiles(fileNames)
for err1 := range errChan {
glog.Errorf("%v", errors.Wrapf(err1, "Failed to delete pipeline file for pipeline %v.", pipelineId))
return nil
}

err = r.pipelineStore.DeletePipeline(pipelineId)
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline DB entry for pipeline %v", pipelineId))
Expand Down
51 changes: 51 additions & 0 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func (m *FakeBadObjectStore) DeleteFile(filePath string) error {
return errors.New("Not implemented.")
}

func (m *FakeBadObjectStore) DeleteFiles(filePath []string) chan error {
errChan := make(chan error, 1)
defer close(errChan)
errChan <- util.NewInternalServerError(errors.New("Error"), "Failed to delete %v", filePath)
return errChan
}

func (m *FakeBadObjectStore) GetFile(filePath string) ([]byte, error) {
return []byte(""), nil
}
Expand Down Expand Up @@ -501,6 +508,50 @@ func TestCreatePipeline(t *testing.T) {
}
}

func TestDeletePipeline(t *testing.T) {
var tests = []struct {
objectStore storage.ObjectStoreInterface
}{
{nil},
{&FakeBadObjectStore{}},
}

for _, test := range tests {
testDeletePipeline(t, test)
}
}

func testDeletePipeline(t *testing.T, test struct{ objectStore storage.ObjectStoreInterface }) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)
// Create a pipeline.
p, err := manager.CreatePipeline("pipeline", "", "", []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
assert.Nil(t, err)

if test.objectStore != nil {
//However if Object store request fails - Pipeline deletion continues.
manager.objectStore = test.objectStore
}
// Delete the Pipeline
err = manager.DeletePipeline(p.UUID)
assert.Nil(t, err)

// Verify the pipeline doesn't exist.
_, err = manager.GetPipeline(p.UUID)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
}

func TestDeletePipelineThatDoesNotExists(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)

err := manager.DeletePipeline(FakeUUIDOne)
assert.NotNil(t, err)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
}

func TestGetPipelineByNameAndNamespace(t *testing.T) {
tt := []struct {
msg string
Expand Down
20 changes: 20 additions & 0 deletions backend/src/apiserver/storage/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"path"
"regexp"
"sync"

"github.com/ghodss/yaml"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand All @@ -32,6 +33,7 @@ const (
type ObjectStoreInterface interface {
AddFile(template []byte, filePath string) error
DeleteFile(filePath string) error
DeleteFiles(filePaths []string) chan error
GetFile(filePath string) ([]byte, error)
AddAsYamlFile(o interface{}, filePath string) error
GetFromYamlFile(o interface{}, filePath string) error
Expand Down Expand Up @@ -78,6 +80,24 @@ func (m *MinioObjectStore) DeleteFile(filePath string) error {
return nil
}

func (m *MinioObjectStore) DeleteFiles(filePaths []string) chan error {
errors := make(chan error, len(filePaths))
var wg sync.WaitGroup
for _, filePath := range filePaths {
wg.Add(1)
go func(filePath string) {
defer wg.Done()
err := m.DeleteFile(filePath)
if err != nil {
errors <- err
}
}(filePath)
}
wg.Wait()
close(errors)
return errors
}

func (m *MinioObjectStore) GetFile(filePath string) ([]byte, error) {
reader, err := m.minioClient.GetObject(m.bucketName, filePath, minio.GetObjectOptions{})
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions backend/src/apiserver/storage/object_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type Foo struct{ ID int }

type FakeBadMinioClient struct {
minioClient map[string][]byte
}

func (c *FakeBadMinioClient) PutObject(bucketName, objectName string, reader io.Reader,
Expand Down Expand Up @@ -88,6 +89,34 @@ func TestDeleteFileError(t *testing.T) {
assert.Equal(t, codes.Internal, error.(*util.UserError).ExternalStatusCode())
}

func TestDeleteFiles(t *testing.T) {
var tests = []struct {
minioClient MinioClientInterface
equal bool
expected int
}{
{NewFakeMinioClient(), true, 0},
{&FakeBadMinioClient{}, false, 0},
}

for _, test := range tests {
manager := &MinioObjectStore{minioClient: test.minioClient, baseFolder: "pipeline"}
manager.AddFile([]byte("abc"), manager.GetPipelineKey("1"))
manager.AddFile([]byte("def"), manager.GetPipelineKey("2"))

var filePaths []string
filePaths = append(filePaths, manager.GetPipelineKey("1"))
filePaths = append(filePaths, manager.GetPipelineKey("2"))

errChan := manager.DeleteFiles(filePaths)
if test.equal {
assert.Equal(t, test.expected, len(errChan))
} else {
assert.NotEqual(t, test.expected, len(errChan))
}
}
}

func TestAddAsYamlFile(t *testing.T) {
minioClient := NewFakeMinioClient()
manager := &MinioObjectStore{minioClient: minioClient, baseFolder: "pipeline"}
Expand Down
43 changes: 42 additions & 1 deletion backend/src/apiserver/storage/pipeline_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package storage
import (
"database/sql"
"fmt"

sq "github.com/Masterminds/squirrel"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
Expand Down Expand Up @@ -71,6 +70,7 @@ type PipelineStoreInterface interface {
GetPipelineVersion(versionId string) (*model.PipelineVersion, error)
GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error)
ListPipelineVersions(pipelineId string, opts *list.Options) (versions []*model.PipelineVersion, totalSize int, nextPageToken string, err error)
ListPipelineVersionIds(pipelineId string) (versionIds []string, err error)
DeletePipelineVersion(pipelineVersionId string) error
// Change status of a particular version.
UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error
Expand Down Expand Up @@ -655,6 +655,47 @@ func (s *PipelineStore) scanPipelineVersionRows(rows *sql.Rows) ([]*model.Pipeli
return pipelineVersions, nil
}

func (s *PipelineStore) ListPipelineVersionIds(pipelineId string) (versionIds []string, err error) {
errorF := func(err error) ([]string, error) {
return nil, util.NewInternalServerError(err, "Failed to list pipeline version ids: %v", err)
}

sqlRequest, args, err := sq.
Select("UUID").
From("pipeline_versions").
Where(sq.And{sq.Eq{"PipelineId": pipelineId}, sq.Eq{"Status": model.PipelineVersionReady}}).
ToSql()
if err != nil {
return errorF(err)
}
r, err := s.db.Query(sqlRequest, args...)
defer r.Close()
if err != nil {
return errorF(err)
}

versionIds, err1 := scanIds(r, versionIds)
if err1 != nil {
return errorF(err1)
}
return versionIds, nil

}

func scanIds(r *sql.Rows, versionIds []string) ([]string, error) {
for r.Next() {
var uuid sql.NullString
if err := r.Scan(
&uuid,
); err != nil {
return nil, err
}
if uuid.Valid {
versionIds = append(versionIds, uuid.String)
}
}
return versionIds, nil
}
func (s *PipelineStore) ListPipelineVersions(pipelineId string, opts *list.Options) (versions []*model.PipelineVersion, totalSize int, nextPageToken string, err error) {
errorF := func(err error) ([]*model.PipelineVersion, int, string, error) {
return nil, 0, "", util.NewInternalServerError(err, "Failed to list pipeline versions: %v", err)
Expand Down
29 changes: 29 additions & 0 deletions backend/src/apiserver/storage/pipeline_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func createPipeline(name string) *model.Pipeline {
}}
}

func createPipelineVersion(name string, pipelineId string) *model.PipelineVersion {
return &model.PipelineVersion{
Name: name,
Parameters: `[{"Name": "param1"}]`,
PipelineId: pipelineId,
Description: "Description",
Status: model.PipelineVersionReady,
}
}

func TestListPipelines_FilterOutNotReady(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
Expand Down Expand Up @@ -378,6 +388,25 @@ func TestListPipelinesError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode())
}

func TestListPipelineVersionIds(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(defaultFakePipelineId, nil))
pipeline, err := pipelineStore.CreatePipeline(createPipeline("pipeline1"))
assert.Nil(t, err)

pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(defaultFakePipelineIdTwo, nil)
pipelineVersion, err1 := pipelineStore.CreatePipelineVersion(createPipelineVersion("versionName1", pipeline.UUID), true)
assert.Nil(t, err1)

versionIds, err := pipelineStore.ListPipelineVersionIds(pipeline.UUID)
assert.Nil(t, err)

assert.Equal(t, 2, len(versionIds))
expectedVersionIds := []string{pipeline.UUID, pipelineVersion.UUID}
assert.ElementsMatchf(t, expectedVersionIds, versionIds, "PipelineVersionIds in both lists does not match")
}

func TestGetPipeline(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
Expand Down

0 comments on commit 79ca4a9

Please sign in to comment.