Skip to content

Commit

Permalink
Merge pull request #1582 from alainjobart/logrpc
Browse files Browse the repository at this point in the history
Replacing channels in vtctl clients with interface.
  • Loading branch information
alainjobart committed Mar 21, 2016
2 parents a82e8ea + a8b848b commit 9654175
Show file tree
Hide file tree
Showing 21 changed files with 266 additions and 228 deletions.
5 changes: 2 additions & 3 deletions go/cmd/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"

logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -649,8 +648,8 @@ func (itmc *internalTabletManagerClient) PromoteSlave(ctx context.Context, table
return "", fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, tmclient.ErrFunc, error) {
return nil, nil, fmt.Errorf("not implemented in vtcombo")
func (itmc *internalTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (logutil.EventStream, error) {
return nil, fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) IsTimeoutError(err error) bool {
Expand Down
14 changes: 11 additions & 3 deletions go/vt/logutil/proto3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)

// This file contains a few functions to help with proto3. proto3
// will eventually support timestamps, at which point we'll retire
// this.
// This file contains a few functions to help with proto3.

// ProtoToTime converts a logutilpb.Time to a time.Time.
// proto3 will eventually support timestamps, at which point we'll retire
// this.
//
// A nil pointer is like the empty timestamp.
func ProtoToTime(ts *logutilpb.Time) time.Time {
Expand All @@ -34,3 +34,11 @@ func TimeToProto(t time.Time) *logutilpb.Time {
Nanoseconds: int32(nanos),
}
}

// EventStream is an interface used by RPC clients when the streaming
// RPC returns a stream of log events.
type EventStream interface {
// Recv returns the next event in the logs.
// If there are no more, it will return io.EOF.
Recv() (*logutilpb.Event, error)
}
29 changes: 16 additions & 13 deletions go/vt/tabletmanager/agentrpctest/test_agent_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package agentrpctest

import (
"fmt"
"io"
"reflect"
"strings"
"sync"
Expand All @@ -24,7 +25,6 @@ import (
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"

logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -90,21 +90,25 @@ func logStuff(logger logutil.Logger, count int) {
}
}

func compareLoggedStuff(t *testing.T, name string, logChannel <-chan *logutilpb.Event, count int) {
func compareLoggedStuff(t *testing.T, name string, stream logutil.EventStream, count int) error {
for i := 0; i < count; i++ {
le, ok := <-logChannel
if !ok {
le, err := stream.Recv()
if err != nil {
t.Errorf("No logged value for %v/%v", name, i)
return
return err
}
if le.Value != testLogString {
t.Errorf("Unexpected log response for %v: got %v expected %v", name, le.Value, testLogString)
}
}
_, ok := <-logChannel
if ok {
_, err := stream.Recv()
if err == nil {
t.Fatalf("log channel wasn't closed for %v", name)
}
if err == io.EOF {
return nil
}
return err
}

func expectRPCWrapPanic(t *testing.T, err error) {
Expand Down Expand Up @@ -1119,24 +1123,23 @@ func (fra *fakeRPCAgent) Backup(ctx context.Context, concurrency int, logger log
}

func agentRPCTestBackup(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
logChannel, errFunc, err := client.Backup(ctx, ti, testBackupConcurrency)
stream, err := client.Backup(ctx, ti, testBackupConcurrency)
if err != nil {
t.Fatalf("Backup failed: %v", err)
}
compareLoggedStuff(t, "Backup", logChannel, 10)
err = errFunc()
err = compareLoggedStuff(t, "Backup", stream, 10)
compareError(t, "Backup", err, true, testBackupCalled)
}

func agentRPCTestBackupPanic(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
logChannel, errFunc, err := client.Backup(ctx, ti, testBackupConcurrency)
stream, err := client.Backup(ctx, ti, testBackupConcurrency)
if err != nil {
t.Fatalf("Backup failed: %v", err)
}
if e, ok := <-logChannel; ok {
e, err := stream.Recv()
if err == nil {
t.Fatalf("Unexpected Backup logs: %v", e)
}
err = errFunc()
expectRPCWrapLockActionPanic(t, err)
}

Expand Down
15 changes: 10 additions & 5 deletions go/vt/tabletmanager/faketmclient/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ package faketmclient
// for yours, feel free to extend this implementation.

import (
"io"
"time"

"golang.org/x/net/context"

"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
Expand Down Expand Up @@ -271,12 +273,15 @@ func (client *FakeTabletManagerClient) PromoteSlave(ctx context.Context, tablet
// Backup related methods
//

type eofEventStream struct{}

func (e *eofEventStream) Recv() (*logutilpb.Event, error) {
return nil, io.EOF
}

// Backup is part of the tmclient.TabletManagerClient interface.
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, tmclient.ErrFunc, error) {
logstream := make(chan *logutilpb.Event, 10)
return logstream, func() error {
return nil
}, nil
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (logutil.EventStream, error) {
return &eofEventStream{}, nil
}

//
Expand Down
43 changes: 20 additions & 23 deletions go/vt/tabletmanager/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ package grpctmclient

import (
"fmt"
"io"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/youtube/vitess/go/netutil"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
Expand Down Expand Up @@ -637,40 +637,37 @@ func (client *Client) PromoteSlave(ctx context.Context, tablet *topo.TabletInfo)
//
// Backup related methods
//
type eventStreamAdapter struct {
stream tabletmanagerservicepb.TabletManager_BackupClient
cc *grpc.ClientConn
}

func (e *eventStreamAdapter) Recv() (*logutilpb.Event, error) {
br, err := e.stream.Recv()
if err != nil {
e.cc.Close()
return nil, err
}
return br.Event, nil
}

// Backup is part of the tmclient.TabletManagerClient interface.
func (client *Client) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, tmclient.ErrFunc, error) {
func (client *Client) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (logutil.EventStream, error) {
cc, c, err := client.dial(ctx, tablet)
if err != nil {
return nil, nil, err
return nil, err
}

logstream := make(chan *logutilpb.Event, 10)
stream, err := c.Backup(ctx, &tabletmanagerdatapb.BackupRequest{
Concurrency: int64(concurrency),
})
if err != nil {
cc.Close()
return nil, nil, err
return nil, err
}

var finalErr error
go func() {
for {
br, err := stream.Recv()
if err != nil {
if err != io.EOF {
finalErr = err
}
close(logstream)
return
}
logstream <- br.Event
}
}()
return logstream, func() error {
cc.Close()
return finalErr
return &eventStreamAdapter{
stream: stream,
cc: cc,
}, nil
}

Expand Down
7 changes: 2 additions & 5 deletions go/vt/tabletmanager/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"golang.org/x/net/context"

logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
Expand All @@ -26,9 +26,6 @@ import (
// manager protocol. It is exported for tests only.
var TabletManagerProtocol = flag.String("tablet_manager_protocol", "grpc", "the protocol to use to talk to vttablet")

// ErrFunc is used by streaming RPCs that don't return a specific result
type ErrFunc func() error

// TabletManagerClient defines the interface used to talk to a remote tablet
type TabletManagerClient interface {
//
Expand Down Expand Up @@ -186,7 +183,7 @@ type TabletManagerClient interface {
//

// Backup creates a database backup
Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, ErrFunc, error)
Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (logutil.EventStream, error)

//
// RPC related methods
Expand Down
52 changes: 33 additions & 19 deletions go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package fakevtctlclient

import (
"fmt"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -81,35 +82,48 @@ func (f *FakeLoggerEventStreamingClient) RegisteredCommands() []string {
return commands
}

// StreamResult returns a channel which streams back a registered result as logging events.
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutilpb.Event, func() error, error) {
type streamResultAdapter struct {
lines []string
index int
err error
}

func (s *streamResultAdapter) Recv() (*logutilpb.Event, error) {
if s.index < len(s.lines) {
result := &logutilpb.Event{
Time: logutil.TimeToProto(time.Now()),
Level: logutilpb.Level_CONSOLE,
File: "fakevtctlclient",
Line: -1,
Value: s.lines[s.index],
}
s.index++
return result, nil
}
if s.err == nil {
return nil, io.EOF
}
return nil, s.err
}

// StreamResult returns an EventStream which streams back a registered result as logging events.
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (logutil.EventStream, error) {
f.mu.Lock()
defer f.mu.Unlock()

k := generateKey(args)
result, ok := f.results[k]
if !ok {
return nil, nil, fmt.Errorf("No response was registered for args: %v", args)
return nil, fmt.Errorf("No response was registered for args: %v", args)
}
result.count--
if result.count == 0 {
delete(f.results, k)
}

stream := make(chan *logutilpb.Event)
go func() {
// Each line of the multi-line string "output" is streamed as console text.
for _, line := range strings.Split(result.output, "\n") {
stream <- &logutilpb.Event{
Time: logutil.TimeToProto(time.Now()),
Level: logutilpb.Level_CONSOLE,
File: "fakevtctlclient",
Line: -1,
Value: line,
}
}
close(stream)
}()

return stream, func() error { return result.err }, nil
return &streamResultAdapter{
lines: strings.Split(result.output, "\n"),
index: 0,
err: result.err,
}, nil
}

0 comments on commit 9654175

Please sign in to comment.