forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
topo_utils.go
153 lines (134 loc) · 6.54 KB
/
topo_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"flag"
"fmt"
"math/rand"
"time"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
// waitForHealthyEndPointsTimeout intends to wait for the
// healthcheck to automatically return rdonly instances which
// have been taken out by previous *Clone or *Diff runs.
// Therefore, the default for this variable must be higher
// than vttablet's -health_check_interval.
waitForHealthyEndPointsTimeout = flag.Duration("wait_for_healthy_rdonly_endpoints_timeout", 60*time.Second, "maximum time to wait if less than --min_healthy_rdonly_endpoints are available")
healthCheckTopologyRefresh = flag.Duration("worker_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology")
healthcheckRetryDelay = flag.Duration("worker_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck")
healthCheckTimeout = flag.Duration("worker_healthcheck_timeout", time.Minute, "the health check timeout period")
)
// FindHealthyRdonlyEndPoint returns a random healthy endpoint.
// Since we don't want to use them all, we require at least
// minHealthyEndPoints servers to be healthy.
// May block up to -wait_for_healthy_rdonly_endpoints_timeout.
func FindHealthyRdonlyEndPoint(ctx context.Context, wr *wrangler.Wrangler, cell, keyspace, shard string, minHealthyRdonlyEndPoints int) (*topodatapb.TabletAlias, error) {
busywaitCtx, busywaitCancel := context.WithTimeout(ctx, *waitForHealthyEndPointsTimeout)
defer busywaitCancel()
// create a discovery healthcheck, wait for it to have one rdonly
// endpoints at this point
healthCheck := discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout, "" /* statsSuffix */)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, 5 /*topoReadConcurrency*/)
defer watcher.Stop()
defer healthCheck.Close()
deadlineForLog, _ := busywaitCtx.Deadline()
wr.Logger().Infof("Waiting for enough healthy rdonly endpoints to become available. required: %v Waiting up to %.1f seconds.",
minHealthyRdonlyEndPoints, deadlineForLog.Sub(time.Now()).Seconds())
if err := discovery.WaitForEndPoints(busywaitCtx, healthCheck, cell, keyspace, shard, []topodatapb.TabletType{topodatapb.TabletType_RDONLY}); err != nil {
return nil, fmt.Errorf("error waiting for rdonly endpoints for (%v,%v/%v): %v", cell, keyspace, shard, err)
}
var healthyEndpoints []*topodatapb.EndPoint
for {
select {
case <-busywaitCtx.Done():
return nil, fmt.Errorf("not enough healthy rdonly endpoints to choose from in (%v,%v/%v), have %v healthy ones, need at least %v Context error: %v",
cell, keyspace, shard, len(healthyEndpoints), minHealthyRdonlyEndPoints, busywaitCtx.Err())
default:
}
addrs := healthCheck.GetEndPointStatsFromTarget(keyspace, shard, topodatapb.TabletType_RDONLY)
healthyEndpoints = make([]*topodatapb.EndPoint, 0, len(addrs))
for _, addr := range addrs {
// Note we do not check the 'Serving' flag here.
// This is mainly to avoid the case where we run a
// Diff between a source and destination, and the source
// is not serving (disabled by TabletControl).
// When we switch the tablet to 'worker', it will
// go back to serving state.
if addr.Stats == nil || addr.Stats.HealthError != "" || addr.Stats.SecondsBehindMaster > 30 {
continue
}
healthyEndpoints = append(healthyEndpoints, addr.EndPoint)
}
if len(healthyEndpoints) >= minHealthyRdonlyEndPoints {
break
}
deadlineForLog, _ := busywaitCtx.Deadline()
wr.Logger().Infof("Waiting for enough healthy rdonly endpoints to become available. available: %v required: %v Waiting up to %.1f more seconds.",
len(healthyEndpoints), minHealthyRdonlyEndPoints, deadlineForLog.Sub(time.Now()).Seconds())
// Block for 1 second because 2 seconds is the -health_check_interval flag value in integration tests.
timer := time.NewTimer(1 * time.Second)
select {
case <-busywaitCtx.Done():
timer.Stop()
case <-timer.C:
}
}
// random server in the list is what we want
index := rand.Intn(len(healthyEndpoints))
return &topodatapb.TabletAlias{
Cell: cell,
Uid: healthyEndpoints[index].Uid,
}, nil
}
// FindWorkerTablet will:
// - find a rdonly instance in the keyspace / shard
// - mark it as worker
// - tag it with our worker process
func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, cell, keyspace, shard string, minHealthyRdonlyEndPoints int) (*topodatapb.TabletAlias, error) {
tabletAlias, err := FindHealthyRdonlyEndPoint(ctx, wr, cell, keyspace, shard, minHealthyRdonlyEndPoints)
if err != nil {
return nil, err
}
// We add the tag before calling ChangeSlaveType, so the destination
// vttablet reloads the worker URL when it reloads the tablet.
ourURL := servenv.ListeningURL.String()
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias))
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error {
if tablet.Tags == nil {
tablet.Tags = make(map[string]string)
}
tablet.Tags["worker"] = ourURL
return nil
})
cancel()
if err != nil {
return nil, err
}
// Using "defer" here because we remove the tag *before* calling
// ChangeSlaveType back, so we need to record this tag change after the change
// slave type change in the cleaner.
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "")
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_WORKER)
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_WORKER)
cancel()
if err != nil {
return nil, err
}
// Record a clean-up action to take the tablet back to rdonly.
// We will alter this one later on and let the tablet go back to
// 'spare' if we have stopped replication for too long on it.
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_RDONLY)
return tabletAlias, nil
}
func init() {
rand.Seed(time.Now().UnixNano())
}