forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 2
/
topo_utils.go
169 lines (144 loc) · 7.03 KB
/
topo_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package worker
import (
"flag"
"fmt"
"math/rand"
"time"
"github.com/xsec-lab/go/vt/vterrors"
"github.com/xsec-lab/go/vt/discovery"
"github.com/xsec-lab/go/vt/log"
"github.com/xsec-lab/go/vt/servenv"
"github.com/xsec-lab/go/vt/topo/topoproto"
"github.com/xsec-lab/go/vt/wrangler"
"golang.org/x/net/context"
topodatapb "github.com/xsec-lab/go/vt/proto/topodata"
)
var (
// waitForHealthyTabletsTimeout intends to wait for the
// healthcheck to automatically return rdonly instances which
// have been taken out by previous *Clone or *Diff runs.
// Therefore, the default for this variable must be higher
// than vttablet's -health_check_interval.
waitForHealthyTabletsTimeout = flag.Duration("wait_for_healthy_tablets_timeout", 60*time.Second, "maximum time to wait at the start if less than --min_healthy_tablets are available")
)
// FindHealthyTablet returns a random healthy tabletType tablet.
// Since we don't want to use them all, we require at least
// minHealthyRdonlyTablets servers to be healthy.
// May block up to -wait_for_healthy_rdonly_tablets_timeout.
func FindHealthyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, tabletType topodatapb.TabletType) (*topodatapb.TabletAlias, error) {
if tsc == nil {
// No healthcheck instance provided. Create one.
healthCheck := discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout)
tsc = discovery.NewTabletStatsCache(healthCheck, wr.TopoServer(), cell)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
defer watcher.Stop()
defer healthCheck.Close()
}
healthyTablets, err := waitForHealthyTablets(ctx, wr, tsc, cell, keyspace, shard, minHealthyRdonlyTablets, *waitForHealthyTabletsTimeout, tabletType)
if err != nil {
return nil, err
}
// random server in the list is what we want
index := rand.Intn(len(healthyTablets))
return healthyTablets[index].Tablet.Alias, nil
}
func waitForHealthyTablets(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, timeout time.Duration, tabletType topodatapb.TabletType) ([]discovery.TabletStats, error) {
busywaitCtx, busywaitCancel := context.WithTimeout(ctx, timeout)
defer busywaitCancel()
start := time.Now()
deadlineForLog, _ := busywaitCtx.Deadline()
log.V(2).Infof("Waiting for enough healthy %v tablets to become available in (%v,%v/%v). required: %v Waiting up to %.1f seconds.", tabletType,
cell, keyspace, shard, minHealthyRdonlyTablets, deadlineForLog.Sub(time.Now()).Seconds())
// Wait for at least one RDONLY tablet initially before checking the list.
if err := tsc.WaitForTablets(busywaitCtx, cell, keyspace, shard, tabletType); err != nil {
return nil, vterrors.Wrapf(err, "error waiting for %v tablets for (%v,%v/%v)", tabletType, cell, keyspace, shard)
}
var healthyTablets []discovery.TabletStats
for {
select {
case <-busywaitCtx.Done():
return nil, fmt.Errorf("not enough healthy %v tablets to choose from in (%v,%v/%v), have %v healthy ones, need at least %v Context error: %v",
tabletType, cell, keyspace, shard, len(healthyTablets), minHealthyRdonlyTablets, busywaitCtx.Err())
default:
}
healthyTablets = discovery.RemoveUnhealthyTablets(tsc.GetTabletStats(keyspace, shard, tabletType))
if len(healthyTablets) >= minHealthyRdonlyTablets {
break
}
deadlineForLog, _ := busywaitCtx.Deadline()
wr.Logger().Infof("Waiting for enough healthy %v tablets to become available (%v,%v/%v). available: %v required: %v Waiting up to %.1f more seconds.",
tabletType, cell, keyspace, shard, len(healthyTablets), minHealthyRdonlyTablets, deadlineForLog.Sub(time.Now()).Seconds())
// Block for 1 second because 2 seconds is the -health_check_interval flag value in integration tests.
timer := time.NewTimer(1 * time.Second)
select {
case <-busywaitCtx.Done():
timer.Stop()
case <-timer.C:
}
}
log.V(2).Infof("At least %v healthy %v tablets are available in (%v,%v/%v) (required: %v). Took %.1f seconds to find this out.",
tabletType, len(healthyTablets), cell, keyspace, shard, minHealthyRdonlyTablets, time.Now().Sub(start).Seconds())
return healthyTablets, nil
}
// FindWorkerTablet will:
// - find a tabletType instance in the keyspace / shard
// - mark it as worker
// - tag it with our worker process
func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyTablets int, tabletType topodatapb.TabletType) (*topodatapb.TabletAlias, error) {
tabletAlias, err := FindHealthyTablet(ctx, wr, tsc, cell, keyspace, shard, minHealthyTablets, tabletType)
if err != nil {
return nil, err
}
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED)
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED)
cancel()
if err != nil {
return nil, err
}
ourURL := servenv.ListeningURL.String()
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias))
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error {
if tablet.Tags == nil {
tablet.Tags = make(map[string]string)
}
tablet.Tags["worker"] = ourURL
tablet.Tags["drain_reason"] = "Used by vtworker"
return nil
})
cancel()
if err != nil {
return nil, err
}
// Using "defer" here because we remove the tag *before* calling
// ChangeSlaveType back, so we need to record this tag change after the change
// slave type change in the cleaner.
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "")
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "drain_reason", "")
// Record a clean-up action to take the tablet back to tabletAlias.
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType)
// We refresh the destination vttablet reloads the worker URL when it reloads the tablet.
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
wr.RefreshTabletState(shortCtx, tabletAlias)
if err != nil {
return nil, err
}
cancel()
return tabletAlias, nil
}
func init() {
rand.Seed(time.Now().UnixNano())
}