/
apigw_events.go
112 lines (96 loc) · 3.06 KB
/
apigw_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package aws
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/aws/aws-lambda-go/events"
tuUtils "github.com/pixie79/tiny-utils/utils"
"github.com/tidwall/gjson"
"github.com/twmb/franz-go/pkg/kgo"
)
func ApiGwCreateKafkaEvent(ctx context.Context, event events.APIGatewayProxyRequest, key []byte) ([]*kgo.Record, context.Context) {
var (
kafkaRecords []*kgo.Record
keyValue = tuUtils.CreateKey(key)
source string
topic string
payloadKey string
topicInBody = true
partialPayloadKey string
partialPayload = false
value []byte
)
if _, found := event.PathParameters["proxy"]; found {
source = event.PathParameters["proxy"]
source = strings.ToLower(source)
} else {
tuUtils.MaybeDie(fmt.Errorf("no source found"), "api gw proxy path parameter not found")
}
if source == "electrum" {
payloadKey = "type"
topicInBody = true
partialPayload = false
partialPayloadKey = "tranInfo"
}
tuUtils.Print("DEBUG", fmt.Sprintf("Source is: %s", source))
decodedPayload := make([]byte, base64.StdEncoding.DecodedLen(len(event.Body)))
n, err := base64.StdEncoding.Decode(decodedPayload, []byte(event.Body))
tuUtils.MaybeDie(err, "unable to decode base64 payload")
//payloads := ReturnListFromString(string(decodedPayload[:n]))
if !gjson.Valid(string(decodedPayload[:n])) {
tuUtils.MaybeDie(errors.New("invalid json"), "error parsing json")
}
//value1 := gjson.Get(string(decodedPayload[:n]), ".")
//m, ok := gjson.Parse(string(decodedPayload[:n])).Value().(map[string]interface{})
m := gjson.GetManyBytes(decodedPayload[:n], "")
//if !ok {
// utils.Print("INFO", "Not a map")
//}
fmt.Printf("%+v\n", m)
fmt.Printf("length: %d\n\n", len(m))
for _, payload := range m {
payloadJson, err := json.Marshal(payload)
tuUtils.MaybeDie(err, "unable to marshal payload")
if topicInBody {
topic = strings.ToLower(source) + "-" + gjson.Get(
string(payloadJson),
payloadKey,
).String()
} else {
topic = strings.ToLower(source)
}
if partialPayload {
value = []byte(gjson.Get(string(payloadJson), partialPayloadKey).String())
} else {
value = payloadJson
}
tuUtils.Print("DEBUG", fmt.Sprintf("topic: %s, value: %s, key: %s", topic, value, key))
payloadEvent := &kgo.Record{
Topic: topic,
Value: value,
Key: keyValue,
}
kafkaRecords = append(kafkaRecords, payloadEvent)
}
// Return the kafka records to be sent to kafka
return kafkaRecords, ctx
}
func ReturnListFromString(body string) []map[string]interface{} {
var (
payloads []map[string]interface{}
payload interface{}
)
// Unmarshal or Decode the JSON to the interface.
err := json.Unmarshal([]byte(body), &payloads)
if err != nil {
err = json.Unmarshal([]byte(body), &payload)
tuUtils.MaybeDie(err, "unable to unmarshal json body")
payloads = append(payloads, payload.(map[string]interface{}))
}
tuUtils.MaybeDie(err, "unable to unmarshal json body")
tuUtils.Print("INFO", fmt.Sprintf("number of items in payload: %d", len(payloads)))
return payloads
}