/
envelopeBuilder.go
60 lines (54 loc) · 1.77 KB
/
envelopeBuilder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Copyright (c) 2022 Silverton Data, Inc.
// You may use, distribute, and modify this code under the terms of the Apache-2.0 license, a copy of
// which may be found at https://github.com/silverton-io/buz/blob/main/LICENSE
package selfdescribing
import (
"bytes"
"compress/gzip"
"io"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/silverton-io/buz/pkg/config"
"github.com/silverton-io/buz/pkg/envelope"
"github.com/silverton-io/buz/pkg/meta"
"github.com/silverton-io/buz/pkg/protocol"
"github.com/tidwall/gjson"
)
func buildEnvelopesFromRequest(c *gin.Context, conf *config.Config, m *meta.CollectorMeta) []envelope.Envelope {
var envelopes []envelope.Envelope
reqBody, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Error().Err(err).Msg("🔴 could not read request body")
return envelopes
}
contexts := envelope.BuildContextsFromRequest(c)
// If the request body is gzipped, decompress it
if c.GetHeader("Content-Encoding") == "gzip" {
reader, err := gzip.NewReader(bytes.NewReader(reqBody))
if err != nil {
log.Error().Err(err).Msg("🔴 could not decompress gzipped request body")
return envelopes
}
defer reader.Close()
reqBody, err = io.ReadAll(reader)
if err != nil {
log.Error().Err(err).Msg("🔴 could not read decompressed gzipped request body")
return envelopes
}
}
for _, e := range gjson.ParseBytes(reqBody).Array() {
n := envelope.NewEnvelope(conf.App)
evnt, err := buildEvent(e, conf.SelfDescribing)
if err != nil {
log.Error().Err(err).Msg("🔴 could not build generic event")
}
n.Protocol = protocol.SELF_DESCRIBING
if evnt.Payload.Schema != "" {
n.Schema = evnt.Payload.Schema
}
n.Contexts = &contexts
n.Payload = evnt.Payload.Data
envelopes = append(envelopes, n)
}
return envelopes
}