-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
171 lines (141 loc) · 4.43 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Event logger is a tool to capture sqs events and display the most recent as a
// HTML page. It keeps the last 10 messages received, and those results can be
// filtered using ?detail-type and ?detail query parameters. It will wait 10
// seconds for a result when filtering.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sort"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/ministryofjustice/opg-go-common/env"
)
const (
// duration of ticks for receiving messages
receiveTick = time.Second
// maximum number of messages to remember
maxMessages = 10
// duration of ticks when filtering messages
waitTick = time.Second
// when filtering messages how many matches to wait for
waitMinimum = 1
// when filtering messages how many "ticks" to wait before returning a response
waitMaxTicks = 10
)
type message struct {
Time time.Time
Detail json.RawMessage
DetailType string `json:"detail-type"`
Source string
}
func main() {
var (
awsBaseURL = env.Get("AWS_BASE_URL", "")
port = env.Get("PORT", "8080")
queueName = env.Get("QUEUE_NAME", "event-queue")
ctx = context.Background()
queueURL string
messages []message
)
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal(fmt.Errorf("unable to load SDK config: %w", err))
}
cfg.Region = "eu-west-1"
cfg.BaseEndpoint = aws.String(awsBaseURL)
client := sqs.NewFromConfig(cfg)
go func() {
for range time.Tick(receiveTick) {
if queueURL == "" {
urlResponse, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
log.Println("failed to get queue url:", err)
continue
}
queueURL = *urlResponse.QueueUrl
}
messageResponse, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
MessageAttributeNames: []string{string(types.QueueAttributeNameAll)},
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 10, // may as well ask for as many as possible
VisibilityTimeout: 5,
})
if err != nil {
log.Println("failed to retrieve message:", err)
continue
}
log.Println("received", len(messageResponse.Messages))
if len(messageResponse.Messages) == 0 {
continue
}
var toDelete []types.DeleteMessageBatchRequestEntry
for _, m := range messageResponse.Messages {
toDelete = append(toDelete, types.DeleteMessageBatchRequestEntry{Id: m.MessageId, ReceiptHandle: m.ReceiptHandle})
var v message
if err := json.Unmarshal([]byte(*m.Body), &v); err != nil {
log.Println("could not unmarshal message: ", err)
continue
}
messages = append(messages, v)
}
deleteResponse, err := client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
QueueUrl: aws.String(queueURL),
Entries: toDelete,
})
if err != nil {
log.Println("problem deleting messages:", err)
continue
}
log.Println("deleting messages:", len(deleteResponse.Successful), "success,", len(deleteResponse.Failed), "failed")
// trim to last N messages
sort.Slice(messages, func(i, j int) bool {
return messages[i].Time.After(messages[j].Time)
})
if len(messages) > maxMessages {
messages = messages[:maxMessages]
}
}
}()
filterMessages := func(messages []message, detailType, detail string) []message {
if detailType == "" {
return messages
}
var matching []message
done := make(chan struct{})
count := 0
go func() {
for range time.Tick(waitTick) {
count++
for _, m := range messages {
if m.DetailType == detailType && strings.Contains(string(m.Detail), detail) {
matching = append(matching, m)
}
}
if len(matching) >= waitMinimum || count > waitMaxTicks {
done <- struct{}{}
break
}
}
}()
<-done
return matching
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "<!DOCTYPE html><body><table><thead><tr><th>Time</th><th>Source</th><th>DetailType</th><th>Detail</th></thead><tbody>")
for _, m := range filterMessages(messages, r.FormValue("detail-type"), r.FormValue("detail")) {
fmt.Fprintf(w, "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>", m.Time, m.Source, m.DetailType, m.Detail)
}
fmt.Fprint(w, "</tbody></table></body>")
})
http.ListenAndServe(":"+port, nil)
}