-
Notifications
You must be signed in to change notification settings - Fork 2k
/
server.go
83 lines (68 loc) · 2.54 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package grpcvtctlserver contains the gRPC implementation of the server side
of the remote execution of vtctl commands.
*/
package grpcvtctlserver
import (
"sync"
"google.golang.org/grpc"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl"
"vitess.io/vitess/go/vt/vttablet/tmclient"
"vitess.io/vitess/go/vt/wrangler"
logutilpb "vitess.io/vitess/go/vt/proto/logutil"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice"
)
// VtctlServer is our RPC server
type VtctlServer struct {
ts *topo.Server
}
// NewVtctlServer returns a new Vtctl Server for the topo server.
func NewVtctlServer(ts *topo.Server) *VtctlServer {
return &VtctlServer{ts}
}
// ExecuteVtctlCommand is part of the vtctldatapb.VtctlServer interface
func (s *VtctlServer) ExecuteVtctlCommand(args *vtctldatapb.ExecuteVtctlCommandRequest, stream vtctlservicepb.Vtctl_ExecuteVtctlCommandServer) (err error) {
defer servenv.HandlePanic("vtctl", &err)
// Create a logger, send the result back to the caller.
// We may execute this in parallel (inside multiple go routines),
// but the stream.Send() method is not thread safe in gRPC.
// So use a mutex to protect it.
mu := sync.Mutex{}
logstream := logutil.NewCallbackLogger(func(e *logutilpb.Event) {
// If the client disconnects, we will just fail
// to send the log events, but won't interrupt
// the command.
mu.Lock()
stream.Send(&vtctldatapb.ExecuteVtctlCommandResponse{
Event: e,
})
mu.Unlock()
})
logger := logutil.NewTeeLogger(logstream, logutil.NewConsoleLogger())
// create the wrangler
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
wr := wrangler.New(logger, s.ts, tmc)
// execute the command
return vtctl.RunCommand(stream.Context(), wr, args.Args)
}
// StartServer registers the VtctlServer for RPCs
func StartServer(s *grpc.Server, ts *topo.Server) {
vtctlservicepb.RegisterVtctlServer(s, NewVtctlServer(ts))
}