From fd2b3412295e2fbbbeef46e1d0bfcf66b13f5c32 Mon Sep 17 00:00:00 2001 From: Debanjan Ganguly Date: Wed, 30 Aug 2023 18:28:07 +0530 Subject: [PATCH] feat: support `HSET` in redis (#3768) --- .../kvstoremanager/mock_kvstoremanager.go | 151 ++++++++++++++++++ .../customdestinationmanager.go | 12 +- .../customdestinationmanager_test.go | 53 ++++++ services/kvstoremanager/kvstoremanager.go | 25 +++ services/kvstoremanager/redis.go | 9 ++ 5 files changed, 248 insertions(+), 2 deletions(-) create mode 100644 mocks/services/kvstoremanager/mock_kvstoremanager.go diff --git a/mocks/services/kvstoremanager/mock_kvstoremanager.go b/mocks/services/kvstoremanager/mock_kvstoremanager.go new file mode 100644 index 0000000000..1fe7e5d292 --- /dev/null +++ b/mocks/services/kvstoremanager/mock_kvstoremanager.go @@ -0,0 +1,151 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./services/kvstoremanager/kvstoremanager.go + +// Package mock_kvstoremanager is a generated GoMock package. +package mock_kvstoremanager + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockKVStoreManager is a mock of KVStoreManager interface. +type MockKVStoreManager struct { + ctrl *gomock.Controller + recorder *MockKVStoreManagerMockRecorder +} + +// MockKVStoreManagerMockRecorder is the mock recorder for MockKVStoreManager. +type MockKVStoreManagerMockRecorder struct { + mock *MockKVStoreManager +} + +// NewMockKVStoreManager creates a new mock instance. +func NewMockKVStoreManager(ctrl *gomock.Controller) *MockKVStoreManager { + mock := &MockKVStoreManager{ctrl: ctrl} + mock.recorder = &MockKVStoreManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockKVStoreManager) EXPECT() *MockKVStoreManagerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockKVStoreManager) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockKVStoreManagerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockKVStoreManager)(nil).Close)) +} + +// Connect mocks base method. +func (m *MockKVStoreManager) Connect() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Connect") +} + +// Connect indicates an expected call of Connect. +func (mr *MockKVStoreManagerMockRecorder) Connect() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockKVStoreManager)(nil).Connect)) +} + +// DeleteKey mocks base method. +func (m *MockKVStoreManager) DeleteKey(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteKey", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteKey indicates an expected call of DeleteKey. +func (mr *MockKVStoreManagerMockRecorder) DeleteKey(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteKey", reflect.TypeOf((*MockKVStoreManager)(nil).DeleteKey), key) +} + +// HGetAll mocks base method. +func (m *MockKVStoreManager) HGetAll(key string) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HGetAll", key) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HGetAll indicates an expected call of HGetAll. +func (mr *MockKVStoreManagerMockRecorder) HGetAll(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HGetAll", reflect.TypeOf((*MockKVStoreManager)(nil).HGetAll), key) +} + +// HMGet mocks base method. +func (m *MockKVStoreManager) HMGet(key string, fields ...string) ([]interface{}, error) { + m.ctrl.T.Helper() + varargs := []interface{}{key} + for _, a := range fields { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "HMGet", varargs...) + ret0, _ := ret[0].([]interface{}) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HMGet indicates an expected call of HMGet. +func (mr *MockKVStoreManagerMockRecorder) HMGet(key interface{}, fields ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{key}, fields...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HMGet", reflect.TypeOf((*MockKVStoreManager)(nil).HMGet), varargs...) +} + +// HMSet mocks base method. +func (m *MockKVStoreManager) HMSet(key string, fields map[string]interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HMSet", key, fields) + ret0, _ := ret[0].(error) + return ret0 +} + +// HMSet indicates an expected call of HMSet. +func (mr *MockKVStoreManagerMockRecorder) HMSet(key, fields interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HMSet", reflect.TypeOf((*MockKVStoreManager)(nil).HMSet), key, fields) +} + +// HSet mocks base method. +func (m *MockKVStoreManager) HSet(key, field string, value interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HSet", key, field, value) + ret0, _ := ret[0].(error) + return ret0 +} + +// HSet indicates an expected call of HSet. +func (mr *MockKVStoreManagerMockRecorder) HSet(key, field, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HSet", reflect.TypeOf((*MockKVStoreManager)(nil).HSet), key, field, value) +} + +// StatusCode mocks base method. +func (m *MockKVStoreManager) StatusCode(err error) int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StatusCode", err) + ret0, _ := ret[0].(int) + return ret0 +} + +// StatusCode indicates an expected call of StatusCode. +func (mr *MockKVStoreManagerMockRecorder) StatusCode(err interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatusCode", reflect.TypeOf((*MockKVStoreManager)(nil).StatusCode), err) +} diff --git a/router/customdestinationmanager/customdestinationmanager.go b/router/customdestinationmanager/customdestinationmanager.go index 209a46ac79..72c5d8e559 100644 --- a/router/customdestinationmanager/customdestinationmanager.go +++ b/router/customdestinationmanager/customdestinationmanager.go @@ -131,9 +131,17 @@ func (customManager *CustomManagerT) send(jsonData json.RawMessage, client inter streamProducer, _ := client.(common.StreamProducer) statusCode, _, respBody = streamProducer.Produce(jsonData, config) case KV: + var err error kvManager, _ := client.(kvstoremanager.KVStoreManager) - key, fields := kvstoremanager.EventToKeyValue(jsonData) - err := kvManager.HMSet(key, fields) + // if the event supports HSET operation then use HSET + if kvstoremanager.IsHSETCompatibleEvent(jsonData) { + hash, key, value := kvstoremanager.ExtractHashKeyValueFromEvent(jsonData) + err = kvManager.HSet(hash, key, value) + } else { + key, fields := kvstoremanager.EventToKeyValue(jsonData) + err = kvManager.HMSet(key, fields) + } + statusCode = kvManager.StatusCode(err) if err != nil { respBody = err.Error() diff --git a/router/customdestinationmanager/customdestinationmanager_test.go b/router/customdestinationmanager/customdestinationmanager_test.go index 69267c54b0..94b5601873 100644 --- a/router/customdestinationmanager/customdestinationmanager_test.go +++ b/router/customdestinationmanager/customdestinationmanager_test.go @@ -14,6 +14,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + mock_kvstoremanager "github.com/rudderlabs/rudder-server/mocks/services/kvstoremanager" mock_streammanager "github.com/rudderlabs/rudder-server/mocks/services/streammanager/common" "github.com/rudderlabs/rudder-server/services/streammanager/kafka" "github.com/rudderlabs/rudder-server/services/streammanager/lambda" @@ -144,3 +145,55 @@ func TestSendDataWithStreamDestination(t *testing.T) { mockProducer.EXPECT().Produce(event, someDestination.Config).Times(1) customManager.SendData(event, someDestination.ID) } + +func TestKVManagerInvocations(t *testing.T) { + initCustomerManager() + customManager := New("REDIS", Opts{}).(*CustomManagerT) + someDestination := backendconfig.DestinationT{ + ID: "someDestinationID1", + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Name: "REDIS", + }, + } + err := customManager.onNewDestination(someDestination) + assert.Nil(t, err) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + t.Run("HSET", func(t *testing.T) { + event := json.RawMessage(`{ + "message": { + "key": "someKey", + "value" : "someValue", + "hash": "someHash" + } + } + `) + ctrl := gomock.NewController(t) + mockKVStoreManager := mock_kvstoremanager.NewMockKVStoreManager(ctrl) + mockKVStoreManager.EXPECT().HSet("someHash", "someKey", "someValue").Times(1) + mockKVStoreManager.EXPECT().StatusCode(nil).Times(1) + customManager.send(event, mockKVStoreManager, someDestination.Config) + }) + + t.Run("HMSET", func(t *testing.T) { + event := json.RawMessage(`{ + "message": { + "key": "someKey", + "fields" : { + "field1": "value1", + "field2": "value2" + } + } + } + `) + mockKVStoreManager := mock_kvstoremanager.NewMockKVStoreManager(ctrl) + mockKVStoreManager.EXPECT().HMSet("someKey", map[string]interface{}{ + "field1": "value1", + "field2": "value2", + }).Times(1) + mockKVStoreManager.EXPECT().StatusCode(nil).Times(1) + customManager.send(event, mockKVStoreManager, someDestination.Config) + }) +} diff --git a/services/kvstoremanager/kvstoremanager.go b/services/kvstoremanager/kvstoremanager.go index f2c1afff69..b008bb87dc 100644 --- a/services/kvstoremanager/kvstoremanager.go +++ b/services/kvstoremanager/kvstoremanager.go @@ -10,6 +10,7 @@ type KVStoreManager interface { Connect() Close() error HMSet(key string, fields map[string]interface{}) error + HSet(key, field string, value interface{}) error StatusCode(err error) int DeleteKey(key string) (err error) HMGet(key string, fields ...string) (result []interface{}, err error) @@ -21,6 +22,12 @@ type SettingsT struct { Config map[string]interface{} } +const ( + hashPath = "message.hash" + keyPath = "message.key" + valuePath = "message.value" +) + func New(provider string, config map[string]interface{}) (m KVStoreManager) { return newManager(SettingsT{ Provider: provider, @@ -49,3 +56,21 @@ func EventToKeyValue(jsonData json.RawMessage) (string, map[string]interface{}) return key, fields } + +// IsHSETCompatibleEvent identifies if the event supports HSET operation +// To support HSET, the event must have the following fields: +// - message.key +// - message.value +// - message.hash +// It doesn't account for the value of the fields. +func IsHSETCompatibleEvent(jsonData json.RawMessage) bool { + return gjson.GetBytes(jsonData, hashPath).Exists() && gjson.GetBytes(jsonData, keyPath).Exists() && gjson.GetBytes(jsonData, valuePath).Exists() +} + +func ExtractHashKeyValueFromEvent(jsonData json.RawMessage) (hash, key, value string) { + hash = gjson.GetBytes(jsonData, hashPath).String() + key = gjson.GetBytes(jsonData, keyPath).String() + value = gjson.GetBytes(jsonData, valuePath).String() + + return hash, key, value +} diff --git a/services/kvstoremanager/redis.go b/services/kvstoremanager/redis.go index 199a4dfce4..ab6bb576c7 100644 --- a/services/kvstoremanager/redis.go +++ b/services/kvstoremanager/redis.go @@ -135,3 +135,12 @@ func (m *redisManagerT) HGetAll(key string) (result map[string]string, err error } return result, err } + +func (m *redisManagerT) HSet(hash, key string, value interface{}) (err error) { + if m.clusterMode { + _, err = m.clusterClient.HSet(hash, key, value).Result() + } else { + _, err = m.client.HSet(hash, key, value).Result() + } + return err +}