Skip to content

Commit

Permalink
Merge branch 'master' into dumplingSupportEnablePaging
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Dec 12, 2022
2 parents 6dcacc1 + c13dfe3 commit 1e199b9
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 9 deletions.
2 changes: 1 addition & 1 deletion ddl/concurrentddltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "concurrentddltest_test",
timeout = "long",
timeout = "moderate",
srcs = [
"main_test.go",
"switch_test.go",
Expand Down
2 changes: 1 addition & 1 deletion domain/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// MockInfoCacheAndLoadInfoSchema only used in unit tests.
func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {
do.infoCache = infoschema.NewCache(16)
do.infoCache.Reset(16)
do.infoCache.Insert(is, 0)
}

Expand Down
1 change: 1 addition & 0 deletions dumpling/export/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"@com_github_coreos_go_semver//semver",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
Expand Down
7 changes: 7 additions & 0 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -144,6 +145,9 @@ type Config struct {
PromFactory promutil.Factory `json:"-"`
PromRegistry promutil.Registry `json:"-"`
ExtStorage storage.ExternalStorage `json:"-"`

IOTotalBytes *atomic.Uint64
Net string
}

// ServerInfoUnknown is the unknown database type to dumpling
Expand Down Expand Up @@ -212,6 +216,9 @@ func (conf *Config) GetDriverConfig(db string) *mysql.Config {
driverCfg.User = conf.User
driverCfg.Passwd = conf.Password
driverCfg.Net = "tcp"
if conf.Net != "" {
driverCfg.Net = conf.Net
}
driverCfg.Addr = hostPort
driverCfg.DBName = db
driverCfg.Collation = "utf8mb4_general_ci"
Expand Down
31 changes: 31 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/hex"
"fmt"
"math/big"
"net"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/coreos/go-semver/semver"
// import mysql driver
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pclog "github.com/pingcap/log"
Expand All @@ -32,8 +34,10 @@ import (
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
pd "github.com/tikv/pd/client"
gatomic "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -106,6 +110,17 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
if err != nil {
return nil, err
}
failpoint.Inject("SetIOTotalBytes", func(_ failpoint.Value) {
d.conf.IOTotalBytes = gatomic.NewUint64(0)
d.conf.Net = uuid.New().String()
go func() {
for {
time.Sleep(10 * time.Millisecond)
d.tctx.L().Logger.Info("IOTotalBytes", zap.Uint64("IOTotalBytes", d.conf.IOTotalBytes.Load()))
}
}()
})

err = runSteps(d,
initLogger,
createExternalStore,
Expand Down Expand Up @@ -1335,6 +1350,22 @@ func startHTTPService(d *Dumper) error {

// openSQLDB is an initialization step of Dumper.
func openSQLDB(d *Dumper) error {
if d.conf.IOTotalBytes != nil {
mysql.RegisterDialContext(d.conf.Net, func(ctx context.Context, addr string) (net.Conn, error) {
dial := &net.Dialer{}
conn, err := dial.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
tcpConn := conn.(*net.TCPConn)
// try https://github.com/go-sql-driver/mysql/blob/bcc459a906419e2890a50fc2c99ea6dd927a88f2/connector.go#L56-L64
err = tcpConn.SetKeepAlive(true)
if err != nil {
d.tctx.L().Logger.Warn("fail to keep alive", zap.Error(err))
}
return util.NewTCPConnWithIOCounter(tcpConn, d.conf.IOTotalBytes), nil
})
}
conf := d.conf
c, err := mysql.NewConnector(conf.GetDriverConfig(""))
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions dumpling/tests/basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,14 @@ run_dumpling --consistency lock -B "$DB_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling
cnt=$(grep -w "$DB_NAME" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}-schema-create.sql|wc -l)
echo "records count is ${cnt}"
[ "$cnt" = 1 ]

# Test for recording network usage
run_sql "drop database if exists test_db;"
run_sql "create database test_db;"
run_sql "create table test_db.test_table (a int primary key);"
run_sql "insert into test_db.test_table values (1),(2),(3),(4),(5),(6),(7),(8);"

export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/SetIOTotalBytes=return(1)"
run_dumpling -B "test_db" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
cnt=$(grep "IOTotalBytes=" ${DUMPLING_OUTPUT_DIR}/dumpling.log | grep -v "IOTotalBytes=0" | wc -l)
[ "$cnt" -ge 1 ]
2 changes: 1 addition & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ go_library(

go_test(
name = "executor_test",
timeout = "long",
timeout = "moderate",
srcs = [
"adapter_test.go",
"admin_test.go",
Expand Down
2 changes: 1 addition & 1 deletion executor/seqtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "seqtest_test",
timeout = "moderate",
timeout = "short",
srcs = [
"main_test.go",
"prepared_test.go",
Expand Down
11 changes: 9 additions & 2 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ type InfoCache struct {
}

// NewCache creates a new InfoCache.
func NewCache(capcity int) *InfoCache {
return &InfoCache{cache: make([]InfoSchema, 0, capcity)}
func NewCache(capacity int) *InfoCache {
return &InfoCache{cache: make([]InfoSchema, 0, capacity)}
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
h.cache = make([]InfoSchema, 0, capacity)
}

// GetLatest gets the newest information schema.
Expand Down
6 changes: 3 additions & 3 deletions sessiontxn/staleread/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,18 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) {
tk.MustExec("set @@tx_read_ts=''")

// `@@tidb_read_staleness`
tk.MustExec("set @@tidb_read_staleness=-5")
tk.MustExec("set @@tidb_read_staleness=-100")
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(nil)
require.True(t, processor.IsStaleness())
require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion())
expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second)
expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second)
require.NoError(t, err)
require.Equal(t, expectedTS, processor.GetStalenessReadTS())
tk.MustExec("set @@tidb_read_staleness=''")

// `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts`
tk.MustExec("set @@tidb_read_staleness=-5")
tk.MustExec("set @@tidb_read_staleness=-100")
processor = createProcessor(t, tk.Session())
err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) {
return p1.ts, nil
Expand Down

0 comments on commit 1e199b9

Please sign in to comment.