Skip to content

Commit

Permalink
Merge branch 'zeromicro:master' into fix-core-errorx
Browse files Browse the repository at this point in the history
  • Loading branch information
ch3nnn committed Mar 1, 2024
2 parents 4bd0c1d + 52e5d85 commit 37cdb12
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
12 changes: 11 additions & 1 deletion core/stores/sqlx/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqlx
import (
"context"
"database/sql"
"errors"
"time"

"github.com/zeromicro/go-zero/core/breaker"
Expand Down Expand Up @@ -70,6 +71,9 @@ func (s statement) ExecCtx(ctx context.Context, args ...any) (result sql.Result,
}, func(err error) bool {
return s.accept(err)
})
if errors.Is(err, breaker.ErrServiceUnavailable) {
metricReqErr.Inc("stmt_exec", "breaker")
}

return
}
Expand Down Expand Up @@ -137,7 +141,8 @@ func (s statement) QueryRowsPartialCtx(ctx context.Context, v any, args ...any)
func (s statement) queryRows(ctx context.Context, scanFn func(any, rowsScanner) error,
v any, args ...any) error {
var scanFailed bool
return s.brk.DoWithAcceptable(func() error {

err := s.brk.DoWithAcceptable(func() error {
return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error {
err := scanFn(v, rows)
if err != nil {
Expand All @@ -148,6 +153,11 @@ func (s statement) queryRows(ctx context.Context, scanFn func(any, rowsScanner)
}, func(err error) bool {
return scanFailed || s.accept(err)
})
if errors.Is(err, breaker.ErrServiceUnavailable) {
metricReqErr.Inc("stmt_queryRows", "breaker")
}

return err
}

// DisableLog disables logging of sql statements, includes info and slow logs.
Expand Down
48 changes: 47 additions & 1 deletion core/stores/sqlx/stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestNilGuard(t *testing.T) {
assert.Equal(t, nilGuard{}, guard)
}

func TestStmtScanFailed(t *testing.T) {
func TestStmtBreaker(t *testing.T) {
dbtest.RunTest(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectPrepare("any")

Expand All @@ -242,6 +242,52 @@ func TestStmtScanFailed(t *testing.T) {
assert.NotErrorIs(t, err, breaker.ErrServiceUnavailable)
}
})

dbtest.RunTest(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectPrepare("any")
conn := NewSqlConnFromDB(db)
stmt, err := conn.Prepare("any")
assert.NoError(t, err)

for i := 0; i < 1000; i++ {
assert.Error(t, conn.Transact(func(session Session) error {
return nil
}))
}

var breakerTriggered bool
for i := 0; i < 1000; i++ {
_, err = stmt.Exec("any")
if errors.Is(err, breaker.ErrServiceUnavailable) {
breakerTriggered = true
break
}
}
assert.True(t, breakerTriggered)
})

dbtest.RunTest(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectPrepare("any")
conn := NewSqlConnFromDB(db)
stmt, err := conn.Prepare("any")
assert.NoError(t, err)

for i := 0; i < 1000; i++ {
assert.Error(t, conn.Transact(func(session Session) error {
return nil
}))
}

var breakerTriggered bool
for i := 0; i < 1000; i++ {
err = stmt.QueryRows(&struct{}{}, "any")
if errors.Is(err, breaker.ErrServiceUnavailable) {
breakerTriggered = true
break
}
}
assert.True(t, breakerTriggered)
})
}

type mockedSessionConn struct {
Expand Down

0 comments on commit 37cdb12

Please sign in to comment.