Skip to content

Commit

Permalink
ddl(ticdc): support batch create table ddl (#11262)
Browse files Browse the repository at this point in the history
close #11009
  • Loading branch information
hongyunyan committed Jun 12, 2024
1 parent 3887861 commit 269d2a9
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 13 deletions.
4 changes: 2 additions & 2 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,11 @@ func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.J
}

if fromHistoryTable {
// we only want to get `create table` ddl from tidb_ddl_history, so we just throw out others ddls.
// we only want to get `create table` and `create tables` ddl from tidb_ddl_history, so we just throw out others ddls.
// We only want the job with `JobStateSynced`, which is means the ddl job is done successfully.
// Besides, to satisfy the subsequent processing,
// We need to set the job to be Done to make it will replay in schemaStorage
if job.Type != timodel.ActionCreateTable || job.State != timodel.JobStateSynced {
if (job.Type != timodel.ActionCreateTable && job.Type != timodel.ActionCreateTables) || job.State != timodel.JobStateSynced {
return nil, nil
}
job.State = timodel.JobStateDone
Expand Down
11 changes: 10 additions & 1 deletion cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) {
switch job.Type {
case timodel.ActionCreateSchema, timodel.ActionModifySchemaCharsetAndCollate, timodel.ActionDropSchema:
return nil, nil
case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable:
case timodel.ActionCreateTable, timodel.ActionCreateTables, timodel.ActionCreateView, timodel.ActionRecoverTable:
// no pre table info
return nil, nil
case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable, timodel.ActionAlterTablePartitioning, timodel.ActionRemovePartitioning:
Expand Down Expand Up @@ -472,6 +472,15 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
case timodel.ActionCreateTables:
multiTableInfos := job.BinlogInfo.MultipleTableInfos
for _, tableInfo := range multiTableInfos {
err := s.inner.createTable(model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS, tableInfo), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
}
case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable:
err := s.inner.createTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package entry
import (
"context"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -407,6 +408,20 @@ func (s *schemaStorage) BuildDDLEvents(
if err != nil {
return nil, errors.Trace(err)
}
case timodel.ActionCreateTables:
if job.BinlogInfo != nil && job.BinlogInfo.MultipleTableInfos != nil {
querys := strings.Split(job.Query, ";")
multiTableInfos := job.BinlogInfo.MultipleTableInfos
for index, tableInfo := range multiTableInfos {
newTableInfo := model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, tableInfo)
job.Query = querys[index]
event := new(model.DDLEvent)
event.FromJob(job, nil, newTableInfo)
ddlEvents = append(ddlEvents, event)
}
} else {
return nil, errors.Errorf("there is no multiple table infos in the create tables job: %s", job)
}
default:
// parse preTableInfo
preSnap, err := s.GetSnapshot(ctx, job.BinlogInfo.FinishedTS-1)
Expand Down
23 changes: 23 additions & 0 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -390,6 +391,28 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
case timodel.ActionCreateTables:
// we only use multiTableInfos and Querys when we generate job event
// So if some table should be discard, we just need to delete the info from multiTableInfos and Querys
var newMultiTableInfos []*timodel.TableInfo
var newQuerys []string

multiTableInfos := job.BinlogInfo.MultipleTableInfos
querys := strings.Split(job.Query, ";")

for index, tableInfo := range multiTableInfos {
// judge each table whether need to be skip
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O) {
continue
}
newMultiTableInfos = append(newMultiTableInfos, multiTableInfos[index])
newQuerys = append(newQuerys, querys[index])
}

skip = len(newMultiTableInfos) == 0

job.BinlogInfo.MultipleTableInfos = newMultiTableInfos
job.Query = strings.Join(newQuerys, ";")
case timodel.ActionRenameTable:
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var ddlWhiteListMap = map[timodel.ActionType]bf.EventType{

// table related DDLs
timodel.ActionCreateTable: bf.CreateTable,
timodel.ActionCreateTables: bf.CreateTable,
timodel.ActionDropTable: bf.DropTable,
timodel.ActionTruncateTable: bf.TruncateTable,
timodel.ActionRenameTable: bf.RenameTable,
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestShouldDiscardDDL(t *testing.T) {
}

func TestIsAllowedDDL(t *testing.T) {
require.Len(t, ddlWhiteListMap, 36)
require.Len(t, ddlWhiteListMap, 37)
type testCase struct {
timodel.ActionType
allowed bool
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/batch_add_table/conf/changefeed.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[filter]
rules = ['*.*','!test.t_1']
24 changes: 23 additions & 1 deletion tests/integration_tests/batch_add_table/data/prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,31 @@ create table a2 (id int primary key);
create table a3 (id int primary key);
create table a4 (id int primary key);
create table a5 (id int primary key);
create table a6 (id int primary key);
create table a7 (id int primary key);
create table a8 (id int primary key);
create table a9 (id int primary key);
create table a10 (id int primary key);
create table a11 (id int primary key);
create table a12 (id int primary key);
create table a13 (id int primary key);
create table a14 (id int primary key);
create table a15 (id int primary key);
create table a16 (id int primary key);

insert into a1 values (1);
insert into a2 values (1);
insert into a3 values (1);
insert into a4 values (1);
insert into a5 values (1);
insert into a5 values (1);
insert into a6 values (1);
insert into a7 values (1);
insert into a8 values (1);
insert into a9 values (1);
insert into a10 values (1);
insert into a11 values (1);
insert into a12 values (1);
insert into a13 values (1);
insert into a14 values (1);
insert into a15 values (1);
insert into a16 values (1);
12 changes: 11 additions & 1 deletion tests/integration_tests/batch_add_table/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ insert into a2 values (2);
insert into a3 values (2);
insert into a4 values (2);
insert into a5 values (2);

insert into a6 values (2);
insert into a7 values (2);
insert into a8 values (2);
insert into a9 values (2);
insert into a10 values (2);
insert into a11 values (2);
insert into a12 values (2);
insert into a13 values (2);
insert into a14 values (2);
insert into a15 values (2);
insert into a16 values (2);

create table finish_mark (id int primary key);
25 changes: 18 additions & 7 deletions tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ function run_with_fast_create_table() {

run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM"
Expand All @@ -32,16 +29,31 @@ function run_with_fast_create_table() {
;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

## to generate batch create ddl. In changefeed.toml, we filter test.t_1
for ((i = 1; i <= 100; i++)); do
mysql -h ${UP_TIDB_HOST} -P ${UP_TIDB_PORT} -u root -D "test" -e "create table t_$i (a int primary key , b int)" &
done

check_table_exists test.t_100 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

for ((i = 1; i <= 100; i++)); do
mysql -h ${UP_TIDB_HOST} -P ${UP_TIDB_PORT} -u root -D "test" -e "insert into t_$i values(1,2)"
done

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists batch_add_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
# check the ddl of this table is skipped
check_table_not_exists test.t_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
Expand All @@ -56,9 +68,6 @@ function run_without_fast_create_table() {

run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM"
Expand All @@ -78,7 +87,9 @@ function run_without_fast_create_table() {
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists batch_add_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
Expand Down

0 comments on commit 269d2a9

Please sign in to comment.