diff --git a/backend-config/namespace_config.go b/backend-config/namespace_config.go index 4f740355a8..8bc5ade931 100644 --- a/backend-config/namespace_config.go +++ b/backend-config/namespace_config.go @@ -145,6 +145,8 @@ func (nc *namespaceConfig) getFromAPI(ctx context.Context) (map[string]ConfigT, )) } workspace = &previousConfig + } else { + workspace.ApplyReplaySources() } // always set connection flags to true for hosted and multi-tenant warehouse service diff --git a/backend-config/replay_types.go b/backend-config/replay_types.go new file mode 100644 index 0000000000..558cbebef9 --- /dev/null +++ b/backend-config/replay_types.go @@ -0,0 +1,80 @@ +package backendconfig + +import ( + "github.com/samber/lo" +) + +type EventReplayConfigs map[string]*EventReplayConfig + +// ApplyReplaySources reads the event replay configuration and adds replay sources to the config +// A replay source is a copy of the original source with a different ID and source definition +// This replay source contains as destinations replay destinations which are copies of the original destinations but with a different ID +func (config *ConfigT) ApplyReplaySources() { + if len(config.EventReplays) == 0 { + return + } + originalSources := config.SourcesMap() + originalDestinations := config.DestinationsMap() + for _, replay := range config.EventReplays { + sources := lo.OmitByValues(lo.MapValues(replay.Sources, func(value EventReplaySource, id string) *SourceT { + s, ok := originalSources[value.OriginalSourceID] + if !ok { + return nil + } + newSource := *s + newSource.ID = id + newSource.OriginalID = s.ID + newSource.WriteKey = id + newSource.EventSchemasEnabled = false + newSource.Config = lo.OmitByKeys(newSource.Config, []string{"eventUpload"}) // no event uploads for replay sources for now + newSource.Destinations = nil // destinations are added later + return &newSource + }), []*SourceT{nil}) + destinations := lo.OmitByValues(lo.MapValues(replay.Destinations, func(value EventReplayDestination, id string) *DestinationT { + d, ok := originalDestinations[value.OriginalDestinationID] + if !ok { + return nil + } + newDestination := *d + newDestination.ID = id + return &newDestination + }), []*DestinationT{nil}) + + // add destinations to sources + for _, connection := range replay.Connections { + source, ok := sources[connection.SourceID] + if !ok { + continue + } + destination, ok := destinations[connection.DestinationID] + if !ok { + continue + } + source.Destinations = append(source.Destinations, *destination) + } + + // add replay sources to config, only the ones that have destinations + config.Sources = append(config.Sources, lo.FilterMap(lo.Values(sources), func(source *SourceT, _ int) (SourceT, bool) { + return *source, len(source.Destinations) > 0 + })...) + } +} + +type EventReplayConfig struct { + Sources map[string]EventReplaySource `json:"sources"` + Destinations map[string]EventReplayDestination `json:"destinations"` + Connections []EventReplayConnection `json:"connections"` +} + +type EventReplaySource struct { + OriginalSourceID string `json:"originalId"` +} + +type EventReplayDestination struct { + OriginalDestinationID string `json:"originalId"` +} + +type EventReplayConnection struct { + SourceID string `json:"sourceId"` + DestinationID string `json:"destinationId"` +} diff --git a/backend-config/replay_types_test.go b/backend-config/replay_types_test.go new file mode 100644 index 0000000000..92040e7286 --- /dev/null +++ b/backend-config/replay_types_test.go @@ -0,0 +1,134 @@ +package backendconfig + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestApplyReplayConfig(t *testing.T) { + t.Run("Valid Replay Config", func(t *testing.T) { + c := &ConfigT{ + Sources: []SourceT{ + { + ID: "s-1", + Config: map[string]interface{}{"eventUpload": true}, + SourceDefinition: SourceDefinitionT{ + ID: "sd-1", + Type: "type-1", + Category: "category-1", + }, + Destinations: []DestinationT{ + { + ID: "d-1", + }, + }, + }, + }, + EventReplays: map[string]EventReplayConfig{ + "er-1": { + Sources: map[string]EventReplaySource{ + "er-s-1": { + OriginalSourceID: "s-1", + }, + }, + Destinations: map[string]EventReplayDestination{ + "er-d-1": { + OriginalDestinationID: "d-1", + }, + }, + Connections: []EventReplayConnection{ + { + SourceID: "er-s-1", + DestinationID: "er-d-1", + }, + }, + }, + }, + } + c.ApplyReplaySources() + + require.Len(t, c.Sources, 2) + require.Equal(t, "s-1", c.Sources[0].ID) + require.Equal(t, "er-s-1", c.Sources[1].ID) + require.Equal(t, "s-1", c.Sources[1].OriginalID) + require.Equal(t, "er-s-1", c.Sources[1].WriteKey) + require.Equal(t, map[string]interface{}{}, c.Sources[1].Config) + require.Len(t, c.Sources[1].Destinations, 1) + require.Equal(t, "er-d-1", c.Sources[1].Destinations[0].ID) + }) + + t.Run("Invalid Replay Config", func(t *testing.T) { + c := &ConfigT{ + Sources: []SourceT{ + { + ID: "s-1", + Config: map[string]interface{}{"eventUpload": true}, + SourceDefinition: SourceDefinitionT{ + ID: "sd-1", + Type: "type-1", + Category: "category-1", + }, + Destinations: []DestinationT{ + { + ID: "d-1", + }, + }, + }, + }, + EventReplays: map[string]EventReplayConfig{ + "er-1": { + Sources: map[string]EventReplaySource{ + "er-s-1": { + OriginalSourceID: "s-1", + }, + "er-s-2": { + OriginalSourceID: "s-2", + }, + }, + Destinations: map[string]EventReplayDestination{ + "er-d-1": { + OriginalDestinationID: "d-1", + }, + "er-d-2": { + OriginalDestinationID: "d-2", + }, + }, + Connections: []EventReplayConnection{ + { + SourceID: "er-s-1", + DestinationID: "er-d-1", + }, + { + SourceID: "er-s-1", + DestinationID: "er-d-2", + }, + { + SourceID: "er-s-2", + DestinationID: "er-d-1", + }, + { + SourceID: "er-s-2", + DestinationID: "er-d-2", + }, + { + SourceID: "er-s-3", + DestinationID: "er-d-3", + }, + }, + }, + }, + } + + c.ApplyReplaySources() + + require.Len(t, c.Sources, 2) + require.Equal(t, "s-1", c.Sources[0].ID) + require.Equal(t, "er-s-1", c.Sources[1].ID) + require.Equal(t, "s-1", c.Sources[1].OriginalID) + require.Equal(t, "er-s-1", c.Sources[1].WriteKey) + require.Equal(t, map[string]interface{}{}, c.Sources[1].Config) + require.Len(t, c.Sources[1].Destinations, 1) + require.Equal(t, "er-d-1", c.Sources[1].Destinations[0].ID) + }) +} diff --git a/backend-config/single_workspace.go b/backend-config/single_workspace.go index a8825439cb..02d0adc3cc 100644 --- a/backend-config/single_workspace.go +++ b/backend-config/single_workspace.go @@ -98,6 +98,7 @@ func (wc *singleWorkspaceConfig) getFromAPI(ctx context.Context) (map[string]Con pkgLogger.Errorf("Error while parsing request: %v", err) return config, err } + sourcesJSON.ApplyReplaySources() workspaceID := sourcesJSON.WorkspaceID wc.workspaceIDOnce.Do(func() { diff --git a/backend-config/types.go b/backend-config/types.go index 0da913ca65..ffd00c2244 100644 --- a/backend-config/types.go +++ b/backend-config/types.go @@ -59,6 +59,7 @@ type DestinationT struct { type SourceT struct { ID string + OriginalID string Name string SourceDefinition SourceDefinitionT Config map[string]interface{} @@ -87,13 +88,32 @@ type SourceRegulationT struct { } type ConfigT struct { - EnableMetrics bool `json:"enableMetrics"` - WorkspaceID string `json:"workspaceId"` - Sources []SourceT `json:"sources"` - Libraries LibrariesT `json:"libraries"` - ConnectionFlags ConnectionFlags `json:"flags"` - Settings Settings `json:"settings"` - UpdatedAt time.Time `json:"updatedAt"` + EnableMetrics bool `json:"enableMetrics"` + WorkspaceID string `json:"workspaceId"` + Sources []SourceT `json:"sources"` + EventReplays map[string]EventReplayConfig `json:"eventReplays"` + Libraries LibrariesT `json:"libraries"` + ConnectionFlags ConnectionFlags `json:"flags"` + Settings Settings `json:"settings"` + UpdatedAt time.Time `json:"updatedAt"` +} + +func (c *ConfigT) SourcesMap() map[string]*SourceT { + sourcesMap := make(map[string]*SourceT) + for _, source := range c.Sources { + sourcesMap[source.ID] = &source + } + return sourcesMap +} + +func (c *ConfigT) DestinationsMap() map[string]*DestinationT { + destinationsMap := make(map[string]*DestinationT) + for _, source := range c.Sources { + for _, destination := range source.Destinations { + destinationsMap[destination.ID] = &destination + } + } + return destinationsMap } type Settings struct {