-
Notifications
You must be signed in to change notification settings - Fork 307
/
kinesismanager.go
113 lines (95 loc) · 3.41 KB
/
kinesismanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//go:generate mockgen -destination=../../../mocks/services/streammanager/kinesis/mock_kinesis.go -package mock_kinesis github.com/rudderlabs/rudder-server/services/streammanager/kinesis KinesisClient
package kinesis
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/tidwall/gjson"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/services/streammanager/common"
"github.com/rudderlabs/rudder-server/utils/awsutils"
"github.com/rudderlabs/rudder-server/utils/logger"
)
var pkgLogger logger.Logger
// Config is the config that is required to send data to Kinesis
type Config struct {
Stream string
UseMessageID bool
}
func init() {
pkgLogger = logger.NewLogger().Child("streammanager").Child(kinesis.ServiceName)
}
type KinesisProducer struct {
client KinesisClient
}
type KinesisClient interface {
PutRecord(input *kinesis.PutRecordInput) (*kinesis.PutRecordOutput, error)
}
// NewProducer creates a producer based on destination config
func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*KinesisProducer, error) {
sessionConfig, err := awsutils.NewSessionConfigForDestination(destination, o.Timeout, kinesis.ServiceName)
if err != nil {
return nil, err
}
awsSession, err := awsutils.CreateSession(sessionConfig)
if err != nil {
return nil, err
}
return &KinesisProducer{client: kinesis.New(awsSession)}, err
}
// Produce creates a producer and send data to Kinesis.
func (producer *KinesisProducer) Produce(jsonData json.RawMessage, destConfig interface{}) (int, string, string) {
client := producer.client
if client == nil {
return 400, "Could not create producer for Kinesis", "Could not create producer for Kinesis"
}
config := Config{}
jsonConfig, err := json.Marshal(destConfig)
if err != nil {
outErr := fmt.Errorf("[KinesisManager] Error while Marshalling destination config %+v Error: %w", destConfig, err)
return 400, outErr.Error(), outErr.Error()
}
err = json.Unmarshal(jsonConfig, &config)
if err != nil {
outErr := fmt.Errorf("[KinesisManager] Error while Unmarshalling destination config: %w", err)
return 400, outErr.Error(), outErr.Error()
}
streamName := aws.String(config.Stream)
parsedJSON := gjson.ParseBytes(jsonData)
data := parsedJSON.Get("message").Value()
if data == nil {
return 400, "InvalidPayload", "Empty Payload"
}
value, err := json.Marshal(data)
if err != nil {
return 400, err.Error(), err.Error()
}
var partitionKey string
if config.UseMessageID {
partitionKey = parsedJSON.Get("message.messageId").String()
}
if partitionKey == "" {
partitionKey = parsedJSON.Get("userId").String()
}
putInput := kinesis.PutRecordInput{
Data: value,
StreamName: streamName,
PartitionKey: aws.String(partitionKey),
}
if err = putInput.Validate(); err != nil {
return 400, "InvalidInput", err.Error()
}
putOutput, err := client.PutRecord(&putInput)
if err != nil {
statusCode, respStatus, responseMessage := common.ParseAWSError(err)
pkgLogger.Errorf("[Kinesis] error :: %d : %s : %s", statusCode, respStatus, responseMessage)
return statusCode, respStatus, responseMessage
}
message := fmt.Sprintf("Message delivered at SequenceNumber: %v , shard Id: %v", putOutput.SequenceNumber, putOutput.ShardId)
return 200, "Success", message
}
func (*KinesisProducer) Close() error {
// no-op
return nil
}