Skip to content

Commit

Permalink
chore: valid json, gzip and retries
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed May 25, 2024
1 parent 363f0f1 commit 18e49ce
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 13 deletions.
4 changes: 2 additions & 2 deletions processor/transformer/http_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func BenchmarkHTTP(b *testing.B) {
tr.stat = stats.NOP
tr.logger = logger.NOP
tr.conf = config.New()
tr.config.useFasthttpClient = func() bool { return true }
tr.config.useFasthttpClient = config.SingleValueLoader(true)
tr.fasthttpClient = &fasthttp.Client{
MaxConnsPerHost: 1000,
MaxIdleConnDuration: 30 * time.Second,
Expand Down Expand Up @@ -64,7 +64,7 @@ func BenchmarkHTTP(b *testing.B) {
tr.stat = stats.NOP
tr.logger = logger.NOP
tr.conf = config.New()
tr.config.useFasthttpClient = func() bool { return false }
tr.config.useFasthttpClient = config.SingleValueLoader(false)
tr.client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 1000,
Expand Down
55 changes: 49 additions & 6 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package transformer

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -184,7 +185,11 @@ type handle struct {
guardConcurrency chan struct{}

config struct {
useFasthttpClient func() bool
useFasthttpClient config.ValueLoader[bool]
useGzip config.ValueLoader[bool]
retryAllNon200 config.ValueLoader[bool]
reValidateJSON config.ValueLoader[bool]

maxConcurrency int
maxHTTPConnections int
maxHTTPIdleConnections int
Expand Down Expand Up @@ -214,7 +219,11 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op
trans.receivedStat = stat.NewStat("processor.transformer_received", stats.CountType)
trans.cpDownGauge = stat.NewStat("processor.control_plane_down", stats.GaugeType)

trans.config.useFasthttpClient = func() bool { return conf.GetBool("Transformer.Client.useFasthttp", true) }
trans.config.useFasthttpClient = conf.GetReloadableBoolVar(false, "Transformer.Client.useFasthttp")
trans.config.useGzip = conf.GetReloadableBoolVar(false, "Transformer.Client.useGzip")
trans.config.retryAllNon200 = conf.GetReloadableBoolVar(false, "Transformer.Client.retryAllNon200")
trans.config.reValidateJSON = conf.GetReloadableBoolVar(false, "Transformer.Client.reValidateJSON")

trans.config.maxConcurrency = conf.GetInt("Processor.maxConcurrency", 200)
trans.config.maxHTTPConnections = conf.GetInt("Transformer.Client.maxHTTPConnections", 100)
trans.config.maxHTTPIdleConnections = conf.GetInt("Transformer.Client.maxHTTPIdleConnections", 10)
Expand Down Expand Up @@ -389,6 +398,10 @@ func (trans *handle) request(ctx context.Context, url, stage string, data []Tran
return nil
}

if trans.config.reValidateJSON.Load() && !json.Valid(rawJSON) {
panic("invalid json")
}

Check warning on line 403 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L403

Added line #L403 was not covered by tests

var (
respData []byte
statusCode int
Expand All @@ -399,7 +412,7 @@ func (trans *handle) request(ctx context.Context, url, stage string, data []Tran
endlessBackoff.MaxElapsedTime = 0 // no max time -> ends only when no error

var postFunc func(ctx context.Context, rawJSON []byte, url, stage string, tags stats.Tags) ([]byte, int)
if trans.config.useFasthttpClient() {
if trans.config.useFasthttpClient.Load() {
postFunc = trans.doFasthttpPost
} else {
postFunc = trans.doPost
Expand Down Expand Up @@ -484,13 +497,39 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri
requestStartTime := time.Now()

trace.WithRegion(ctx, "request/post", func() {
var req *http.Request
req, reqErr = http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(rawJSON))
var (
req *http.Request
buf bytes.Buffer
)

if trans.config.useGzip.Load() {
zw, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
if err != nil {
panic(err)
}

Check warning on line 509 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L507-L509

Added lines #L507 - L509 were not covered by tests

_, err = zw.Write(rawJSON)
if err != nil {
panic(err)
}

Check warning on line 514 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L512-L514

Added lines #L512 - L514 were not covered by tests

if err := zw.Close(); err != nil {
panic(err)
}

Check warning on line 518 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L517-L518

Added lines #L517 - L518 were not covered by tests
} else {
buf.Write(rawJSON)
}

req, reqErr = http.NewRequestWithContext(ctx, "POST", url, &buf)
if reqErr != nil {
return
}

req.Header.Set("Content-Type", "application/json; charset=utf-8")
if trans.config.useGzip.Load() {
req.Header.Set("Content-Encoding", "gzip")
} else {

Check warning on line 530 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L530

Added line #L530 was not covered by tests
req.Header.Set("Content-Type", "application/json; charset=utf-8")
}
req.Header.Set("X-Feature-Gzip-Support", "?1")
// Header to let transformer know that the client understands event filter code
req.Header.Set("X-Feature-Filter-Code", "?1")
Expand All @@ -504,6 +543,10 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri

defer func() { httputil.CloseResponse(resp) }()

if trans.config.retryAllNon200.Load() && resp.StatusCode != http.StatusOK {
return fmt.Errorf("transformer returned status code: %v", resp.StatusCode)
}

Check warning on line 548 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L548

Added line #L548 was not covered by tests

if !isJobTerminated(resp.StatusCode) && resp.StatusCode != StatusCPDown {
return fmt.Errorf("transformer returned status code: %v", resp.StatusCode)
}
Expand Down
19 changes: 14 additions & 5 deletions processor/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func TestTransformer(t *testing.T) {
tr.receivedStat = tr.stat.NewStat("transformer_received", stats.CountType)
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
tr.config.timeoutDuration = 1 * time.Second
tr.config.useFasthttpClient = func() bool { return true }
tr.config.useFasthttpClient = config.SingleValueLoader(true)
tr.config.reValidateJSON = config.SingleValueLoader(true)

tr.fasthttpClient = &fasthttp.Client{}
tr.config.failOnUserTransformTimeout = config.SingleValueLoader(true)
tr.config.failOnError = config.SingleValueLoader(true)
Expand Down Expand Up @@ -305,7 +307,8 @@ func TestTransformer(t *testing.T) {

tr := handle{}
tr.config.timeoutDuration = 1 * time.Millisecond
tr.config.useFasthttpClient = func() bool { return true }
tr.config.useFasthttpClient = config.SingleValueLoader(true)
tr.config.reValidateJSON = config.SingleValueLoader(true)
tr.fasthttpClient = &fasthttp.Client{}
tr.stat = stats.Default
tr.logger = logger.NOP
Expand Down Expand Up @@ -367,7 +370,9 @@ func TestTransformer(t *testing.T) {
tr.client = srv.Client()
tr.config.maxRetry = config.SingleValueLoader(1)
tr.config.timeoutDuration = 1 * time.Second
tr.config.useFasthttpClient = func() bool { return true }
tr.config.useFasthttpClient = config.SingleValueLoader(true)
tr.config.reValidateJSON = config.SingleValueLoader(true)

tr.fasthttpClient = &fasthttp.Client{}
tr.config.failOnUserTransformTimeout = config.SingleValueLoader(false)
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
Expand Down Expand Up @@ -494,7 +499,9 @@ func TestTransformer(t *testing.T) {
tr.config.failOnError = config.SingleValueLoader(tc.failOnError)
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
tr.config.timeoutDuration = 1 * time.Second
tr.config.useFasthttpClient = func() bool { return true }
tr.config.useFasthttpClient = config.SingleValueLoader(true)
tr.config.reValidateJSON = config.SingleValueLoader(true)

tr.fasthttpClient = &fasthttp.Client{}

if tc.expectPanic {
Expand Down Expand Up @@ -588,7 +595,9 @@ func TestTransformer(t *testing.T) {
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
tr.config.maxRetry = config.SingleValueLoader(1)
tr.config.timeoutDuration = 1 * time.Second
tr.config.useFasthttpClient = func() bool { return true }
tr.config.useFasthttpClient = config.SingleValueLoader(true)
tr.config.reValidateJSON = config.SingleValueLoader(true)

tr.fasthttpClient = &fasthttp.Client{}

if tc.expectPanic {
Expand Down

0 comments on commit 18e49ce

Please sign in to comment.