Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu committed Oct 17, 2020
1 parent 45d9ab6 commit be66bda
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 112 deletions.
122 changes: 14 additions & 108 deletions mssqlx.go
Expand Up @@ -524,34 +524,6 @@ func getDBFromBalancer(target *balancer) (db *wrapper, err error) {
return
}

func retryBackoff(query string, exec func() (interface{}, error)) (v interface{}, err error) {
for retry := 0; retry < 100; retry++ {
if v, err = exec(); err == nil {
return
}

switch err {
case sql.ErrConnDone:

case sql.ErrTxDone, sql.ErrNoRows:
return

default:
if isErrBadConn(err) || IsDeadlock(err) {
time.Sleep(5 * time.Millisecond)
} else {
return
}
}
}

if err == sql.ErrConnDone || isErrBadConn(err) {
reportError(query, err)
}

return
}

func shouldFailure(w *wrapper, isWsrep bool, err error) bool {
if err = parseError(w, err); err == nil {
return false
Expand All @@ -576,7 +548,7 @@ func _namedQuery(ctx context.Context, target *balancer, query string, arg interf
return
}

r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.NamedQueryContext(ctx, query, arg)
})
if r != nil {
Expand Down Expand Up @@ -630,7 +602,7 @@ func _namedExec(ctx context.Context, target *balancer, query string, arg interfa
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.NamedExecContext(ctx, query, arg)
})
if r != nil {
Expand Down Expand Up @@ -684,7 +656,7 @@ func _query(ctx context.Context, target *balancer, query string, args ...interfa
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.QueryContext(ctx, query, args...)
})
if r != nil {
Expand Down Expand Up @@ -743,7 +715,7 @@ func _queryx(ctx context.Context, target *balancer, query string, args ...interf
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.QueryxContext(ctx, query, args...)
})
if r != nil {
Expand Down Expand Up @@ -888,7 +860,7 @@ func _select(ctx context.Context, target *balancer, dest interface{}, query stri
}

// executing
_, err = retryBackoff(query, func() (interface{}, error) {
_, err = retryFunc(query, func() (interface{}, error) {
return nil, w.db.SelectContext(ctx, dest, query, args...)
})

Expand Down Expand Up @@ -941,7 +913,7 @@ func _get(ctx context.Context, target *balancer, dest interface{}, query string,
}

// executing
_, err = retryBackoff(query, func() (interface{}, error) {
_, err = retryFunc(query, func() (interface{}, error) {
return nil, w.db.GetContext(ctx, dest, query, args...)
})

Expand Down Expand Up @@ -1001,7 +973,7 @@ func _exec(ctx context.Context, target *balancer, query string, args ...interfac
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.ExecContext(ctx, query, args...)
})
if r != nil {
Expand Down Expand Up @@ -1051,7 +1023,7 @@ func _prepareContext(ctx context.Context, target *balancer, query string) (dbx *
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.PrepareContext(ctx, query)
})
if r != nil {
Expand Down Expand Up @@ -1118,7 +1090,7 @@ func _preparexContext(ctx context.Context, target *balancer, query string) (dbx
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.PreparexContext(ctx, query)
})
if r != nil {
Expand Down Expand Up @@ -1189,7 +1161,7 @@ func _prepareNamedContext(ctx context.Context, target *balancer, query string) (
}

// executing
r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.PrepareNamedContext(ctx, query)
})
if r != nil {
Expand Down Expand Up @@ -1239,7 +1211,7 @@ func _mustExec(ctx context.Context, target *balancer, query string, args ...inte
panic(err)
}

r, err = retryBackoff(query, func() (interface{}, error) {
r, err = retryFunc(query, func() (interface{}, error) {
return w.db.ExecContext(ctx, query, args...)
})
if r != nil {
Expand Down Expand Up @@ -1279,72 +1251,6 @@ func (dbs *DBs) MustExecContextOnSlave(ctx context.Context, query string, args .
return _mustExec(ctx, dbs.slaves, query, args...)
}

// Tx wraps std sql.Tx
type Tx struct {
*sql.Tx
}

func (t *Tx) Commit() (err error) {
for retry := 0; retry < 50; retry++ {
if err = t.Tx.Commit(); err == nil {
return
}

switch err {
case sql.ErrConnDone:

case sql.ErrTxDone, sql.ErrNoRows:
return

default:
if isErrBadConn(err) || IsDeadlock(err) {
time.Sleep(5 * time.Millisecond)
} else {
return
}
}
}

if err == sql.ErrConnDone || isErrBadConn(err) {
reportError("transaction", err)
}

return
}

// Txx wraps std sqlx.Tx
type Txx struct {
*sqlx.Tx
}

func (t *Txx) Commit() (err error) {
for retry := 0; retry < 50; retry++ {
if err = t.Tx.Commit(); err == nil {
return
}

switch err {
case sql.ErrConnDone:

case sql.ErrTxDone, sql.ErrNoRows:
return

default:
if isErrBadConn(err) || IsDeadlock(err) {
time.Sleep(5 * time.Millisecond)
} else {
return
}
}
}

if err == sql.ErrConnDone || isErrBadConn(err) {
reportError("transaction", err)
}

return
}

// MustBegin starts a transaction, and panics on error.
// Transaction is bound to one of master connections.
func (dbs *DBs) MustBegin() *Tx {
Expand Down Expand Up @@ -1415,7 +1321,7 @@ func (dbs *DBs) BeginTx(ctx context.Context, opts *sql.TxOptions) (res *Tx, err
}

// executing
r, err = retryBackoff("START TRANSACTION", func() (interface{}, error) {
r, err = retryFunc("START TRANSACTION", func() (interface{}, error) {
return w.db.BeginTx(ctx, opts)
})
if r != nil {
Expand Down Expand Up @@ -1450,7 +1356,7 @@ func (dbs *DBs) Beginx() (res *Txx, err error) {
}

// executing
r, err = retryBackoff("START TRANSACTION", func() (interface{}, error) {
r, err = retryFunc("START TRANSACTION", func() (interface{}, error) {
return w.db.Beginx()
})
if r != nil {
Expand Down Expand Up @@ -1490,7 +1396,7 @@ func (dbs *DBs) BeginTxx(ctx context.Context, opts *sql.TxOptions) (res *Txx, er
}

// executing
r, err = retryBackoff("START TRANSACTION", func() (interface{}, error) {
r, err = retryFunc("START TRANSACTION", func() (interface{}, error) {
return w.db.BeginTxx(ctx, opts)
})
if r != nil {
Expand Down
16 changes: 12 additions & 4 deletions mssqlx_test.go
Expand Up @@ -1865,8 +1865,12 @@ func TestStressQueries(t *testing.T) {
if e != nil {
t.Log(e)
} else {
_, _ = tx.Exec(db.Rebind("INSERT INTO stress VALUES (?, ?)"), "b", 13)
_, _ = tx.Exec(db.Rebind("DELETE FROM stress WHERE k = ?"), "a")
_, err := tx.Exec(db.Rebind("INSERT INTO stress VALUES (?, ?)"), "b", 13)
require.Nil(t, err)

_, err = tx.Exec(db.Rebind("DELETE FROM stress WHERE k = ?"), "a")
require.Nil(t, err)

if e = tx.Commit(); e != nil {
require.Nil(t, tx.Rollback())
t.Log(e)
Expand All @@ -1877,8 +1881,12 @@ func TestStressQueries(t *testing.T) {
if e != nil {
t.Log(e)
} else {
_, _ = txx.Exec(db.Rebind("INSERT INTO stress VALUES (?, ?)"), "c", 13)
_, _ = txx.Exec(db.Rebind("DELETE FROM stress WHERE k = ?"), "b")
_, err := txx.Exec(db.Rebind("INSERT INTO stress VALUES (?, ?)"), "c", 13)
require.Nil(t, err)

_, err = txx.Exec(db.Rebind("DELETE FROM stress WHERE k = ?"), "b")
require.Nil(t, err)

if e = txx.Commit(); e != nil {
require.Nil(t, txx.Rollback())
t.Log(e)
Expand Down
34 changes: 34 additions & 0 deletions retry.go
@@ -0,0 +1,34 @@
package mssqlx

import (
"database/sql"
"time"
)

func retryFunc(query string, f func() (interface{}, error)) (result interface{}, err error) {
for retry := 0; retry < 50; retry++ {
if result, err = f(); err == nil {
return
}

switch err {
case sql.ErrConnDone:

case sql.ErrTxDone, sql.ErrNoRows:
return

default:
if isErrBadConn(err) || IsDeadlock(err) {
time.Sleep(5 * time.Millisecond)
} else {
return
}
}
}

if err == sql.ErrConnDone || isErrBadConn(err) {
reportError(query, err)
}

return
}
42 changes: 42 additions & 0 deletions stmt.go
@@ -0,0 +1,42 @@
package mssqlx

import (
"context"
"database/sql"
)

type Stmt struct {
*sql.Stmt
}

// Exec executes a prepared statement with the given arguments and returns a Result summarizing the effect of the statement.
func (s *Stmt) Exec(args ...interface{}) (sql.Result, error) {
return s.ExecContext(context.Background(), args...)
}

// ExecContext executes a prepared statement with the given arguments and returns a Result summarizing the effect of the statement.
func (s *Stmt) ExecContext(ctx context.Context, args ...interface{}) (result sql.Result, err error) {
r, err := retryFunc("stmt_exec", func() (interface{}, error) {
return s.Stmt.ExecContext(ctx, args...)
})
if err == nil {
result = r.(sql.Result)
}
return
}

// Query executes a prepared query statement with the given arguments and returns the query results as a *Rows.
func (s *Stmt) Query(args ...interface{}) (*sql.Rows, error) {
return s.QueryContext(context.Background(), args...)
}

// QueryContext executes a prepared query statement with the given arguments and returns the query results as a *Rows.
func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (result *sql.Rows, err error) {
r, err := retryFunc("stmt_query", func() (interface{}, error) {
return s.Stmt.QueryContext(ctx, args...)
})
if err == nil {
result = r.(*sql.Rows)
}
return
}

0 comments on commit be66bda

Please sign in to comment.