Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jun 27, 2023
2 parents d8bc499 + 9b15bb9 commit ac88374
Show file tree
Hide file tree
Showing 107 changed files with 1,213 additions and 3,434 deletions.
107 changes: 107 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions enterprise/replay/dumpsloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-go-kit/filemanager"

"github.com/google/uuid"
"github.com/tidwall/gjson"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (gwHandle *GWReplayRequestHandler) fetchDumpsList(ctx context.Context) {
gwHandle.handle.prefix,
gwHandle.handle.startAfterKey,
maxItems,
&gwHandle.handle.uploader,
gwHandle.handle.uploader,
)
for iter.Next() {
object := iter.Get()
Expand Down Expand Up @@ -177,7 +177,7 @@ func (procHandle *ProcErrorRequestHandler) fetchDumpsList(ctx context.Context) {
procHandle.handle.prefix,
procHandle.handle.startAfterKey,
maxItems,
&procHandle.handle.uploader,
procHandle.handle.uploader,
)
for iter.Next() {
object := iter.Get()
Expand Down
2 changes: 1 addition & 1 deletion enterprise/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/services/filemanager"
)

type Handler struct {
Expand Down
9 changes: 5 additions & 4 deletions enterprise/replay/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"strings"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/filemanagerutil"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand All @@ -27,10 +28,9 @@ func initFileManager(log logger.Logger) (filemanager.FileManager, string, error)
}

provider := config.GetString("JOBS_BACKUP_STORAGE_PROVIDER", "S3")
fileManagerFactory := filemanager.DefaultFileManagerFactory

configFromEnv := filemanager.GetProviderConfigForBackupsFromEnv(context.TODO())
uploader, err := fileManagerFactory.New(&filemanager.SettingsT{
configFromEnv := filemanagerutil.ProviderConfigOpts(context.TODO(), provider, config.Default)
uploader, err := filemanager.New(&filemanager.Settings{
Provider: provider,
Config: misc.GetObjectStorageConfig(misc.ObjectStorageOptsT{
Provider: provider,
Expand All @@ -39,6 +39,7 @@ func initFileManager(log logger.Logger) (filemanager.FileManager, string, error)
// TODO: need to figure out how to bring workspaceID here
// when we support IAM role here.
}),
Conf: config.Default,
})
if err != nil {
log.Errorf("[[ Replay ]] Error creating file manager: %s", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion enterprise/replay/sourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/services/filemanager"
)

type SourceWorkerT struct {
Expand Down
2 changes: 2 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,7 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
)
srvMux.Route("/internal", func(r chi.Router) {
r.Post("/v1/extract", gateway.webExtractHandler)
r.Get("/v1/warehouse/fetch-tables", gateway.whProxy.ServeHTTP)
})

srvMux.Route("/v1", func(r chi.Router) {
Expand All @@ -1331,6 +1332,7 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
r.Post("/pending-events", gateway.whProxy.ServeHTTP)
r.Post("/trigger-upload", gateway.whProxy.ServeHTTP)
r.Post("/jobs", gateway.whProxy.ServeHTTP)
// TODO: Remove this endpoint once sources change is released
r.Get("/fetch-tables", gateway.whProxy.ServeHTTP)

r.Get("/jobs/status", gateway.whProxy.ServeHTTP)
Expand Down
2 changes: 2 additions & 0 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,9 @@ func endpointsToVerify() ([]string, []string, []string) {
"/v1/job-status/123",
"/v1/job-status/123/failed-records",
"/v1/warehouse/jobs/status",
// TODO: Remove this endpoint once sources change is released
"/v1/warehouse/fetch-tables",
"/internal/v1/warehouse/fetch-tables",
}

postEndpoints := []string{
Expand Down
40 changes: 32 additions & 8 deletions gateway/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/samber/lo"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -370,17 +371,23 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() {
// stats
bt.stats.sourceStats[breq.sourceType].sourceTransform.Since(transformStart)

var reason string
if batchResponse.batchError == nil && len(batchResponse.responses) != len(payloadArr) {
batchResponse.batchError = errors.New("webhook batch transform response events size does not equal sent events size")
reason = "in out mismatch"
pkgLogger.Errorf("%w", batchResponse.batchError)
}
if batchResponse.batchError != nil {
if reason == "" {
reason = "batch response error"
}
statusCode := http.StatusInternalServerError
if batchResponse.statusCode != 0 {
statusCode = batchResponse.statusCode
}
pkgLogger.Errorf("webhook %s source transformation failed with error: %w and status code: %s", breq.sourceType, batchResponse.batchError, statusCode)
countWebhookErrors(breq.sourceType, statusCode, len(breq.batchRequest))
bt.webhook.recordWebhookErrors(breq.sourceType, reason, webRequests, statusCode)

for _, req := range breq.batchRequest {
req.done <- transformerResponse{StatusCode: statusCode, Err: batchResponse.batchError.Error()}
}
Expand All @@ -392,22 +399,24 @@ func (bt *batchWebhookTransformerT) batchTransformLoop() {
for idx, resp := range batchResponse.responses {
webRequest := webRequests[idx]
if resp.Err == "" && resp.Output != nil {
var errMessage string
var errMessage, reason string
outputPayload, err := json.Marshal(resp.Output)
if err != nil {
errMessage = response.SourceTransformerInvalidOutputFormatInResponse
reason = "marshal error"
} else {
errMessage = bt.webhook.enqueueInGateway(webRequest, outputPayload)
reason = "enqueueInGateway failed"
}
if errMessage != "" {
pkgLogger.Errorf("webhook %s source transformation failed: %s", breq.sourceType, errMessage)
countWebhookErrors(breq.sourceType, response.GetErrorStatusCode(errMessage), 1)
bt.webhook.countWebhookErrors(breq.sourceType, webRequest.writeKey, reason, response.GetErrorStatusCode(errMessage), 1)
webRequest.done <- bt.markResponseFail(errMessage)
continue
}
} else if resp.StatusCode != http.StatusOK {
pkgLogger.Errorf("webhook %s source transformation failed with error: %s and status code: %s", breq.sourceType, resp.Err, resp.StatusCode)
countWebhookErrors(breq.sourceType, resp.StatusCode, 1)
bt.webhook.countWebhookErrors(breq.sourceType, webRequest.writeKey, "non 200 response", resp.StatusCode, 1)
}

webRequest.done <- resp
Expand Down Expand Up @@ -457,13 +466,28 @@ func (webhook *HandleT) Shutdown() error {
return webhook.backgroundWait()
}

func countWebhookErrors(sourceType string, statusCode, count int) {
stats.Default.NewTaggedStat("webhook_num_errors", stats.CountType, stats.Tags{
"sourceType": sourceType,
"statusCode": strconv.Itoa(statusCode),
func (webhook *HandleT) countWebhookErrors(sourceType, writeKey, reason string, statusCode, count int) {
stat := webhook.gwHandle.NewSourceStat(writeKey, "webhook")
webhook.stats.NewTaggedStat("webhook_num_errors", stats.CountType, stats.Tags{
"writeKey": writeKey,
"workspaceId": stat.WorkspaceID,
"sourceID": stat.SourceID,
"statusCode": strconv.Itoa(statusCode),
"sourceType": sourceType,
"reason": reason,
}).Count(count)
}

func (webhook *HandleT) recordWebhookErrors(sourceType, reason string, reqs []*webhookT, statusCode int) {
reqsGroupedByWriteKey := lo.GroupBy(reqs, func(request *webhookT) string {
return request.writeKey
})

for writeKey, reqs := range reqsGroupedByWriteKey {
webhook.countWebhookErrors(sourceType, writeKey, reason, statusCode, len(reqs))
}
}

// TODO: Check if correct
func newWebhookStat(sourceType string) *webhookSourceStatT {
tags := map[string]string{
Expand Down
81 changes: 78 additions & 3 deletions gateway/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
gwStats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
mockWebhook "github.com/rudderlabs/rudder-server/gateway/mocks"
"github.com/rudderlabs/rudder-server/gateway/response"
Expand Down Expand Up @@ -64,7 +66,7 @@ func TestWebhookRequestHandlerWithTransformerBatchGeneralError(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(2)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand Down Expand Up @@ -105,7 +107,7 @@ func TestWebhookRequestHandlerWithTransformerBatchPayloadLengthMismatchError(t *
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(2)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand Down Expand Up @@ -145,7 +147,7 @@ func TestWebhookRequestHandlerWithTransformerRequestError(t *testing.T) {
mockGW.EXPECT().IncrementAckCount(gomock.Any()).Times(1)
mockGW.EXPECT().GetWebhookSourceDefName(sampleWriteKey).Return(sourceDefName, true)
mockGW.EXPECT().TrackRequestMetrics(gomock.Any()).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(1)
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).Return(&gwStats.SourceStat{}).Times(2)

webhookHandler.Register(sourceDefName)
req := httptest.NewRequest(http.MethodPost, "/v1/webhook?writeKey="+sampleWriteKey, bytes.NewBufferString(sampleJson))
Expand Down Expand Up @@ -281,3 +283,76 @@ func TestWebhookRequestHandlerWithOutputToGatewayAndSource(t *testing.T) {
assert.Equal(t, sampleJson, strings.TrimSpace(w.Body.String()))
_ = webhookHandler.Shutdown()
}

func TestRecordWebhookErrors(t *testing.T) {
initWebhook()
ctrl := gomock.NewController(t)
mockGW := mockWebhook.NewMockGatewayI(ctrl)
statsStore := memstats.New()
webhookHandler := Setup(mockGW, statsStore)
reqs := []*webhookT{
{writeKey: "w1"}, {writeKey: "w2"}, {writeKey: "w1"}, {writeKey: "w3"}, {writeKey: "w2"}, {writeKey: "w1"},
}
mockGW.EXPECT().NewSourceStat(gomock.Any(), gomock.Any()).DoAndReturn(func(writeKey, reqType string) *gwStats.SourceStat {
switch writeKey {
case "w1":
return &gwStats.SourceStat{
Source: "source1",
SourceID: "sourceID1",
WriteKey: writeKey,
ReqType: reqType,
WorkspaceID: "workspaceID1",
SourceType: "webhook1",
}
case "w2":
return &gwStats.SourceStat{
Source: "source2",
SourceID: "sourceID2",
WriteKey: writeKey,
ReqType: reqType,
WorkspaceID: "workspaceID2",
SourceType: "webhook2",
}
case "w3":
return &gwStats.SourceStat{
Source: "source3",
SourceID: "sourceID3",
WriteKey: writeKey,
ReqType: reqType,
WorkspaceID: "workspaceID3",
SourceType: "webhook3",
}
}
return nil
}).Times(3)

webhookHandler.recordWebhookErrors("cio", "err1", reqs, 400)

m := statsStore.Get("webhook_num_errors", stats.Tags{
"writeKey": "w1",
"workspaceId": "workspaceID1",
"sourceID": "sourceID1",
"statusCode": "400",
"sourceType": "cio",
"reason": "err1",
})
require.EqualValues(t, m.LastValue(), 3)
m = statsStore.Get("webhook_num_errors", stats.Tags{
"writeKey": "w2",
"workspaceId": "workspaceID2",
"sourceID": "sourceID2",
"statusCode": "400",
"sourceType": "cio",
"reason": "err1",
})
require.EqualValues(t, m.LastValue(), 2)
m = statsStore.Get("webhook_num_errors", stats.Tags{
"writeKey": "w3",
"workspaceId": "workspaceID3",
"sourceID": "sourceID3",
"statusCode": "400",
"sourceType": "cio",
"reason": "err1",
})
require.EqualValues(t, m.LastValue(), 1)
}
13 changes: 5 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ replace (
github.com/prometheus/client_golang => github.com/prometheus/client_golang v1.15.0
github.com/satori/go.uuid => github.com/satori/go.uuid v1.1.0
github.com/spf13/viper => github.com/spf13/viper v1.15.0
github.com/xitongsys/parquet-go => github.com/rudderlabs/parquet-go v0.0.2
go.mongodb.org/mongo-driver => go.mongodb.org/mongo-driver v1.11.4
golang.org/x/crypto => golang.org/x/crypto v0.8.0
golang.org/x/image => golang.org/x/image v0.5.0
Expand All @@ -21,20 +22,18 @@ replace (
gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 => gopkg.in/yaml.v3 v3.0.1
k8s.io/kubernetes => k8s.io/kubernetes v1.22.2

)

require (
cloud.google.com/go/bigquery v1.51.2
cloud.google.com/go/pubsub v1.31.0
cloud.google.com/go/storage v1.30.1
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/alexeyco/simpletable v1.0.0
github.com/allisson/go-pglock/v2 v2.0.1
github.com/apache/pulsar-client-go v0.10.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.44.287
github.com/aws/aws-sdk-go v1.44.289
github.com/bugsnag/bugsnag-go/v2 v2.2.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.1
Expand All @@ -60,8 +59,7 @@ require (
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.12.0
github.com/minio/minio-go v6.0.14+incompatible
github.com/minio/minio-go/v6 v6.0.57
github.com/minio/minio-go/v7 v7.0.57
github.com/minio/minio-go/v7 v7.0.58
github.com/mitchellh/mapstructure v1.5.0
github.com/mkmik/multierror v0.3.0
github.com/onsi/ginkgo/v2 v2.11.0
Expand All @@ -71,6 +69,7 @@ require (
github.com/prometheus/client_model v0.4.0
github.com/rs/cors v1.9.0
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/compose-test v0.1.2
github.com/rudderlabs/rudder-go-kit v0.15.1
github.com/rudderlabs/sql-tunnels v0.1.3
github.com/samber/lo v1.38.1
Expand Down Expand Up @@ -105,6 +104,7 @@ require (
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.0.1 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
Expand Down Expand Up @@ -233,7 +233,6 @@ require (
github.com/rivo/uniseg v0.1.0 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/rs/zerolog v1.28.0 // indirect
github.com/rudderlabs/compose-test v0.1.2
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/backo-go v1.0.1 // indirect
github.com/shirou/gopsutil/v3 v3.23.4 // indirect
Expand Down Expand Up @@ -296,5 +295,3 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/gotestsum v1.8.2 // indirect
)

replace github.com/xitongsys/parquet-go => github.com/rudderlabs/parquet-go v0.0.2
Loading

0 comments on commit ac88374

Please sign in to comment.