Skip to content

Commit

Permalink
implement stage2 sync
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
  • Loading branch information
matthyx committed Nov 20, 2023
1 parent 9d427c8 commit c8ad63c
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 22 deletions.
44 changes: 33 additions & 11 deletions adapters/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/armosec/utils-k8s-go/armometadata"
jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
Expand All @@ -12,17 +13,19 @@ import (
)

type MockAdapter struct {
callbacks domain.Callbacks
patchStrategy bool // true for client, false for server
Resources map[string][]byte
shadowObjects map[string][]byte
callbacks domain.Callbacks
checkResourceVersion bool // false for client, true for server
patchStrategy bool // true for client, false for server
Resources map[string][]byte
shadowObjects map[string][]byte
}

func NewMockAdapter(patchStrategy bool) *MockAdapter {
func NewMockAdapter(isClient bool) *MockAdapter {
return &MockAdapter{
patchStrategy: patchStrategy,
Resources: map[string][]byte{},
shadowObjects: map[string][]byte{},
checkResourceVersion: !isClient,
patchStrategy: isClient,
Resources: map[string][]byte{},
shadowObjects: map[string][]byte{},
}
}

Expand Down Expand Up @@ -88,16 +91,35 @@ func (m *MockAdapter) patchObject(id domain.KindName, checksum string, patch []b
if newChecksum != checksum {
return object, fmt.Errorf("checksum mismatch: %s != %s", newChecksum, checksum)
}
m.Resources[id.String()] = modified
m.saveIfNewer(id, modified)
m.shadowObjects[id.String()] = modified
return object, nil
return nil, nil
}

func (m *MockAdapter) PutObject(_ context.Context, id domain.KindName, object []byte) error {
m.Resources[id.String()] = object
m.saveIfNewer(id, object)
return nil
}

// saveIfNewer saves the object only if it is newer than the existing one
// this reference implementation should be implemented in the ingester on the backend side
func (m *MockAdapter) saveIfNewer(id domain.KindName, newObject []byte) {
if m.checkResourceVersion {
err, _, _, _, _, resourceVersion := armometadata.ExtractMetadataFromJsonBytes(newObject)
if err == nil {
if oldObject, ok := m.Resources[id.String()]; ok {
err, _, _, _, _, oldResourceVersion := armometadata.ExtractMetadataFromJsonBytes(oldObject)
if err == nil {
if !utils.StringValueBigger(resourceVersion, oldResourceVersion) {
return
}
}
}
}
}
m.Resources[id.String()] = newObject
}

func (m *MockAdapter) RegisterCallbacks(mainCtx context.Context, callbacks domain.Callbacks) {
m.callbacks = callbacks
}
Expand Down
5 changes: 2 additions & 3 deletions core/synchronizer_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ func TestSynchronizer_ObjectModifiedOnServer(t *testing.T) {
clientAdapter.Resources[kindKnownServers.String()] = object
serverAdapter.Resources[kindKnownServers.String()] = object
// modify object
objectV2 := []byte(`{"kind":"kind","metadata":{"name":"name","version":"2"}}`)
err := serverAdapter.TestCallPutOrPatch(ctx, kindKnownServers, nil, objectV2)
err := serverAdapter.TestCallPutOrPatch(ctx, kindKnownServers, nil, objectServerV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check object modified
clientObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectV2, clientObj)
assert.Equal(t, objectServerV2, clientObj)
}
33 changes: 29 additions & 4 deletions core/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ var (
Name: "name",
Namespace: "namespace",
}
object = []byte(`{"kind":"kind","metadata":{"name":"name"}}`)
object = []byte(`{"kind":"kind","metadata":{"name":"name","resourceVersion":"1"}}`)
objectClientV2 = []byte(`{"kind":"kind","metadata":{"name":"client","resourceVersion":"2"}}`)
objectServerV2 = []byte(`{"kind":"kind","metadata":{"name":"server","resourceVersion":"2"}}`)
)

func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.MockAdapter) {
Expand Down Expand Up @@ -80,12 +82,35 @@ func TestSynchronizer_ObjectModified(t *testing.T) {
clientAdapter.Resources[kindDeployment.String()] = object
serverAdapter.Resources[kindDeployment.String()] = object
// modify object
objectV2 := []byte(`{"kind":"kind","metadata":{"name":"name","version":"2"}}`)
err := clientAdapter.TestCallPutOrPatch(ctx, kindDeployment, object, objectV2)
err := clientAdapter.TestCallPutOrPatch(ctx, kindDeployment, object, objectClientV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check object modified
serverObj, ok := serverAdapter.Resources[kindDeployment.String()]
assert.True(t, ok)
assert.Equal(t, objectV2, serverObj)
assert.Equal(t, objectClientV2, serverObj)
}

func TestSynchronizer_ObjectModifiedOnBothSides(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// pre: add object
clientAdapter.Resources[kindKnownServers.String()] = object
serverAdapter.Resources[kindKnownServers.String()] = object
// manually modify object on server (PutObject message will be sent later)
serverAdapter.Resources[kindKnownServers.String()] = objectServerV2
// we create a race condition here
// object is modified on client, but we don't know about server modification
err := clientAdapter.TestCallPutOrPatch(ctx, kindKnownServers, object, objectClientV2)
assert.NoError(t, err)
// server message arrives just now on client
err = clientAdapter.PutObject(ctx, kindKnownServers, objectServerV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check both sides have the one from the server
clientObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, clientObj)
serverObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, serverObj)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/SergJa/jsonhash v0.0.0-20210531165746-fc45f346aa74
github.com/apache/pulsar-client-go v0.11.0
github.com/armosec/utils-k8s-go v0.0.16
github.com/armosec/utils-k8s-go v0.0.21
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/gobwas/ws v1.3.0
Expand Down Expand Up @@ -65,6 +65,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/olvrng/ujson v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/armosec/armoapi-go v0.0.254 h1:+4fnVHbVTwSQW99t7/XlkQAT2yJ5bPr4HAYIIg
github.com/armosec/armoapi-go v0.0.254/go.mod h1:CJT5iH5VF30zjdQYXaQhsAm8IEHtM1T87HcFVXeLX54=
github.com/armosec/gojay v1.2.15 h1:sSB2vnAvacUNkw9nzUYZKcPzhJOyk6/5LK2JCNdmoZY=
github.com/armosec/gojay v1.2.15/go.mod h1:vzVAaay2TWJAngOpxu8aqLbye9jMgoKleuAOK+xsOts=
github.com/armosec/utils-k8s-go v0.0.16 h1:h46PoxAb4OHA2p719PzcAS03lADw4lH4TyRMaZ3ix/g=
github.com/armosec/utils-k8s-go v0.0.16/go.mod h1:QX0QAGlH7KCZq810eO9QjTYqkhjw8cvrr96TZfaUGrk=
github.com/armosec/utils-k8s-go v0.0.21 h1:/3k+TOssKgYMaYKJY4dhvHmPnrzmVAKW9PjNirRSGrY=
github.com/armosec/utils-k8s-go v0.0.21/go.mod h1:CXgkHFgY8xlKN+wiQ8TyjwNj0+VSgj6NolB3itZ2lY8=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
Expand Down Expand Up @@ -103,7 +103,6 @@ github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM=
github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -300,6 +299,8 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/olvrng/ujson v1.1.0 h1:8xVUzVlqwdMVWh5d1UHBtLQ1D50nxoPuPEq9Wozs8oA=
github.com/olvrng/ujson v1.1.0/go.mod h1:Mz4G3RODTUfbkKyvi0lgmPx/7vd3Saksk+1jgk8s9xo=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
Expand Down
12 changes: 12 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,15 @@ func StartLivenessProbe() {
}
}()
}

func StringValueBigger(s1, s2 string) bool {
i1, err := strconv.Atoi(s1)
if err != nil {
return false
}
i2, err := strconv.Atoi(s2)
if err != nil {
return false
}
return i1 > i2
}

0 comments on commit c8ad63c

Please sign in to comment.