Skip to content

Commit

Permalink
refine or.processor implementation and also add UT case
Browse files Browse the repository at this point in the history
Signed-off-by: Steven Zou <szou@vmware.com>
  • Loading branch information
steven-zou committed Jul 9, 2019
1 parent 3409065 commit 185f4f3
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/pkg/retention/job.go
Expand Up @@ -32,11 +32,11 @@ func (pj *Job) ShouldRetry() bool {
}

// Validate the parameters
func (pj *Job) Validate(params Parameters) error {
func (pj *Job) Validate(params job.Parameters) error {
return nil
}

// Run the job
func (pj *Job) Run(ctx job.Context, params Parameters) error {
func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
return nil
}
4 changes: 2 additions & 2 deletions src/pkg/retention/models.go
Expand Up @@ -16,7 +16,7 @@ package retention

import "time"

// Retention execution
// Execution of retention
type Execution struct {
ID string `json:"id"`
PolicyID string `json:"policy_id"`
Expand All @@ -25,7 +25,7 @@ type Execution struct {
Status string `json:"status"`
}

// Retention history
// History of retention
type History struct {
ExecutionID string `json:"execution_id"`
Rule struct {
Expand Down
4 changes: 2 additions & 2 deletions src/pkg/retention/periodic_job.go
Expand Up @@ -34,11 +34,11 @@ func (pj *PeriodicJob) ShouldRetry() bool {
}

// Validate the parameters
func (pj *PeriodicJob) Validate(params Parameters) error {
func (pj *PeriodicJob) Validate(params job.Parameters) error {
return nil
}

// Run the job
func (pj *PeriodicJob) Run(ctx job.Context, params Parameters) error {
func (pj *PeriodicJob) Run(ctx job.Context, params job.Parameters) error {
return ctx.Checkin(fmt.Sprintf("pong=%d", time.Now().Unix()))
}
11 changes: 10 additions & 1 deletion src/pkg/retention/policy/action/performer.go
Expand Up @@ -44,7 +44,16 @@ type retainAction struct {

// Perform the action
func (ra *retainAction) Perform(candidates []*res.Candidate) ([]*res.Result, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
results := make([]*res.Result, 0)

for _, c := range candidates {
results = append(results, &res.Result{
Target: c,
})
}

return results, nil
}

// NewRetainAction is factory method for RetainAction
Expand Down
52 changes: 38 additions & 14 deletions src/pkg/retention/policy/alg/or/processor.go
Expand Up @@ -52,7 +52,7 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
// collect errors by wrapping
err error
// collect processed candidates
processedCandidates = make(map[string][]*res.Candidate)
processedCandidates = make(map[string]cHash)
)

// for sync
Expand All @@ -67,30 +67,36 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
// control chan
done := make(chan bool, 1)

defer func() {
// signal the result listener loop exit
done <- true
}()

// go routine for receiving results/error
go func() {
defer func() {
// done
done <- true
}()

for {
select {
case result := <-resChan:
if result == nil {
// chan is closed
return
}

if _, ok := processedCandidates[result.action]; !ok {
processedCandidates[result.action] = make([]*res.Candidate, 0)
processedCandidates[result.action] = make(cHash)
}

processedCandidates[result.action] = append(processedCandidates[result.action], result.processed...)
listByAction := processedCandidates[result.action]
for _, rp := range result.processed {
// remove duplicated ones
listByAction[rp.Hash()] = rp
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "artifact processing error")
} else {
err = errors.Wrap(e, err.Error())
}
case <-done:
// exit
return
}
}
}()
Expand Down Expand Up @@ -143,18 +149,24 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {

// waiting for all the rules are evaluated
wg.Wait()
// close result chan
close(resChan)
// check if the receiving loop exists
<-done

if err != nil {
return nil, err
}

results := make([]*res.Result, 0)
// Perform actions
for act, candidates := range processedCandidates {
for act, hash := range processedCandidates {
var attachedErr error

cl := hash.toList()

if pf, ok := p.performers[act]; ok {
if theRes, err := pf.Perform(candidates); err != nil {
if theRes, err := pf.Perform(cl); err != nil {
attachedErr = err
} else {
results = append(results, theRes...)
Expand All @@ -164,7 +176,7 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
}

if attachedErr != nil {
for _, c := range candidates {
for _, c := range cl {
results = append(results, &res.Result{
Target: c,
Error: attachedErr,
Expand All @@ -187,3 +199,15 @@ func (p *processor) AddEvaluator(evaluator rule.Evaluator, selectors []res.Selec
func (p *processor) AddActionPerformer(action string, performer action.Performer) {
p.performers[action] = performer
}

type cHash map[string]*res.Candidate

func (ch cHash) toList() []*res.Candidate {
l := make([]*res.Candidate, 0)

for _, v := range ch {
l = append(l, v)
}

return l
}
103 changes: 103 additions & 0 deletions src/pkg/retention/policy/alg/or/processor_test.go
@@ -0,0 +1,103 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package or

import (
"github.com/goharbor/harbor/src/pkg/retention/policy/action"
"github.com/goharbor/harbor/src/pkg/retention/policy/alg"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule/lastx"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule/latestk"
"github.com/goharbor/harbor/src/pkg/retention/res"
"github.com/goharbor/harbor/src/pkg/retention/res/selectors/label"
"github.com/goharbor/harbor/src/pkg/retention/res/selectors/regexp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

// ProcessorTestSuite is suite for testing processor
type ProcessorTestSuite struct {
suite.Suite

p alg.Processor
all []*res.Candidate
}

// TestProcessor is entrance for ProcessorTestSuite
func TestProcessor(t *testing.T) {
suite.Run(t, new(ProcessorTestSuite))
}

// SetupSuite ...
func (suite *ProcessorTestSuite) SetupSuite() {
suite.all = []*res.Candidate{
{
Namespace: "library",
Repository: "harbor",
Kind: "image",
Tag: "latest",
PushedTime: time.Now().Unix(),
Labels: []string{"L1", "L2"},
},
{
Namespace: "library",
Repository: "harbor",
Kind: "image",
Tag: "dev",
PushedTime: time.Now().Unix(),
Labels: []string{"L3"},
},
}

p := New()
p.AddActionPerformer(action.Retain, action.NewRetainAction(suite.all))

lastxParams := make(map[string]rule.Parameter)
lastxParams[lastx.ParameterX] = 10
p.AddEvaluator(lastx.New(lastxParams), []res.Selector{
regexp.New(regexp.Matches, "*dev*"),
label.New(label.With, "L1,L2"),
})

latestKParams := make(map[string]rule.Parameter)
latestKParams[latestk.ParameterK] = 10
p.AddEvaluator(latestk.New(latestKParams), []res.Selector{
label.New(label.With, "L3"),
})

suite.p = p
}

// TearDownSuite ...
func (suite *ProcessorTestSuite) TearDownSuite() {}

// TestProcess tests process method
func (suite *ProcessorTestSuite) TestProcess() {
results, err := suite.p.Process(suite.all)
require.NoError(suite.T(), err)
assert.Equal(suite.T(), 2, len(results))
assert.Condition(suite.T(), func() bool {
for _, r := range results {
if r.Error != nil {
return false
}
}

return true
}, "no errors in the returned result list")
}
4 changes: 2 additions & 2 deletions src/pkg/retention/policy/rule/evaluator.go
Expand Up @@ -32,5 +32,5 @@ type Evaluator interface {
Action() string
}

// RuleFactory defines a factory method for creating rule evaluator
type RuleFactory func(parameters Parameters) Evaluator
// Factory defines a factory method for creating rule evaluator
type Factory func(parameters Parameters) Evaluator
6 changes: 3 additions & 3 deletions src/pkg/retention/policy/rule/index.go
Expand Up @@ -37,7 +37,7 @@ type IndexMeta struct {
type IndexedParam struct {
Name string `json:"name"`

//Type of the param
// Type of the param
// "int", "string" or "[]string"
Type string `json:"type"`

Expand All @@ -50,11 +50,11 @@ type IndexedParam struct {
type indexedItem struct {
Meta *IndexMeta

Factory RuleFactory
Factory Factory
}

// Register the rule evaluator with the corresponding rule template
func Register(meta *IndexMeta, factory RuleFactory) {
func Register(meta *IndexMeta, factory Factory) {
if meta == nil || factory == nil || len(meta.TemplateID) == 0 {
// do nothing
return
Expand Down
3 changes: 2 additions & 1 deletion src/pkg/retention/policy/rule/lastx/evaluator.go
Expand Up @@ -38,7 +38,8 @@ type evaluator struct {

// Process the candidates based on the rule definition
func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}

// Specify what action is performed to the candidates processed by this evaluator
Expand Down
3 changes: 2 additions & 1 deletion src/pkg/retention/policy/rule/latestk/evaluator.go
Expand Up @@ -38,7 +38,8 @@ type evaluator struct {

// Process the candidates based on the rule definition
func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}

// Specify what action is performed to the candidates processed by this evaluator
Expand Down
12 changes: 12 additions & 0 deletions src/pkg/retention/res/candidate.go
Expand Up @@ -14,6 +14,11 @@

package res

import (
"encoding/base64"
"fmt"
)

const (
// Image kind
Image = "image"
Expand All @@ -37,3 +42,10 @@ type Candidate struct {
// Labels attached with the candidate
Labels []string
}

// Hash code based on the candidate info for differentiation
func (c *Candidate) Hash() string {
raw := fmt.Sprintf("%s:%s/%s:%s", c.Kind, c.Namespace, c.Repository, c.Tag)

return base64.StdEncoding.EncodeToString([]byte(raw))
}
3 changes: 2 additions & 1 deletion src/pkg/retention/res/selectors/label/selector.go
Expand Up @@ -40,7 +40,8 @@ type selector struct {

// Select candidates by regular expressions
func (s *selector) Select(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}

// New is factory method for list selector
Expand Down
3 changes: 2 additions & 1 deletion src/pkg/retention/res/selectors/regexp/selector.go
Expand Up @@ -39,7 +39,8 @@ type selector struct {

// Select candidates by regular expressions
func (s *selector) Select(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}

// New is factory method for regexp selector
Expand Down

0 comments on commit 185f4f3

Please sign in to comment.