diff --git a/src/pkg/retention/job.go b/src/pkg/retention/job.go index f0d88a54af6..db69a8d4589 100644 --- a/src/pkg/retention/job.go +++ b/src/pkg/retention/job.go @@ -32,11 +32,11 @@ func (pj *Job) ShouldRetry() bool { } // Validate the parameters -func (pj *Job) Validate(params Parameters) error { +func (pj *Job) Validate(params job.Parameters) error { return nil } // Run the job -func (pj *Job) Run(ctx job.Context, params Parameters) error { +func (pj *Job) Run(ctx job.Context, params job.Parameters) error { return nil } diff --git a/src/pkg/retention/models.go b/src/pkg/retention/models.go index c8bcb0a5ab6..958685782ff 100644 --- a/src/pkg/retention/models.go +++ b/src/pkg/retention/models.go @@ -16,7 +16,7 @@ package retention import "time" -// Retention execution +// Execution of retention type Execution struct { ID string `json:"id"` PolicyID string `json:"policy_id"` @@ -25,7 +25,7 @@ type Execution struct { Status string `json:"status"` } -// Retention history +// History of retention type History struct { ExecutionID string `json:"execution_id"` Rule struct { diff --git a/src/pkg/retention/periodic_job.go b/src/pkg/retention/periodic_job.go index f24f5e0d931..bad14af9b2f 100644 --- a/src/pkg/retention/periodic_job.go +++ b/src/pkg/retention/periodic_job.go @@ -34,11 +34,11 @@ func (pj *PeriodicJob) ShouldRetry() bool { } // Validate the parameters -func (pj *PeriodicJob) Validate(params Parameters) error { +func (pj *PeriodicJob) Validate(params job.Parameters) error { return nil } // Run the job -func (pj *PeriodicJob) Run(ctx job.Context, params Parameters) error { +func (pj *PeriodicJob) Run(ctx job.Context, params job.Parameters) error { return ctx.Checkin(fmt.Sprintf("pong=%d", time.Now().Unix())) } diff --git a/src/pkg/retention/policy/action/performer.go b/src/pkg/retention/policy/action/performer.go index c2ef880719e..da23e14f253 100644 --- a/src/pkg/retention/policy/action/performer.go +++ b/src/pkg/retention/policy/action/performer.go @@ -44,7 +44,16 @@ type retainAction struct { // Perform the action func (ra *retainAction) Perform(candidates []*res.Candidate) ([]*res.Result, error) { - return nil, nil + // TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION + results := make([]*res.Result, 0) + + for _, c := range candidates { + results = append(results, &res.Result{ + Target: c, + }) + } + + return results, nil } // NewRetainAction is factory method for RetainAction diff --git a/src/pkg/retention/policy/alg/or/processor.go b/src/pkg/retention/policy/alg/or/processor.go index 120db41b1bf..78ae32b924f 100644 --- a/src/pkg/retention/policy/alg/or/processor.go +++ b/src/pkg/retention/policy/alg/or/processor.go @@ -52,7 +52,7 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) { // collect errors by wrapping err error // collect processed candidates - processedCandidates = make(map[string][]*res.Candidate) + processedCandidates = make(map[string]cHash) ) // for sync @@ -67,30 +67,36 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) { // control chan done := make(chan bool, 1) - defer func() { - // signal the result listener loop exit - done <- true - }() - // go routine for receiving results/error go func() { + defer func() { + // done + done <- true + }() + for { select { case result := <-resChan: + if result == nil { + // chan is closed + return + } + if _, ok := processedCandidates[result.action]; !ok { - processedCandidates[result.action] = make([]*res.Candidate, 0) + processedCandidates[result.action] = make(cHash) } - processedCandidates[result.action] = append(processedCandidates[result.action], result.processed...) + listByAction := processedCandidates[result.action] + for _, rp := range result.processed { + // remove duplicated ones + listByAction[rp.Hash()] = rp + } case e := <-errChan: if err == nil { err = errors.Wrap(e, "artifact processing error") } else { err = errors.Wrap(e, err.Error()) } - case <-done: - // exit - return } } }() @@ -143,6 +149,10 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) { // waiting for all the rules are evaluated wg.Wait() + // close result chan + close(resChan) + // check if the receiving loop exists + <-done if err != nil { return nil, err @@ -150,11 +160,13 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) { results := make([]*res.Result, 0) // Perform actions - for act, candidates := range processedCandidates { + for act, hash := range processedCandidates { var attachedErr error + cl := hash.toList() + if pf, ok := p.performers[act]; ok { - if theRes, err := pf.Perform(candidates); err != nil { + if theRes, err := pf.Perform(cl); err != nil { attachedErr = err } else { results = append(results, theRes...) @@ -164,7 +176,7 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) { } if attachedErr != nil { - for _, c := range candidates { + for _, c := range cl { results = append(results, &res.Result{ Target: c, Error: attachedErr, @@ -187,3 +199,15 @@ func (p *processor) AddEvaluator(evaluator rule.Evaluator, selectors []res.Selec func (p *processor) AddActionPerformer(action string, performer action.Performer) { p.performers[action] = performer } + +type cHash map[string]*res.Candidate + +func (ch cHash) toList() []*res.Candidate { + l := make([]*res.Candidate, 0) + + for _, v := range ch { + l = append(l, v) + } + + return l +} diff --git a/src/pkg/retention/policy/alg/or/processor_test.go b/src/pkg/retention/policy/alg/or/processor_test.go new file mode 100644 index 00000000000..d4c20c46fa4 --- /dev/null +++ b/src/pkg/retention/policy/alg/or/processor_test.go @@ -0,0 +1,103 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package or + +import ( + "github.com/goharbor/harbor/src/pkg/retention/policy/action" + "github.com/goharbor/harbor/src/pkg/retention/policy/alg" + "github.com/goharbor/harbor/src/pkg/retention/policy/rule" + "github.com/goharbor/harbor/src/pkg/retention/policy/rule/lastx" + "github.com/goharbor/harbor/src/pkg/retention/policy/rule/latestk" + "github.com/goharbor/harbor/src/pkg/retention/res" + "github.com/goharbor/harbor/src/pkg/retention/res/selectors/label" + "github.com/goharbor/harbor/src/pkg/retention/res/selectors/regexp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "testing" + "time" +) + +// ProcessorTestSuite is suite for testing processor +type ProcessorTestSuite struct { + suite.Suite + + p alg.Processor + all []*res.Candidate +} + +// TestProcessor is entrance for ProcessorTestSuite +func TestProcessor(t *testing.T) { + suite.Run(t, new(ProcessorTestSuite)) +} + +// SetupSuite ... +func (suite *ProcessorTestSuite) SetupSuite() { + suite.all = []*res.Candidate{ + { + Namespace: "library", + Repository: "harbor", + Kind: "image", + Tag: "latest", + PushedTime: time.Now().Unix(), + Labels: []string{"L1", "L2"}, + }, + { + Namespace: "library", + Repository: "harbor", + Kind: "image", + Tag: "dev", + PushedTime: time.Now().Unix(), + Labels: []string{"L3"}, + }, + } + + p := New() + p.AddActionPerformer(action.Retain, action.NewRetainAction(suite.all)) + + lastxParams := make(map[string]rule.Parameter) + lastxParams[lastx.ParameterX] = 10 + p.AddEvaluator(lastx.New(lastxParams), []res.Selector{ + regexp.New(regexp.Matches, "*dev*"), + label.New(label.With, "L1,L2"), + }) + + latestKParams := make(map[string]rule.Parameter) + latestKParams[latestk.ParameterK] = 10 + p.AddEvaluator(latestk.New(latestKParams), []res.Selector{ + label.New(label.With, "L3"), + }) + + suite.p = p +} + +// TearDownSuite ... +func (suite *ProcessorTestSuite) TearDownSuite() {} + +// TestProcess tests process method +func (suite *ProcessorTestSuite) TestProcess() { + results, err := suite.p.Process(suite.all) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), 2, len(results)) + assert.Condition(suite.T(), func() bool { + for _, r := range results { + if r.Error != nil { + return false + } + } + + return true + }, "no errors in the returned result list") +} diff --git a/src/pkg/retention/policy/rule/evaluator.go b/src/pkg/retention/policy/rule/evaluator.go index 780d9efa91a..91ec27913df 100644 --- a/src/pkg/retention/policy/rule/evaluator.go +++ b/src/pkg/retention/policy/rule/evaluator.go @@ -32,5 +32,5 @@ type Evaluator interface { Action() string } -// RuleFactory defines a factory method for creating rule evaluator -type RuleFactory func(parameters Parameters) Evaluator +// Factory defines a factory method for creating rule evaluator +type Factory func(parameters Parameters) Evaluator diff --git a/src/pkg/retention/policy/rule/index.go b/src/pkg/retention/policy/rule/index.go index 3944ef40bf3..cef42176a06 100644 --- a/src/pkg/retention/policy/rule/index.go +++ b/src/pkg/retention/policy/rule/index.go @@ -37,7 +37,7 @@ type IndexMeta struct { type IndexedParam struct { Name string `json:"name"` - //Type of the param + // Type of the param // "int", "string" or "[]string" Type string `json:"type"` @@ -50,11 +50,11 @@ type IndexedParam struct { type indexedItem struct { Meta *IndexMeta - Factory RuleFactory + Factory Factory } // Register the rule evaluator with the corresponding rule template -func Register(meta *IndexMeta, factory RuleFactory) { +func Register(meta *IndexMeta, factory Factory) { if meta == nil || factory == nil || len(meta.TemplateID) == 0 { // do nothing return diff --git a/src/pkg/retention/policy/rule/lastx/evaluator.go b/src/pkg/retention/policy/rule/lastx/evaluator.go index fc528fcfa8b..284560267b0 100644 --- a/src/pkg/retention/policy/rule/lastx/evaluator.go +++ b/src/pkg/retention/policy/rule/lastx/evaluator.go @@ -38,7 +38,8 @@ type evaluator struct { // Process the candidates based on the rule definition func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) { - return nil, nil + // TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION + return artifacts, nil } // Specify what action is performed to the candidates processed by this evaluator diff --git a/src/pkg/retention/policy/rule/latestk/evaluator.go b/src/pkg/retention/policy/rule/latestk/evaluator.go index fe23ed5ea8a..a49c5e1cc9b 100644 --- a/src/pkg/retention/policy/rule/latestk/evaluator.go +++ b/src/pkg/retention/policy/rule/latestk/evaluator.go @@ -38,7 +38,8 @@ type evaluator struct { // Process the candidates based on the rule definition func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) { - return nil, nil + // TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION + return artifacts, nil } // Specify what action is performed to the candidates processed by this evaluator diff --git a/src/pkg/retention/res/candidate.go b/src/pkg/retention/res/candidate.go index 4894854f942..324b696b543 100644 --- a/src/pkg/retention/res/candidate.go +++ b/src/pkg/retention/res/candidate.go @@ -14,6 +14,11 @@ package res +import ( + "encoding/base64" + "fmt" +) + const ( // Image kind Image = "image" @@ -37,3 +42,10 @@ type Candidate struct { // Labels attached with the candidate Labels []string } + +// Hash code based on the candidate info for differentiation +func (c *Candidate) Hash() string { + raw := fmt.Sprintf("%s:%s/%s:%s", c.Kind, c.Namespace, c.Repository, c.Tag) + + return base64.StdEncoding.EncodeToString([]byte(raw)) +} diff --git a/src/pkg/retention/res/selectors/label/selector.go b/src/pkg/retention/res/selectors/label/selector.go index b8b3926edde..efdfdef691a 100644 --- a/src/pkg/retention/res/selectors/label/selector.go +++ b/src/pkg/retention/res/selectors/label/selector.go @@ -40,7 +40,8 @@ type selector struct { // Select candidates by regular expressions func (s *selector) Select(artifacts []*res.Candidate) ([]*res.Candidate, error) { - return nil, nil + // TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION + return artifacts, nil } // New is factory method for list selector diff --git a/src/pkg/retention/res/selectors/regexp/selector.go b/src/pkg/retention/res/selectors/regexp/selector.go index 1e320edef6d..0e717c648b1 100644 --- a/src/pkg/retention/res/selectors/regexp/selector.go +++ b/src/pkg/retention/res/selectors/regexp/selector.go @@ -39,7 +39,8 @@ type selector struct { // Select candidates by regular expressions func (s *selector) Select(artifacts []*res.Candidate) ([]*res.Candidate, error) { - return nil, nil + // TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION + return artifacts, nil } // New is factory method for regexp selector