Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
add import goroutine leak integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL committed Nov 12, 2019
1 parent fbf1fb3 commit b0daec2
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 10 deletions.
52 changes: 42 additions & 10 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,20 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) {

// Close closes worker
func (w *Worker) Close() {
failpoint.Inject("workerCantClose", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "workerCantClose"))
failpoint.Return()
})

if !atomic.CompareAndSwapInt64(&w.closed, 0, 1) {
w.wg.Wait()
w.tctx.L().Info("already closed...")
return
}

w.tctx.L().Info("start to close...")

close(w.jobQueue)
w.wg.Wait()

w.tctx.L().Info("closed !!!")
}

Expand All @@ -129,16 +133,19 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *

ctctx := w.tctx.WithContext(newCtx)

w.wg.Add(1)
go func() {
defer w.wg.Done()
doJob := func() {
for {
select {
case <-newCtx.Done():
w.tctx.L().Debug("execution goroutine exits")
w.tctx.L().Info("context canceled, execution goroutine exits")
return
case job, ok := <-w.jobQueue:
if !ok || job == nil {
if job == nil {
w.tctx.L().Info("jobs are finished, execution goroutine exits")
return
}
if !ok {
w.tctx.L().Info("job queue was closed, execution goroutine exits")
return
}
sqls := make([]string, 0, 3)
Expand All @@ -158,7 +165,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *

failpoint.Inject("LoadDataSlowDown", nil)

if err := w.conn.executeSQL(ctctx, sqls); err != nil {
err := w.conn.executeSQL(ctctx, sqls)
failpoint.Inject("executeSQLError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "executeSQLError"))
err = errors.New("inject failpoint executeSQLError")
})
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
Expand All @@ -167,19 +179,26 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, workerWg *
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
}
}
}()
}

// worker main routine
for {
select {
case <-newCtx.Done():
w.tctx.L().Info("context canceled, main goroutine exits")
return
case job, ok := <-fileJobQueue:
if !ok {
w.tctx.L().Debug("main routine exit.")
w.tctx.L().Info("file queue was closed, main routine exit.")
return
}

w.wg.Add(1)
go func() {
defer w.wg.Done()
doJob()
}()

// restore a table
if err := w.restoreDataFile(ctx, filepath.Join(w.cfg.Dir, job.dataFile), job.offset, job.info); err != nil {
// expect pause rather than exit
Expand All @@ -198,6 +217,17 @@ func (w *Worker) restoreDataFile(ctx context.Context, filePath string, offset in
return err
}

failpoint.Inject("dispatchError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "dispatchError"))
failpoint.Return(errors.New("inject failpoint dispatchError"))
})

// dispatchSQL completed, send nil to make sure all dmls are applied to target database
// we don't want to close and re-make chan frequently
// but if we need to re-call w.run, we need re-make jobQueue chan
w.jobQueue <- nil
w.wg.Wait()

w.tctx.L().Info("finish to restore dump sql file", zap.String("data file", filePath))
return nil
}
Expand Down Expand Up @@ -576,6 +606,8 @@ func (l *Loader) Close() {
func (l *Loader) stopLoad() {
// before re-write workflow, simply close all job queue and job workers
// when resuming, re-create them
l.tctx.L().Info("stop importing data process")

l.closeFileJobQueue()
l.workerWg.Wait()

Expand Down
9 changes: 9 additions & 0 deletions tests/import_goroutine_leak/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Master Configuration.

[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"

[[deploy]]
source-id = "mysql-replica-02"
dm-worker = "127.0.0.1:8263"
49 changes: 49 additions & 0 deletions tests/import_goroutine_leak/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
name: test
task-mode: full
is-sharding: false
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
black-white-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
black-white-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["import_goroutine_leak"]

mydumpers:
global:
mydumper-path: "./bin/mydumper"
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "--statement-size=100"

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
13 changes: 13 additions & 0 deletions tests/import_goroutine_leak/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Worker Configuration.

source-id = "mysql-replica-01"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "root"
password = ""
port = 3306
13 changes: 13 additions & 0 deletions tests/import_goroutine_leak/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Worker Configuration.

source-id = "mysql-replica-02"
flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""

[from]
host = "127.0.0.1"
user = "root"
password = ""
port = 3307
77 changes: 77 additions & 0 deletions tests/import_goroutine_leak/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/bin/bash

set -eu

cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

COUNT=200
function prepare_datafile() {
for i in $(seq 2); do
data_file="$WORK_DIR/db$i.prepare.sql"
echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file
echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file
echo 'USE import_goroutine_leak;' >> $data_file
echo "CREATE TABLE t$i(i TINYINT, j INT UNIQUE KEY);" >> $data_file
for j in $(seq $COUNT); do
echo "INSERT INTO t$i VALUES ($i,${j}000$i),($i,${j}001$i);" >> $data_file
done
done
}

function run() {
prepare_datafile

run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2


# check workers of import unit exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/workerCantClose=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

dmctl_start_task

check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

# dm-worker1 panics
err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l`
if [ $err_cnt -ne 1 ]; then
echo "dm-worker1 doesn't panic, panic count ${err_cnt}"
exit 2
fi

# check workers of import unit exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

export GO_FAILPOINTS=''
}

cleanup_data import_goroutine_leak
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"

0 comments on commit b0daec2

Please sign in to comment.