Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: change flashback grammar to flashback cluster to timestamp #37815

Merged
merged 4 commits into from Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions ddl/cluster_test.go
Expand Up @@ -150,7 +150,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)

finishValue, err := infosync.GetPDScheduleConfig(context.Background())
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
tk.MustExec("set global tidb_gc_enable = on")
tk.MustExec("set global tidb_super_read_only = off")

tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
rs, err := tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)
Expand All @@ -214,7 +214,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)
Expand Down Expand Up @@ -251,15 +251,15 @@ func TestCancelFlashbackCluster(t *testing.T) {
return job.SchemaState == model.StateWriteOnly
})
dom.DDL().SetHook(hook)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)

// Try canceled on StateWriteReorganization, cancel failed
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(hook)
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
hook.MustCancelFailed(t)

dom.DDL().SetHook(originHook)
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Expand Up @@ -537,7 +537,7 @@ func (e *DDLExec) executeFlashBackCluster(ctx context.Context, s *ast.FlashBackC
return errors.Errorf("not support flash back cluster with TiFlash stores")
}

flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, &s.AsOf)
flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS)
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions executor/recover_test.go
Expand Up @@ -298,7 +298,7 @@ func TestRecoverClusterMeetError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "Not support flashback cluster in non-TiKV env")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(30*time.Second)), "Not support flashback cluster in non-TiKV env")

ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
flashbackTs := oracle.GetTimeFromTS(ts)
Expand All @@ -311,8 +311,8 @@ func TestRecoverClusterMeetError(t *testing.T) {
fmt.Sprintf("return(%v)", injectSafeTS)))

// Get GC safe point error.
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "cannot set flashback timestamp to future time")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), "can not get 'tikv_gc_safe_point'")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(30*time.Second)), "cannot set flashback timestamp to future time")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), "can not get 'tikv_gc_safe_point'")

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()
Expand All @@ -321,19 +321,19 @@ func TestRecoverClusterMeetError(t *testing.T) {
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

// out of GC safe point range.
tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-60*60*60*time.Second)), int(variable.ErrSnapshotTooOld.Code()))
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-60*60*60*time.Second)), int(variable.ErrSnapshotTooOld.Code()))

// Flashback without super privilege.
tk.MustExec("CREATE USER 'testflashback'@'localhost';")
newTk := testkit.NewTestKit(t, store)
require.NoError(t, newTk.Session().Auth(&auth.UserIdentity{Username: "testflashback", Hostname: "localhost"}, nil, nil))
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), int(core.ErrSpecificAccessDenied.Code()))
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), int(core.ErrSpecificAccessDenied.Code()))
tk.MustExec("drop user 'testflashback'@'localhost';")

// Flashback failed because of ddl history.
tk.MustExec("use test;")
tk.MustExec("create table t(a int);")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs), "schema version not same, have done ddl during [flashbackTS, now)")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "schema version not same, have done ddl during [flashbackTS, now)")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
Expand All @@ -356,7 +356,7 @@ func TestRecoverClusterWithTiFlash(t *testing.T) {
// Set GC safe point
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)),
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)),
"not support flash back cluster with TiFlash stores")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
Expand Down Expand Up @@ -387,20 +387,20 @@ func TestFlashbackWithSafeTs(t *testing.T) {
}{
{
name: "5 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs),
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs),
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 5 secs ago",
// Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped.
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)),
compareWithSafeTS: -1,
},
{
name: "5 seconds ago to now, safeTS 10 secs ago",
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs),
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(-10 * time.Second)),
compareWithSafeTS: 1,
},
Expand Down
13 changes: 6 additions & 7 deletions parser/ast/ddl.go
Expand Up @@ -4006,14 +4006,14 @@ func (n *RecoverTableStmt) Accept(v Visitor) (Node, bool) {
type FlashBackClusterStmt struct {
ddlNode

AsOf AsOfClause
FlashbackTS ExprNode
}

// Restore implements Node interface
func (n *FlashBackClusterStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("FLASHBACK CLUSTER ")
if err := n.AsOf.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while splicing FlashBackClusterStmt.Asof")
ctx.WriteKeyWord("FLASHBACK CLUSTER TO TIMESTAMP ")
if err := n.FlashbackTS.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while splicing FlashBackClusterStmt.FlashbackTS")
}
return nil
}
Expand All @@ -4024,13 +4024,12 @@ func (n *FlashBackClusterStmt) Accept(v Visitor) (Node, bool) {
if skipChildren {
return v.Leave(newNode)
}

n = newNode.(*FlashBackClusterStmt)
node, ok := n.AsOf.Accept(v)
node, ok := n.FlashbackTS.Accept(v)
if !ok {
return n, false
}
n.AsOf = *node.(*AsOfClause)
n.FlashbackTS = node.(ExprNode)
return v.Leave(n)
}

Expand Down
10 changes: 4 additions & 6 deletions parser/parser.go
Expand Up @@ -1479,7 +1479,7 @@ var (
57376: 646, // character (658x)
57473: 647, // match (650x)
57437: 648, // index (646x)
57542: 649, // to (568x)
57542: 649, // to (569x)
57360: 650, // all (554x)
46: 651, // '.' (549x)
57362: 652, // analyze (533x)
Expand All @@ -1502,7 +1502,7 @@ var (
57373: 669, // cascade (484x)
57503: 670, // read (484x)
57513: 671, // restrict (484x)
57347: 672, // asof (483x)
57347: 672, // asof (482x)
57383: 673, // create (480x)
57422: 674, // foreign (480x)
57424: 675, // fulltext (480x)
Expand Down Expand Up @@ -11246,7 +11246,7 @@ var (
{2382, 2382},
{563: 6766, 659: 6767, 1134: 6775},
{2383, 2383},
{672: 6782},
{649: 6782},
// 4245
{2: 3129, 2961, 2996, 2841, 2877, 2998, 2768, 10: 2814, 2769, 2900, 3015, 3008, 3365, 3360, 2880, 3164, 2882, 2856, 2800, 2803, 2792, 2825, 2884, 2885, 2992, 2879, 3016, 3121, 3120, 2767, 2878, 2881, 2892, 2832, 2836, 2888, 3001, 2847, 2928, 2765, 2766, 2927, 3000, 2764, 3013, 2973, 50: 3084, 2846, 2849, 3067, 3064, 3056, 3068, 3071, 3072, 3069, 3073, 3074, 3070, 3063, 3075, 3058, 3059, 3062, 3065, 3066, 3076, 3368, 2914, 2850, 3043, 3042, 3044, 3039, 3038, 3045, 3040, 3041, 2842, 2958, 3028, 3092, 3026, 3093, 3133, 3027, 2854, 2922, 3216, 3220, 3208, 3219, 3221, 3211, 3217, 3218, 3222, 3215, 2783, 2917, 3369, 3362, 3358, 2777, 3381, 3025, 3014, 2812, 3364, 3379, 3380, 3378, 3374, 3017, 3018, 3019, 3020, 3021, 3022, 3024, 3370, 2855, 2851, 2943, 2947, 2948, 2949, 2950, 2938, 2967, 3010, 2969, 2827, 2785, 2968, 2939, 3089, 2919, 2959, 2822, 2875, 3034, 2896, 2786, 2791, 2802, 2817, 3357, 2826, 3029, 2899, 2844, 2941, 2858, 2866, 2772, 2918, 2801, 2821, 3196, 2831, 3078, 3168, 2955, 2864, 3372, 2894, 3166, 2835, 2843, 2865, 3079, 2776, 2794, 3361, 2815, 2807, 2893, 2828, 3032, 3048, 2976, 3085, 3086, 3050, 2913, 3087, 3006, 3163, 3114, 3046, 2845, 2946, 3367, 3366, 3004, 2903, 2761, 2787, 2908, 2798, 2799, 2910, 2806, 2816, 2819, 3057, 2869, 2971, 3165, 2937, 2906, 2966, 3009, 2895, 3031, 3116, 2853, 3126, 3127, 3005, 3095, 3054, 3096, 2915, 2977, 2775, 3144, 3097, 3100, 2781, 3080, 3101, 3377, 2788, 2979, 3146, 3103, 2975, 2796, 3105, 2988, 3012, 2999, 2797, 3150, 3107, 3136, 3007, 2810, 3037, 3203, 3363, 2820, 2823, 2989, 3035, 3155, 3030, 3156, 2983, 3109, 3108, 3033, 3090, 2920, 3382, 3110, 3111, 2924, 2981, 3112, 3088, 2839, 2840, 2954, 3060, 2956, 3169, 3113, 3002, 3003, 2944, 2848, 2985, 3117, 2763, 3178, 2984, 3185, 3186, 3187, 3188, 3190, 3189, 3191, 3192, 3193, 3128, 2861, 2986, 3213, 3212, 2867, 2758, 2759, 3036, 3053, 2770, 3055, 3081, 2762, 2773, 2774, 3098, 3099, 2778, 2965, 2779, 2780, 2952, 3091, 3373, 3102, 2897, 2784, 2789, 2790, 3104, 3106, 2909, 3151, 2911, 2804, 2805, 2921, 2809, 2972, 3197, 2811, 2982, 2916, 2890, 3123, 2990, 3011, 2974, 2905, 3157, 2960, 2978, 3023, 2902, 2991, 2883, 3047, 2886, 2887, 3383, 2923, 2830, 2852, 3130, 3198, 2833, 2994, 2997, 3049, 3083, 3131, 3094, 2933, 2934, 2940, 3161, 3134, 3162, 3135, 3061, 3137, 2964, 2901, 3115, 2995, 2953, 3122, 3119, 3118, 3170, 2980, 3082, 2993, 3182, 3125, 2962, 2857, 3206, 3194, 2862, 2891, 2898, 2963, 3132, 2970, 3386, 2872, 3139, 3140, 3359, 3141, 3142, 3143, 3199, 3145, 3147, 3148, 3149, 2808, 2957, 3200, 2926, 3152, 2813, 3207, 3387, 3154, 3392, 3391, 3384, 3209, 3210, 3159, 3158, 2829, 3160, 3167, 2932, 2837, 2838, 3077, 2951, 3375, 3376, 3385, 2945, 2873, 2987, 2904, 2907, 3201, 3174, 3175, 3176, 3177, 3202, 3388, 3172, 3173, 2925, 3124, 3389, 3390, 3195, 3179, 3180, 3181, 3214, 3371, 661: 3914, 2756, 2757, 2755, 736: 6778},
{2386, 2386, 649: 6780, 1217: 6779},
Expand Down Expand Up @@ -12531,9 +12531,7 @@ yynewstate:
case 144:
{
parser.yyVAL.statement = &ast.FlashBackClusterStmt{
AsOf: ast.AsOfClause{
TsExpr: ast.NewValueExpr(yyS[yypt-0].ident, "", ""),
},
FlashbackTS: ast.NewValueExpr(yyS[yypt-0].ident, "", ""),
}
}
case 145:
Expand Down
6 changes: 2 additions & 4 deletions parser/parser.y
Expand Up @@ -2573,12 +2573,10 @@ RecoverTableStmt:
*
*******************************************************************/
FlashbackClusterStmt:
"FLASHBACK" "CLUSTER" asof "TIMESTAMP" stringLit
"FLASHBACK" "CLUSTER" "TO" "TIMESTAMP" stringLit
{
$$ = &ast.FlashBackClusterStmt{
AsOf: ast.AsOfClause{
TsExpr: ast.NewValueExpr($5, "", ""),
},
FlashbackTS: ast.NewValueExpr($5, "", ""),
}
}

Expand Down
6 changes: 3 additions & 3 deletions parser/parser_test.go
Expand Up @@ -3236,9 +3236,9 @@ func TestDDL(t *testing.T) {
{"flashback table t TO t1", true, "FLASHBACK TABLE `t` TO `t1`"},

// for flashback cluster
{"flashback cluster as of timestamp '2021-05-26 16:45:26'", true, "FLASHBACK CLUSTER AS OF TIMESTAMP '2021-05-26 16:45:26'"},
{"flashback cluster as of timestamp TIDB_BOUNDED_STALENESS(DATE_SUB(NOW(), INTERVAL 3 SECOND), NOW())", false, ""},
{"flashback cluster as of timestamp DATE_SUB(NOW(), INTERVAL 3 SECOND)", false, ""},
{"flashback cluster to timestamp '2021-05-26 16:45:26'", true, "FLASHBACK CLUSTER TO TIMESTAMP '2021-05-26 16:45:26'"},
{"flashback cluster to timestamp TIDB_BOUNDED_STALENESS(DATE_SUB(NOW(), INTERVAL 3 SECOND), NOW())", false, ""},
{"flashback cluster to timestamp DATE_SUB(NOW(), INTERVAL 3 SECOND)", false, ""},

// for remove partitioning
{"alter table t remove partitioning", true, "ALTER TABLE `t` REMOVE PARTITIONING"},
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Expand Up @@ -3213,7 +3213,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
case *ast.BeginStmt:
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
if raw.AsOf != nil {
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf)
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf.TsExpr)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/staleread/processor.go
Expand Up @@ -243,7 +243,7 @@ func parseAndValidateAsOf(sctx sessionctx.Context, asOf *ast.AsOfClause) (uint64
return 0, nil
}

ts, err := CalculateAsOfTsExpr(sctx, asOf)
ts, err := CalculateAsOfTsExpr(sctx, asOf.TsExpr)
if err != nil {
return 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions sessiontxn/staleread/util.go
Expand Up @@ -26,8 +26,8 @@ import (
)

// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func CalculateAsOfTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) {
tsVal, err := expression.EvalAstExpr(sctx, asOfClause.TsExpr)
func CalculateAsOfTsExpr(sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
tsVal, err := expression.EvalAstExpr(sctx, tsExpr)
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/brietest/flashback_test.go
Expand Up @@ -80,7 +80,7 @@ func TestFlashback(t *testing.T) {
fmt.Sprintf("return(%v)", injectSafeTS)))

tk.MustExec("insert t values (4), (5), (6)")
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

tk.MustExec("admin check table t")
require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3")
Expand Down