/
checkpoint.go
132 lines (107 loc) · 3.56 KB
/
checkpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package kinesis
import (
"context"
"fmt"
"time"
"github.com/justtrackio/gosoline/pkg/clock"
"github.com/justtrackio/gosoline/pkg/conc"
"github.com/justtrackio/gosoline/pkg/ddb"
"github.com/justtrackio/gosoline/pkg/mdl"
)
// checkpointWrapper is used with atomic.Value. It allows us to store different types of the same interface in it without a panic.
type checkpointWrapper struct {
Checkpoint
}
// once we replace the checkpoint value, we use this type which doesn't care and never fails if we release it twice
type nopCheckpoint struct{}
func (c nopCheckpoint) GetSequenceNumber() SequenceNumber {
return ""
}
func (c nopCheckpoint) Advance(_ SequenceNumber) error {
return nil
}
func (c nopCheckpoint) Done(_ SequenceNumber) error {
return nil
}
func (c nopCheckpoint) Persist(_ context.Context) (shouldRelease bool, err error) {
return false, nil
}
func (c nopCheckpoint) Release(_ context.Context) error {
return nil
}
type checkpoint struct {
repo ddb.Repository
clock clock.Clock
lck conc.PoisonedLock
namespace string
shardId ShardId
owningClientId ClientId
sequenceNumber SequenceNumber
finalSequenceNumber SequenceNumber
finishedAt *time.Time
}
func (c *checkpoint) GetSequenceNumber() SequenceNumber {
return c.sequenceNumber
}
func (c *checkpoint) Advance(sequenceNumber SequenceNumber) error {
if err := c.lck.TryLock(); err != nil {
return fmt.Errorf("can not advance already released checkpoint: %w", err)
}
defer c.lck.Unlock()
c.sequenceNumber = sequenceNumber
return nil
}
func (c *checkpoint) Done(sequenceNumber SequenceNumber) error {
if err := c.lck.TryLock(); err != nil {
return fmt.Errorf("can not mark already released checkpoint as done: %w", err)
}
defer c.lck.Unlock()
c.finishedAt = mdl.Box(c.clock.Now())
c.finalSequenceNumber = sequenceNumber
return nil
}
func (c *checkpoint) Persist(ctx context.Context) (shouldRelease bool, err error) {
if err := c.lck.TryLock(); err != nil {
return false, fmt.Errorf("can not persist already released checkpoint: %w", err)
}
defer c.lck.Unlock()
record := &CheckpointRecord{
BaseRecord: BaseRecord{
Namespace: c.namespace,
Resource: string(c.shardId),
UpdatedAt: c.clock.Now(),
Ttl: mdl.Box(c.clock.Now().Add(ShardTimeout).Unix()),
},
OwningClientId: c.owningClientId,
SequenceNumber: c.sequenceNumber,
FinishedAt: c.finishedAt,
}
if c.sequenceNumber != c.finalSequenceNumber && c.finalSequenceNumber != "" {
record.FinishedAt = nil
}
qb := c.repo.PutItemBuilder().WithCondition(ddb.Eq("owningClientId", c.owningClientId))
if result, err := c.repo.PutItem(ctx, qb, record); err != nil {
return false, fmt.Errorf("failed to persist checkpoint: %w", err)
} else if result.ConditionalCheckFailed {
return false, ErrCheckpointNoLongerOwned
}
return record.FinishedAt != nil, nil
}
func (c *checkpoint) Release(ctx context.Context) error {
return c.lck.PoisonIf(func() (bool, error) {
qb := c.repo.UpdateItemBuilder().
WithHash(c.namespace).
WithRange(c.shardId).
Remove("owningClientId").
Set("updatedAt", c.clock.Now()).
Set("ttl", mdl.Box(c.clock.Now().Add(ShardTimeout).Unix())).
Set("sequenceNumber", c.sequenceNumber).
WithCondition(ddb.Eq("owningClientId", c.owningClientId))
if result, err := c.repo.UpdateItem(ctx, qb, &CheckpointRecord{}); err != nil {
return false, fmt.Errorf("failed to release checkpoint: %w", err)
} else if result.ConditionalCheckFailed {
return true, ErrCheckpointAlreadyReleased
}
return true, nil
})
}