Skip to content

Commit

Permalink
chore: using new kit kafka client and docker resources (#4350)
Browse files Browse the repository at this point in the history
* chore: using new kit kafka client and docker resources

* chore: go mod tidy

* chore: goimports

* chore: sorting out imports

* chore: renaming setup for pulsar

* chore: removing unused structs

* chore: removing unused structs

* chore: ssh testdata

* chore: adding avro schemas

* chore: linting

* chore: updating kit

* chore: updating deps for snyk
  • Loading branch information
fracasula authored and atzoum committed Feb 12, 2024
1 parent 59ecda8 commit 4fa88cb
Show file tree
Hide file tree
Showing 90 changed files with 697 additions and 3,515 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.55.0
version: v1.55.2
args: -v
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ install-tools:
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
go install gotest.tools/gotestsum@v1.10.0
go install golang.org/x/tools/cmd/goimports@latest
bash ./scripts/install-golangci-lint.sh v1.55.0
bash ./scripts/install-golangci-lint.sh v1.55.2

.PHONY: lint
lint: fmt ## Run linters on all go files
Expand Down
4 changes: 2 additions & 2 deletions app/apphandlers/apphandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/services/db"
)
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestAppHandlerStartSequence(t *testing.T) {
func startJobsDBPostgresql(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
r, err := resource.SetupPostgres(pool, t)
r, err := postgres.Setup(pool, t)
require.NoError(t, err)
config.Set("DB.port", r.Port)
config.Set("DB.user", r.User)
Expand Down
18 changes: 8 additions & 10 deletions archiver/archiver_isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,24 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/testhelper/destination"

"golang.org/x/sync/errgroup"

"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
trand "github.com/rudderlabs/rudder-go-kit/testhelper/rand"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
Expand Down Expand Up @@ -66,7 +64,7 @@ func BenchmarkArchiverIsolation(b *testing.B) {
func dummyConfig(
numWorkspace,
numSourcesPerWorkspace int,
minio *resource.MinioResource,
minio *minio.Resource,
) map[string]backendconfig.ConfigT {
configMap := map[string]backendconfig.ConfigT{}
for i := 0; i < numWorkspace; i++ {
Expand Down Expand Up @@ -130,10 +128,10 @@ func ArchivalScenario(
cleanup := &testhelper.Cleanup{}
defer cleanup.Run()

postgresContainer, err := resource.SetupPostgres(pool, cleanup, postgres.WithShmSize(256*bytesize.MB))
postgresContainer, err := postgres.Setup(pool, cleanup, postgres.WithShmSize(256*bytesize.MB))
require.NoError(t, err, "failed to setup postgres container")

minioResource, err := resource.SetupMinio(pool, cleanup)
minioResource, err := minio.Setup(pool, cleanup)
require.NoError(t, err, "failed to setup minio container")
transformerContainer, err := destination.SetupTransformer(pool, cleanup)
require.NoError(t, err, "failed to setup transformer container")
Expand Down
13 changes: 6 additions & 7 deletions archiver/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
trand "github.com/rudderlabs/rudder-go-kit/testhelper/rand"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"
)

func TestJobsArchival(t *testing.T) {
var (
prefixByWorkspace = map[int]string{0: trand.String(10), 1: trand.String(10), 2: trand.String(10)}
minioResource []*resource.MinioResource
minioResource []*minio.Resource

// test data - contains jobs from 3 workspaces(1 - 1 source, 2 & 3 - 2 sources each)
seedJobsFileName = "testdata/MultiWorkspaceBackupJobs.json.gz"
Expand All @@ -47,7 +46,7 @@ func TestJobsArchival(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to create docker pool")

postgresResource, err := resource.SetupPostgres(pool, t, postgres.WithShmSize(256*bytesize.MB))
postgresResource, err := postgres.Setup(pool, t, postgres.WithShmSize(256*bytesize.MB))
require.NoError(t, err, "failed to setup postgres resource")
c := config.New()
c.Set("DB.name", postgresResource.Database)
Expand All @@ -60,9 +59,9 @@ func TestJobsArchival(t *testing.T) {
jd := jobsdb.NewForReadWrite("archiver", jobsdb.WithClearDB(false), jobsdb.WithConfig(c))
require.NoError(t, jd.Start())

minioResource = make([]*resource.MinioResource, uniqueWorkspaces)
minioResource = make([]*minio.Resource, uniqueWorkspaces)
for i := 0; i < uniqueWorkspaces; i++ {
minioResource[i], err = resource.SetupMinio(pool, t)
minioResource[i], err = minio.Setup(pool, t)
require.NoError(t, err, "failed to setup minio resource")
}

Expand Down
12 changes: 6 additions & 6 deletions backend-config/replay_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ type EventReplayConfigs map[string]*EventReplayConfig
// ApplyReplaySources reads the event replay configuration and adds replay sources to the config
// A replay source is a copy of the original source with a different ID and source definition
// This replay source contains as destinations replay destinations which are copies of the original destinations but with a different ID
func (config *ConfigT) ApplyReplaySources() {
if len(config.EventReplays) == 0 {
func (c *ConfigT) ApplyReplaySources() {
if len(c.EventReplays) == 0 {
return
}
originalSources := config.SourcesMap()
originalDestinations := config.DestinationsMap()
for _, replay := range config.EventReplays {
originalSources := c.SourcesMap()
originalDestinations := c.DestinationsMap()
for _, replay := range c.EventReplays {
sources := lo.OmitByValues(lo.MapValues(replay.Sources, func(value EventReplaySource, id string) *SourceT {
s, ok := originalSources[value.OriginalSourceID]
if !ok {
Expand Down Expand Up @@ -54,7 +54,7 @@ func (config *ConfigT) ApplyReplaySources() {
}

// add replay sources to config, only the ones that have destinations
config.Sources = append(config.Sources, lo.FilterMap(lo.Values(sources), func(source *SourceT, _ int) (SourceT, bool) {
c.Sources = append(c.Sources, lo.FilterMap(lo.Values(sources), func(source *SourceT, _ int) (SourceT, bool) {
return *source, len(source.Destinations) > 0
})...)
}
Expand Down
33 changes: 0 additions & 33 deletions backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,6 @@ func (s *SourceT) IsReplaySource() bool {
return s.OriginalID != ""
}

type WorkspaceRegulationT struct {
ID string
RegulationType string
WorkspaceID string
UserID string
}

type SourceRegulationT struct {
ID string
RegulationType string
WorkspaceID string
SourceID string
UserID string
}

type ConfigT struct {
EnableMetrics bool `json:"enableMetrics"`
WorkspaceID string `json:"workspaceId"`
Expand Down Expand Up @@ -171,24 +156,6 @@ type ConnectionFlags struct {
Services map[string]bool `json:"services"`
}

type WRegulationsT struct {
WorkspaceRegulations []WorkspaceRegulationT `json:"workspaceRegulations"`
Start int `json:"start"`
Limit int `json:"limit"`
Size int `json:"size"`
End bool `json:"end"`
Next int `json:"next"`
}

type SRegulationsT struct {
SourceRegulations []SourceRegulationT `json:"sourceRegulations"`
Start int `json:"start"`
Limit int `json:"limit"`
Size int `json:"size"`
End bool `json:"end"`
Next int `json:"next"`
}

type TransformationT struct {
VersionID string
ID string
Expand Down
16 changes: 6 additions & 10 deletions enterprise/reporting/error_index/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,19 @@ import (
"sync"
"time"

"github.com/rudderlabs/rudder-server/utils/misc"

kitsync "github.com/rudderlabs/rudder-go-kit/sync"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/utils/workerpool"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
)

type ErrorIndexReporter struct {
Expand Down
28 changes: 13 additions & 15 deletions enterprise/reporting/error_index/error_index_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/samber/lo"

"github.com/ory/dockertest/v3"

"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/jobsdb"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -215,7 +213,7 @@ func TestErrorIndexReporter(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
postgresContainer, err := resource.SetupPostgres(pool, t)
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

c := config.New()
Expand Down Expand Up @@ -282,7 +280,7 @@ func TestErrorIndexReporter(t *testing.T) {
})

t.Run("graceful shutdown", func(t *testing.T) {
postgresContainer, err := resource.SetupPostgres(pool, t)
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)

c := config.New()
Expand Down Expand Up @@ -316,9 +314,9 @@ func TestErrorIndexReporter(t *testing.T) {

t.Run("using 1 syncer", func(t *testing.T) {
t.Run("wrong transaction", func(t *testing.T) {
pg1, err := resource.SetupPostgres(pool, t)
pg1, err := postgres.Setup(pool, t)
require.NoError(t, err)
pg2, err := resource.SetupPostgres(pool, t)
pg2, err := postgres.Setup(pool, t)
require.NoError(t, err)

c := config.New()
Expand Down Expand Up @@ -377,11 +375,11 @@ func TestErrorIndexReporter(t *testing.T) {
})

t.Run("using 2 syncers", func(t *testing.T) {
pg1, err := resource.SetupPostgres(pool, t)
pg1, err := postgres.Setup(pool, t)
require.NoError(t, err)
pg2, err := resource.SetupPostgres(pool, t)
pg2, err := postgres.Setup(pool, t)
require.NoError(t, err)
pg3, err := resource.SetupPostgres(pool, t)
pg3, err := postgres.Setup(pool, t)
require.NoError(t, err)

c := config.New()
Expand Down Expand Up @@ -478,9 +476,9 @@ func TestErrorIndexReporter(t *testing.T) {
})

t.Run("sync data", func(t *testing.T) {
postgresContainer, err := resource.SetupPostgres(pool, t)
postgresContainer, err := postgres.Setup(pool, t)
require.NoError(t, err)
minioResource, err := resource.SetupMinio(pool, t)
minioResource, err := minio.Setup(pool, t)
require.NoError(t, err)

reports := []*types.PUReportedMetric{
Expand Down

0 comments on commit 4fa88cb

Please sign in to comment.