-
Notifications
You must be signed in to change notification settings - Fork 402
/
hubspot.go
209 lines (185 loc) · 5.8 KB
/
hubspot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package analytics
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/url"
"strings"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/sync2"
)
var mon = monkit.Package()
const (
eventPrefix = "pe20293085"
)
// HubSpotConfig is a configuration struct for Concurrent Sending of Events.
type HubSpotConfig struct {
APIKey string `help:"hubspot api key" default:""`
ChannelSize int `help:"the number of events that can be in the queue before dropping" default:"1000"`
ConcurrentSends int `help:"the number of concurrent api requests that can be made" default:"4"`
DefaultTimeout time.Duration `help:"the default timeout for the hubspot http client" default:"10s"`
}
// HubSpotEvent is a configuration struct for sending API request to HubSpot.
type HubSpotEvent struct {
Data map[string]interface{}
Endpoint string
}
// HubSpotEvents is a configuration struct for sending Events data to HubSpot.
type HubSpotEvents struct {
log *zap.Logger
config HubSpotConfig
events chan []HubSpotEvent
escapedAPIKey string
satelliteName string
worker sync2.Limiter
httpClient *http.Client
}
// NewHubSpotEvents for sending user events to HubSpot.
func NewHubSpotEvents(log *zap.Logger, config HubSpotConfig, satelliteName string) *HubSpotEvents {
return &HubSpotEvents{
log: log,
config: config,
events: make(chan []HubSpotEvent, config.ChannelSize),
escapedAPIKey: url.QueryEscape(config.APIKey),
satelliteName: satelliteName,
worker: *sync2.NewLimiter(config.ConcurrentSends),
httpClient: &http.Client{
Timeout: config.DefaultTimeout,
},
}
}
// Run for concurrent API requests.
func (q *HubSpotEvents) Run(ctx context.Context) error {
defer q.worker.Wait()
for {
if err := ctx.Err(); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-q.events:
q.worker.Go(ctx, func() {
err := q.Handle(ctx, ev)
if err != nil {
q.log.Error("Sending hubspot event", zap.Error(err))
}
})
}
}
}
// EnqueueCreateUser for creating user in HubSpot.
func (q *HubSpotEvents) EnqueueCreateUser(fields TrackCreateUserFields) {
fullName := fields.FullName
names := strings.SplitN(fullName, " ", 2)
var firstName string
var lastName string
if len(names) > 1 {
firstName = names[0]
lastName = names[1]
} else {
firstName = fullName
}
newField := func(name, value string) map[string]interface{} {
return map[string]interface{}{
"name": name,
"value": value,
}
}
createUser := HubSpotEvent{
Endpoint: "https://api.hsforms.com/submissions/v3/integration/submit/20293085/77cfa709-f533-44b8-bf3a-ed1278ca3202?hapikey=" + q.escapedAPIKey,
Data: map[string]interface{}{
"context": map[string]interface{}{
"hutk": fields.HubspotUTK,
},
"fields": []map[string]interface{}{
newField("email", fields.Email),
newField("firstname", firstName),
newField("lastname", lastName),
newField("lifecyclestage", "customer"),
newField("origin_header", fields.OriginHeader),
newField("signup_referrer", fields.Referrer),
newField("account_created", "true"),
},
},
}
sendUserEvent := HubSpotEvent{
Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey,
Data: map[string]interface{}{
"email": fields.Email,
"eventName": eventPrefix + "_" + strings.ToLower(q.satelliteName) + "_" + "account_created",
"properties": map[string]interface{}{
"userid": fields.ID.String(),
"email": fields.Email,
"name": fields.FullName,
"satellite_selected": q.satelliteName,
"account_type": string(fields.Type),
"company_size": fields.EmployeeCount,
"company_name": fields.CompanyName,
"job_title": fields.JobTitle,
"have_sales_contact": fields.HaveSalesContact,
},
},
}
select {
case q.events <- []HubSpotEvent{createUser, sendUserEvent}:
default:
q.log.Error("create user hubspot event failed, event channel is full")
}
}
// EnqueueEvent for sending user behavioral event to HubSpot.
func (q *HubSpotEvents) EnqueueEvent(email, eventName string, properties map[string]interface{}) {
eventName = strings.ReplaceAll(eventName, " ", "_")
eventName = strings.ToLower(eventName)
eventName = eventPrefix + "_" + eventName
newEvent := HubSpotEvent{
Endpoint: "https://api.hubapi.com/events/v3/send?hapikey=" + q.escapedAPIKey,
Data: map[string]interface{}{
"email": email,
"eventName": eventName,
"properties": properties,
},
}
select {
case q.events <- []HubSpotEvent{newEvent}:
default:
q.log.Error("sending hubspot event failed, event channel is full")
}
}
// handleSingleEvent for handle the single HubSpot API request.
func (q *HubSpotEvents) handleSingleEvent(ctx context.Context, ev HubSpotEvent) (err error) {
payloadBytes, err := json.Marshal(ev.Data)
if err != nil {
return Error.New("json marshal failed: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ev.Endpoint, bytes.NewReader(payloadBytes))
if err != nil {
return Error.New("new request failed: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := q.httpClient.Do(req)
if err != nil {
return Error.New("send request failed: %w", err)
}
err = resp.Body.Close()
if err != nil {
err = Error.New("closing resp body failed: %w", err)
}
return err
}
// Handle for handle the HubSpot API requests.
func (q *HubSpotEvents) Handle(ctx context.Context, events []HubSpotEvent) (err error) {
defer mon.Task()(&ctx)(&err)
for _, ev := range events {
err := q.handleSingleEvent(ctx, ev)
if err != nil {
return Error.New("handle event: %w", err)
}
}
return nil
}