Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the HealthCheckImpl in vtctld instead of the LegacyHealthCheck #10254

Merged
merged 16 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 61 additions & 11 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discovery

import (
"context"
"fmt"
"sort"
"sync"

Expand All @@ -42,8 +43,9 @@ import (
// NewFakeHealthCheck returns the fake healthcheck object.
func NewFakeHealthCheck(ch chan *TabletHealth) *FakeHealthCheck {
return &FakeHealthCheck{
items: make(map[string]*fhcItem),
ch: ch,
items: make(map[string]*fhcItem),
itemsAlias: make(map[string]*fhcItem),
ch: ch,
}
}

Expand All @@ -52,6 +54,7 @@ type FakeHealthCheck struct {
// mu protects the items map
mu sync.RWMutex
items map[string]*fhcItem
itemsAlias map[string]*fhcItem
currentTabletUID int
// channel to return on subscribe. Pass nil if no subscribe should not return a channel
ch chan *TabletHealth
Expand Down Expand Up @@ -87,6 +90,24 @@ func (fhc *FakeHealthCheck) GetHealthyTabletStats(target *querypb.Target) []*Tab
return result
}

// GetTabletHealthByAlias results the TabletHealth of the tablet that matches the given alias
func (fhc *FakeHealthCheck) GetTabletHealthByAlias(alias *topodatapb.TabletAlias) (*TabletHealth, error) {
return fhc.GetTabletHealth("", alias)
}

// GetTabletHealth results the TabletHealth of the tablet that matches the given alias
func (fhc *FakeHealthCheck) GetTabletHealth(kst KeyspaceShardTabletType, alias *topodatapb.TabletAlias) (*TabletHealth, error) {
fhc.mu.Lock()
defer fhc.mu.Unlock()

if hd, ok := fhc.itemsAlias[alias.String()]; ok {
if hd.ts.Tablet.Alias.String() == alias.String() {
return hd.ts, nil
}
}
return nil, fmt.Errorf("could not find tablet: %s", alias.String())
}

// Subscribe returns the channel in the struct. Subscribe should only be called in one place for this fake health check
func (fhc *FakeHealthCheck) Subscribe() chan *TabletHealth {
return fhc.ch
Expand Down Expand Up @@ -174,6 +195,7 @@ func (fhc *FakeHealthCheck) AddTablet(tablet *topodatapb.Tablet) {
fhc.mu.Lock()
defer fhc.mu.Unlock()
fhc.items[key] = item
fhc.itemsAlias[tablet.Alias.String()] = item
}

// RemoveTablet removes the tablet.
Expand Down Expand Up @@ -216,19 +238,46 @@ func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias, targ

// CacheStatus returns the status for each tablet
func (fhc *FakeHealthCheck) CacheStatus() TabletsCacheStatusList {
tcsMap := fhc.CacheStatusMap()
tcsl := make(TabletsCacheStatusList, 0, len(tcsMap))
for _, tcs := range tcsMap {
tcsl = append(tcsl, tcs)
}
sort.Sort(tcsl)
return tcsl
}

// CacheStatusMap returns a map of the health check cache.
func (fhc *FakeHealthCheck) CacheStatusMap() map[string]*TabletsCacheStatus {
tcsMap := make(map[string]*TabletsCacheStatus)
fhc.mu.Lock()
defer fhc.mu.Unlock()
for _, ths := range fhc.items {
key := fmt.Sprintf("%v.%v.%v.%v", ths.ts.Tablet.Alias.Cell, ths.ts.Target.Keyspace, ths.ts.Target.Shard, ths.ts.Target.TabletType.String())
var tcs *TabletsCacheStatus
var ok bool
if tcs, ok = tcsMap[key]; !ok {
tcs = &TabletsCacheStatus{
Cell: ths.ts.Tablet.Alias.Cell,
Target: ths.ts.Target,
}
tcsMap[key] = tcs
}
tcs.TabletsStats = append(tcs.TabletsStats, ths.ts)
}
return tcsMap
}

stats := make(TabletsCacheStatusList, 0, len(fhc.items))
for _, item := range fhc.items {
stats = append(stats, &TabletsCacheStatus{
Cell: "FakeCell",
Target: item.ts.Target,
TabletsStats: TabletStatsList{item.ts},
})
// UpdateHealth adds a TabletHealth for the tablet defined in th.
func (fhc *FakeHealthCheck) UpdateHealth(th *TabletHealth) {
frouioui marked this conversation as resolved.
Show resolved Hide resolved
fhc.mu.Lock()
defer fhc.mu.Unlock()

key := TabletToMapKey(th.Tablet)
if t, ok := fhc.items[key]; ok {
t.ts = th
fhc.itemsAlias[th.Tablet.Alias.String()].ts = th
}
sort.Sort(stats)
return stats
}

// Close is not implemented.
Expand Down Expand Up @@ -275,6 +324,7 @@ func (fhc *FakeHealthCheck) AddFakeTablet(cell, host string, port int32, keyspac
},
}
fhc.items[key] = item
fhc.itemsAlias[t.Alias.String()] = item
}
item.ts.Target = &querypb.Target{
Keyspace: keyspace,
Expand Down
91 changes: 63 additions & 28 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,17 @@ type TabletRecorder interface {
ReplaceTablet(old, new *topodata.Tablet)
}

type keyspaceShardTabletType string
type KeyspaceShardTabletType string
type tabletAliasString string

// HealthCheck declares what the TabletGateway needs from the HealthCheck
type HealthCheck interface {
// CacheStatus returns a displayable version of the health check cache.
CacheStatus() TabletsCacheStatusList

// CacheStatusMap returns a map of the health check cache.
CacheStatusMap() map[string]*TabletsCacheStatus

// Close stops the healthcheck.
Close() error

Expand All @@ -192,6 +195,12 @@ type HealthCheck interface {
// synchronization
GetHealthyTabletStats(target *query.Target) []*TabletHealth

// GetTabletHealth results the TabletHealth of the tablet that matches the given alias
GetTabletHealth(kst KeyspaceShardTabletType, alias *topodata.TabletAlias) (*TabletHealth, error)

// GetTabletHealthByAlias results the TabletHealth of the tablet that matches the given alias
GetTabletHealthByAlias(alias *topodata.TabletAlias) (*TabletHealth, error)

// Subscribe adds a listener. Used by vtgate buffer to learn about primary changes.
Subscribe() chan *TabletHealth

Expand All @@ -201,6 +210,17 @@ type HealthCheck interface {

var _ HealthCheck = (*HealthCheckImpl)(nil)

// Target includes cell which we ignore here
// because tabletStatsCache is intended to be per-cell
func KeyFromTarget(target *query.Target) KeyspaceShardTabletType {
return KeyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType)))
}

// KeyFromTablet returns the KeyspaceShardTabletType that matches the given topodata.Tablet
func KeyFromTablet(tablet *topodata.Tablet) KeyspaceShardTabletType {
return KeyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type)))
}

// HealthCheckImpl performs health checking and stores the results.
// The goal of this object is to maintain a StreamHealth RPC
// to a lot of tablets. Tablets are added / removed by calling the
Expand All @@ -227,9 +247,9 @@ type HealthCheckImpl struct {
// a map keyed by keyspace.shard.tabletType
// contains a map of TabletHealth keyed by tablet alias for each tablet relevant to the keyspace.shard.tabletType
// has to be kept in sync with healthByAlias
healthData map[keyspaceShardTabletType]map[tabletAliasString]*TabletHealth
healthData map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth
// another map keyed by keyspace.shard.tabletType, this one containing a sorted list of TabletHealth
healthy map[keyspaceShardTabletType][]*TabletHealth
healthy map[KeyspaceShardTabletType][]*TabletHealth
// connsWG keeps track of all launched Go routines that monitor tablet connections.
connsWG sync.WaitGroup
// topology watchers that inform healthcheck of tablets being added and deleted
Expand Down Expand Up @@ -266,8 +286,8 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[keyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[keyspaceShardTabletType][]*TabletHealth),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
}
Expand Down Expand Up @@ -341,7 +361,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
}

// add to our datastore
key := hc.keyFromTarget(target)
key := KeyFromTarget(target)
tabletAlias := topoproto.TabletAliasString(tablet.Alias)
if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok {
// We should not add a tablet that we already have
Expand Down Expand Up @@ -377,7 +397,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
hc.mu.Lock()
defer hc.mu.Unlock()

key := hc.keyFromTablet(tablet)
key := KeyFromTablet(tablet)
tabletAlias := tabletAliasString(topoproto.TabletAliasString(tablet.Alias))
// delete from authoritative map
th, ok := hc.healthByAlias[tabletAlias]
Expand Down Expand Up @@ -417,14 +437,14 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
return
}

targetKey := hc.keyFromTarget(th.Target)
targetKey := KeyFromTarget(th.Target)
targetChanged := prevTarget.TabletType != th.Target.TabletType || prevTarget.Keyspace != th.Target.Keyspace || prevTarget.Shard != th.Target.Shard
if targetChanged {
// Error counter has to be set here in case we get a new tablet type for the first time in a stream response
hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoproto.TabletTypeLString(th.Target.TabletType)}, 0)
// keyspace and shard are not expected to change, but just in case ...
// move this tabletHealthCheck to the correct map
oldTargetKey := hc.keyFromTarget(prevTarget)
oldTargetKey := KeyFromTarget(prevTarget)
delete(hc.healthData[oldTargetKey], tabletAlias)
_, ok := hc.healthData[targetKey]
if !ok {
Expand Down Expand Up @@ -476,7 +496,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
hc.recomputeHealthy(targetKey)
}
if targetChanged && prevTarget.TabletType != topodata.TabletType_PRIMARY && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list
oldTargetKey := hc.keyFromTarget(prevTarget)
oldTargetKey := KeyFromTarget(prevTarget)
hc.recomputeHealthy(oldTargetKey)
}
}
Expand All @@ -491,7 +511,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
hc.broadcast(th)
}

func (hc *HealthCheckImpl) recomputeHealthy(key keyspaceShardTabletType) {
func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
all := hc.healthData[key]
allArray := make([]*TabletHealth, 0, len(all))
for _, s := range all {
Expand Down Expand Up @@ -532,7 +552,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {

// CacheStatus returns a displayable version of the cache.
func (hc *HealthCheckImpl) CacheStatus() TabletsCacheStatusList {
tcsMap := hc.cacheStatusMap()
tcsMap := hc.CacheStatusMap()
tcsl := make(TabletsCacheStatusList, 0, len(tcsMap))
for _, tcs := range tcsMap {
tcsl = append(tcsl, tcs)
Expand All @@ -541,7 +561,7 @@ func (hc *HealthCheckImpl) CacheStatus() TabletsCacheStatusList {
return tcsl
}

func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus {
func (hc *HealthCheckImpl) CacheStatusMap() map[string]*TabletsCacheStatus {
tcsMap := make(map[string]*TabletsCacheStatus)
hc.mu.Lock()
defer hc.mu.Unlock()
Expand Down Expand Up @@ -602,7 +622,7 @@ func (hc *HealthCheckImpl) GetHealthyTabletStats(target *query.Target) []*Tablet
if target.Shard == "" {
target.Shard = "0"
}
return append(result, hc.healthy[hc.keyFromTarget(target)]...)
return append(result, hc.healthy[KeyFromTarget(target)]...)
}

// getTabletStats returns all tablets for the given target.
Expand All @@ -613,7 +633,7 @@ func (hc *HealthCheckImpl) getTabletStats(target *query.Target) []*TabletHealth
var result []*TabletHealth
hc.mu.Lock()
defer hc.mu.Unlock()
ths := hc.healthData[hc.keyFromTarget(target)]
ths := hc.healthData[KeyFromTarget(target)]
for _, th := range ths {
result = append(result, th)
}
Expand Down Expand Up @@ -708,28 +728,43 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.
}
}

// GetTabletHealthByAlias results the TabletHealth of the tablet that matches the given alias
func (hc *HealthCheckImpl) GetTabletHealthByAlias(alias *topodata.TabletAlias) (*TabletHealth, error) {
hc.mu.Lock()
defer hc.mu.Unlock()

if hd, ok := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))]; ok {
return hd.SimpleCopy(), nil
}
return nil, fmt.Errorf("could not find tablet: %s", alias.String())
}

// GetTabletHealth results the TabletHealth of the tablet that matches the given alias and KeyspaceShardTabletType
// The data is retrieved from the healthData map.
func (hc *HealthCheckImpl) GetTabletHealth(kst KeyspaceShardTabletType, alias *topodata.TabletAlias) (*TabletHealth, error) {
hc.mu.Lock()
defer hc.mu.Unlock()

if hd, ok := hc.healthData[kst]; ok {
if th, ok := hd[tabletAliasString(topoproto.TabletAliasString(alias))]; ok {
return th, nil
}
}
return nil, fmt.Errorf("could not find tablet: %s", alias.String())
}

// TabletConnection returns the Connection to a given tablet.
func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target *query.Target) (queryservice.QueryService, error) {
hc.mu.Lock()
thc := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))]
hc.mu.Unlock()
if thc == nil || thc.Conn == nil {
//TODO: test that throws this error
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(), nil
}

// Target includes cell which we ignore here
// because tabletStatsCache is intended to be per-cell
func (hc *HealthCheckImpl) keyFromTarget(target *query.Target) keyspaceShardTabletType {
return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType)))
}

func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardTabletType {
return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type)))
}

// getAliasByCell should only be called while holding hc.mu
func (hc *HealthCheckImpl) getAliasByCell(cell string) string {
if alias, ok := hc.cellAliases[cell]; ok {
Expand Down Expand Up @@ -811,7 +846,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
status := hc.CacheStatus()
b, err := json.MarshalIndent(status, "", " ")
if err != nil {
//Error logged
// Error logged
if _, err := w.Write([]byte(err.Error())); err != nil {
log.Errorf("write to buffer error failed: %v", err)
}
Expand All @@ -822,7 +857,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
buf := bytes.NewBuffer(nil)
json.HTMLEscape(buf, b)

//Error logged
// Error logged
if _, err := w.Write(buf.Bytes()); err != nil {
log.Errorf("write to buffer bytes failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/legacy_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (e LegacyTabletStats) NamedStatusURL() string {
func (e LegacyTabletStats) getTabletDebugURL() string {
var buffer bytes.Buffer

//Error logged
// Error logged
if err := tabletURLTemplate.Execute(&buffer, e); err != nil {
log.Errorf("tabletURLTemplate.Execute(&buffer, e) failed: %v", err)
}
Expand Down
Loading