Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/api/v1/atlascluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func init() {
}

type ProviderName string
type ClusterType string

const (
ProviderAWS ProviderName = "AWS"
Expand All @@ -40,6 +39,8 @@ const (
ProviderTenant ProviderName = "TENANT"
)

type ClusterType string

const (
TypeReplicaSet ClusterType = "REPLICASET"
TypeSharded ClusterType = "SHARDED"
Expand Down
67 changes: 47 additions & 20 deletions pkg/controller/atlascluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,31 @@ func (r *AtlasClusterReconciler) ensureClusterState(ctx *workflow.Context, proje

switch c.StateName {
case "IDLE":
if done, err := clusterMatchesSpec(ctx.Log, c, cluster.Spec); err != nil {
resultingCluster, err := mergedCluster(*c, cluster.Spec)
if err != nil {
return c, workflow.Terminate(workflow.Internal, err.Error())
} else if done {
return c, workflow.OK()
}

spec, err := cluster.Spec.Cluster()
if err != nil {
return c, workflow.Terminate(workflow.Internal, err.Error())
if done := clustersEqual(ctx.Log, *c, resultingCluster); done {
return c, workflow.OK()
}

if cluster.Spec.Paused != nil {
if c.Paused == nil || *c.Paused != *cluster.Spec.Paused {
// paused is different from Atlas
// we need to first send a special (un)pause request before reconciling everything else
spec = &mongodbatlas.Cluster{
resultingCluster = mongodbatlas.Cluster{
Paused: cluster.Spec.Paused,
}
} else {
// otherwise, don't send the paused field
spec.Paused = nil
resultingCluster.Paused = nil
}
}

c, _, err = ctx.Client.Clusters.Update(context.Background(), project.Status.ID, cluster.Spec.Name, spec)
resultingCluster = cleanupCluster(resultingCluster)

c, _, err = ctx.Client.Clusters.Update(context.Background(), project.Status.ID, cluster.Spec.Name, &resultingCluster)
if err != nil {
return c, workflow.Terminate(workflow.ClusterNotUpdatedInAtlas, err.Error())
}
Expand All @@ -84,22 +84,49 @@ func (r *AtlasClusterReconciler) ensureClusterState(ctx *workflow.Context, proje
}
}

// clusterMatchesSpec will merge everything from the Spec into existing Cluster and use that to detect change.
// Direct comparison is not feasible because Atlas will set a lot of fields to default values, so we need to apply our changes on top of that.
func clusterMatchesSpec(log *zap.SugaredLogger, cluster *mongodbatlas.Cluster, spec mdbv1.AtlasClusterSpec) (bool, error) {
clusterMerged := mongodbatlas.Cluster{}
if err := compat.JSONCopy(&clusterMerged, cluster); err != nil {
return false, err
// cleanupCluster will unset some fields that cannot be changed via API or are deprecated.
func cleanupCluster(cluster mongodbatlas.Cluster) mongodbatlas.Cluster {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] can we add some comment about why we need to clean these fields?

cluster.ID = ""
cluster.MongoDBVersion = ""
cluster.MongoURI = ""
cluster.MongoURIUpdated = ""
cluster.MongoURIWithOptions = ""
cluster.SrvAddress = ""
cluster.StateName = ""
cluster.ReplicationFactor = nil
cluster.ReplicationSpec = nil
cluster.ConnectionStrings = nil
return cluster
}

// mergedCluster will return the result of merging AtlasClusterSpec with Atlas Cluster
func mergedCluster(cluster mongodbatlas.Cluster, spec mdbv1.AtlasClusterSpec) (result mongodbatlas.Cluster, err error) {
if err = compat.JSONCopy(&result, cluster); err != nil {
return
}

if err = compat.JSONCopy(&result, spec); err != nil {
return
}

if err := compat.JSONCopy(&clusterMerged, spec); err != nil {
return false, err
// TODO: might need to do this with other slices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we file a ticket for this? Seems like we need to cover all slices with tests...

if err = compat.JSONSliceMerge(&result.ReplicationSpecs, cluster.ReplicationSpecs); err != nil {
return
}

d := cmp.Diff(*cluster, clusterMerged, cmpopts.EquateEmpty())
if err = compat.JSONSliceMerge(&result.ReplicationSpecs, spec.ReplicationSpecs); err != nil {
return
}

return
}

// clustersEqual compares two Atlas Clusters
func clustersEqual(log *zap.SugaredLogger, clusterA mongodbatlas.Cluster, clusterB mongodbatlas.Cluster) bool {
d := cmp.Diff(clusterA, clusterB, cmpopts.EquateEmpty())
if d != "" {
log.Debugf("Cluster differs from spec: %s", d)
log.Debugf("Clusters are different: %s", d)
}

return d == "", nil
return d == ""
}
17 changes: 11 additions & 6 deletions pkg/controller/atlascluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,34 @@ import (

func TestClusterMatchesSpec(t *testing.T) {
t.Run("Clusters match (enums)", func(t *testing.T) {
atlasClusterEnum := mongodbatlas.Cluster{
atlasCluster := mongodbatlas.Cluster{
ProviderSettings: &mongodbatlas.ProviderSettings{
ProviderName: "AWS",
},
ClusterType: "GEOSHARDED",
}
operatorClusterEnum := mdbv1.AtlasClusterSpec{
operatorCluster := mdbv1.AtlasClusterSpec{
ProviderSettings: &mdbv1.ProviderSettingsSpec{
ProviderName: mdbv1.ProviderAWS,
},
ClusterType: mdbv1.TypeGeoSharded,
}

match, err := clusterMatchesSpec(zap.S(), &atlasClusterEnum, operatorClusterEnum)
merged, err := mergedCluster(atlasCluster, operatorCluster)
assert.NoError(t, err)
assert.True(t, match)

equal := clustersEqual(zap.S(), atlasCluster, merged)
assert.True(t, equal)
})

t.Run("Clusters don't match (enums)", func(t *testing.T) {
atlasClusterEnum := mongodbatlas.Cluster{ClusterType: "GEOSHARDED"}
operatorClusterEnum := mdbv1.AtlasClusterSpec{ClusterType: mdbv1.TypeReplicaSet}

match, err := clusterMatchesSpec(zap.S(), &atlasClusterEnum, operatorClusterEnum)
merged, err := mergedCluster(atlasClusterEnum, operatorClusterEnum)
assert.NoError(t, err)
assert.False(t, match)

equal := clustersEqual(zap.S(), atlasClusterEnum, merged)
assert.False(t, equal)
})
}
59 changes: 59 additions & 0 deletions pkg/util/compat/json_slice_merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package compat

import (
"errors"
"fmt"
"reflect"
)

// JSONSliceMerge will merge two slices using JSONCopy according to these rules:
//
// 1. If `dst` and `src` are the same length, all elements are merged
//
// 2. If `dst` is longer, only the first `len(src)` elements are merged
//
// 3. If `src` is longer, first `len(dst)` elements are merged, then remaining elements are appended to `dst`
func JSONSliceMerge(dst, src interface{}) error {
dstVal := reflect.ValueOf(dst)
srcVal := reflect.ValueOf(src)

if dstVal.Kind() != reflect.Ptr {
return errors.New("dst must be a pointer to slice")
}

dstVal = reflect.Indirect(dstVal)
srcVal = reflect.Indirect(srcVal)

if dstVal.Kind() != reflect.Slice {
return errors.New("dst must be pointing to a slice")
}

if srcVal.Kind() != reflect.Slice {
return errors.New("src must be a slice or a pointer to slice")
}

minLen := dstVal.Len()
if srcVal.Len() < minLen {
minLen = srcVal.Len()
}

// merge common elements
for i := 0; i < minLen; i++ {
dstX := dstVal.Index(i).Addr().Interface()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] am I right that this will work only if the items of the slice are pointers as well? (looking at the tests below seems only the destination slice must have pointers?)
If so - may need to add to the method documentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't have to be pointers - I used pointers in the test case just to check that pointers work too

if err := JSONCopy(dstX, srcVal.Index(i).Interface()); err != nil {
return fmt.Errorf("cannot copy value at index %d: %w", i, err)
}
}

// append extra elements (if any)
dstType := reflect.TypeOf(dst).Elem().Elem()
for i := minLen; i < srcVal.Len(); i++ {
newVal := reflect.New(dstType).Interface()
if err := JSONCopy(&newVal, srcVal.Index(i).Interface()); err != nil {
return fmt.Errorf("cannot copy value at index %d: %w", i, err)
}
dstVal.Set(reflect.Append(dstVal, reflect.ValueOf(newVal).Elem()))
}

return nil
}
98 changes: 98 additions & 0 deletions pkg/util/compat/json_slice_merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package compat_test

import (
"errors"
"testing"

"github.com/stretchr/testify/require"

. "github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/compat"
)

func TestJSONSliceMerge(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to have the unit test!
Do you think we can add the use cases:

  • when the src has smaller size than the dst?
  • the src is empty/nil
  • the dst is empty/nil

type Item struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
}

type OtherItem struct {
OtherID string `json:"id,omitempty"`
OtherName string `json:"name,omitempty"`
}

tests := []struct {
name string
dst, src, expected interface{}
expectedError error
}{
{
name: "src is longer",
dst: &[]*Item{
{"00001", "dst1"},
{"00002", "dst2"},
{"00003", "dst3"},
},
src: []OtherItem{ // copying from different element type
{"99999", "src1"}, // different key, different value
{"", "src2"}, // no key, different value
{"", ""}, // no key, no value
{"12345", "extra"}, // extra value
},
expected: &[]*Item{ // kept dst element type
{"99999", "src1"}, // key & value replaced by src
{"00002", "src2"}, // only value replaced by src
{"00003", "dst3"}, // untouched
{"12345", "extra"}, // appended from src
},
},
{
name: "dst is longer",
dst: &[]*Item{
{"00001", "dst1"},
{"00002", "dst2"},
{"00003", "dst3"},
},
src: []OtherItem{
{"99999", "src1"},
},
expected: &[]*Item{
{"99999", "src1"}, // key & value replaced by src
{"00002", "dst2"}, // untouched
{"00003", "dst3"}, // untouched
},
},
{
name: "src is nil",
dst: &[]*Item{
{"00001", "dst1"},
{"00002", "dst2"},
{"00003", "dst3"},
},
src: nil,
expectedError: errors.New("src must be a slice or a pointer to slice"),
expected: &[]*Item{
{"00001", "dst1"}, // untouched
{"00002", "dst2"}, // untouched
{"00003", "dst3"}, // untouched
},
},
{
name: "dst is nil",
dst: nil,
expectedError: errors.New("dst must be a pointer to slice"),
src: []OtherItem{
{"99999", "src1"},
},
expected: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
err := JSONSliceMerge(tt.dst, tt.src)
require.Equal(tt.expectedError, err)
require.Equal(tt.expected, tt.dst)
})
}
}
Loading