Skip to content

Commit

Permalink
Merge pull request #9395 from y1r/add-context-k8sutil-kvstore
Browse files Browse the repository at this point in the history
core: add context parameter to k8sutil kvstore
  • Loading branch information
leseb committed Dec 13, 2021
2 parents 97b41ea + 0d8c581 commit 200fb56
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cmd/rook/ceph/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func prepareOSD(cmd *cobra.Command, args []string) error {
Message: err.Error(),
PvcBackedOSD: cfg.pvcBacked,
}
oposd.UpdateNodeOrPVCStatus(kv, cfg.nodeName, status)
oposd.UpdateNodeOrPVCStatus(clusterInfo.Context, kv, cfg.nodeName, status)

rook.TerminateFatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/daemon/ceph/osd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func Provision(context *clusterd.Context, agent *OsdAgent, crushLocation, topolo

// set the initial orchestration status
status := oposd.OrchestrationStatus{Status: oposd.OrchestrationStatusOrchestrating}
oposd.UpdateNodeOrPVCStatus(agent.kv, agent.nodeName, status)
oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)

if err := client.WriteCephConfig(context, agent.clusterInfo); err != nil {
return errors.Wrap(err, "failed to generate ceph config")
Expand Down Expand Up @@ -215,7 +215,7 @@ func Provision(context *clusterd.Context, agent *OsdAgent, crushLocation, topolo

// orchestration is about to start, update the status
status = oposd.OrchestrationStatus{Status: oposd.OrchestrationStatusOrchestrating, PvcBackedOSD: agent.pvcBacked}
oposd.UpdateNodeOrPVCStatus(agent.kv, agent.nodeName, status)
oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)

// start the desired OSDs on devices
logger.Infof("configuring osd devices: %+v", devices)
Expand All @@ -232,7 +232,7 @@ func Provision(context *clusterd.Context, agent *OsdAgent, crushLocation, topolo
if len(deviceOSDs) == 0 {
logger.Warningf("skipping OSD configuration as no devices matched the storage settings for this node %q", agent.nodeName)
status = oposd.OrchestrationStatus{OSDs: deviceOSDs, Status: oposd.OrchestrationStatusCompleted, PvcBackedOSD: agent.pvcBacked}
oposd.UpdateNodeOrPVCStatus(agent.kv, agent.nodeName, status)
oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)
return nil
}

Expand Down Expand Up @@ -272,7 +272,7 @@ func Provision(context *clusterd.Context, agent *OsdAgent, crushLocation, topolo

// orchestration is completed, update the status
status = oposd.OrchestrationStatus{OSDs: deviceOSDs, Status: oposd.OrchestrationStatusCompleted, PvcBackedOSD: agent.pvcBacked}
oposd.UpdateNodeOrPVCStatus(agent.kv, agent.nodeName, status)
oposd.UpdateNodeOrPVCStatus(agent.clusterInfo.Context, agent.kv, agent.nodeName, status)

return nil
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/operator/ceph/cluster/osd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package osd

import (
"context"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (e *provisionErrors) asMessages() string {

// return name of status ConfigMap
func (c *Cluster) updateOSDStatus(node string, status OrchestrationStatus) string {
return UpdateNodeOrPVCStatus(c.kv, node, status)
return UpdateNodeOrPVCStatus(c.clusterInfo.Context, c.kv, node, status)
}

func statusConfigMapLabels(node string) map[string]string {
Expand All @@ -110,13 +111,14 @@ func statusConfigMapLabels(node string) map[string]string {

// UpdateNodeOrPVCStatus updates the status ConfigMap for the OSD on the given node or PVC. It returns the name
// the ConfigMap used.
func UpdateNodeOrPVCStatus(kv *k8sutil.ConfigMapKVStore, nodeOrPVC string, status OrchestrationStatus) string {
func UpdateNodeOrPVCStatus(ctx context.Context, kv *k8sutil.ConfigMapKVStore, nodeOrPVC string, status OrchestrationStatus) string {
labels := statusConfigMapLabels(nodeOrPVC)

// update the status map with the given status now
s, _ := json.Marshal(status)
cmName := statusConfigMapName(nodeOrPVC)
if err := kv.SetValueWithLabels(
ctx,
cmName,
orchestrationStatusKey,
string(s),
Expand All @@ -131,7 +133,7 @@ func UpdateNodeOrPVCStatus(kv *k8sutil.ConfigMapKVStore, nodeOrPVC string, statu
func (c *Cluster) handleOrchestrationFailure(errors *provisionErrors, nodeName, message string, args ...interface{}) {
errors.addError(message, args...)
status := OrchestrationStatus{Status: OrchestrationStatusFailed, Message: message}
UpdateNodeOrPVCStatus(c.kv, nodeName, status)
UpdateNodeOrPVCStatus(c.clusterInfo.Context, c.kv, nodeName, status)
}

func parseOrchestrationStatus(data map[string]string) *OrchestrationStatus {
Expand Down Expand Up @@ -340,7 +342,7 @@ func statusConfigMapName(nodeOrPVCName string) string {
}

func (c *Cluster) deleteStatusConfigMap(nodeOrPVCName string) {
if err := c.kv.ClearStore(statusConfigMapName(nodeOrPVCName)); err != nil {
if err := c.kv.ClearStore(c.clusterInfo.Context, statusConfigMapName(nodeOrPVCName)); err != nil {
logger.Errorf("failed to remove the status configmap %q. %v", statusConfigMapName(nodeOrPVCName), err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/osd/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestOrchestrationStatus(t *testing.T) {

// update the status map with some status
status := OrchestrationStatus{Status: OrchestrationStatusOrchestrating, Message: "doing work"}
UpdateNodeOrPVCStatus(kv, nodeName, status)
UpdateNodeOrPVCStatus(ctx, kv, nodeName, status)

// retrieve the status and verify it
statusMap, err := c.context.Clientset.CoreV1().ConfigMaps(c.clusterInfo.Namespace).Get(ctx, cmName, metav1.GetOptions{})
Expand Down Expand Up @@ -94,7 +94,7 @@ func mockNodeOrchestrationCompletion(c *Cluster, nodeName string, statusMapWatch
},
Status: OrchestrationStatusCompleted,
}
UpdateNodeOrPVCStatus(c.kv, nodeName, *status)
UpdateNodeOrPVCStatus(ctx, c.kv, nodeName, *status)

// 2) call modify on the fake watcher so a watch event will get triggered
s, _ := json.Marshal(status)
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/object/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func mimeTypesMountPath() string {
// store mime.types file in a config map
func (c *clusterConfig) generateMimeTypes() error {
k := k8sutil.NewConfigMapKVStore(c.store.Namespace, c.context.Clientset, c.ownerInfo)
if _, err := k.GetValue(c.mimeTypesConfigMapName(), mimeTypesFileName); err == nil || !kerrors.IsNotFound(err) {
if _, err := k.GetValue(c.clusterInfo.Context, c.mimeTypesConfigMapName(), mimeTypesFileName); err == nil || !kerrors.IsNotFound(err) {
logger.Infof("config map %q for object store %q already exists, not overwriting", c.mimeTypesConfigMapName(), c.store.Name)
return nil
}
// is not found
if err := k.SetValue(c.mimeTypesConfigMapName(), mimeTypesFileName, mimeTypes); err != nil {
if err := k.SetValue(c.clusterInfo.Context, c.mimeTypesConfigMapName(), mimeTypesFileName, mimeTypes); err != nil {
return errors.Wrapf(err, "failed to create config map for object store %q", c.store.Name)
}
return nil
Expand Down
16 changes: 6 additions & 10 deletions pkg/operator/k8sutil/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func NewConfigMapKVStore(namespace string, clientset kubernetes.Interface, owner
}
}

func (kv *ConfigMapKVStore) GetValue(storeName, key string) (string, error) {
ctx := context.TODO()
func (kv *ConfigMapKVStore) GetValue(ctx context.Context, storeName, key string) (string, error) {
cm, err := kv.clientset.CoreV1().ConfigMaps(kv.namespace).Get(ctx, storeName, metav1.GetOptions{})
if err != nil {
return "", err
Expand All @@ -56,12 +55,11 @@ func (kv *ConfigMapKVStore) GetValue(storeName, key string) (string, error) {
return val, nil
}

func (kv *ConfigMapKVStore) SetValue(storeName, key, value string) error {
return kv.SetValueWithLabels(storeName, key, value, nil)
func (kv *ConfigMapKVStore) SetValue(ctx context.Context, storeName, key, value string) error {
return kv.SetValueWithLabels(ctx, storeName, key, value, nil)
}

func (kv *ConfigMapKVStore) SetValueWithLabels(storeName, key, value string, labels map[string]string) error {
ctx := context.TODO()
func (kv *ConfigMapKVStore) SetValueWithLabels(ctx context.Context, storeName, key, value string, labels map[string]string) error {
cm, err := kv.clientset.CoreV1().ConfigMaps(kv.namespace).Get(ctx, storeName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
Expand Down Expand Up @@ -99,8 +97,7 @@ func (kv *ConfigMapKVStore) SetValueWithLabels(storeName, key, value string, lab
return nil
}

func (kv *ConfigMapKVStore) GetStore(storeName string) (map[string]string, error) {
ctx := context.TODO()
func (kv *ConfigMapKVStore) GetStore(ctx context.Context, storeName string) (map[string]string, error) {
cm, err := kv.clientset.CoreV1().ConfigMaps(kv.namespace).Get(ctx, storeName, metav1.GetOptions{})
if err != nil {
return nil, err
Expand All @@ -109,8 +106,7 @@ func (kv *ConfigMapKVStore) GetStore(storeName string) (map[string]string, error
return cm.Data, nil
}

func (kv *ConfigMapKVStore) ClearStore(storeName string) error {
ctx := context.TODO()
func (kv *ConfigMapKVStore) ClearStore(ctx context.Context, storeName string) error {
err := kv.clientset.CoreV1().ConfigMaps(kv.namespace).Delete(ctx, storeName, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
// a real error, return it (we're OK with clearing a store that doesn't exist)
Expand Down
35 changes: 21 additions & 14 deletions pkg/operator/k8sutil/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package k8sutil

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -33,7 +34,7 @@ func TestGetValueStoreNotExist(t *testing.T) {
kv, storeName := newKVStore()

// try to get a value from a store that does not exist
_, err := kv.GetValue(storeName, "key1")
_, err := kv.GetValue(context.TODO(), storeName, "key1")
assert.NotNil(t, err)
assert.True(t, errors.IsNotFound(err))
}
Expand All @@ -44,7 +45,7 @@ func TestGetValueKeyNotExist(t *testing.T) {
kv, storeName := newKVStore(cm)

// try to get a value from a store that does exist but from a key that does not exist
_, err := kv.GetValue(storeName, "key1")
_, err := kv.GetValue(context.TODO(), storeName, "key1")
assert.NotNil(t, err)
assert.True(t, errors.IsNotFound(err))
}
Expand All @@ -57,7 +58,7 @@ func TestGetValue(t *testing.T) {
cm := &v1.ConfigMap{Data: map[string]string{key: value}}
kv, storeName := newKVStore(cm)

actualValue, err := kv.GetValue(storeName, key)
actualValue, err := kv.GetValue(context.TODO(), storeName, key)
assert.Nil(t, err)
assert.Equal(t, value, actualValue)
}
Expand All @@ -66,21 +67,23 @@ func TestSetValueStoreNotExist(t *testing.T) {
key := "key1"
value := "value1"

ctx := context.TODO()

// start with no stores created at all
kv, storeName := newKVStore()

// try to set a value on a store that doesn't exist. The store should be created automatically
// and there should be no error.
err := kv.SetValue(storeName, key, value)
err := kv.SetValue(ctx, storeName, key, value)
assert.Nil(t, err)

// try to get the value that was set, it should be as expected
actualValue, err := kv.GetValue(storeName, key)
actualValue, err := kv.GetValue(ctx, storeName, key)
assert.Nil(t, err)
assert.Equal(t, value, actualValue)

// get a value that doesn't exist
_, err = kv.GetValue(storeName, "key2")
_, err = kv.GetValue(ctx, storeName, "key2")
assert.NotNil(t, err)
assert.True(t, errors.IsNotFound(err))

Expand All @@ -90,25 +93,27 @@ func TestSetValueUpdate(t *testing.T) {
key := "key1"
value := "value1"

ctx := context.TODO()

// create a configmap (store) that has a key value pair in it
cm := &v1.ConfigMap{Data: map[string]string{key: value}}
kv, storeName := newKVStore(cm)

// try to set the already existing key to a new value, which should update it
newValue := "value2"
err := kv.SetValue(storeName, key, newValue)
err := kv.SetValue(ctx, storeName, key, newValue)
assert.Nil(t, err)

// try to get the key, this should return the updated value
actualValue, err := kv.GetValue(storeName, key)
actualValue, err := kv.GetValue(ctx, storeName, key)
assert.Nil(t, err)
assert.Equal(t, newValue, actualValue)
}

func TestGetStoreNotExist(t *testing.T) {
kv, storeName := newKVStore()

_, err := kv.GetStore(storeName)
_, err := kv.GetStore(context.TODO(), storeName)
assert.NotNil(t, err)
assert.True(t, errors.IsNotFound(err))
}
Expand All @@ -121,7 +126,7 @@ func TestGetStore(t *testing.T) {
cm := &v1.ConfigMap{Data: map[string]string{key: value}}
kv, storeName := newKVStore(cm)

actualStore, err := kv.GetStore(storeName)
actualStore, err := kv.GetStore(context.TODO(), storeName)
assert.Nil(t, err)
assert.Equal(t, map[string]string{key: value}, actualStore)
}
Expand All @@ -130,29 +135,31 @@ func TestClearStoreNotExist(t *testing.T) {
kv, storeName := newKVStore()

// clearing a store that does not exist is OK, should be no error
err := kv.ClearStore(storeName)
err := kv.ClearStore(context.TODO(), storeName)
assert.Nil(t, err)
}

func TestClearStore(t *testing.T) {
key := "key1"
value := "value1"

ctx := context.TODO()

// create a configmap (store) that has a key value pair in it
cm := &v1.ConfigMap{Data: map[string]string{key: value}}
kv, storeName := newKVStore(cm)

// verify the store/key/value exist
actualValue, err := kv.GetValue(storeName, key)
actualValue, err := kv.GetValue(ctx, storeName, key)
assert.Nil(t, err)
assert.Equal(t, value, actualValue)

// now clear the store
err = kv.ClearStore(storeName)
err = kv.ClearStore(ctx, storeName)
assert.Nil(t, err)

// getting the store should return an error for not exist
_, err = kv.GetStore(storeName)
_, err = kv.GetStore(ctx, storeName)
assert.NotNil(t, err)
assert.True(t, errors.IsNotFound(err))
}
Expand Down

0 comments on commit 200fb56

Please sign in to comment.