Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow decompression when ingesting data #77

Merged
merged 11 commits into from
Sep 10, 2023
4 changes: 4 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ flows:
ingestion:
# When a token is defined, requests that arrive without it are denied
token: "some-token-here"
# Optional. Adding values here means the data will be decompressed before being ingested.
# But decompression will only happen in case the correct "Content-Encoding" header is set.
# Possible values are only: gzip, zlib, deflate, lzw, zstd, snappy
decompress_ingested_data: ['gzip', 'snappy']
# The amount of items in memory, waiting to be sent to storage. If this queue is full,
# data starts to be dropped (lost)
in_memory_queue_max_size: 1000
Expand Down
172 changes: 172 additions & 0 deletions pkg/adapters/http_in/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package http_in
import (
"bytes"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/jademcosta/jiboia/pkg/compressor"
"github.com/jademcosta/jiboia/pkg/config"
"github.com/jademcosta/jiboia/pkg/domain/flow"
"github.com/jademcosta/jiboia/pkg/logger"
Expand All @@ -18,6 +21,18 @@ import (

const version string = "0.0.0"

var characters = []rune("abcdefghijklmnopqrstuvwxyz")

func randString(n int) string {
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]rune, n)

for i := range b {
b[i] = characters[r1.Intn(len(characters))]
}
return string(b)
}

type mockDataFlow struct {
calledWith [][]byte
mu sync.Mutex
Expand Down Expand Up @@ -492,5 +507,162 @@ func TestVersionEndpointInformsTheVersion(t *testing.T) {
assert.Equal(t, fmt.Sprintf("{\"version\":\"%s\"}", version), body, "version informed should be the current one")
}

func TestDecompressionOnIngestion(t *testing.T) {
l := logger.New(&config.Config{Log: config.LogConfig{Level: "error", Format: "json"}})
c := config.ApiConfig{Port: 9111}

t.Run("Happy path", func(t *testing.T) {
testCases := []struct {
algorithms []string
}{
{algorithms: []string{"snappy"}},
{algorithms: []string{"gzip"}},
{algorithms: []string{"zstd"}},
{algorithms: []string{"deflate"}},
{algorithms: []string{"zlib"}},
{algorithms: []string{"zlib", "gzip"}},
{algorithms: []string{"snappy", "gzip", "zstd"}},
{algorithms: []string{"snappy", "gzip", "zstd", "deflate", "zlib"}},
}

for _, tc := range testCases {
df := &mockDataFlow{calledWith: make([][]byte, 0)}

flws := []flow.Flow{
{
Name: "flow-1",
Entrypoint: df,
DecompressionAlgorithms: tc.algorithms,
},
}

api := New(l, c, prometheus.NewRegistry(), version, flws)
srvr := httptest.NewServer(api.mux)
defer srvr.Close()

expected := make([][]byte, 0)

for _, algorithm := range tc.algorithms {

decompressedData1 := strings.Repeat("ab", 90)
expected = append(expected, []byte(decompressedData1))

buf1 := &bytes.Buffer{}
writer, err := compressor.NewWriter(&config.Compression{Type: algorithm}, buf1)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(decompressedData1))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, decompressedData1, buf1.Bytes(), "the data should have been compressed before sending")

req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), buf1)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", algorithm)

resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200)")

decompressedData2 := randString(300)
expected = append(expected, []byte(decompressedData2))
buf2 := &bytes.Buffer{}
writer, err = compressor.NewWriter(&config.Compression{Type: algorithm}, buf2)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(decompressedData2))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, decompressedData2, buf2.Bytes(), "the data should have been compressed before sending")

req, err = http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), buf2)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", algorithm)

resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200)")
}

assert.Equal(t, expected, df.calledWith,
"payloads should be decompressed before being sent to dataflow")
}
})

t.Run("Unsupported compressed data is ingested without decompression", func(t *testing.T) {

df := &mockDataFlow{calledWith: make([][]byte, 0)}

flws := []flow.Flow{
{
Name: "flow-1",
Entrypoint: df,
DecompressionAlgorithms: []string{"gzip"},
},
}

api := New(l, c, prometheus.NewRegistry(), version, flws)
srvr := httptest.NewServer(api.mux)
defer srvr.Close()

expectedNotCompressed := randString(10)

bufExpected := &bytes.Buffer{}
writer, err := compressor.NewWriter(&config.Compression{Type: "gzip"}, bufExpected)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(expectedNotCompressed))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, expectedNotCompressed, bufExpected.Bytes(), "the data should have been compressed before sending")

req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), bufExpected)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", "gzip")

resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200) for GZIP")

expectedCompressed := randString(10)
bufUnexpected := &bytes.Buffer{}
writer, err = compressor.NewWriter(&config.Compression{Type: "snappy"}, bufUnexpected)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(expectedCompressed))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, expectedCompressed, bufUnexpected.Bytes(), "the data should have been compressed before sending")
secondPayloadBytes := bufUnexpected.Bytes()

req, err = http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), bufUnexpected)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", "snappy")

resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200)")

assert.Equal(t, [][]byte{[]byte(expectedNotCompressed), secondPayloadBytes}, df.calledWith,
"only payloads with supported algorithm should be accepted")
})
}

//TODO: test the graceful shutdown
//TODO: add tests for metrics serving
71 changes: 61 additions & 10 deletions pkg/adapters/http_in/ingest_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"bytes"
"errors"
"fmt"
"io"
"net/http"

"github.com/jademcosta/jiboia/pkg/adapters/http_in/httpmiddleware"
"github.com/jademcosta/jiboia/pkg/compressor"
"github.com/jademcosta/jiboia/pkg/config"
"github.com/jademcosta/jiboia/pkg/domain/flow"
"go.uber.org/zap"
)
Expand All @@ -21,25 +24,31 @@ func RegisterIngestingRoutes(

for _, flw := range flws {
flwCopy := flw

path := fmt.Sprintf("/%s/async_ingestion", flwCopy.Name)
middlewares := make([]func(http.Handler) http.Handler, 0, 1)

if flwCopy.Token != "" {
api.mux.With(httpmiddleware.Auth(flwCopy.Token)).
Post(path, asyncIngestion(api.log, &flwCopy))
api.mux.With(httpmiddleware.Auth(flwCopy.Token)).
Post("/"+version+path, asyncIngestion(api.log, &flwCopy))
} else {
api.mux.Post(path, asyncIngestion(api.log, &flwCopy))
api.mux.Post("/"+version+path, asyncIngestion(api.log, &flwCopy))
middlewares = append(middlewares, httpmiddleware.Auth(flwCopy.Token))
}

api.mux.With(middlewares...).
Post(path, asyncIngestion(api.log, &flwCopy))
api.mux.With(middlewares...).
Post("/"+version+path, asyncIngestion(api.log, &flwCopy))
}

api.mux.Middlewares()
}

func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc {

// This is just for perf
acceptedDecompressionAlgorithms := make(map[string]struct{})
for _, alg := range flw.DecompressionAlgorithms {
acceptedDecompressionAlgorithms[alg] = struct{}{}
}

return func(w http.ResponseWriter, r *http.Request) {
//TODO: implement the "with" on the logger and add the "ingestion_type": "async" here on this fn

currentPath := r.URL.Path
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -72,9 +81,22 @@ func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc {
}

data := buf.Bytes()

l.Debugw("data received on async handler", "length", dataLen)

decompressAlgorithm :=
selectDecompressionAlgorithm(acceptedDecompressionAlgorithms, r.Header["Content-Encoding"])

if decompressAlgorithm != "" {
data, err = decompress(data, decompressAlgorithm)
if err != nil {
l.Warnw("failed to decompress data", "algorithm", decompressAlgorithm, "error", err)
//TODO: send a JSON response with the error
w.WriteHeader(http.StatusBadRequest)
increaseErrorCount("enqueue_failed", currentPath)
return
}
}

err = flw.Entrypoint.Enqueue(data)
if err != nil {
l.Warnw("failed while enqueueing data from http request", "error", err)
Expand All @@ -89,3 +111,32 @@ func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc {
w.WriteHeader(http.StatusOK)
}
}

func selectDecompressionAlgorithm(acceptedAlgorithms map[string]struct{}, contentEncodingHeaders []string) string {
if len(acceptedAlgorithms) <= 0 || len(contentEncodingHeaders) <= 0 {
return ""
}

for _, contentType := range contentEncodingHeaders {
if _, exists := acceptedAlgorithms[contentType]; exists {
return contentType
}
}

return ""
}

func decompress(data []byte, algorithm string) ([]byte, error) {

decompressor, err := compressor.NewReader(&config.Compression{Type: algorithm}, bytes.NewReader(data))
if err != nil {
return nil, err
}

decompressedData, err := io.ReadAll(decompressor)
if err != nil {
return nil, err
}

return decompressedData, nil
}
13 changes: 7 additions & 6 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,13 @@ func createFlows(llog *zap.SugaredLogger, metricRegistry *prometheus.Registry,
metricRegistry)

f := flow.Flow{
Name: flowConf.Name,
ObjStorage: objStorage,
ExternalQueue: externalQueue,
Uploader: uploader,
UploadWorkers: make([]flow.Runnable, 0, flowConf.MaxConcurrentUploads),
Token: flowConf.Ingestion.Token,
Name: flowConf.Name,
ObjStorage: objStorage,
ExternalQueue: externalQueue,
Uploader: uploader,
UploadWorkers: make([]flow.Runnable, 0, flowConf.MaxConcurrentUploads),
Token: flowConf.Ingestion.Token,
DecompressionAlgorithms: flowConf.Ingestion.DecompressTypes,
}

hasAccumulatorDeclared := flowConf.Accumulator.SizeInBytes > 0 //TODO: this is something that will need to be improved once config is localized inside packages
Expand Down
Loading