-
Notifications
You must be signed in to change notification settings - Fork 390
/
dialer.go
170 lines (138 loc) · 4.29 KB
/
dialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package piecedeletion
import (
"context"
"strconv"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
)
// Dialer implements dialing piecestores and sending delete requests with batching and redial threshold.
type Dialer struct {
log *zap.Logger
dialer rpc.Dialer
requestTimeout time.Duration
failThreshold time.Duration
piecesPerRequest int
mu sync.RWMutex
dialFailed map[storj.NodeID]time.Time
}
// NewDialer returns a new Dialer.
func NewDialer(log *zap.Logger, dialer rpc.Dialer, requestTimeout, failThreshold time.Duration, piecesPerRequest int) *Dialer {
return &Dialer{
log: log,
dialer: dialer,
requestTimeout: requestTimeout,
failThreshold: failThreshold,
piecesPerRequest: piecesPerRequest,
dialFailed: map[storj.NodeID]time.Time{},
}
}
// Handle tries to send the deletion requests to the specified node.
func (dialer *Dialer) Handle(ctx context.Context, node storj.NodeURL, queue Queue) {
defer mon.Task()(&ctx, node.ID.String())(nil)
defer FailPending(queue)
if dialer.recentlyFailed(ctx, node) {
return
}
client, conn, err := dialPieceStore(ctx, dialer.dialer, node)
if err != nil {
dialer.log.Debug("failed to dial", zap.Stringer("id", node.ID), zap.Error(err))
dialer.markFailed(ctx, node)
return
}
defer func() {
if err := conn.Close(); err != nil {
dialer.log.Debug("closing connection failed", zap.Stringer("id", node.ID), zap.Error(err))
}
}()
for {
if err := ctx.Err(); err != nil {
return
}
jobs, ok := queue.PopAll()
// Add metrics on to the span
s := monkit.SpanFromCtx(ctx)
s.Annotate("delete jobs size", strconv.Itoa(len(jobs)))
if !ok {
return
}
for len(jobs) > 0 {
batch, promises, rest := batchJobs(jobs, dialer.piecesPerRequest)
// Aggregation metrics
mon.IntVal("delete_batch_size").Observe(int64(len(batch))) //mon:locked
// Tracing metrics
s.Annotate("delete_batch_size", strconv.Itoa(len(batch)))
jobs = rest
requestCtx, cancel := context.WithTimeout(ctx, dialer.requestTimeout)
resp, err := client.DeletePieces(requestCtx, &pb.DeletePiecesRequest{
PieceIds: batch,
})
cancel()
for _, promise := range promises {
if err != nil {
promise.Failure()
} else {
promise.Success()
}
}
if err != nil {
dialer.log.Debug("deletion request failed", zap.Stringer("id", node.ID), zap.Error(err))
// don't try to send to this storage node a bit, when the deletion times out
if errs2.IsCanceled(err) {
dialer.markFailed(ctx, node)
}
break
} else {
mon.IntVal("deletion_pieces_unhandled_count").Observe(resp.UnhandledCount) //mon:locked
}
jobs = append(jobs, queue.PopAllWithoutClose()...)
}
// if we failed early, remaining jobs should be marked as failures
for _, job := range jobs {
job.Resolve.Failure()
}
}
}
// markFailed marks node as something failed recently, so we shouldn't try again,
// for some time.
func (dialer *Dialer) markFailed(ctx context.Context, node storj.NodeURL) {
dialer.mu.Lock()
defer dialer.mu.Unlock()
now := time.Now()
lastFailed, ok := dialer.dialFailed[node.ID]
if !ok || lastFailed.Before(now) {
dialer.dialFailed[node.ID] = now
}
}
// recentlyFailed checks whether a request to node recently failed.
func (dialer *Dialer) recentlyFailed(ctx context.Context, node storj.NodeURL) bool {
dialer.mu.RLock()
lastFailed, ok := dialer.dialFailed[node.ID]
dialer.mu.RUnlock()
// when we recently failed to dial, then fail immediately
return ok && time.Since(lastFailed) < dialer.failThreshold
}
func batchJobs(jobs []Job, maxBatchSize int) (pieces []storj.PieceID, promises []Promise, rest []Job) {
for i, job := range jobs {
if len(pieces) >= maxBatchSize {
return pieces, promises, jobs[i:]
}
pieces = append(pieces, job.Pieces...)
promises = append(promises, job.Resolve)
}
return pieces, promises, nil
}
func dialPieceStore(ctx context.Context, dialer rpc.Dialer, target storj.NodeURL) (pb.DRPCPiecestoreClient, *rpc.Conn, error) {
conn, err := dialer.DialNodeURL(ctx, target)
if err != nil {
return nil, nil, err
}
return pb.NewDRPCPiecestoreClient(conn), conn, nil
}