diff --git a/config_example.yaml b/config_example.yaml index 46add54..a90f794 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -14,6 +14,10 @@ flows: ingestion: # When a token is defined, requests that arrive without it are denied token: "some-token-here" + # Optional. Adding values here means the data will be decompressed before being ingested. + # But decompression will only happen in case the correct "Content-Encoding" header is set. + # Possible values are only: gzip, zlib, deflate, lzw, zstd, snappy + decompress_ingested_data: ['gzip', 'snappy'] # The amount of items in memory, waiting to be sent to storage. If this queue is full, # data starts to be dropped (lost) in_memory_queue_max_size: 1000 diff --git a/pkg/adapters/http_in/api_test.go b/pkg/adapters/http_in/api_test.go index d6628a9..7f36f91 100644 --- a/pkg/adapters/http_in/api_test.go +++ b/pkg/adapters/http_in/api_test.go @@ -3,12 +3,15 @@ package http_in import ( "bytes" "fmt" + "math/rand" "net/http" "net/http/httptest" "strings" "sync" "testing" + "time" + "github.com/jademcosta/jiboia/pkg/compressor" "github.com/jademcosta/jiboia/pkg/config" "github.com/jademcosta/jiboia/pkg/domain/flow" "github.com/jademcosta/jiboia/pkg/logger" @@ -18,6 +21,18 @@ import ( const version string = "0.0.0" +var characters = []rune("abcdefghijklmnopqrstuvwxyz") + +func randString(n int) string { + r1 := rand.New(rand.NewSource(time.Now().UnixNano())) + b := make([]rune, n) + + for i := range b { + b[i] = characters[r1.Intn(len(characters))] + } + return string(b) +} + type mockDataFlow struct { calledWith [][]byte mu sync.Mutex @@ -492,5 +507,162 @@ func TestVersionEndpointInformsTheVersion(t *testing.T) { assert.Equal(t, fmt.Sprintf("{\"version\":\"%s\"}", version), body, "version informed should be the current one") } +func TestDecompressionOnIngestion(t *testing.T) { + l := logger.New(&config.Config{Log: config.LogConfig{Level: "error", Format: "json"}}) + c := config.ApiConfig{Port: 9111} + + t.Run("Happy path", func(t *testing.T) { + testCases := []struct { + algorithms []string + }{ + {algorithms: []string{"snappy"}}, + {algorithms: []string{"gzip"}}, + {algorithms: []string{"zstd"}}, + {algorithms: []string{"deflate"}}, + {algorithms: []string{"zlib"}}, + {algorithms: []string{"zlib", "gzip"}}, + {algorithms: []string{"snappy", "gzip", "zstd"}}, + {algorithms: []string{"snappy", "gzip", "zstd", "deflate", "zlib"}}, + } + + for _, tc := range testCases { + df := &mockDataFlow{calledWith: make([][]byte, 0)} + + flws := []flow.Flow{ + { + Name: "flow-1", + Entrypoint: df, + DecompressionAlgorithms: tc.algorithms, + }, + } + + api := New(l, c, prometheus.NewRegistry(), version, flws) + srvr := httptest.NewServer(api.mux) + defer srvr.Close() + + expected := make([][]byte, 0) + + for _, algorithm := range tc.algorithms { + + decompressedData1 := strings.Repeat("ab", 90) + expected = append(expected, []byte(decompressedData1)) + + buf1 := &bytes.Buffer{} + writer, err := compressor.NewWriter(&config.Compression{Type: algorithm}, buf1) + assert.NoError(t, err, "error on compressor writer creation", err) + _, err = writer.Write([]byte(decompressedData1)) + assert.NoError(t, err, "error on compressing data", err) + err = writer.Close() + assert.NoError(t, err, "error on compressor Close", err) + + assert.NotEqual(t, decompressedData1, buf1.Bytes(), "the data should have been compressed before sending") + + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), buf1) + assert.NoError(t, err, "error on request creation", err) + req.Header.Add("Content-Encoding", algorithm) + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err, "error on posting data", err) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode, + "status should be Internal Server Ok(200)") + + decompressedData2 := randString(300) + expected = append(expected, []byte(decompressedData2)) + buf2 := &bytes.Buffer{} + writer, err = compressor.NewWriter(&config.Compression{Type: algorithm}, buf2) + assert.NoError(t, err, "error on compressor writer creation", err) + _, err = writer.Write([]byte(decompressedData2)) + assert.NoError(t, err, "error on compressing data", err) + err = writer.Close() + assert.NoError(t, err, "error on compressor Close", err) + + assert.NotEqual(t, decompressedData2, buf2.Bytes(), "the data should have been compressed before sending") + + req, err = http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), buf2) + assert.NoError(t, err, "error on request creation", err) + req.Header.Add("Content-Encoding", algorithm) + + resp, err = http.DefaultClient.Do(req) + assert.NoError(t, err, "error on posting data", err) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode, + "status should be Internal Server Ok(200)") + } + + assert.Equal(t, expected, df.calledWith, + "payloads should be decompressed before being sent to dataflow") + } + }) + + t.Run("Unsupported compressed data is ingested without decompression", func(t *testing.T) { + + df := &mockDataFlow{calledWith: make([][]byte, 0)} + + flws := []flow.Flow{ + { + Name: "flow-1", + Entrypoint: df, + DecompressionAlgorithms: []string{"gzip"}, + }, + } + + api := New(l, c, prometheus.NewRegistry(), version, flws) + srvr := httptest.NewServer(api.mux) + defer srvr.Close() + + expectedNotCompressed := randString(10) + + bufExpected := &bytes.Buffer{} + writer, err := compressor.NewWriter(&config.Compression{Type: "gzip"}, bufExpected) + assert.NoError(t, err, "error on compressor writer creation", err) + _, err = writer.Write([]byte(expectedNotCompressed)) + assert.NoError(t, err, "error on compressing data", err) + err = writer.Close() + assert.NoError(t, err, "error on compressor Close", err) + + assert.NotEqual(t, expectedNotCompressed, bufExpected.Bytes(), "the data should have been compressed before sending") + + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), bufExpected) + assert.NoError(t, err, "error on request creation", err) + req.Header.Add("Content-Encoding", "gzip") + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err, "error on posting data", err) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode, + "status should be Internal Server Ok(200) for GZIP") + + expectedCompressed := randString(10) + bufUnexpected := &bytes.Buffer{} + writer, err = compressor.NewWriter(&config.Compression{Type: "snappy"}, bufUnexpected) + assert.NoError(t, err, "error on compressor writer creation", err) + _, err = writer.Write([]byte(expectedCompressed)) + assert.NoError(t, err, "error on compressing data", err) + err = writer.Close() + assert.NoError(t, err, "error on compressor Close", err) + + assert.NotEqual(t, expectedCompressed, bufUnexpected.Bytes(), "the data should have been compressed before sending") + secondPayloadBytes := bufUnexpected.Bytes() + + req, err = http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), bufUnexpected) + assert.NoError(t, err, "error on request creation", err) + req.Header.Add("Content-Encoding", "snappy") + + resp, err = http.DefaultClient.Do(req) + assert.NoError(t, err, "error on posting data", err) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode, + "status should be Internal Server Ok(200)") + + assert.Equal(t, [][]byte{[]byte(expectedNotCompressed), secondPayloadBytes}, df.calledWith, + "only payloads with supported algorithm should be accepted") + }) +} + //TODO: test the graceful shutdown //TODO: add tests for metrics serving diff --git a/pkg/adapters/http_in/ingest_routes.go b/pkg/adapters/http_in/ingest_routes.go index 4bddeec..566019a 100644 --- a/pkg/adapters/http_in/ingest_routes.go +++ b/pkg/adapters/http_in/ingest_routes.go @@ -4,9 +4,12 @@ import ( "bytes" "errors" "fmt" + "io" "net/http" "github.com/jademcosta/jiboia/pkg/adapters/http_in/httpmiddleware" + "github.com/jademcosta/jiboia/pkg/compressor" + "github.com/jademcosta/jiboia/pkg/config" "github.com/jademcosta/jiboia/pkg/domain/flow" "go.uber.org/zap" ) @@ -21,25 +24,31 @@ func RegisterIngestingRoutes( for _, flw := range flws { flwCopy := flw - path := fmt.Sprintf("/%s/async_ingestion", flwCopy.Name) + middlewares := make([]func(http.Handler) http.Handler, 0, 1) + if flwCopy.Token != "" { - api.mux.With(httpmiddleware.Auth(flwCopy.Token)). - Post(path, asyncIngestion(api.log, &flwCopy)) - api.mux.With(httpmiddleware.Auth(flwCopy.Token)). - Post("/"+version+path, asyncIngestion(api.log, &flwCopy)) - } else { - api.mux.Post(path, asyncIngestion(api.log, &flwCopy)) - api.mux.Post("/"+version+path, asyncIngestion(api.log, &flwCopy)) + middlewares = append(middlewares, httpmiddleware.Auth(flwCopy.Token)) } + + api.mux.With(middlewares...). + Post(path, asyncIngestion(api.log, &flwCopy)) + api.mux.With(middlewares...). + Post("/"+version+path, asyncIngestion(api.log, &flwCopy)) } api.mux.Middlewares() } func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc { + + // This is just for perf + acceptedDecompressionAlgorithms := make(map[string]struct{}) + for _, alg := range flw.DecompressionAlgorithms { + acceptedDecompressionAlgorithms[alg] = struct{}{} + } + return func(w http.ResponseWriter, r *http.Request) { - //TODO: implement the "with" on the logger and add the "ingestion_type": "async" here on this fn currentPath := r.URL.Path buf := &bytes.Buffer{} @@ -72,9 +81,22 @@ func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc { } data := buf.Bytes() - l.Debugw("data received on async handler", "length", dataLen) + decompressAlgorithm := + selectDecompressionAlgorithm(acceptedDecompressionAlgorithms, r.Header["Content-Encoding"]) + + if decompressAlgorithm != "" { + data, err = decompress(data, decompressAlgorithm) + if err != nil { + l.Warnw("failed to decompress data", "algorithm", decompressAlgorithm, "error", err) + //TODO: send a JSON response with the error + w.WriteHeader(http.StatusBadRequest) + increaseErrorCount("enqueue_failed", currentPath) + return + } + } + err = flw.Entrypoint.Enqueue(data) if err != nil { l.Warnw("failed while enqueueing data from http request", "error", err) @@ -89,3 +111,32 @@ func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc { w.WriteHeader(http.StatusOK) } } + +func selectDecompressionAlgorithm(acceptedAlgorithms map[string]struct{}, contentEncodingHeaders []string) string { + if len(acceptedAlgorithms) <= 0 || len(contentEncodingHeaders) <= 0 { + return "" + } + + for _, contentType := range contentEncodingHeaders { + if _, exists := acceptedAlgorithms[contentType]; exists { + return contentType + } + } + + return "" +} + +func decompress(data []byte, algorithm string) ([]byte, error) { + + decompressor, err := compressor.NewReader(&config.Compression{Type: algorithm}, bytes.NewReader(data)) + if err != nil { + return nil, err + } + + decompressedData, err := io.ReadAll(decompressor) + if err != nil { + return nil, err + } + + return decompressedData, nil +} diff --git a/pkg/app/app.go b/pkg/app/app.go index 4c5948f..e6ce235 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -241,12 +241,13 @@ func createFlows(llog *zap.SugaredLogger, metricRegistry *prometheus.Registry, metricRegistry) f := flow.Flow{ - Name: flowConf.Name, - ObjStorage: objStorage, - ExternalQueue: externalQueue, - Uploader: uploader, - UploadWorkers: make([]flow.Runnable, 0, flowConf.MaxConcurrentUploads), - Token: flowConf.Ingestion.Token, + Name: flowConf.Name, + ObjStorage: objStorage, + ExternalQueue: externalQueue, + Uploader: uploader, + UploadWorkers: make([]flow.Runnable, 0, flowConf.MaxConcurrentUploads), + Token: flowConf.Ingestion.Token, + DecompressionAlgorithms: flowConf.Ingestion.DecompressTypes, } hasAccumulatorDeclared := flowConf.Accumulator.SizeInBytes > 0 //TODO: this is something that will need to be improved once config is localized inside packages diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index bdb2fa1..558afc1 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -72,6 +72,23 @@ flows: type: httpstorage config: url: "http://non-existent-27836178236.com" + - name: "int_flow4" + in_memory_queue_max_size: 4 + max_concurrent_uploads: 2 + timeout: 120 + ingestion: + decompress_ingested_data: ['gzip', 'snappy'] + accumulator: + size_in_bytes: 20 + separator: "" + queue_capacity: 10 + external_queue: + type: noop + config: "" + object_storage: + type: httpstorage + config: + url: "http://non-existent-flow4.com" ` const confForCompressionYaml = ` @@ -374,6 +391,119 @@ func TestApiToken(t *testing.T) { <-stopDone } +func TestIngestionDecompression(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Needed for the other ingestion endpoints + createDir(t, testingPathNoAcc) + defer func() { + deleteDir(t, testingPathNoAcc) + }() + + var mu sync.Mutex + objStorageReceived := make([][]byte, 0) + + storageServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + mu.Lock() + defer mu.Unlock() + objStorageReceived = append(objStorageReceived, body) + w.WriteHeader(http.StatusOK) + })) + defer storageServer.Close() + + confFull := strings.Replace(confYaml, "http://non-existent-flow4.com", + fmt.Sprintf("%s/%s", storageServer.URL, "%s"), 1) + + conf, err := config.New([]byte(confFull)) + assert.NoError(t, err, "should initialize config") + l = logger.New(conf) + + app := New(conf, l) + go app.Start() + time.Sleep(2 * time.Second) + + payload := randSeq(100) + buf1 := &bytes.Buffer{} + writer, err := compressor.NewWriter(&config.Compression{Type: "snappy"}, buf1) + assert.NoError(t, err, "error on compressor writer creation", err) + _, err = writer.Write([]byte(payload)) + assert.NoError(t, err, "error compressing data") + err = writer.Close() + assert.NoError(t, err, "error closing compressor") + + highlyCompressRatioPayload := strings.Repeat("ab", 50) + buf2 := &bytes.Buffer{} + writer, err = compressor.NewWriter(&config.Compression{Type: "gzip"}, buf2) + assert.NoError(t, err, "error on compressor writer creation", err) + _, err = writer.Write([]byte(highlyCompressRatioPayload)) + assert.NoError(t, err, "error compressing data") + err = writer.Close() + assert.NoError(t, err, "error closing compressor") + + nonCompressedPayload := randSeq(120) + + req, err := http.NewRequest( + http.MethodPost, + "http://localhost:9099/int_flow4/async_ingestion", + buf1, + ) + assert.NoError(t, err, "error creating request") + + req.Header.Add("Content-Encoding", "snappy") + resp, err := http.DefaultClient.Do(req) + resp.Body.Close() + assert.NoError(t, err, "error posting data") + + assert.Equal(t, http.StatusOK, resp.StatusCode, "status should be Ok(200)") + + time.Sleep(100 * time.Millisecond) + + req, err = http.NewRequest( + http.MethodPost, + "http://localhost:9099/int_flow4/async_ingestion", + buf2, + ) + assert.NoError(t, err, "error creating request") + + req.Header.Add("Content-Encoding", "gzip") + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.NoError(t, err, "error posting data") + + assert.Equal(t, http.StatusOK, resp.StatusCode, "status should be OK(200)") + + time.Sleep(100 * time.Millisecond) + + req, err = http.NewRequest( + http.MethodPost, + "http://localhost:9099/int_flow4/async_ingestion", + strings.NewReader(nonCompressedPayload), + ) + assert.NoError(t, err, "error creating request") + + resp, err = http.DefaultClient.Do(req) + resp.Body.Close() + assert.NoError(t, err, "error posting data") + + assert.Equal(t, http.StatusOK, resp.StatusCode, "status should be OK(200)") + + time.Sleep(100 * time.Millisecond) + mu.Lock() + assert.Equal( + t, + [][]byte{[]byte(payload), []byte(highlyCompressRatioPayload), []byte(nonCompressedPayload)}, + objStorageReceived, + "all payloads should be decompressed, in case it needs, and ingested", + ) + mu.Unlock() + + stopDone := app.Stop() + <-stopDone +} + func TestCompression(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") diff --git a/pkg/config/config.go b/pkg/config/config.go index 9d000ef..5ce1d31 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -65,7 +65,8 @@ type ObjectStorage struct { } type IngestionConfig struct { - Token string `yaml:"token"` + Token string `yaml:"token"` + DecompressTypes []string `yaml:"decompress_ingested_data"` } func New(confData []byte) (*Config, error) { @@ -132,11 +133,20 @@ func validateConfig(c *Config) error { } } } + + if len(flow.Ingestion.DecompressTypes) > 0 { + for _, decompressionType := range flow.Ingestion.DecompressTypes { + if !allowed(allowedValues("compression"), decompressionType) { + return fmt.Errorf("ingestion.decompress_ingested_data option should be one of %v", + allowedValues("compression")) + } + } + } } err := c.Api.validateSizeLimit() if err != nil { - return fmt.Errorf("invalid api size limit format") + return err } return nil diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 36eec71..6be0dd6 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -25,6 +25,7 @@ func TestDefaultValues(t *testing.T) { assert.Equal(t, 9010, conf.Api.Port, "default for api.port config doesn't match") assert.Equal(t, 500, conf.Flows[0].MaxConcurrentUploads, "default for flow.max_concurrent_uploads config doesn't match") assert.Equal(t, 1, conf.Flows[0].PathPrefixCount, "default value for flow path_prefix_count should be 1") + assert.Empty(t, conf.Flows[0].Ingestion.DecompressTypes, "default flows.ingestion.decompress_ingested_data is empty") sizeInBytes, err := conf.Api.PayloadSizeLimitInBytes() assert.Equal(t, 0, sizeInBytes, "default for api.payload_size_limit config doesn't match") @@ -48,6 +49,7 @@ flows: path_prefix_count: 7 ingestion: token: "some token here!" + decompress_ingested_data: ["gzip", "zstd"] accumulator: size_in_bytes: 2097152 separator: "_a_" @@ -112,6 +114,7 @@ flows: assert.Equal(t, "", conf.Flows[0].Compression.Level, "should have an empty flow.compression.level") assert.Equal(t, "some token here!", conf.Flows[0].Ingestion.Token, "should have parsed the correct flow.ingestion.token") + assert.Equal(t, []string{"gzip", "zstd"}, conf.Flows[0].Ingestion.DecompressTypes, "should have parsed the correct flow.ingestion.decompress_ingested_data") assert.Equal(t, 2097152, conf.Flows[0].Accumulator.SizeInBytes, "should have parsed the correct flow.accumulator.size_in_bytes") assert.Equal(t, "_a_", conf.Flows[0].Accumulator.Separator, "should have parsed the correct flow.accumulator.separator") @@ -132,6 +135,7 @@ flows: assert.Equal(t, "2", conf.Flows[1].Compression.Level, "should have parsed the correct flow.compression.level") assert.Equal(t, "", conf.Flows[1].Ingestion.Token, "should have parsed the correct flow.ingestion.token (which is empty)") + assert.Equal(t, []string(nil), conf.Flows[1].Ingestion.DecompressTypes, "should have parsed the correct flow.ingestion.decompress_ingested_data (which is empty)") assert.Equal(t, 20, conf.Flows[1].Accumulator.SizeInBytes, "should have parsed the correct flow.accumulator.size_in_bytes") assert.Equal(t, "", conf.Flows[1].Accumulator.Separator, "should have parsed the correct flow.accumulator.separator") @@ -535,6 +539,87 @@ flows: } } +func TestValidateFlowDecompressionTypes(t *testing.T) { + logTemplate := ` +flows: + - name: flow_1 + ingestion: + decompress_ingested_data: [{{TYPES}}]` + + testCases := []struct { + types []string + shouldError bool + }{ + { + types: []string{"aaa"}, + shouldError: true, + }, + { + types: []string{"aaa", "something"}, + shouldError: true, + }, + { + types: []string{"aaa", "gzip"}, + shouldError: true, + }, + { + types: []string{""}, + shouldError: true, + }, + { + types: []string(nil), + shouldError: false, + }, + { + types: []string{"gzip"}, + shouldError: false, + }, + { + types: []string{"snappy"}, + shouldError: false, + }, + { + types: []string{"zstd"}, + shouldError: false, + }, + { + types: []string{"zlib"}, + shouldError: false, + }, + { + types: []string{"deflate"}, + shouldError: false, + }, + { + types: []string{"gzip", "zlib"}, + shouldError: false, + }, + { + types: []string{"gzip", "snappy", "zlib", "zstd", "deflate"}, + shouldError: false, + }, + } + + for _, tc := range testCases { + var conf string + if len(tc.types) == 0 { + conf = strings.ReplaceAll(logTemplate, "{{TYPES}}", "") + } else { + types := strings.Join(tc.types, "\",\"") + types = "\"" + types + "\"" + conf = strings.ReplaceAll(logTemplate, "{{TYPES}}", types) + } + + _, err := config.New([]byte(conf)) + + if tc.shouldError { + assert.Errorf(t, err, "should error when decompression types are %v", tc.types) + } else { + assert.NoErrorf(t, err, "should NOT error when decompression types are %v", tc.types) + } + } +} + func TestErrorOnInvalidLogFormat(t *testing.T) { //TODO: implement me } diff --git a/pkg/domain/flow/flow.go b/pkg/domain/flow/flow.go index 2ab7168..4dfc241 100644 --- a/pkg/domain/flow/flow.go +++ b/pkg/domain/flow/flow.go @@ -17,12 +17,13 @@ type DataFlowRunnable interface { } type Flow struct { - Name string - ObjStorage worker.ObjStorage - ExternalQueue worker.ExternalQueue - Entrypoint domain.DataFlow - Uploader DataFlowRunnable - Accumulator DataFlowRunnable - UploadWorkers []Runnable - Token string + Name string + ObjStorage worker.ObjStorage + ExternalQueue worker.ExternalQueue + Entrypoint domain.DataFlow + Uploader DataFlowRunnable + Accumulator DataFlowRunnable + UploadWorkers []Runnable + Token string + DecompressionAlgorithms []string }