Skip to content

Commit

Permalink
ticdc: change the calculation of first sync point ts (#11253)
Browse files Browse the repository at this point in the history
close #11212
  • Loading branch information
hongyunyan committed Jun 13, 2024
1 parent 269d2a9 commit fa1a2b3
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 8 deletions.
10 changes: 9 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,15 @@ LOOP2:

c.barriers = newBarriers()
if util.GetOrZero(cfInfo.Config.EnableSyncPoint) {
c.barriers.Update(syncPointBarrier, c.resolvedTs)
// firstSyncPointStartTs = k * syncPointInterval,
// which >= startTs, and choose the minimal k
syncPointInterval := util.GetOrZero(cfInfo.Config.SyncPointInterval)
k := oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0)) / syncPointInterval
if oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0))%syncPointInterval != 0 || oracle.ExtractLogical(c.resolvedTs) != 0 {
k += 1
}
firstSyncPointTs := oracle.GoTimeToTS(time.Unix(0, 0).Add(k * syncPointInterval))
c.barriers.Update(syncPointBarrier, firstSyncPointTs)
}
c.barriers.Update(finishBarrier, cfInfo.GetTargetTs())

Expand Down
6 changes: 0 additions & 6 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,7 @@ func TestBarrierAdvance(t *testing.T) {
require.Nil(t, err)
err = cf.handleBarrier(ctx, state.Info, state.Status, barrier)

nextSyncPointTs := oracle.GoTimeToTS(
oracle.GetTimeFromTS(state.Status.CheckpointTs + 10).
Add(util.GetOrZero(changefeedInfo.Config.SyncPointInterval)),
)

require.Nil(t, err)
require.Equal(t, nextSyncPointTs, barrier.GlobalBarrierTs)
require.Less(t, state.Status.CheckpointTs+10, barrier.GlobalBarrierTs)
require.Less(t, barrier.GlobalBarrierTs, cf.ddlManager.ddlResolvedTs)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ group_num=${group#G}
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility changefeed_dup_error_restart"
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts hang_sink_suicide server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable-sync-point = true
sync-point-interval = "30s"
72 changes: 72 additions & 0 deletions tests/integration_tests/syncpoint_check_ts/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/bin/bash

# [DISCRIPTION]:
# The test is used to check the first sync point ts value when we set different SyncPointInterval
# [STEP]:
# 1. Create changefeed with --sync-point --sync-interval=30s
# 2. After test, get fisrt syncpoints from tidb_cdc.syncpoint_v1
# 3. check the first syncpoint ts value

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

CDC_COUNT=3
DB_COUNT=4

function run() {
if [ "$SINK_TYPE" != "mysql" ]; then
echo "kafka downstream isn't support syncpoint record"
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR
run_sql "SET GLOBAL tidb_enable_external_ts_read = on;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

# this test contains `set global tidb_external_ts = ?` , which requires super privilege, so we
# can't use the normal user
SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml"

run_sql "SET GLOBAL tidb_enable_external_ts_read = off;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
sleep 60

run_sql "SELECT primary_ts, secondary_ts FROM tidb_cdc.syncpoint_v1 order by primary_ts limit 1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
echo "____________________________________"
cat "$OUT_DIR/sql_res.$TEST_NAME.txt"
primary_ts=($(grep primary_ts $OUT_DIR/sql_res.$TEST_NAME.txt | awk -F ": " '{print $2}'))
echo "primary_ts is " $primary_ts "start_ts is " $start_ts

shifted_primary_ts=$(($primary_ts >> 18))
if [ $(($shifted_primary_ts % 30000)) -eq 0 ]; then
pre_primary_ts=$(($primary_ts >> 18 - 30000))
pre_primary_ts=$(($pre_primary_ts << 18))
if [ $pre_primary_ts -lt $start_ts ]; then
echo "check successfully"
else
echo "primary ts is not correct, there should be a smaller primary ts as the first sync point ts"
exit 1
fi
else
echo "primary ts is not correct, the difference between it and the time.Unix(0,0) should be the integer multiples of the syncPointInterval "
exit 1
fi

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit fa1a2b3

Please sign in to comment.