forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
init_tablet.go
182 lines (161 loc) · 6.65 KB
/
init_tablet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
// This file handles the initialization of the tablet at startup time.
// It is only enabled if init_tablet_type or init_keyspace is set.
import (
"flag"
"fmt"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/flagutil"
"github.com/youtube/vitess/go/netutil"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"golang.org/x/net/context"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
initDbNameOverride = flag.String("init_db_name_override", "", "(init parameter) override the name of the db used by vttablet")
initKeyspace = flag.String("init_keyspace", "", "(init parameter) keyspace to use for this tablet")
initShard = flag.String("init_shard", "", "(init parameter) shard to use for this tablet")
initTags flagutil.StringMapValue
initTabletType = flag.String("init_tablet_type", "", "(init parameter) the tablet type to use for this tablet.")
initTimeout = flag.Duration("init_timeout", 1*time.Minute, "(init parameter) timeout to use for the init phase.")
)
func init() {
flag.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet")
}
// InitTablet initializes the tablet record if necessary.
func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// it should be either we have all three of init_keyspace,
// init_shard and init_tablet_type, or none.
if *initKeyspace == "" && *initShard == "" && *initTabletType == "" {
// not initializing the record
return nil
}
if *initKeyspace == "" || *initShard == "" || *initTabletType == "" {
return fmt.Errorf("either need all of init_keyspace, init_shard and init_tablet_type, or none")
}
// parse init_tablet_type
tabletType, err := topoproto.ParseTabletType(*initTabletType)
if err != nil {
return fmt.Errorf("invalid init_tablet_type %v: %v", *initTabletType, err)
}
if tabletType == topodatapb.TabletType_MASTER {
// We disallow MASTER, so we don't have to change
// shard.MasterAlias, and deal with the corner cases.
return fmt.Errorf("init_tablet_type cannot be master, use replica instead")
}
// parse and validate shard name
shard, _, err := topo.ValidateShardName(*initShard)
if err != nil {
return fmt.Errorf("cannot validate shard name %v: %v", *initShard, err)
}
// create a context for this whole operation
ctx, cancel := context.WithTimeout(agent.batchCtx, *initTimeout)
defer cancel()
// read the shard, create it if necessary
log.Infof("Reading shard record %v/%v", *initKeyspace, shard)
si, err := agent.TopoServer.GetOrCreateShard(ctx, *initKeyspace, shard)
if err != nil {
return fmt.Errorf("InitTablet cannot GetOrCreateShard shard: %v", err)
}
if si.MasterAlias != nil && topoproto.TabletAliasEqual(si.MasterAlias, agent.TabletAlias) {
// We're marked as master in the shard record, which could mean the master
// tablet process was just restarted. However, we need to check if a new
// master is in the process of taking over. In that case, it will let us
// know by forcibly updating the old master's tablet record.
oldTablet, err := agent.TopoServer.GetTablet(ctx, agent.TabletAlias)
switch err {
case topo.ErrNoNode:
// There's no existing tablet record, so we can assume
// no one has left us a message to step down.
tabletType = topodatapb.TabletType_MASTER
case nil:
if oldTablet.Type == topodatapb.TabletType_MASTER {
// We're marked as master in the shard record,
// and our existing tablet record agrees.
tabletType = topodatapb.TabletType_MASTER
}
default:
return fmt.Errorf("InitTablet failed to read existing tablet record: %v", err)
}
}
// See if we need to add the tablet's cell to the shard's cell list.
if !si.HasCell(agent.TabletAlias.Cell) {
si, err = agent.TopoServer.UpdateShardFields(ctx, *initKeyspace, shard, func(si *topo.ShardInfo) error {
if si.HasCell(agent.TabletAlias.Cell) {
// Someone else already did it.
return topo.ErrNoUpdateNeeded
}
si.Cells = append(si.Cells, agent.TabletAlias.Cell)
return nil
})
if err != nil {
return fmt.Errorf("couldn't add tablet's cell to shard record: %v", err)
}
}
log.Infof("Initializing the tablet for type %v", tabletType)
// figure out the hostname
hostname := *tabletHostname
if hostname != "" {
log.Infof("Using hostname: %v from -tablet_hostname flag.", hostname)
} else {
hostname, err := netutil.FullyQualifiedHostname()
if err != nil {
return err
}
log.Infof("Using detected machine hostname: %v To change this, fix your machine network configuration or override it with -tablet_hostname.", hostname)
}
// create and populate tablet record
tablet := &topodatapb.Tablet{
Alias: agent.TabletAlias,
Hostname: hostname,
PortMap: make(map[string]int32),
Keyspace: *initKeyspace,
Shard: *initShard,
Type: tabletType,
DbNameOverride: *initDbNameOverride,
Tags: initTags,
}
if port != 0 {
tablet.PortMap["vt"] = port
}
if gRPCPort != 0 {
tablet.PortMap["grpc"] = gRPCPort
}
if err := topo.TabletComplete(tablet); err != nil {
return fmt.Errorf("InitTablet TabletComplete failed: %v", err)
}
// now try to create the record
err = agent.TopoServer.CreateTablet(ctx, tablet)
switch err {
case nil:
// it worked, we're good, can update the replication graph
if err := topo.UpdateTabletReplicationData(ctx, agent.TopoServer, tablet); err != nil {
return fmt.Errorf("UpdateTabletReplicationData failed: %v", err)
}
case topo.ErrNodeExists:
// The node already exists, will just try to update it. So we read it first.
oldTablet, err := agent.TopoServer.GetTablet(ctx, tablet.Alias)
if err != nil {
return fmt.Errorf("InitTablet failed to read existing tablet record: %v", err)
}
// Sanity check the keyspace and shard
if oldTablet.Keyspace != tablet.Keyspace || oldTablet.Shard != tablet.Shard {
return fmt.Errorf("InitTablet failed because existing tablet keyspace and shard %v/%v differ from the provided ones %v/%v", oldTablet.Keyspace, oldTablet.Shard, tablet.Keyspace, tablet.Shard)
}
// Then overwrite everything, ignoring version mismatch.
if err := agent.TopoServer.UpdateTablet(ctx, topo.NewTabletInfo(tablet, -1)); err != nil {
return fmt.Errorf("UpdateTablet failed: %v", err)
}
// Note we don't need to UpdateTabletReplicationData
// as the tablet already existed with the right data
// in the replication graph
default:
return fmt.Errorf("CreateTablet failed: %v", err)
}
return nil
}