-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
resilient_server.go
106 lines (89 loc) · 3.56 KB
/
resilient_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package srvtopo
import (
"time"
"github.com/spf13/pflag"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
)
var (
// srvTopoCacheTTL and srvTopoCacheRefresh control the behavior of
// the caching for both watched and unwatched values.
//
// For entries we don't watch (like the list of Keyspaces), we refresh
// the cached list from the topo after srv_topo_cache_refresh elapses.
// If the fetch fails, we hold onto the cached value until
// srv_topo_cache_ttl elapses.
//
// For entries we watch (like the SrvKeyspace for a given cell), if
// setting the watch fails, we will use the last known value until
// srv_topo_cache_ttl elapses and we only try to re-establish the watch
// once every srv_topo_cache_refresh interval.
srvTopoTimeout = 5 * time.Second
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
)
func registerFlags(fs *pflag.FlagSet) {
fs.DurationVar(&srvTopoTimeout, "srv_topo_timeout", srvTopoTimeout, "topo server timeout")
fs.DurationVar(&srvTopoCacheTTL, "srv_topo_cache_ttl", srvTopoCacheTTL, "how long to use cached entries for topology")
fs.DurationVar(&srvTopoCacheRefresh, "srv_topo_cache_refresh", srvTopoCacheRefresh, "how frequently to refresh the topology for cached entries")
}
func init() {
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vttablet", registerFlags)
servenv.OnParseFor("vtcombo", registerFlags)
}
const (
queryCategory = "query"
cachedCategory = "cached"
errorCategory = "error"
)
// ResilientServer is an implementation of srvtopo.Server based
// on a topo.Server that uses a cache for two purposes:
// - limit the QPS to the underlying topo.Server
// - return the last known value of the data if there is an error
type ResilientServer struct {
topoServer *topo.Server
counts *stats.CountersWithSingleLabel
*SrvKeyspaceWatcher
*SrvVSchemaWatcher
*SrvKeyspaceNamesQuery
}
// NewResilientServer creates a new ResilientServer
// based on the provided topo.Server.
func NewResilientServer(base *topo.Server, counterPrefix string) *ResilientServer {
if srvTopoCacheRefresh > srvTopoCacheTTL {
log.Fatalf("srv_topo_cache_refresh must be less than or equal to srv_topo_cache_ttl")
}
var metric string
if counterPrefix == "" {
metric = counterPrefix + "Counts"
} else {
metric = ""
}
counts := stats.NewCountersWithSingleLabel(metric, "Resilient srvtopo server operations", "type")
return &ResilientServer{
topoServer: base,
counts: counts,
SrvKeyspaceWatcher: NewSrvKeyspaceWatcher(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
SrvVSchemaWatcher: NewSrvVSchemaWatcher(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
SrvKeyspaceNamesQuery: NewSrvKeyspaceNamesQuery(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
}
}
// GetTopoServer returns the topo.Server that backs the resilient server.
func (server *ResilientServer) GetTopoServer() (*topo.Server, error) {
return server.topoServer, nil
}