/
notification.go
151 lines (135 loc) · 4.45 KB
/
notification.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package tfc_hooks
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/hashicorp/go-tfe"
"github.com/kr/pretty"
"github.com/labstack/echo/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
"github.com/zapier/tfbuddy/pkg/runstream"
"github.com/zapier/tfbuddy/pkg/tfc_api"
"go.opentelemetry.io/otel"
)
var (
commonLabels = []string{
"organization",
"workspace",
"status",
}
tfcNotificationsReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "tfbuddy_tfc_notifications_received",
Help: "Count of all TFC notification webhooks received",
},
[]string{
"status",
},
)
tfcNotificationPublishSuccess = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "tfbuddy_tfc_notifications_success",
Help: "Count of all TFC Notifications that were processed successfully",
}, commonLabels)
tfcNotificationPublishFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "tfbuddy_tfc_notifications_failed",
Help: "Count of all TFC Notifications that could not be processed",
}, commonLabels)
)
func init() {
r := prometheus.DefaultRegisterer
r.MustRegister(tfcNotificationsReceived)
r.MustRegister(tfcNotificationPublishSuccess)
r.MustRegister(tfcNotificationPublishFailed)
}
type NotificationHandler struct {
api tfc_api.ApiClient
stream runstream.StreamClient
}
func NewNotificationHandler(api tfc_api.ApiClient, stream runstream.StreamClient) *NotificationHandler {
h := &NotificationHandler{
api: api,
stream: stream,
}
// subscribe to Run Polling Tasks queue
_, err := stream.SubscribeTFRunPollingTasks(h.pollingStreamCallback)
if err != nil {
log.Fatal().Err(err).Msg("could not create Run Polling Task subscription")
}
return h
}
func (h *NotificationHandler) Handler() func(c echo.Context) error {
return func(c echo.Context) error {
ctx, span := otel.Tracer("TFBuddy").Start(c.Request().Context(), "NotificationHandler")
defer span.End()
labels := prometheus.Labels{
"status": "processed",
}
event := NotificationPayload{}
if err := (&echo.DefaultBinder{}).BindBody(c, &event); err != nil {
log.Error().Err(err).Msg("failed to unmarshall event payload")
labels["status"] = "error"
tfcNotificationsReceived.With(labels).Inc()
return err
}
log.Debug().Str("event", pretty.Sprint(event))
// do something with event
h.processNotification(ctx, &event)
tfcNotificationsReceived.With(labels).Inc()
return c.String(http.StatusOK, "OK")
}
}
type NotificationPayload struct {
PayloadVersion int `json:"payload_version"`
NotificationConfigurationId string `json:"notification_configuration_id"`
RunUrl string `json:"run_url"`
RunId string `json:"run_id"`
RunMessage string `json:"run_message"`
RunCreatedAt time.Time `json:"run_created_at"`
RunCreatedBy string `json:"run_created_by"`
WorkspaceId string `json:"workspace_id"`
WorkspaceName string `json:"workspace_name"`
OrganizationName string `json:"organization_name"`
Notifications []struct {
Message string `json:"message"`
Trigger string `json:"trigger"`
RunStatus tfe.RunStatus `json:"run_status"`
RunUpdatedAt time.Time `json:"run_updated_at"`
RunUpdatedBy string `json:"run_updated_by"`
} `json:"notifications"`
}
func (h *NotificationHandler) processNotification(ctx context.Context, n *NotificationPayload) {
ctx, span := otel.Tracer("TFBuddy").Start(ctx, "ProcessNotification")
defer span.End()
log.Debug().Interface("NotificationPayload", *n).Msg("processNotification()")
if n.RunId == "" {
return
}
run, err := h.api.GetRun(ctx, n.RunId)
if err != nil {
span.RecordError(err)
log.Error().Err(err)
}
runJson, _ := json.Marshal(run)
log.Debug().Str("run", string(runJson))
fmt.Println(string(runJson))
// notifying
labels := prometheus.Labels{
"status": string(n.Notifications[0].RunStatus),
"organization": n.OrganizationName,
"workspace": n.WorkspaceName,
}
err = h.stream.PublishTFRunEvent(ctx, &runstream.TFRunEvent{
Organization: n.OrganizationName,
Workspace: n.WorkspaceName,
RunID: n.RunId,
NewStatus: string(n.Notifications[0].RunStatus),
})
if err != nil {
span.RecordError(err)
tfcNotificationPublishFailed.With(labels).Inc()
} else {
tfcNotificationPublishSuccess.With(labels).Inc()
}
}