-
Notifications
You must be signed in to change notification settings - Fork 402
/
pending.go
175 lines (145 loc) · 4.77 KB
/
pending.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package gracefulexit
import (
"context"
"sync"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
)
// PendingFinishedPromise for waiting for information about finished state.
type PendingFinishedPromise struct {
addedWorkChan chan struct{}
finishCalledChan chan struct{}
finishErr error
}
func newFinishedPromise() *PendingFinishedPromise {
return &PendingFinishedPromise{
addedWorkChan: make(chan struct{}),
finishCalledChan: make(chan struct{}),
}
}
// Wait should be called (once) after acquiring the finished promise and will return whether the pending map is finished.
func (promise *PendingFinishedPromise) Wait(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-promise.addedWorkChan:
return false, nil
case <-promise.finishCalledChan:
return true, promise.finishErr
}
}
func (promise *PendingFinishedPromise) addedWork() {
close(promise.addedWorkChan)
}
func (promise *PendingFinishedPromise) finishCalled(err error) {
promise.finishErr = err
close(promise.finishCalledChan)
}
// PendingTransfer is the representation of work on the pending map.
// It contains information about a transfer request that has been sent to a storagenode by the satellite.
type PendingTransfer struct {
StreamID uuid.UUID
Position metabase.SegmentPosition
PieceSize int64
SatelliteMessage *pb.SatelliteMessage
OriginalRootPieceID storj.PieceID
PieceNum uint16
}
// PendingMap for managing concurrent access to the pending transfer map.
type PendingMap struct {
mu sync.RWMutex
data map[storj.PieceID]*PendingTransfer
doneSending bool
doneSendingErr error
finishedPromise *PendingFinishedPromise
}
// NewPendingMap creates a new PendingMap.
func NewPendingMap() *PendingMap {
newData := make(map[storj.PieceID]*PendingTransfer)
return &PendingMap{
data: newData,
}
}
// Put adds work to the map. If there is already work associated with this piece ID it returns an error.
// If there is a PendingFinishedPromise waiting, that promise is updated to return false.
func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.doneSending {
return Error.New("cannot add work: pending map already finished")
}
if _, ok := pm.data[pieceID]; ok {
return Error.New("piece ID already exists in pending map")
}
pm.data[pieceID] = pendingTransfer
if pm.finishedPromise != nil {
pm.finishedPromise.addedWork()
pm.finishedPromise = nil
}
return nil
}
// Get returns the pending transfer item from the map, if it exists.
func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pendingTransfer, ok := pm.data[pieceID]
return pendingTransfer, ok
}
// Length returns the number of elements in the map.
func (pm *PendingMap) Length() int {
pm.mu.RLock()
defer pm.mu.RUnlock()
return len(pm.data)
}
// Delete removes the pending transfer item from the map and returns an error if the data does not exist.
func (pm *PendingMap) Delete(pieceID storj.PieceID) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.data[pieceID]; !ok {
return Error.New("piece ID does not exist in pending map")
}
delete(pm.data, pieceID)
return nil
}
// IsFinishedPromise returns a promise for the caller to wait on to determine the finished status of the pending map.
// If we have enough information to determine the finished status, we update the promise to have an answer immediately.
// Otherwise, we attach the promise to the pending map to be updated and cleared by either Put or DoneSending (whichever happens first).
func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.finishedPromise != nil {
return pm.finishedPromise
}
newPromise := newFinishedPromise()
if len(pm.data) > 0 {
newPromise.addedWork()
return newPromise
}
if pm.doneSending {
newPromise.finishCalled(pm.doneSendingErr)
return newPromise
}
pm.finishedPromise = newPromise
return newPromise
}
// DoneSending is called (with an optional error) when no more work will be added to the map.
// If DoneSending has already been called, an error is returned.
// If a PendingFinishedPromise is waiting on a response, it is updated to return true.
func (pm *PendingMap) DoneSending(err error) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.doneSending {
return Error.New("DoneSending() already called on pending map")
}
if pm.finishedPromise != nil {
pm.finishedPromise.finishCalled(err)
pm.finishedPromise = nil
}
pm.doneSending = true
pm.doneSendingErr = err
return nil
}