Skip to content

Commit

Permalink
feat: adaptations of additional helpers packages from go-kit (#4562)
Browse files Browse the repository at this point in the history
* feat: additional of some helpers fot go-kit

* chore: review comments

* chore: upgrade go-kit
  • Loading branch information
achettyiitr committed Apr 10, 2024
1 parent d7e9a9e commit 3deadee
Show file tree
Hide file tree
Showing 87 changed files with 439 additions and 736 deletions.
1 change: 1 addition & 0 deletions app/app.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
configenv "github.com/rudderlabs/rudder-server/enterprise/config-env"
"github.com/rudderlabs/rudder-server/enterprise/replay"
Expand Down
9 changes: 5 additions & 4 deletions app/apphandlers/embeddedAppHandler.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/archiver"
Expand Down Expand Up @@ -48,10 +49,10 @@ type embeddedApp struct {
log logger.Logger
config struct {
enableReplay bool
processorDSLimit misc.ValueLoader[int]
routerDSLimit misc.ValueLoader[int]
batchRouterDSLimit misc.ValueLoader[int]
gatewayDSLimit misc.ValueLoader[int]
processorDSLimit config.ValueLoader[int]
routerDSLimit config.ValueLoader[int]
batchRouterDSLimit config.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
}
}

Expand Down
3 changes: 2 additions & 1 deletion app/apphandlers/gatewayAppHandler.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand All @@ -33,7 +34,7 @@ type gatewayApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
gatewayDSLimit misc.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
}
}

Expand Down
8 changes: 4 additions & 4 deletions app/apphandlers/processorAppHandler.go
Expand Up @@ -49,10 +49,10 @@ type processorApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
processorDSLimit misc.ValueLoader[int]
routerDSLimit misc.ValueLoader[int]
batchRouterDSLimit misc.ValueLoader[int]
gatewayDSLimit misc.ValueLoader[int]
processorDSLimit config.ValueLoader[int]
routerDSLimit config.ValueLoader[int]
batchRouterDSLimit config.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
http struct {
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
Expand Down
1 change: 1 addition & 0 deletions app/cluster/integration_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app/cluster"
arc "github.com/rudderlabs/rudder-server/archiver"
Expand Down
4 changes: 2 additions & 2 deletions backend-config/backend-config.go
Expand Up @@ -19,11 +19,11 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"github.com/rudderlabs/rudder-server/utils/types"
Expand All @@ -34,7 +34,7 @@ var (
// environment variables
configBackendURL string
cpRouterURL string
pollInterval misc.ValueLoader[time.Duration]
pollInterval config.ValueLoader[time.Duration]
configJSONPath string
configFromFile bool
configEnvReplacementEnabled bool
Expand Down
4 changes: 2 additions & 2 deletions backend-config/backend_config_test.go
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

adminpkg "github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestWaitForConfig(t *testing.T) {
defer ctrl.Finish()

pkgLogger = logger.NOP
pollInterval = misc.SingleValueLoader(time.Millisecond)
pollInterval = config.SingleValueLoader(time.Millisecond)
bc := &backendConfigImpl{initialized: false}

var done int32
Expand Down
4 changes: 3 additions & 1 deletion cmd/backupfilemigrator/file_migrator.go
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/stringify"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

kitconfig "github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -31,6 +32,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/filemanager"
kitlogger "github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/enterprise/replay/utils"
)

Expand Down Expand Up @@ -116,7 +118,7 @@ func (m *fileMigrator) convertToNewFormat(lineBytes []byte, createdAt time.Time)
j.UserID = userID
j.EventPayload = payloadBytes
j.CreatedAt = createdAt
j.MessageID = misc.GetStringifiedData(singleEvent["messageId"])
j.MessageID = stringify.Any(singleEvent["messageId"])
listOfNewEvents = append(listOfNewEvents, j)
}
return listOfNewEvents, nil
Expand Down
3 changes: 1 addition & 2 deletions enterprise/reporting/error_extractor.go
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

const (
Expand Down Expand Up @@ -69,7 +68,7 @@ var lowercasedDeprecationKeywords = lo.MapKeys(deprecationKeywords, func(_ int,
type ExtractorHandle struct {
log logger.Logger
ErrorMessageKeys []string // the keys where in we may have error message
versionDeprecationThresholdScore misc.ValueLoader[int]
versionDeprecationThresholdScore config.ValueLoader[int]
}

func NewErrorDetailExtractor(log logger.Logger) *ExtractorHandle {
Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/error_index/error_index_reporting.go
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
Expand All @@ -43,7 +43,7 @@ type ErrorIndexReporter struct {
update kitsync.Limiter
}

concurrency misc.ValueLoader[int]
concurrency config.ValueLoader[int]

statsFactory stats.Stats
stats struct {
Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/error_index/worker.go
Expand Up @@ -55,11 +55,11 @@ type worker struct {
lastUploadTime time.Time

config struct {
parquetParallelWriters, parquetRowGroupSize, parquetPageSize misc.ValueLoader[int64]
parquetParallelWriters, parquetRowGroupSize, parquetPageSize config.ValueLoader[int64]
bucketName, instanceID string
payloadLimit, eventsLimit misc.ValueLoader[int64]
payloadLimit, eventsLimit config.ValueLoader[int64]
minWorkerSleep, uploadFrequency, jobsDBCommandTimeout time.Duration
jobsDBMaxRetries misc.ValueLoader[int]
jobsDBMaxRetries config.ValueLoader[int]
}
}

Expand Down
20 changes: 10 additions & 10 deletions enterprise/reporting/error_index/worker_test.go
Expand Up @@ -36,8 +36,8 @@ import (
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -84,9 +84,9 @@ func TestWorkerWriter(t *testing.T) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))

w := worker{}
w.config.parquetRowGroupSize = misc.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8))
w.config.parquetRowGroupSize = config.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = config.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = config.SingleValueLoader(int64(8))

require.NoError(t, w.encodeToParquet(buf, toEncode))

Expand Down Expand Up @@ -125,9 +125,9 @@ func TestWorkerWriter(t *testing.T) {
require.NoError(t, err)

w := worker{}
w.config.parquetRowGroupSize = misc.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8))
w.config.parquetRowGroupSize = config.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = config.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = config.SingleValueLoader(int64(8))

require.NoError(t, w.encodeToParquet(fw, toEncode))

Expand Down Expand Up @@ -713,9 +713,9 @@ func BenchmarkFileFormat(b *testing.B) {
}

w := worker{}
w.config.parquetRowGroupSize = misc.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8))
w.config.parquetRowGroupSize = config.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = config.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = config.SingleValueLoader(int64(8))

buf := bytes.NewBuffer(make([]byte, 0, 1024))

Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/error_reporting.go
Expand Up @@ -22,9 +22,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -65,9 +65,9 @@ type ErrorDetailReporter struct {

instanceID string
region string
sleepInterval misc.ValueLoader[time.Duration]
mainLoopSleepInterval misc.ValueLoader[time.Duration]
maxConcurrentRequests misc.ValueLoader[int]
sleepInterval config.ValueLoader[time.Duration]
mainLoopSleepInterval config.ValueLoader[time.Duration]
maxConcurrentRequests config.ValueLoader[int]
maxOpenConnections int

httpClient *http.Client
Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/reporting.go
Expand Up @@ -25,9 +25,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -61,12 +61,12 @@ type DefaultReporter struct {
instanceID string
whActionsOnly bool
region string
sleepInterval misc.ValueLoader[time.Duration]
mainLoopSleepInterval misc.ValueLoader[time.Duration]
sleepInterval config.ValueLoader[time.Duration]
mainLoopSleepInterval config.ValueLoader[time.Duration]
dbQueryTimeout *config.Reloadable[time.Duration]
sourcesWithEventNameTrackingDisabled []string
maxOpenConnections int
maxConcurrentRequests misc.ValueLoader[int]
maxConcurrentRequests config.ValueLoader[int]

getMinReportedAtQueryTime stats.Measurement
getReportsQueryTime stats.Measurement
Expand Down
2 changes: 1 addition & 1 deletion enterprise/suppress-user/factory.go
Expand Up @@ -189,7 +189,7 @@ func (m *Factory) retryIndefinitely(ctx context.Context, f func() error, wait ti
}
}

func (m *Factory) newSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), maxSeedWaitTime time.Duration, identity identity.Identifier, pollInterval misc.ValueLoader[time.Duration]) (*Syncer, Repository, error) {
func (m *Factory) newSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), maxSeedWaitTime time.Duration, identity identity.Identifier, pollInterval config.ValueLoader[time.Duration]) (*Syncer, Repository, error) {
repo, err := NewBadgerRepository(
repoPath,
m.Log,
Expand Down
4 changes: 3 additions & 1 deletion gateway/gateway_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"time"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
kituuid "github.com/rudderlabs/rudder-go-kit/uuid"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -219,7 +221,7 @@ var _ = Describe("Reconstructing JSON for ServerSide SDK", func() {
{"anonymousId":"anon_id_1","event":"event_1_3"}
]}`
response, payloadError := getUsersPayload([]byte(testValidBody))
key, err := misc.GetMD5UUID(inputKey)
key, err := kituuid.GetMD5UUID(inputKey)
Expect(string(response[key.String()])).To(Equal(value))
Expect(err).To(BeNil())
Expect(payloadError).To(BeNil())
Expand Down

0 comments on commit 3deadee

Please sign in to comment.