Skip to content

Commit

Permalink
Revert "Remove --rpc-only-in-reply flag from VTGate"
Browse files Browse the repository at this point in the history
  • Loading branch information
aaijazi committed Oct 14, 2015
1 parent 6860161 commit 5ce1f54
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 86 deletions.
3 changes: 3 additions & 0 deletions go/vt/vtgate/gorpcvtgateconn/conn_rpc_test.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/vtgate"
"github.com/youtube/vitess/go/vt/vtgate/gorpcvtgateservice"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconntest"
"golang.org/x/net/context"
Expand All @@ -31,6 +32,8 @@ func TestGoRPCVTGateConn(t *testing.T) {
// Create a Go Rpc server and listen on the port
server := rpcplus.NewServer()
server.Register(gorpcvtgateservice.New(service))
// TODO(aaijazi): remove this flag once all VtGate Gorpc clients properly support the new behavior
*vtgate.RPCErrorOnlyInReply = true

// create the HTTP server, serve the server from it
handler := http.NewServeMux()
Expand Down
119 changes: 92 additions & 27 deletions go/vt/vtgate/gorpcvtgateservice/server.go
Expand Up @@ -48,7 +48,10 @@ func (vtg *VTGate) Execute(ctx context.Context, request *proto.Query, reply *pro
request.NotInTransaction,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// ExecuteShard is the RPC version of vtgateservice.VTGateService method
Expand All @@ -69,7 +72,10 @@ func (vtg *VTGate) ExecuteShard(ctx context.Context, request *proto.QueryShard,
request.NotInTransaction,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// ExecuteKeyspaceIds is the RPC version of vtgateservice.VTGateService method
Expand All @@ -90,7 +96,10 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *proto.Keyspa
request.NotInTransaction,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// ExecuteKeyRanges is the RPC version of vtgateservice.VTGateService method
Expand All @@ -111,7 +120,10 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *proto.KeyRange
request.NotInTransaction,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// ExecuteEntityIds is the RPC version of vtgateservice.VTGateService method
Expand All @@ -133,7 +145,10 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *proto.EntityId
request.NotInTransaction,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// ExecuteBatchShard is the RPC version of vtgateservice.VTGateService method
Expand All @@ -151,7 +166,10 @@ func (vtg *VTGate) ExecuteBatchShard(ctx context.Context, request *proto.BatchQu
request.Session,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// ExecuteBatchKeyspaceIds is the RPC version of
Expand All @@ -170,7 +188,10 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *proto.K
request.Session,
reply)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// StreamExecute is the RPC version of vtgateservice.VTGateService method
Expand Down Expand Up @@ -204,10 +225,18 @@ func (vtg *VTGate) StreamExecute2(ctx context.Context, request *proto.Query, sen
if vtgErr == nil {
return nil
}
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
return sendReply(qr)
if *vtgate.RPCErrorOnlyInReply {
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
// problems where the client will get out of sync with the number of QueryResults sent.
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
// (signalling that all clients are able to handle new-style errors).
return sendReply(qr)
}
return vtgErr
}

// StreamExecuteShard is the RPC version of vtgateservice.VTGateService method
Expand Down Expand Up @@ -245,10 +274,18 @@ func (vtg *VTGate) StreamExecuteShard2(ctx context.Context, request *proto.Query
if vtgErr == nil {
return nil
}
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
return sendReply(qr)
if *vtgate.RPCErrorOnlyInReply {
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
// problems where the client will get out of sync with the number of QueryResults sent.
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
// (signalling that all clients are able to handle new-style errors).
return sendReply(qr)
}
return vtgErr
}

// StreamExecuteKeyspaceIds is the RPC version of
Expand Down Expand Up @@ -288,10 +325,18 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds2(ctx context.Context, request *proto
if vtgErr == nil {
return nil
}
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
return sendReply(qr)
if *vtgate.RPCErrorOnlyInReply {
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
// problems where the client will get out of sync with the number of QueryResults sent.
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
// (signalling that all clients are able to handle new-style errors).
return sendReply(qr)
}
return vtgErr
}

// StreamExecuteKeyRanges is the RPC version of
Expand Down Expand Up @@ -331,10 +376,18 @@ func (vtg *VTGate) StreamExecuteKeyRanges2(ctx context.Context, request *proto.K
if vtgErr == nil {
return nil
}
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
return sendReply(qr)
if *vtgate.RPCErrorOnlyInReply {
// If there was an app error, send a QueryResult back with it.
qr := new(proto.QueryResult)
vtgate.AddVtGateError(vtgErr, &qr.Err)
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
// problems where the client will get out of sync with the number of QueryResults sent.
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
// (signalling that all clients are able to handle new-style errors).
return sendReply(qr)
}
return vtgErr
}

// Begin is the RPC version of vtgateservice.VTGateService method
Expand Down Expand Up @@ -373,7 +426,10 @@ func (vtg *VTGate) Begin2(ctx context.Context, request *proto.BeginRequest, repl
reply.Session = &proto.Session{}
vtgErr := vtg.server.Begin(ctx, reply.Session)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// Commit2 is the RPC version of vtgateservice.VTGateService method
Expand All @@ -386,7 +442,10 @@ func (vtg *VTGate) Commit2(ctx context.Context, request *proto.CommitRequest, re
callerid.NewImmediateCallerID("gorpc client"))
vtgErr := vtg.server.Commit(ctx, request.Session)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// Rollback2 is the RPC version of vtgateservice.VTGateService method
Expand All @@ -399,7 +458,10 @@ func (vtg *VTGate) Rollback2(ctx context.Context, request *proto.RollbackRequest
callerid.NewImmediateCallerID("gorpc client"))
vtgErr := vtg.server.Rollback(ctx, request.Session)
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// SplitQuery is the RPC version of vtgateservice.VTGateService method
Expand All @@ -418,7 +480,10 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *proto.SplitQueryRequ
request.SplitCount)
reply.Splits = splits
vtgate.AddVtGateError(vtgErr, &reply.Err)
return nil
if *vtgate.RPCErrorOnlyInReply {
return nil
}
return vtgErr
}

// GetSrvKeyspace is the RPC version of vtgateservice.VTGateService method
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/vtgate.go
Expand Up @@ -8,6 +8,7 @@ package vtgate

import (
"errors"
"flag"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -96,6 +97,11 @@ type RegisterVTGate func(vtgateservice.VTGateService)
// RegisterVTGates stores register funcs for VTGate server.
var RegisterVTGates []RegisterVTGate

var (
// RPCErrorOnlyInReply informs vtgateservice(s) about how to return errors.
RPCErrorOnlyInReply = flag.Bool("rpc-error-only-in-reply", false, "if true, supported RPC calls from vtgateservice(s) will only return errors as part of the RPC server response")
)

// Init initializes VTGate server.
func Init(hc discovery.HealthCheck, topoServer topo.Server, serv SrvTopoServer, schema *planbuilder.Schema, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, maxInFlight int, testGateway string) {
if rpcVTGate != nil {
Expand Down

This file was deleted.

Expand Up @@ -5,12 +5,10 @@
public class SplitQueryResponse {
private Map<Query, Long> queries;
private String error;
private RPCError err;

public SplitQueryResponse(Map<Query, Long> queries, String error, RPCError err) {
public SplitQueryResponse(Map<Query, Long> queries, String error) {
this.queries = queries;
this.error = error;
this.err = err;
}

public Map<Query, Long> getQueries() {
Expand All @@ -21,8 +19,4 @@ public String getError() {
return error;
}

public RPCError getErr() {
return err;
}

}
Expand Up @@ -16,7 +16,6 @@
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.Row;
import com.youtube.vitess.vtgate.Row.Cell;
import com.youtube.vitess.vtgate.RPCError;
import com.youtube.vitess.vtgate.SplitQueryRequest;
import com.youtube.vitess.vtgate.SplitQueryResponse;

Expand Down Expand Up @@ -203,20 +202,6 @@ public static SplitQueryResponse bsonToSplitQueryResponse(BSONObject reply) {
error = new String(err);
}
}

RPCError err = null;
if (reply.containsField("Err")) {
BSONObject errBson = (BSONObject) reply.get("Err");
if (errBson != null) {
long code = (long) errBson.get("Code");
String messageString = null;
byte[] message = (byte[]) errBson.get("Message");
if (message.length > 0) {
messageString = new String(message);
}
err = new RPCError(code, messageString);
}
}
BasicBSONList result = (BasicBSONList) reply.get("Splits");
Map<Query, Long> queries = new HashMap<>();
for (Object split : result) {
Expand Down Expand Up @@ -268,6 +253,6 @@ public static SplitQueryResponse bsonToSplitQueryResponse(BSONObject reply) {
long size = (long) splitObj.get("Size");
queries.put(q, size);
}
return new SplitQueryResponse(queries, error, err);
return new SplitQueryResponse(queries, error);
}
}
Expand Up @@ -9,7 +9,6 @@
import com.youtube.vitess.vtgate.Query;
import com.youtube.vitess.vtgate.QueryResponse;
import com.youtube.vitess.vtgate.QueryResult;
import com.youtube.vitess.vtgate.RPCError;
import com.youtube.vitess.vtgate.SplitQueryRequest;
import com.youtube.vitess.vtgate.SplitQueryResponse;
import com.youtube.vitess.vtgate.rpcclient.RpcClient;
Expand Down Expand Up @@ -110,18 +109,7 @@ public void close() throws ConnectionException {
public SplitQueryResponse splitQuery(SplitQueryRequest request) throws ConnectionException {
String callMethod = "VTGate.SplitQuery";
Response response = call(callMethod, Bsonify.splitQueryRequestToBson(request));
SplitQueryResponse splitQueryResponse = Bsonify.bsonToSplitQueryResponse(
(BSONObject) response.getReply());
try {
RPCError err = splitQueryResponse.getErr();
if (err != null) {
throw new ConnectionException(err.getMessage());
}
} catch (ConnectionException e) {
LOGGER.error("vtgate exception", e);
throw new ConnectionException("vtgate exception: " + e.getMessage());
}
return splitQueryResponse;
return Bsonify.bsonToSplitQueryResponse((BSONObject) response.getReply());
}

private Response call(String methodName, Object args) throws ConnectionException {
Expand Down
4 changes: 2 additions & 2 deletions test/java_vtgate_test_helper.py
Expand Up @@ -94,6 +94,7 @@ def set_up(self):
utils.run_vtctl(['ApplyVSchema', '-vschema_file', self.vschema])
utils.VtGate(port=self.vtgate_port).start(
cache_ttl='500s',
rpc_error_only_in_reply=False
)
except:
self.shutdown()
Expand All @@ -104,8 +105,7 @@ def shutdown(self):
# StreamingServerShutdownIT.java expects an EOF from the vtgate
# client and not an error that vttablet killed the query (which is
# seen when vtgate is killed last).
if utils.vtgate:
utils.vtgate.kill()
utils.vtgate.kill()
tablet.kill_tablets(self.tablets)
teardown_procs = [t.teardown_mysql() for t in self.tablets]
utils.wait_procs(teardown_procs, raise_on_error=False)
Expand Down
1 change: 1 addition & 0 deletions test/python_client_test.py
Expand Up @@ -45,6 +45,7 @@ def setUpModule():
args.extend(['-grpc_port', str(vtgateclienttest_grpc_port)])
if protocols_flavor().service_map():
args.extend(['-service_map', ','.join(protocols_flavor().service_map())])
args.extend(['-rpc-error-only-in-reply=true'])

vtgateclienttest_process = utils.run_bg(args)
utils.wait_for_vars('vtgateclienttest', vtgateclienttest_port)
Expand Down

0 comments on commit 5ce1f54

Please sign in to comment.