Skip to content

Commit

Permalink
Allow decompression when ingesting data (#77)
Browse files Browse the repository at this point in the history
Configurable decompression when ingesting data
  • Loading branch information
jademcosta committed Sep 10, 2023
1 parent 0717479 commit 31cc3a0
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 26 deletions.
4 changes: 4 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ flows:
ingestion:
# When a token is defined, requests that arrive without it are denied
token: "some-token-here"
# Optional. Adding values here means the data will be decompressed before being ingested.
# But decompression will only happen in case the correct "Content-Encoding" header is set.
# Possible values are only: gzip, zlib, deflate, lzw, zstd, snappy
decompress_ingested_data: ['gzip', 'snappy']
# The amount of items in memory, waiting to be sent to storage. If this queue is full,
# data starts to be dropped (lost)
in_memory_queue_max_size: 1000
Expand Down
172 changes: 172 additions & 0 deletions pkg/adapters/http_in/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package http_in
import (
"bytes"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/jademcosta/jiboia/pkg/compressor"
"github.com/jademcosta/jiboia/pkg/config"
"github.com/jademcosta/jiboia/pkg/domain/flow"
"github.com/jademcosta/jiboia/pkg/logger"
Expand All @@ -18,6 +21,18 @@ import (

const version string = "0.0.0"

var characters = []rune("abcdefghijklmnopqrstuvwxyz")

func randString(n int) string {
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]rune, n)

for i := range b {
b[i] = characters[r1.Intn(len(characters))]
}
return string(b)
}

type mockDataFlow struct {
calledWith [][]byte
mu sync.Mutex
Expand Down Expand Up @@ -492,5 +507,162 @@ func TestVersionEndpointInformsTheVersion(t *testing.T) {
assert.Equal(t, fmt.Sprintf("{\"version\":\"%s\"}", version), body, "version informed should be the current one")
}

func TestDecompressionOnIngestion(t *testing.T) {
l := logger.New(&config.Config{Log: config.LogConfig{Level: "error", Format: "json"}})
c := config.ApiConfig{Port: 9111}

t.Run("Happy path", func(t *testing.T) {
testCases := []struct {
algorithms []string
}{
{algorithms: []string{"snappy"}},
{algorithms: []string{"gzip"}},
{algorithms: []string{"zstd"}},
{algorithms: []string{"deflate"}},
{algorithms: []string{"zlib"}},
{algorithms: []string{"zlib", "gzip"}},
{algorithms: []string{"snappy", "gzip", "zstd"}},
{algorithms: []string{"snappy", "gzip", "zstd", "deflate", "zlib"}},
}

for _, tc := range testCases {
df := &mockDataFlow{calledWith: make([][]byte, 0)}

flws := []flow.Flow{
{
Name: "flow-1",
Entrypoint: df,
DecompressionAlgorithms: tc.algorithms,
},
}

api := New(l, c, prometheus.NewRegistry(), version, flws)
srvr := httptest.NewServer(api.mux)
defer srvr.Close()

expected := make([][]byte, 0)

for _, algorithm := range tc.algorithms {

decompressedData1 := strings.Repeat("ab", 90)
expected = append(expected, []byte(decompressedData1))

buf1 := &bytes.Buffer{}
writer, err := compressor.NewWriter(&config.Compression{Type: algorithm}, buf1)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(decompressedData1))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, decompressedData1, buf1.Bytes(), "the data should have been compressed before sending")

req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), buf1)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", algorithm)

resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200)")

decompressedData2 := randString(300)
expected = append(expected, []byte(decompressedData2))
buf2 := &bytes.Buffer{}
writer, err = compressor.NewWriter(&config.Compression{Type: algorithm}, buf2)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(decompressedData2))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, decompressedData2, buf2.Bytes(), "the data should have been compressed before sending")

req, err = http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), buf2)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", algorithm)

resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200)")
}

assert.Equal(t, expected, df.calledWith,
"payloads should be decompressed before being sent to dataflow")
}
})

t.Run("Unsupported compressed data is ingested without decompression", func(t *testing.T) {

df := &mockDataFlow{calledWith: make([][]byte, 0)}

flws := []flow.Flow{
{
Name: "flow-1",
Entrypoint: df,
DecompressionAlgorithms: []string{"gzip"},
},
}

api := New(l, c, prometheus.NewRegistry(), version, flws)
srvr := httptest.NewServer(api.mux)
defer srvr.Close()

expectedNotCompressed := randString(10)

bufExpected := &bytes.Buffer{}
writer, err := compressor.NewWriter(&config.Compression{Type: "gzip"}, bufExpected)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(expectedNotCompressed))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, expectedNotCompressed, bufExpected.Bytes(), "the data should have been compressed before sending")

req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), bufExpected)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", "gzip")

resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200) for GZIP")

expectedCompressed := randString(10)
bufUnexpected := &bytes.Buffer{}
writer, err = compressor.NewWriter(&config.Compression{Type: "snappy"}, bufUnexpected)
assert.NoError(t, err, "error on compressor writer creation", err)
_, err = writer.Write([]byte(expectedCompressed))
assert.NoError(t, err, "error on compressing data", err)
err = writer.Close()
assert.NoError(t, err, "error on compressor Close", err)

assert.NotEqual(t, expectedCompressed, bufUnexpected.Bytes(), "the data should have been compressed before sending")
secondPayloadBytes := bufUnexpected.Bytes()

req, err = http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/flow-1/async_ingestion", srvr.URL), bufUnexpected)
assert.NoError(t, err, "error on request creation", err)
req.Header.Add("Content-Encoding", "snappy")

resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err, "error on posting data", err)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode,
"status should be Internal Server Ok(200)")

assert.Equal(t, [][]byte{[]byte(expectedNotCompressed), secondPayloadBytes}, df.calledWith,
"only payloads with supported algorithm should be accepted")
})
}

//TODO: test the graceful shutdown
//TODO: add tests for metrics serving
71 changes: 61 additions & 10 deletions pkg/adapters/http_in/ingest_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"bytes"
"errors"
"fmt"
"io"
"net/http"

"github.com/jademcosta/jiboia/pkg/adapters/http_in/httpmiddleware"
"github.com/jademcosta/jiboia/pkg/compressor"
"github.com/jademcosta/jiboia/pkg/config"
"github.com/jademcosta/jiboia/pkg/domain/flow"
"go.uber.org/zap"
)
Expand All @@ -21,25 +24,31 @@ func RegisterIngestingRoutes(

for _, flw := range flws {
flwCopy := flw

path := fmt.Sprintf("/%s/async_ingestion", flwCopy.Name)
middlewares := make([]func(http.Handler) http.Handler, 0, 1)

if flwCopy.Token != "" {
api.mux.With(httpmiddleware.Auth(flwCopy.Token)).
Post(path, asyncIngestion(api.log, &flwCopy))
api.mux.With(httpmiddleware.Auth(flwCopy.Token)).
Post("/"+version+path, asyncIngestion(api.log, &flwCopy))
} else {
api.mux.Post(path, asyncIngestion(api.log, &flwCopy))
api.mux.Post("/"+version+path, asyncIngestion(api.log, &flwCopy))
middlewares = append(middlewares, httpmiddleware.Auth(flwCopy.Token))
}

api.mux.With(middlewares...).
Post(path, asyncIngestion(api.log, &flwCopy))
api.mux.With(middlewares...).
Post("/"+version+path, asyncIngestion(api.log, &flwCopy))
}

api.mux.Middlewares()
}

func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc {

// This is just for perf
acceptedDecompressionAlgorithms := make(map[string]struct{})
for _, alg := range flw.DecompressionAlgorithms {
acceptedDecompressionAlgorithms[alg] = struct{}{}
}

return func(w http.ResponseWriter, r *http.Request) {
//TODO: implement the "with" on the logger and add the "ingestion_type": "async" here on this fn

currentPath := r.URL.Path
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -72,9 +81,22 @@ func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc {
}

data := buf.Bytes()

l.Debugw("data received on async handler", "length", dataLen)

decompressAlgorithm :=
selectDecompressionAlgorithm(acceptedDecompressionAlgorithms, r.Header["Content-Encoding"])

if decompressAlgorithm != "" {
data, err = decompress(data, decompressAlgorithm)
if err != nil {
l.Warnw("failed to decompress data", "algorithm", decompressAlgorithm, "error", err)
//TODO: send a JSON response with the error
w.WriteHeader(http.StatusBadRequest)
increaseErrorCount("enqueue_failed", currentPath)
return
}
}

err = flw.Entrypoint.Enqueue(data)
if err != nil {
l.Warnw("failed while enqueueing data from http request", "error", err)
Expand All @@ -89,3 +111,32 @@ func asyncIngestion(l *zap.SugaredLogger, flw *flow.Flow) http.HandlerFunc {
w.WriteHeader(http.StatusOK)
}
}

func selectDecompressionAlgorithm(acceptedAlgorithms map[string]struct{}, contentEncodingHeaders []string) string {
if len(acceptedAlgorithms) <= 0 || len(contentEncodingHeaders) <= 0 {
return ""
}

for _, contentType := range contentEncodingHeaders {
if _, exists := acceptedAlgorithms[contentType]; exists {
return contentType
}
}

return ""
}

func decompress(data []byte, algorithm string) ([]byte, error) {

decompressor, err := compressor.NewReader(&config.Compression{Type: algorithm}, bytes.NewReader(data))
if err != nil {
return nil, err
}

decompressedData, err := io.ReadAll(decompressor)
if err != nil {
return nil, err
}

return decompressedData, nil
}
13 changes: 7 additions & 6 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,13 @@ func createFlows(llog *zap.SugaredLogger, metricRegistry *prometheus.Registry,
metricRegistry)

f := flow.Flow{
Name: flowConf.Name,
ObjStorage: objStorage,
ExternalQueue: externalQueue,
Uploader: uploader,
UploadWorkers: make([]flow.Runnable, 0, flowConf.MaxConcurrentUploads),
Token: flowConf.Ingestion.Token,
Name: flowConf.Name,
ObjStorage: objStorage,
ExternalQueue: externalQueue,
Uploader: uploader,
UploadWorkers: make([]flow.Runnable, 0, flowConf.MaxConcurrentUploads),
Token: flowConf.Ingestion.Token,
DecompressionAlgorithms: flowConf.Ingestion.DecompressTypes,
}

hasAccumulatorDeclared := flowConf.Accumulator.SizeInBytes > 0 //TODO: this is something that will need to be improved once config is localized inside packages
Expand Down

0 comments on commit 31cc3a0

Please sign in to comment.