Skip to content

Commit

Permalink
tabletserver: Move QueryService fakes into subpackage.
Browse files Browse the repository at this point in the history
Moved StreamHealthQueryService fake out of wait_for_drain_test.go and use it in other tests as well.
  • Loading branch information
michael-berlin committed Apr 29, 2016
1 parent 4aa8402 commit 442e30e
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 268 deletions.
93 changes: 93 additions & 0 deletions go/vt/tabletserver/queryservice/fakes/error_query_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package fakes

import (
"fmt"

"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/querytypes"
"golang.org/x/net/context"

querypb "github.com/youtube/vitess/go/vt/proto/query"
)

// ErrorQueryService is an implementation of QueryService that returns a
// configurable error for some of its methods.
//
// It is used as base for other, more specialised QueryService fakes e.g.
// StreamHealthQueryService.
type ErrorQueryService struct {
GetSessionIdError error
}

// GetSessionId is part of QueryService interface
func (e *ErrorQueryService) GetSessionId(keyspace, shard string) (int64, error) {
return 0, e.GetSessionIdError
}

// Begin is part of QueryService interface
func (e *ErrorQueryService) Begin(ctx context.Context, target *querypb.Target, sessionID int64) (int64, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}

// Commit is part of QueryService interface
func (e *ErrorQueryService) Commit(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// Rollback is part of QueryService interface
func (e *ErrorQueryService) Rollback(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// Execute is part of QueryService interface
func (e *ErrorQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID, transactionID int64) (*sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// StreamExecute is part of QueryService interface
func (e *ErrorQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(*sqltypes.Result) error) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// ExecuteBatch is part of QueryService interface
func (e *ErrorQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, sessionID int64, asTransaction bool, transactionID int64) ([]sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// SplitQuery is part of QueryService interface
// TODO(erez): Remove once the migration to SplitQuery V2 is done.
func (e *ErrorQueryService) SplitQuery(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64, sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// SplitQueryV2 is part of QueryService interface
func (e *ErrorQueryService) SplitQueryV2(
ctx context.Context,
target *querypb.Target,
sql string,
bindVariables map[string]interface{},
splitColumns []string,
splitCount int64,
numRowsPerQueryPart int64,
algorithm querypb.SplitQueryRequest_Algorithm,
sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// StreamHealthRegister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthRegister(chan<- *querypb.StreamHealthResponse) (int, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}

// StreamHealthUnregister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthUnregister(int) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// HandlePanic is part of QueryService interface
func (e *ErrorQueryService) HandlePanic(*error) {
}

// make sure ErrorQueryService implements QueryService
var _ queryservice.QueryService = &ErrorQueryService{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fakes

import (
"github.com/golang/protobuf/proto"

querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)

const (
// DefaultSecondsBehindMaster is the default MySQL replication lag which is
// reported in all faked stream health responses.
DefaultSecondsBehindMaster uint32 = 1
)

// StreamHealthQueryService is a QueryService implementation which allows to
// send custom StreamHealthResponse messages by adding them to a channel.
// Note that it only works with one connected client because messages going
// into "healthResponses" are not duplicated to all clients.
//
// If you want to override other QueryService methods, embed this struct
// as anonymous field in your own QueryService fake.
type StreamHealthQueryService struct {
ErrorQueryService
healthResponses chan *querypb.StreamHealthResponse
target querypb.Target
}

// NewStreamHealthQueryService creates a new fake query service for the target.
func NewStreamHealthQueryService(target querypb.Target) *StreamHealthQueryService {
return &StreamHealthQueryService{
healthResponses: make(chan *querypb.StreamHealthResponse, 10),
target: target,
}
}

// StreamHealthRegister implements the QueryService interface.
// It sends all queued and future healthResponses to the connected client e.g.
// the healthcheck module.
func (q *StreamHealthQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
go func() {
for shr := range q.healthResponses {
c <- shr
}
}()
return 0, nil
}

// AddDefaultHealthResponse adds a faked health response to the buffer channel.
// The response will have default values typical for a healthy tablet.
func (q *StreamHealthQueryService) AddDefaultHealthResponse() {
q.healthResponses <- &querypb.StreamHealthResponse{
Target: proto.Clone(&q.target).(*querypb.Target),
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: DefaultSecondsBehindMaster,
},
}
}

// AddHealthResponseWithQPS adds a faked health response to the buffer channel.
// Only "qps" is different in this message.
func (q *StreamHealthQueryService) AddHealthResponseWithQPS(qps float64) {
q.healthResponses <- &querypb.StreamHealthResponse{
Target: proto.Clone(&q.target).(*querypb.Target),
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
Qps: qps,
SecondsBehindMaster: DefaultSecondsBehindMaster,
},
}
}

// UpdateType changes the type of the query service.
// Only newly sent health messages will use the new type.
func (q *StreamHealthQueryService) UpdateType(tabletType topodatapb.TabletType) {
q.target.TabletType = tabletType
}
80 changes: 0 additions & 80 deletions go/vt/tabletserver/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
package queryservice

import (
"fmt"

"github.com/youtube/vitess/go/sqltypes"
"golang.org/x/net/context"

Expand Down Expand Up @@ -70,84 +68,6 @@ type QueryService interface {
HandlePanic(*error)
}

// ErrorQueryService is an implementation of QueryService that returns a
// configurable error for some of its methods.
type ErrorQueryService struct {
GetSessionIdError error
}

// GetSessionId is part of QueryService interface
func (e *ErrorQueryService) GetSessionId(keyspace, shard string) (int64, error) {
return 0, e.GetSessionIdError
}

// Begin is part of QueryService interface
func (e *ErrorQueryService) Begin(ctx context.Context, target *querypb.Target, sessionID int64) (int64, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}

// Commit is part of QueryService interface
func (e *ErrorQueryService) Commit(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// Rollback is part of QueryService interface
func (e *ErrorQueryService) Rollback(ctx context.Context, target *querypb.Target, sessionID, transactionID int64) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// Execute is part of QueryService interface
func (e *ErrorQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID, transactionID int64) (*sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// StreamExecute is part of QueryService interface
func (e *ErrorQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(*sqltypes.Result) error) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// ExecuteBatch is part of QueryService interface
func (e *ErrorQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, sessionID int64, asTransaction bool, transactionID int64) ([]sqltypes.Result, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// SplitQuery is part of QueryService interface
// TODO(erez): Remove once the migration to SplitQuery V2 is done.
func (e *ErrorQueryService) SplitQuery(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64, sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// SplitQuery is part of QueryService interface
func (e *ErrorQueryService) SplitQueryV2(
ctx context.Context,
target *querypb.Target,
sql string,
bindVariables map[string]interface{},
splitColumns []string,
splitCount int64,
numRowsPerQueryPart int64,
algorithm querypb.SplitQueryRequest_Algorithm,
sessionID int64) ([]querytypes.QuerySplit, error) {
return nil, fmt.Errorf("ErrorQueryService does not implement any method")
}

// StreamHealthRegister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthRegister(chan<- *querypb.StreamHealthResponse) (int, error) {
return 0, fmt.Errorf("ErrorQueryService does not implement any method")
}

// StreamHealthUnregister is part of QueryService interface
func (e *ErrorQueryService) StreamHealthUnregister(int) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
}

// HandlePanic is part of QueryService interface
func (e *ErrorQueryService) HandlePanic(*error) {
}

// make sure ErrorQueryService implements QueryService
var _ QueryService = &ErrorQueryService{}

// CallCorrectSplitQuery calls the correct SplitQuery.
// This trivial logic is encapsulated in a function here so it can be easily tested.
// TODO(erez): Remove once the migration to SplitQueryV2 is done.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctld/tablet_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
Expand All @@ -23,7 +23,7 @@ import (

// streamHealthTabletServer is a local QueryService implementation to support the tests
type streamHealthTabletServer struct {
queryservice.ErrorQueryService
fakes.ErrorQueryService
t *testing.T

// streamHealthMutex protects all the following fields
Expand Down
33 changes: 9 additions & 24 deletions go/vt/worker/split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl/replication"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletserver/grpcqueryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice"
"github.com/youtube/vitess/go/vt/tabletserver/queryservice/fakes"
"github.com/youtube/vitess/go/vt/vttest/fakesqldb"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
"github.com/youtube/vitess/go/vt/zktopo/zktestserver"
Expand All @@ -28,12 +28,11 @@ import (
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)

// testQueryService is a local QueryService implementation to support the tests
// testQueryService is a local QueryService implementation to support the tests.
type testQueryService struct {
queryservice.ErrorQueryService
t *testing.T
keyspace string
shard string
t *testing.T

*fakes.StreamHealthQueryService
}

func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, sessionID int64, sendReply func(reply *sqltypes.Result) error) error {
Expand Down Expand Up @@ -93,21 +92,6 @@ func (sq *testQueryService) StreamExecute(ctx context.Context, target *querypb.T
return nil
}

func (sq *testQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error) {
c <- &querypb.StreamHealthResponse{
Target: &querypb.Target{
Keyspace: sq.keyspace,
Shard: sq.shard,
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
RealtimeStats: &querypb.RealtimeStats{
SecondsBehindMaster: 1,
},
}
return 0, nil
}

type ExpectedExecuteFetch struct {
Query string
MaxRows int
Expand Down Expand Up @@ -356,10 +340,11 @@ func testSplitClone(t *testing.T, v3 bool) {
"STOP SLAVE",
"START SLAVE",
}
qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target())
qs.AddDefaultHealthResponse()
grpcqueryservice.RegisterForTest(sourceRdonly.RPCServer, &testQueryService{
t: t,
keyspace: sourceRdonly.Tablet.Keyspace,
shard: sourceRdonly.Tablet.Shard,
t: t,
StreamHealthQueryService: qs,
})
}

Expand Down

0 comments on commit 442e30e

Please sign in to comment.