Skip to content

Commit

Permalink
VTGate related changes:
Browse files Browse the repository at this point in the history
Change default retryDelay to 2ms.
Only retry on tx_pool_full, retry, and fatal errors, while it is not in transaction, not cancelled, and not exceeding deadline.
Add integration tests to test retry logic with vttablet shutting down.
  • Loading branch information
guoliang100 committed Feb 27, 2015
1 parent dcf8ab0 commit ff817ac
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 100 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
var (
cell = flag.String("cell", "test_nj", "cell to use")
schemaFile = flag.String("vschema_file", "", "JSON schema file")
retryDelay = flag.Duration("retry-delay", 200*time.Millisecond, "retry delay")
retryDelay = flag.Duration("retry-delay", 2*time.Millisecond, "retry delay")
retryCount = flag.Int("retry-count", 2, "retry count")
connTimeout = flag.Duration("conn-timeout", 3*time.Second, "vttablet connection timeout")
maxInFlight = flag.Int("max-in-flight", 0, "maximum number of calls to allow simultaneously")
Expand Down
8 changes: 4 additions & 4 deletions go/vt/tabletserver/gorpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,13 @@ func tabletError(err error) error {
var code int
errStr := err.Error()
switch {
case strings.HasPrefix(errStr, "fatal"):
case strings.Contains(errStr, "fatal: "):
code = tabletconn.ERR_FATAL
case strings.HasPrefix(errStr, "retry"):
case strings.Contains(errStr, "retry: "):
code = tabletconn.ERR_RETRY
case strings.HasPrefix(errStr, "tx_pool_full"):
case strings.Contains(errStr, "tx_pool_full: "):
code = tabletconn.ERR_TX_POOL_FULL
case strings.HasPrefix(errStr, "not_in_tx"):
case strings.Contains(errStr, "not_in_tx: "):
code = tabletconn.ERR_NOT_IN_TX
default:
code = tabletconn.ERR_NORMAL
Expand Down
35 changes: 18 additions & 17 deletions go/vt/vtgate/shard_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,31 +286,32 @@ func (sdc *ShardConn) canRetry(ctx context.Context, err error, transactionID int
if serverError, ok := err.(*tabletconn.ServerError); ok {
switch serverError.Code {
case tabletconn.ERR_TX_POOL_FULL:
// Do not retry if deadline passed.
if deadline, ok := ctx.Deadline(); ok {
if deadline.Before(time.Now()) {
return false
}
// Do not retry if ctx.Done() is closed.
select {
case <-ctx.Done():
return false
default:
return true
}
// Retry without reconnecting.
return true
case tabletconn.ERR_RETRY, tabletconn.ERR_FATAL:
// No-op: treat these errors as operational by breaking out of this switch
// Retry on RETRY and FATAL if not in a transaction.
inTransaction := (transactionID != 0)
sdc.markDown(conn, err.Error())
// Do not retry if ctx.Done() is closed.
select {
case <-ctx.Done():
return false
default:
return !inTransaction
}
default:
// Should not retry for normal server errors.
return false
}
}
// Non-server errors or fatal/retry errors. Retry if we're not in a transaction.
inTransaction := (transactionID != 0)
// Do not retry on operational error.

This comment has been minimized.

Copy link
@shrutip

shrutip Feb 27, 2015

Contributor

Do you want to elaborate a little on the code comment here. It will help context for future.

This comment has been minimized.

Copy link
@guoliang100

guoliang100 Feb 27, 2015

Author Contributor

Added a todo mentioning we need to fix a retry case.

sdc.markDown(conn, err.Error())
// Do not retry if deadline passed.
if deadline, ok := ctx.Deadline(); ok {
if deadline.Before(time.Now()) {
return false
}
}
return !inTransaction
return false
}

// markDown closes conn and temporarily marks the associated
Expand Down
63 changes: 40 additions & 23 deletions go/vt/vtgate/shard_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,22 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
}

// conn error (one failure)
// no retry on OperationalError
s.Reset()
sbc = &sandboxConn{mustFailConn: 1}
s.MapTestConn("0", sbc)
err = f()
if err != nil {
t.Errorf("want nil, got %v", err)
want = fmt.Sprintf("shard, host: %v.0., {Uid:0 Host:0 NamedPortMap:map[vt:1] Health:map[]}, error: conn", name)
if err == nil || err.Error() != want {
t.Errorf("want %v, got %v", want, err)
}
// Ensure we dialed twice (second one succeeded)
if s.DialCounter != 2 {
t.Errorf("want 2, got %v", s.DialCounter)
// Ensure we did not redail.
if s.DialCounter != 1 {
t.Errorf("want 1, got %v", s.DialCounter)
}
// Ensure we executed twice (second one succeeded)
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
// Ensure we did not re-execute.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
}

// no failures
Expand Down Expand Up @@ -309,9 +311,9 @@ func TestShardConnReconnect(t *testing.T) {
t.Errorf("want 2, got %v", s.EndPointCounter)
}

// case 2.2: resolve 1 endpoint and execute failed -> resolve and retry without spamming
// case 2.2: resolve 1 endpoint and execute failed with retryable error -> resolve and retry without spamming
s.Reset()
sbc = &sandboxConn{mustFailConn: 1}
sbc = &sandboxConn{mustFailRetry: 1}
s.MapTestConn("0", sbc)
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
timeStart = time.Now()
Expand All @@ -327,6 +329,21 @@ func TestShardConnReconnect(t *testing.T) {
t.Errorf("want 3, got %v", s.EndPointCounter)
}

// case 2.3: resolve 1 endpoint and execute failed with OperationalError -> no retry
s.Reset()
sbc = &sandboxConn{mustFailConn: 1}
s.MapTestConn("0", sbc)
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
timeStart = time.Now()
sdc.Execute(context.Background(), "query", nil, 0)
timeDuration = time.Now().Sub(timeStart)
if timeDuration > retryDelay {
t.Errorf("want instant fail %v, got %v", retryDelay, timeDuration)
}
if s.EndPointCounter != 1 {
t.Errorf("want 1, got %v", s.EndPointCounter)
}

// case 3.1: resolve 3 endpoints, failed connection to 1st one -> resolve and connect to 2nd one
s.Reset()
s.DialMustFail = 1
Expand Down Expand Up @@ -355,7 +372,7 @@ func TestShardConnReconnect(t *testing.T) {
countConnUse := 0
onConnUse := func(conn *sandboxConn) {
if countConnUse == 0 {
conn.mustFailConn = 1
conn.mustFailRetry = 1
}
countConnUse++
}
Expand Down Expand Up @@ -388,7 +405,7 @@ func TestShardConnReconnect(t *testing.T) {
countConnUse = 0
onConnUse = func(conn *sandboxConn) {
if countConnUse == 0 {
conn.mustFailConn = 1
conn.mustFailRetry = 1
}
countConnUse++
}
Expand Down Expand Up @@ -425,9 +442,9 @@ func TestShardConnReconnect(t *testing.T) {
}
countConnUse++
}
sbc0 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc1 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc2 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc0 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc1 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc2 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
s.MapTestConn("0", sbc0)
s.MapTestConn("0", sbc1)
s.MapTestConn("0", sbc2)
Expand Down Expand Up @@ -460,7 +477,7 @@ func TestShardConnReconnect(t *testing.T) {
onConnUse = func(conn *sandboxConn) {
if firstConn == nil {
firstConn = conn
conn.mustFailConn = 1
conn.mustFailRetry = 1
}
}
sbc0 = &sandboxConn{onConnUse: onConnUse}
Expand Down Expand Up @@ -513,10 +530,10 @@ func TestShardConnReconnect(t *testing.T) {
}
countConnUse++
}
sbc0 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc1 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc2 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc3 = &sandboxConn{mustFailConn: 1}
sbc0 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc1 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc2 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc3 = &sandboxConn{mustFailRetry: 1}
s.MapTestConn("0", sbc0)
s.MapTestConn("0", sbc1)
s.MapTestConn("0", sbc2)
Expand Down Expand Up @@ -563,9 +580,9 @@ func TestShardConnReconnect(t *testing.T) {
firstConn = conn
}
}
sbc0 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc1 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc2 = &sandboxConn{mustFailConn: 1, onConnUse: onConnUse}
sbc0 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc1 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc2 = &sandboxConn{mustFailRetry: 1, onConnUse: onConnUse}
sbc3 = &sandboxConn{}
sbc4 := &sandboxConn{}
sbc5 := &sandboxConn{}
Expand Down
13 changes: 11 additions & 2 deletions test/tablet.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,20 @@ def get_status(self):
def get_healthz(self):
return urllib2.urlopen('http://localhost:%u/healthz' % self.port).read()

def kill_vttablet(self):
logging.debug('killing vttablet: %s', self.tablet_alias)
def kill_vttablet(self, wait=True):
logging.debug('killing vttablet: %s, wait: %s', self.tablet_alias, str(wait))
if self.proc is not None:
Tablet.tablets_running -= 1
self.proc.terminate()
if wait:
self.proc.wait()
self.proc = None

def hard_kill_vttablet(self):
logging.debug('hard killing vttablet: %s', self.tablet_alias)
if self.proc is not None:
Tablet.tablets_running -= 1
self.proc.kill()
self.proc.wait()
self.proc = None

Expand Down
Loading

1 comment on commit ff817ac

@shrutip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, a few small suggestions.

Please sign in to comment.