forked from goharbor/harbor
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce system artifact manager cleanup job (goharbor#16879)
Signed-off-by: prahaladdarkin <prahaladd@vmware.com>
- Loading branch information
1 parent
e65f573
commit 89dd95f
Showing
10 changed files
with
582 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package systemartifact | ||
|
||
import ( | ||
"context" | ||
"github.com/goharbor/harbor/src/jobservice/logger" | ||
"github.com/goharbor/harbor/src/lib/log" | ||
"github.com/goharbor/harbor/src/pkg/scheduler" | ||
"github.com/goharbor/harbor/src/pkg/task" | ||
) | ||
|
||
const ( | ||
SystemArtifactCleanupCallback = "SYSTEM_ARTIFACT_CLEANUP" | ||
) | ||
|
||
var ( | ||
cleanupController = Ctl | ||
) | ||
|
||
func init() { | ||
if err := scheduler.RegisterCallbackFunc(SystemArtifactCleanupCallback, cleanupCallBack); err != nil { | ||
log.Fatalf("failed to register the callback for the system artifact cleanup schedule, error %v", err) | ||
} | ||
} | ||
|
||
func cleanupCallBack(ctx context.Context, param string) error { | ||
|
||
err := cleanupController.Start(ctx, true, task.ExecutionTriggerSchedule) | ||
if err != nil { | ||
logger.Errorf("System artifact cleanup job encountered errors: %v", err) | ||
} | ||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package systemartifact | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"github.com/goharbor/harbor/src/pkg/task" | ||
"github.com/goharbor/harbor/src/testing/controller/systemartifact" | ||
"github.com/goharbor/harbor/src/testing/mock" | ||
testifymock "github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/suite" | ||
"testing" | ||
) | ||
|
||
type CallbackTestSuite struct { | ||
suite.Suite | ||
cleanupController *systemartifact.Controller | ||
} | ||
|
||
func (suite *CallbackTestSuite) SetupSuite() { | ||
suite.cleanupController = &systemartifact.Controller{} | ||
cleanupController = suite.cleanupController | ||
} | ||
|
||
func (suite *CallbackTestSuite) TestCleanupCallbackSuccess() { | ||
{ | ||
ctx := context.TODO() | ||
suite.cleanupController.On("Start", mock.Anything, mock.Anything, mock.Anything).Return(nil) | ||
triggerScheduleMatcher := testifymock.MatchedBy(func(trigger string) bool { | ||
return trigger == task.ExecutionTriggerSchedule | ||
}) | ||
err := cleanupCallBack(ctx, "") | ||
suite.NoErrorf(err, "Unexpected error : %v", err) | ||
suite.cleanupController.AssertCalled(suite.T(), "Start", mock.Anything, true, triggerScheduleMatcher) | ||
} | ||
{ | ||
suite.cleanupController = nil | ||
suite.cleanupController = &systemartifact.Controller{} | ||
cleanupController = suite.cleanupController | ||
} | ||
|
||
{ | ||
ctx := context.TODO() | ||
suite.cleanupController.On("Start", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test error")) | ||
triggerScheduleMatcher := testifymock.MatchedBy(func(trigger string) bool { | ||
return trigger == task.ExecutionTriggerSchedule | ||
}) | ||
err := cleanupCallBack(ctx, "") | ||
suite.Error(err) | ||
suite.cleanupController.AssertCalled(suite.T(), "Start", mock.Anything, true, triggerScheduleMatcher) | ||
} | ||
|
||
} | ||
|
||
func TestCallbackTestSuite(t *testing.T) { | ||
suite.Run(t, &CallbackTestSuite{}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
package systemartifact | ||
|
||
import ( | ||
"context" | ||
"github.com/goharbor/harbor/src/jobservice/job" | ||
"github.com/goharbor/harbor/src/jobservice/logger" | ||
"github.com/goharbor/harbor/src/lib/log" | ||
"github.com/goharbor/harbor/src/lib/orm" | ||
"github.com/goharbor/harbor/src/lib/q" | ||
"github.com/goharbor/harbor/src/lib/retry" | ||
"github.com/goharbor/harbor/src/pkg/scheduler" | ||
"github.com/goharbor/harbor/src/pkg/systemartifact" | ||
"github.com/goharbor/harbor/src/pkg/task" | ||
"time" | ||
) | ||
|
||
const ( | ||
VendorTypeSystemArtifactCleanup = "SYSTEM_ARTIFACT_CLEANUP" | ||
cronTypeDaily = "Daily" | ||
cronSpec = "0 0 0 * * *" | ||
) | ||
|
||
var ( | ||
sched = scheduler.Sched | ||
) | ||
|
||
func init() { | ||
task.SetExecutionSweeperCount(VendorTypeSystemArtifactCleanup, 50) | ||
} | ||
|
||
var Ctl = NewController() | ||
|
||
type Controller interface { | ||
Start(ctx context.Context, async bool, trigger string) error | ||
} | ||
|
||
func NewController() Controller { | ||
return &controller{ | ||
execMgr: task.ExecMgr, | ||
taskMgr: task.Mgr, | ||
systemArtifactMgr: systemartifact.Mgr, | ||
makeCtx: orm.Context, | ||
} | ||
} | ||
|
||
type controller struct { | ||
execMgr task.ExecutionManager | ||
taskMgr task.Manager | ||
systemArtifactMgr systemartifact.Manager | ||
makeCtx func() context.Context | ||
} | ||
|
||
func (c *controller) Start(ctx context.Context, async bool, trigger string) error { | ||
execId, err := c.execMgr.Create(ctx, VendorTypeSystemArtifactCleanup, 0, trigger) | ||
if err != nil { | ||
return err | ||
} | ||
// cleanup job would always be scheduled in async mode in production | ||
// allowing for sync mode execution only for test mode purposes | ||
// if there are any trigger settings then pass them to the cleanup manager first | ||
jobParams := job.Parameters{} | ||
|
||
if !async { | ||
err := c.createCleanupTask(ctx, jobParams, execId) | ||
if err != nil { | ||
log.Errorf("failed to create system artifact clean-up task: %v", err) | ||
return err | ||
} | ||
|
||
logger.Info("Created job for scan data export successfully") | ||
return nil | ||
} | ||
go func(ctx context.Context) { | ||
err := retry.Retry(func() error { | ||
_, err := c.execMgr.Get(ctx, execId) | ||
return err | ||
}) | ||
if err != nil { | ||
log.Errorf("failed to get the execution %d for the export data cleanup job", execId) | ||
return | ||
} | ||
err = c.createCleanupTask(ctx, jobParams, execId) | ||
if err != nil { | ||
logger.Errorf("Encountered error in scan data artifact cleanup : %v", err) | ||
return | ||
} | ||
}(c.makeCtx()) | ||
|
||
return nil | ||
} | ||
|
||
func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parameters, execId int64) error { | ||
j := &task.Job{ | ||
Name: job.SystemArtifactCleanup, | ||
Metadata: &job.Metadata{ | ||
JobKind: job.KindGeneric, | ||
}, | ||
Parameters: jobParams, | ||
} | ||
|
||
_, err := c.taskMgr.Create(ctx, execId, j) | ||
|
||
if err != nil { | ||
logger.Errorf("Unable to create a scan data export job in clean-up mode : %v", err) | ||
c.markError(ctx, execId, err) | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func (c *controller) markError(ctx context.Context, executionID int64, err error) { | ||
|
||
// try to stop the execution first in case that some tasks are already created | ||
if err := c.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil { | ||
logger.Errorf("failed to stop the execution %d: %v", executionID, err) | ||
} | ||
if err := c.execMgr.MarkError(ctx, executionID, err.Error()); err != nil { | ||
logger.Errorf("failed to mark error for the execution %d: %v", executionID, err) | ||
} | ||
} | ||
|
||
// ScheduleCleanupTask schedules a system artifact cleanup task | ||
func ScheduleCleanupTask(ctx context.Context) { | ||
scheduleSystemArtifactCleanJob(ctx) | ||
} | ||
|
||
func scheduleSystemArtifactCleanJob(ctx context.Context) { | ||
schedule, err := getSystemArtifactCleanupSchedule(ctx) | ||
if err != nil { | ||
return | ||
} | ||
if schedule != nil { | ||
logger.Debugf(" Export data cleanup job already scheduled with ID : %v.", schedule.ID) | ||
return | ||
} | ||
scheduleId, err := sched.Schedule(ctx, VendorTypeSystemArtifactCleanup, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil) | ||
if err != nil { | ||
log.Errorf("Encountered error when scheduling scan data export cleanup job : %v", err) | ||
return | ||
} | ||
log.Infof("Scheduled scan data export cleanup job with ID : %v", scheduleId) | ||
} | ||
|
||
func getSystemArtifactCleanupSchedule(ctx context.Context) (*scheduler.Schedule, error) { | ||
query := q.New(map[string]interface{}{"vendor_type": VendorTypeSystemArtifactCleanup}) | ||
schedules, err := sched.ListSchedules(ctx, query) | ||
if err != nil { | ||
logger.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err) | ||
return nil, err | ||
} | ||
if len(schedules) > 0 { | ||
logger.Infof("Found export data cleanup job with schedule id : %v", schedules[0].ID) | ||
return schedules[0], nil | ||
} | ||
return nil, nil | ||
} |
Oops, something went wrong.