-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock.go
137 lines (123 loc) · 3.81 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Copyright (C) Continental Automotive GmbH 2019
* Alle Rechte vorbehalten. All Rights Reserved.
* The reproduction, transmission or use of this document or its contents is not
* permitted without express written authority. Offenders will be liable for
* damages. All rights, including rights created by patent grant or registration of
* a utility model or design, are reserved.
*/
package sqs
import (
"context"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"go.uber.org/zap"
"github.com/scraly/go.pkg/log"
)
type releaseRequest struct {
retryDelay *time.Duration
}
// MessageLock holds a message which is kept invisible to other consumers until Release is called.
//
// The invibility is garanteed be a background routine which peridically resets the visibility
// timeout of the locked message.
type MessageLock struct {
ctx context.Context
svc sqsiface.SQSAPI
queue string
visibilityTimeout time.Duration
heartbeatInterval time.Duration
message *sqs.Message
release chan releaseRequest
err chan error
}
// NewMessageLock creates a lock for the given SQS message.
func NewMessageLock(ctx context.Context, svc sqsiface.SQSAPI, queue string, visibilityTimeout, heartbeatInterval time.Duration, message *sqs.Message) *MessageLock {
lock := &MessageLock{
ctx: ctx,
svc: svc,
queue: queue,
visibilityTimeout: visibilityTimeout,
heartbeatInterval: heartbeatInterval,
message: message,
release: make(chan releaseRequest, 1),
err: make(chan error, 1),
}
go lock.loop()
return lock
}
// Message returns the locked SQS message.
func (l *MessageLock) Message() *sqs.Message {
return l.message
}
// Release the lock.
//
// If retryDelay is nil, then the locked message is deleted.
// Otherwise, the message visibility timeout is set to the given duration.
//
// Release must be called only once.
func (l *MessageLock) Release(retryDelay *time.Duration) error {
l.release <- releaseRequest{
retryDelay: retryDelay,
}
close(l.release)
return <-l.err
}
func (l *MessageLock) loop() {
for {
select {
case release := <-l.release:
if release.retryDelay == nil {
l.err <- l.deleteMessage()
} else {
l.err <- l.changeMessageVisibility(*release.retryDelay)
}
return
case <-l.ctx.Done():
_ = l.changeMessageVisibility(0)
l.err <- l.ctx.Err()
return
case <-time.After(l.heartbeatInterval):
_ = l.changeMessageVisibility(l.visibilityTimeout)
}
}
}
func (l *MessageLock) deleteMessage() error {
_, err := l.svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(l.queue),
ReceiptHandle: l.message.ReceiptHandle,
})
if err != nil {
log.For(l.ctx).Error("Failed to delete message",
zap.String("messageID", aws.StringValue(l.message.MessageId)),
zap.Error(err),
)
return err
}
log.For(l.ctx).Info("Message deleted",
zap.String("messageID", aws.StringValue(l.message.MessageId)),
)
return nil
}
func (l *MessageLock) changeMessageVisibility(visibilityTimeout time.Duration) error {
_, err := l.svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(l.queue),
ReceiptHandle: l.message.ReceiptHandle,
VisibilityTimeout: aws.Int64(int64(visibilityTimeout / time.Second)),
})
if err != nil {
log.For(l.ctx).Warn("Failed to update visibility timeout",
zap.String("messageID", aws.StringValue(l.message.MessageId)),
zap.Duration("visibilityTimeout", visibilityTimeout),
zap.Error(err),
)
return err
}
log.For(l.ctx).Debug("Visibility timeout updated",
zap.String("messageID", aws.StringValue(l.message.MessageId)),
zap.Duration("visibilityTimeout", visibilityTimeout),
)
return nil
}