-
Notifications
You must be signed in to change notification settings - Fork 311
/
personalizemanager.go
113 lines (98 loc) · 3.9 KB
/
personalizemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//go:generate mockgen -destination=../../../mocks/services/streammanager/personalize/mock_personalize.go -package mock_personalize github.com/rudderlabs/rudder-server/services/streammanager/personalize PersonalizeClient
package personalize
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/service/personalizeevents"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/services/streammanager/common"
"github.com/rudderlabs/rudder-server/utils/awsutils"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/tidwall/gjson"
)
var pkgLogger logger.Logger
func init() {
pkgLogger = logger.NewLogger().Child("streammanager").Child("personalize")
}
type PersonalizeProducer struct {
client PersonalizeClient
}
type PersonalizeClient interface {
PutEvents(input *personalizeevents.PutEventsInput) (*personalizeevents.PutEventsOutput, error)
PutUsers(input *personalizeevents.PutUsersInput) (*personalizeevents.PutUsersOutput, error)
PutItems(input *personalizeevents.PutItemsInput) (*personalizeevents.PutItemsOutput, error)
}
func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*PersonalizeProducer, error) {
sessionConfig, err := awsutils.NewSessionConfigForDestination(destination, o.Timeout, personalizeevents.ServiceName)
if err != nil {
return nil, err
}
awsSession, err := awsutils.CreateSession(sessionConfig)
if err != nil {
return nil, err
}
return &PersonalizeProducer{client: personalizeevents.New(awsSession)}, nil
}
func (producer *PersonalizeProducer) Produce(jsonData json.RawMessage, _ interface{}) (statusCode int, respStatus, responseMessag string) {
client := producer.client
if client == nil {
return 400, "Could not create producer for Personalize", "Could not create producer for Personalize"
}
var response interface{}
var err error
parsedJSON := gjson.ParseBytes(jsonData)
eventChoice := parsedJSON.Get("choice").String()
eventPayload := []byte(parsedJSON.Get("payload").String())
switch eventChoice {
case "PutEvents":
input := personalizeevents.PutEventsInput{}
err = json.Unmarshal(eventPayload, &input)
if err != nil {
return 400, err.Error(), "Could not unmarshal jsonData according to PutEvents input structure"
}
if err = input.Validate(); err != nil {
return 400, err.Error(), "input does not have required fields"
}
response, err = client.PutEvents(&input)
case "PutUsers":
input := personalizeevents.PutUsersInput{}
err = json.Unmarshal(eventPayload, &input)
if err != nil {
return 400, err.Error(), "Could not unmarshal jsonData according to PutUsers input structure"
}
if err = input.Validate(); err != nil {
return 400, err.Error(), "input does not have required fields"
}
response, err = client.PutUsers(&input)
case "PutItems":
input := personalizeevents.PutItemsInput{}
err = json.Unmarshal(eventPayload, &input)
if err != nil {
return 400, err.Error(), "Could not unmarshal jsonData according to PutItems input structure"
}
if err = input.Validate(); err != nil {
return 400, err.Error(), "input does not have required fields"
}
response, err = client.PutItems(&input)
default:
input := personalizeevents.PutEventsInput{}
err = json.Unmarshal(jsonData, &input)
if err != nil {
return 400, err.Error(), "Could not unmarshal jsonData according to PutEvents input structure"
}
if err = input.Validate(); err != nil {
return 400, err.Error(), "input does not have required fields"
}
response, err = client.PutEvents(&input)
}
if err != nil {
statusCode, respStatus, responseMessage := common.ParseAWSError(err)
pkgLogger.Errorf("[Personalize] error :: %d : %s : %s", statusCode, respStatus, responseMessage)
return statusCode, respStatus, responseMessage
}
return 200, "Success", fmt.Sprintf("Message delivered with Record information %v", response)
}
func (*PersonalizeProducer) Close() error {
// no-op
return nil
}