Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): On Pipeline delete, clean all associated Minio records #7542

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,25 @@ func (r *ResourceManager) DeletePipeline(pipelineId string) error {
return util.Wrap(err, "Delete pipeline failed")
}

// Delete pipeline file and DB entry.
versions, err := r.pipelineStore.ListPipelineVersionIds(pipelineId)
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to list all PipelineVersionIds for pipeline %v", pipelineId))
return nil
}

var fileNames []string
for _, v := range versions {
fileNames = append(fileNames, r.objectStore.GetPipelineKey(fmt.Sprint(v)))
}

// Not fail the request if this step failed. A background run will do the cleanup.
// https://github.com/kubeflow/pipelines/issues/388
// TODO(jingzhang36): For now (before exposing version API), we have only 1
// file with both pipeline and version pointing to it; so it is ok to do
// the deletion as follows. After exposing version API, we can have multiple
// versions and hence multiple files, and we shall improve performance by
// either using async deletion in order for this method to be non-blocking
// or or exploring other performance optimization tools provided by gcs.
err = r.objectStore.DeleteFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineId)))
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline file for pipeline %v", pipelineId))
errChan := r.objectStore.DeleteFiles(fileNames)
for err1 := range errChan {
glog.Errorf("%v", errors.Wrapf(err1, "Failed to delete pipeline file for pipeline %v.", pipelineId))
return nil
}
Copy link
Member Author

@difince difince Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I keep the current implementation - if Minios requests fail I print the errors from MInio down and return from the method without any error (even though the Pipeline and its Versions are not deleted. )

// Not fail the request if this step failed. A background run will do the cleanup.
// #388

I cannot find this background run that should do the cleanup.

Tell me, if you think this behavior needs to change


err = r.pipelineStore.DeletePipeline(pipelineId)
if err != nil {
glog.Errorf("%v", errors.Wrapf(err, "Failed to delete pipeline DB entry for pipeline %v", pipelineId))
Expand Down
51 changes: 51 additions & 0 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func (m *FakeBadObjectStore) DeleteFile(filePath string) error {
return errors.New("Not implemented.")
}

func (m *FakeBadObjectStore) DeleteFiles(filePath []string) chan error {
errChan := make(chan error, 1)
defer close(errChan)
errChan <- util.NewInternalServerError(errors.New("Error"), "Failed to delete %v", filePath)
return errChan
}

func (m *FakeBadObjectStore) GetFile(filePath string) ([]byte, error) {
return []byte(""), nil
}
Expand Down Expand Up @@ -501,6 +508,50 @@ func TestCreatePipeline(t *testing.T) {
}
}

func TestDeletePipeline(t *testing.T) {
var tests = []struct {
objectStore storage.ObjectStoreInterface
}{
{nil},
{&FakeBadObjectStore{}},
}

for _, test := range tests {
testDeletePipeline(t, test)
}
}

func testDeletePipeline(t *testing.T, test struct{ objectStore storage.ObjectStoreInterface }) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)
// Create a pipeline.
p, err := manager.CreatePipeline("pipeline", "", "", []byte("apiVersion: argoproj.io/v1alpha1\nkind: Workflow"))
assert.Nil(t, err)

if test.objectStore != nil {
//However if Object store request fails - Pipeline deletion continues.
manager.objectStore = test.objectStore
}
// Delete the Pipeline
err = manager.DeletePipeline(p.UUID)
assert.Nil(t, err)

// Verify the pipeline doesn't exist.
_, err = manager.GetPipeline(p.UUID)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
}

func TestDeletePipelineThatDoesNotExists(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)

err := manager.DeletePipeline(FakeUUIDOne)
assert.NotNil(t, err)
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
}

func TestGetPipelineByNameAndNamespace(t *testing.T) {
tt := []struct {
msg string
Expand Down
20 changes: 20 additions & 0 deletions backend/src/apiserver/storage/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"path"
"regexp"
"sync"

"github.com/ghodss/yaml"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand All @@ -32,6 +33,7 @@ const (
type ObjectStoreInterface interface {
AddFile(template []byte, filePath string) error
DeleteFile(filePath string) error
DeleteFiles(filePaths []string) chan error
GetFile(filePath string) ([]byte, error)
AddAsYamlFile(o interface{}, filePath string) error
GetFromYamlFile(o interface{}, filePath string) error
Expand Down Expand Up @@ -78,6 +80,24 @@ func (m *MinioObjectStore) DeleteFile(filePath string) error {
return nil
}

func (m *MinioObjectStore) DeleteFiles(filePaths []string) chan error {
errors := make(chan error, len(filePaths))
var wg sync.WaitGroup
for _, filePath := range filePaths {
wg.Add(1)
go func(filePath string) {
defer wg.Done()
err := m.DeleteFile(filePath)
if err != nil {
errors <- err
}
}(filePath)
}
wg.Wait()
close(errors)
return errors
}

func (m *MinioObjectStore) GetFile(filePath string) ([]byte, error) {
reader, err := m.minioClient.GetObject(m.bucketName, filePath, minio.GetObjectOptions{})
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions backend/src/apiserver/storage/object_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,34 @@ func TestDeleteFileError(t *testing.T) {
assert.Equal(t, codes.Internal, error.(*util.UserError).ExternalStatusCode())
}

func TestDeleteFiles(t *testing.T) {
var tests = []struct {
minioClient MinioClientInterface
equal bool
expected int
}{
{NewFakeMinioClient(), true, 0},
{&FakeBadMinioClient{}, false, 0},
}

for _, test := range tests {
manager := &MinioObjectStore{minioClient: test.minioClient, baseFolder: "pipeline"}
manager.AddFile([]byte("abc"), manager.GetPipelineKey("1"))
manager.AddFile([]byte("def"), manager.GetPipelineKey("2"))

var filePaths []string
filePaths = append(filePaths, manager.GetPipelineKey("1"))
filePaths = append(filePaths, manager.GetPipelineKey("2"))

errChan := manager.DeleteFiles(filePaths)
if test.equal {
assert.Equal(t, test.expected, len(errChan))
} else {
assert.NotEqual(t, test.expected, len(errChan))
}
}
}

func TestAddAsYamlFile(t *testing.T) {
minioClient := NewFakeMinioClient()
manager := &MinioObjectStore{minioClient: minioClient, baseFolder: "pipeline"}
Expand Down
43 changes: 42 additions & 1 deletion backend/src/apiserver/storage/pipeline_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package storage
import (
"database/sql"
"fmt"

sq "github.com/Masterminds/squirrel"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
Expand Down Expand Up @@ -71,6 +70,7 @@ type PipelineStoreInterface interface {
GetPipelineVersion(versionId string) (*model.PipelineVersion, error)
GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error)
ListPipelineVersions(pipelineId string, opts *list.Options) (versions []*model.PipelineVersion, totalSize int, nextPageToken string, err error)
ListPipelineVersionIds(pipelineId string) (versionIds []string, err error)
DeletePipelineVersion(pipelineVersionId string) error
// Change status of a particular version.
UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error
Expand Down Expand Up @@ -655,6 +655,47 @@ func (s *PipelineStore) scanPipelineVersionRows(rows *sql.Rows) ([]*model.Pipeli
return pipelineVersions, nil
}

func (s *PipelineStore) ListPipelineVersionIds(pipelineId string) (versionIds []string, err error) {
errorF := func(err error) ([]string, error) {
return nil, util.NewInternalServerError(err, "Failed to list pipeline version ids: %v", err)
}

sqlRequest, args, err := sq.
Select("UUID").
From("pipeline_versions").
Where(sq.And{sq.Eq{"PipelineId": pipelineId}, sq.Eq{"Status": model.PipelineVersionReady}}).
ToSql()
if err != nil {
return errorF(err)
}
r, err := s.db.Query(sqlRequest, args...)
defer r.Close()
if err != nil {
return errorF(err)
}

versionIds, err1 := scanIds(r, versionIds)
if err1 != nil {
return errorF(err1)
}
return versionIds, nil

}

func scanIds(r *sql.Rows, versionIds []string) ([]string, error) {
for r.Next() {
var uuid sql.NullString
if err := r.Scan(
&uuid,
); err != nil {
return nil, err
}
if uuid.Valid {
versionIds = append(versionIds, uuid.String)
}
}
return versionIds, nil
}
func (s *PipelineStore) ListPipelineVersions(pipelineId string, opts *list.Options) (versions []*model.PipelineVersion, totalSize int, nextPageToken string, err error) {
errorF := func(err error) ([]*model.PipelineVersion, int, string, error) {
return nil, 0, "", util.NewInternalServerError(err, "Failed to list pipeline versions: %v", err)
Expand Down
29 changes: 29 additions & 0 deletions backend/src/apiserver/storage/pipeline_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func createPipeline(name string) *model.Pipeline {
}}
}

func createPipelineVersion(name string, pipelineId string) *model.PipelineVersion {
return &model.PipelineVersion{
Name: name,
Parameters: `[{"Name": "param1"}]`,
PipelineId: pipelineId,
Description: "Description",
Status: model.PipelineVersionReady,
}
}

func TestListPipelines_FilterOutNotReady(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
Expand Down Expand Up @@ -378,6 +388,25 @@ func TestListPipelinesError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode())
}

func TestListPipelineVersionIds(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
pipelineStore := NewPipelineStore(db, util.NewFakeTimeForEpoch(), util.NewFakeUUIDGeneratorOrFatal(defaultFakePipelineId, nil))
pipeline, err := pipelineStore.CreatePipeline(createPipeline("pipeline1"))
assert.Nil(t, err)

pipelineStore.uuid = util.NewFakeUUIDGeneratorOrFatal(defaultFakePipelineIdTwo, nil)
pipelineVersion, err1 := pipelineStore.CreatePipelineVersion(createPipelineVersion("versionName1", pipeline.UUID), true)
assert.Nil(t, err1)

versionIds, err := pipelineStore.ListPipelineVersionIds(pipeline.UUID)
assert.Nil(t, err)

assert.Equal(t, 2, len(versionIds))
expectedVersionIds := []string{pipeline.UUID, pipelineVersion.UUID}
assert.ElementsMatchf(t, expectedVersionIds, versionIds, "PipelineVersionIds in both lists does not match")
}

func TestGetPipeline(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
Expand Down