Skip to content

Commit

Permalink
feat: support HSET in redis (#3768)
Browse files Browse the repository at this point in the history
  • Loading branch information
debanjan97 committed Aug 30, 2023
1 parent ebc918e commit fd2b341
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 2 deletions.
151 changes: 151 additions & 0 deletions mocks/services/kvstoremanager/mock_kvstoremanager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions router/customdestinationmanager/customdestinationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,17 @@ func (customManager *CustomManagerT) send(jsonData json.RawMessage, client inter
streamProducer, _ := client.(common.StreamProducer)
statusCode, _, respBody = streamProducer.Produce(jsonData, config)
case KV:
var err error
kvManager, _ := client.(kvstoremanager.KVStoreManager)
key, fields := kvstoremanager.EventToKeyValue(jsonData)
err := kvManager.HMSet(key, fields)
// if the event supports HSET operation then use HSET
if kvstoremanager.IsHSETCompatibleEvent(jsonData) {
hash, key, value := kvstoremanager.ExtractHashKeyValueFromEvent(jsonData)
err = kvManager.HSet(hash, key, value)
} else {
key, fields := kvstoremanager.EventToKeyValue(jsonData)
err = kvManager.HMSet(key, fields)
}

statusCode = kvManager.StatusCode(err)
if err != nil {
respBody = err.Error()
Expand Down
53 changes: 53 additions & 0 deletions router/customdestinationmanager/customdestinationmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
mock_kvstoremanager "github.com/rudderlabs/rudder-server/mocks/services/kvstoremanager"
mock_streammanager "github.com/rudderlabs/rudder-server/mocks/services/streammanager/common"
"github.com/rudderlabs/rudder-server/services/streammanager/kafka"
"github.com/rudderlabs/rudder-server/services/streammanager/lambda"
Expand Down Expand Up @@ -144,3 +145,55 @@ func TestSendDataWithStreamDestination(t *testing.T) {
mockProducer.EXPECT().Produce(event, someDestination.Config).Times(1)
customManager.SendData(event, someDestination.ID)
}

func TestKVManagerInvocations(t *testing.T) {
initCustomerManager()
customManager := New("REDIS", Opts{}).(*CustomManagerT)
someDestination := backendconfig.DestinationT{
ID: "someDestinationID1",
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: "REDIS",
},
}
err := customManager.onNewDestination(someDestination)
assert.Nil(t, err)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

t.Run("HSET", func(t *testing.T) {
event := json.RawMessage(`{
"message": {
"key": "someKey",
"value" : "someValue",
"hash": "someHash"
}
}
`)
ctrl := gomock.NewController(t)
mockKVStoreManager := mock_kvstoremanager.NewMockKVStoreManager(ctrl)
mockKVStoreManager.EXPECT().HSet("someHash", "someKey", "someValue").Times(1)
mockKVStoreManager.EXPECT().StatusCode(nil).Times(1)
customManager.send(event, mockKVStoreManager, someDestination.Config)
})

t.Run("HMSET", func(t *testing.T) {
event := json.RawMessage(`{
"message": {
"key": "someKey",
"fields" : {
"field1": "value1",
"field2": "value2"
}
}
}
`)
mockKVStoreManager := mock_kvstoremanager.NewMockKVStoreManager(ctrl)
mockKVStoreManager.EXPECT().HMSet("someKey", map[string]interface{}{
"field1": "value1",
"field2": "value2",
}).Times(1)
mockKVStoreManager.EXPECT().StatusCode(nil).Times(1)
customManager.send(event, mockKVStoreManager, someDestination.Config)
})
}
25 changes: 25 additions & 0 deletions services/kvstoremanager/kvstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type KVStoreManager interface {
Connect()
Close() error
HMSet(key string, fields map[string]interface{}) error
HSet(key, field string, value interface{}) error
StatusCode(err error) int
DeleteKey(key string) (err error)
HMGet(key string, fields ...string) (result []interface{}, err error)
Expand All @@ -21,6 +22,12 @@ type SettingsT struct {
Config map[string]interface{}
}

const (
hashPath = "message.hash"
keyPath = "message.key"
valuePath = "message.value"
)

func New(provider string, config map[string]interface{}) (m KVStoreManager) {
return newManager(SettingsT{
Provider: provider,
Expand Down Expand Up @@ -49,3 +56,21 @@ func EventToKeyValue(jsonData json.RawMessage) (string, map[string]interface{})

return key, fields
}

// IsHSETCompatibleEvent identifies if the event supports HSET operation
// To support HSET, the event must have the following fields:
// - message.key
// - message.value
// - message.hash
// It doesn't account for the value of the fields.
func IsHSETCompatibleEvent(jsonData json.RawMessage) bool {
return gjson.GetBytes(jsonData, hashPath).Exists() && gjson.GetBytes(jsonData, keyPath).Exists() && gjson.GetBytes(jsonData, valuePath).Exists()
}

func ExtractHashKeyValueFromEvent(jsonData json.RawMessage) (hash, key, value string) {
hash = gjson.GetBytes(jsonData, hashPath).String()
key = gjson.GetBytes(jsonData, keyPath).String()
value = gjson.GetBytes(jsonData, valuePath).String()

return hash, key, value
}
9 changes: 9 additions & 0 deletions services/kvstoremanager/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,12 @@ func (m *redisManagerT) HGetAll(key string) (result map[string]string, err error
}
return result, err
}

func (m *redisManagerT) HSet(hash, key string, value interface{}) (err error) {
if m.clusterMode {
_, err = m.clusterClient.HSet(hash, key, value).Result()
} else {
_, err = m.client.HSet(hash, key, value).Result()
}
return err
}

0 comments on commit fd2b341

Please sign in to comment.