Skip to content

Commit

Permalink
feat: parse replay configuration from backend config (#3703)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Aug 10, 2023
1 parent c948122 commit 35f55e0
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 7 deletions.
2 changes: 2 additions & 0 deletions backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (nc *namespaceConfig) getFromAPI(ctx context.Context) (map[string]ConfigT,
))
}
workspace = &previousConfig
} else {
workspace.ApplyReplaySources()
}

// always set connection flags to true for hosted and multi-tenant warehouse service
Expand Down
80 changes: 80 additions & 0 deletions backend-config/replay_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package backendconfig

import (
"github.com/samber/lo"
)

type EventReplayConfigs map[string]*EventReplayConfig

// ApplyReplaySources reads the event replay configuration and adds replay sources to the config
// A replay source is a copy of the original source with a different ID and source definition
// This replay source contains as destinations replay destinations which are copies of the original destinations but with a different ID
func (config *ConfigT) ApplyReplaySources() {
if len(config.EventReplays) == 0 {
return
}
originalSources := config.SourcesMap()
originalDestinations := config.DestinationsMap()
for _, replay := range config.EventReplays {
sources := lo.OmitByValues(lo.MapValues(replay.Sources, func(value EventReplaySource, id string) *SourceT {
s, ok := originalSources[value.OriginalSourceID]
if !ok {
return nil
}
newSource := *s
newSource.ID = id
newSource.OriginalID = s.ID
newSource.WriteKey = id
newSource.EventSchemasEnabled = false
newSource.Config = lo.OmitByKeys(newSource.Config, []string{"eventUpload"}) // no event uploads for replay sources for now
newSource.Destinations = nil // destinations are added later
return &newSource
}), []*SourceT{nil})
destinations := lo.OmitByValues(lo.MapValues(replay.Destinations, func(value EventReplayDestination, id string) *DestinationT {
d, ok := originalDestinations[value.OriginalDestinationID]
if !ok {
return nil
}
newDestination := *d
newDestination.ID = id
return &newDestination
}), []*DestinationT{nil})

// add destinations to sources
for _, connection := range replay.Connections {
source, ok := sources[connection.SourceID]
if !ok {
continue
}
destination, ok := destinations[connection.DestinationID]
if !ok {
continue
}
source.Destinations = append(source.Destinations, *destination)
}

// add replay sources to config, only the ones that have destinations
config.Sources = append(config.Sources, lo.FilterMap(lo.Values(sources), func(source *SourceT, _ int) (SourceT, bool) {
return *source, len(source.Destinations) > 0
})...)
}
}

type EventReplayConfig struct {
Sources map[string]EventReplaySource `json:"sources"`
Destinations map[string]EventReplayDestination `json:"destinations"`
Connections []EventReplayConnection `json:"connections"`
}

type EventReplaySource struct {
OriginalSourceID string `json:"originalId"`
}

type EventReplayDestination struct {
OriginalDestinationID string `json:"originalId"`
}

type EventReplayConnection struct {
SourceID string `json:"sourceId"`
DestinationID string `json:"destinationId"`
}
134 changes: 134 additions & 0 deletions backend-config/replay_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package backendconfig

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestApplyReplayConfig(t *testing.T) {
t.Run("Valid Replay Config", func(t *testing.T) {
c := &ConfigT{
Sources: []SourceT{
{
ID: "s-1",
Config: map[string]interface{}{"eventUpload": true},
SourceDefinition: SourceDefinitionT{
ID: "sd-1",
Type: "type-1",
Category: "category-1",
},
Destinations: []DestinationT{
{
ID: "d-1",
},
},
},
},
EventReplays: map[string]EventReplayConfig{
"er-1": {
Sources: map[string]EventReplaySource{
"er-s-1": {
OriginalSourceID: "s-1",
},
},
Destinations: map[string]EventReplayDestination{
"er-d-1": {
OriginalDestinationID: "d-1",
},
},
Connections: []EventReplayConnection{
{
SourceID: "er-s-1",
DestinationID: "er-d-1",
},
},
},
},
}
c.ApplyReplaySources()

require.Len(t, c.Sources, 2)
require.Equal(t, "s-1", c.Sources[0].ID)
require.Equal(t, "er-s-1", c.Sources[1].ID)
require.Equal(t, "s-1", c.Sources[1].OriginalID)
require.Equal(t, "er-s-1", c.Sources[1].WriteKey)
require.Equal(t, map[string]interface{}{}, c.Sources[1].Config)
require.Len(t, c.Sources[1].Destinations, 1)
require.Equal(t, "er-d-1", c.Sources[1].Destinations[0].ID)
})

t.Run("Invalid Replay Config", func(t *testing.T) {
c := &ConfigT{
Sources: []SourceT{
{
ID: "s-1",
Config: map[string]interface{}{"eventUpload": true},
SourceDefinition: SourceDefinitionT{
ID: "sd-1",
Type: "type-1",
Category: "category-1",
},
Destinations: []DestinationT{
{
ID: "d-1",
},
},
},
},
EventReplays: map[string]EventReplayConfig{
"er-1": {
Sources: map[string]EventReplaySource{
"er-s-1": {
OriginalSourceID: "s-1",
},
"er-s-2": {
OriginalSourceID: "s-2",
},
},
Destinations: map[string]EventReplayDestination{
"er-d-1": {
OriginalDestinationID: "d-1",
},
"er-d-2": {
OriginalDestinationID: "d-2",
},
},
Connections: []EventReplayConnection{
{
SourceID: "er-s-1",
DestinationID: "er-d-1",
},
{
SourceID: "er-s-1",
DestinationID: "er-d-2",
},
{
SourceID: "er-s-2",
DestinationID: "er-d-1",
},
{
SourceID: "er-s-2",
DestinationID: "er-d-2",
},
{
SourceID: "er-s-3",
DestinationID: "er-d-3",
},
},
},
},
}

c.ApplyReplaySources()

require.Len(t, c.Sources, 2)
require.Equal(t, "s-1", c.Sources[0].ID)
require.Equal(t, "er-s-1", c.Sources[1].ID)
require.Equal(t, "s-1", c.Sources[1].OriginalID)
require.Equal(t, "er-s-1", c.Sources[1].WriteKey)
require.Equal(t, map[string]interface{}{}, c.Sources[1].Config)
require.Len(t, c.Sources[1].Destinations, 1)
require.Equal(t, "er-d-1", c.Sources[1].Destinations[0].ID)
})
}
1 change: 1 addition & 0 deletions backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (wc *singleWorkspaceConfig) getFromAPI(ctx context.Context) (map[string]Con
pkgLogger.Errorf("Error while parsing request: %v", err)
return config, err
}
sourcesJSON.ApplyReplaySources()
workspaceID := sourcesJSON.WorkspaceID

wc.workspaceIDOnce.Do(func() {
Expand Down
34 changes: 27 additions & 7 deletions backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type DestinationT struct {

type SourceT struct {
ID string
OriginalID string
Name string
SourceDefinition SourceDefinitionT
Config map[string]interface{}
Expand Down Expand Up @@ -87,13 +88,32 @@ type SourceRegulationT struct {
}

type ConfigT struct {
EnableMetrics bool `json:"enableMetrics"`
WorkspaceID string `json:"workspaceId"`
Sources []SourceT `json:"sources"`
Libraries LibrariesT `json:"libraries"`
ConnectionFlags ConnectionFlags `json:"flags"`
Settings Settings `json:"settings"`
UpdatedAt time.Time `json:"updatedAt"`
EnableMetrics bool `json:"enableMetrics"`
WorkspaceID string `json:"workspaceId"`
Sources []SourceT `json:"sources"`
EventReplays map[string]EventReplayConfig `json:"eventReplays"`
Libraries LibrariesT `json:"libraries"`
ConnectionFlags ConnectionFlags `json:"flags"`
Settings Settings `json:"settings"`
UpdatedAt time.Time `json:"updatedAt"`
}

func (c *ConfigT) SourcesMap() map[string]*SourceT {
sourcesMap := make(map[string]*SourceT)
for _, source := range c.Sources {
sourcesMap[source.ID] = &source
}
return sourcesMap
}

func (c *ConfigT) DestinationsMap() map[string]*DestinationT {
destinationsMap := make(map[string]*DestinationT)
for _, source := range c.Sources {
for _, destination := range source.Destinations {
destinationsMap[destination.ID] = &destination
}
}
return destinationsMap
}

type Settings struct {
Expand Down

0 comments on commit 35f55e0

Please sign in to comment.