forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
179 lines (161 loc) · 5.24 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package ts
import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
)
const (
// URLPrefix is the prefix for all time series endpoints hosted by the
// server.
URLPrefix = "/ts/"
// queryWorkerMax is the default maximum number of worker goroutines that
// the time series server can use to service incoming queries.
queryWorkerMax = 250
)
// ServerConfig provides a means for tests to override settings in the time
// series server.
type ServerConfig struct {
// The maximum number of query workers used by the server. If this
// value is zero, a default non-zero value is used instead.
QueryWorkerMax int
}
// Server handles incoming external requests related to time series data.
type Server struct {
log.AmbientContext
db *DB
stopper *stop.Stopper
workerSem chan struct{}
}
// MakeServer instantiates a new Server which services requests with data from
// the supplied DB.
func MakeServer(
ambient log.AmbientContext, db *DB, cfg ServerConfig, stopper *stop.Stopper,
) Server {
ambient.AddLogTag("ts-srv", nil)
queryWorkerMax := queryWorkerMax
if cfg.QueryWorkerMax != 0 {
queryWorkerMax = cfg.QueryWorkerMax
}
return Server{
AmbientContext: ambient,
db: db,
stopper: stopper,
workerSem: make(chan struct{}, queryWorkerMax),
}
}
// RegisterService registers the GRPC service.
func (s *Server) RegisterService(g *grpc.Server) {
tspb.RegisterTimeSeriesServer(g, s)
}
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests
// to the appropriate gRPC endpoints.
func (s *Server) RegisterGateway(
ctx context.Context, mux *gwruntime.ServeMux, conn *grpc.ClientConn,
) error {
return tspb.RegisterTimeSeriesHandler(ctx, mux, conn)
}
// Query is an endpoint that returns data for one or more metrics over a
// specific time span.
func (s *Server) Query(
ctx context.Context, request *tspb.TimeSeriesQueryRequest,
) (*tspb.TimeSeriesQueryResponse, error) {
ctx = s.AnnotateCtx(ctx)
if len(request.Queries) == 0 {
return nil, grpc.Errorf(codes.InvalidArgument, "Queries cannot be empty")
}
// If not set, sampleNanos should default to ten second resolution.
sampleNanos := request.SampleNanos
if sampleNanos == 0 {
sampleNanos = Resolution10s.SampleDuration()
}
response := tspb.TimeSeriesQueryResponse{
Results: make([]tspb.TimeSeriesQueryResponse_Result, len(request.Queries)),
}
// Defer cancellation of context passed to worker tasks; if main task
// returns early, worker tasks should be torn down quickly.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Channel which workers use to report their result, which is either an
// error or nil (when successful).
workerOutput := make(chan error)
// Start a task which is itself responsible for starting per-query worker
// tasks. This is needed because RunLimitedAsyncTask can block; in the
// case where a single request has more queries than the semaphore limit,
// a deadlock would occur because queries cannot complete until
// they have written their result to the "output" channel, which is
// processed later in the main function.
if err := s.stopper.RunAsyncTask(ctx, "ts.Server: queries", func(ctx context.Context) {
for queryIdx, query := range request.Queries {
queryIdx := queryIdx
query := query
if err := s.stopper.RunLimitedAsyncTask(
ctx,
"ts.Server: query",
s.workerSem,
true, /* wait */
func(ctx context.Context) {
datapoints, sources, err := s.db.Query(
ctx,
query,
Resolution10s,
sampleNanos,
request.StartNanos,
request.EndNanos,
)
if err == nil {
response.Results[queryIdx] = tspb.TimeSeriesQueryResponse_Result{
Query: query,
Datapoints: datapoints,
}
response.Results[queryIdx].Sources = sources
}
select {
case workerOutput <- err:
case <-ctx.Done():
}
},
); err != nil {
// Stopper has been closed and is draining. Return an error and
// exit the worker-spawning loop.
select {
case workerOutput <- err:
case <-ctx.Done():
}
return
}
}
}); err != nil {
return nil, err
}
for range request.Queries {
select {
case err := <-workerOutput:
if err != nil {
// Return the first error encountered. This will cancel the
// worker context and cause all other in-progress workers to
// exit.
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
return &response, nil
}