/
task.go
111 lines (93 loc) · 2.97 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package framework
import (
"context"
"database/sql"
"strings"
_ "github.com/go-sql-driver/mysql" // imported for side effects
"github.com/pingcap/log"
"go.uber.org/zap"
)
// Task represents a single test case
type Task interface {
Name() string
GetCDCProfile() *CDCProfile
Prepare(taskContext *TaskContext) error
Run(taskContext *TaskContext) error
}
// TaskContext is passed to the test case to provide basic utilities for testing
type TaskContext struct {
Upstream *sql.DB
Downstream *sql.DB
Env Environment
WaitForReady func() error
Ctx context.Context
}
// CDCProfile represents the command line arguments used to create the changefeed
type CDCProfile struct {
PDUri string
SinkURI string
ConfigFile string
Opts map[string]string
}
// CreateDB creates a database in both the upstream and the downstream
func (c *TaskContext) CreateDB(name string) error {
log.Debug("Creating database in upstream", zap.String("db", name))
_, err := c.Upstream.ExecContext(c.Ctx, "create database "+name)
if err != nil {
log.Warn("Failed to create database in upstream", zap.String("db", name), zap.Error(err))
return err
}
log.Debug("Successfully created database in upstream", zap.String("db", name))
log.Debug("Creating database in downstream", zap.String("db", name))
_, err = c.Downstream.ExecContext(c.Ctx, "create database "+name)
if err != nil {
log.Warn("Failed to create database in downstream", zap.String("db", name), zap.Error(err))
return err
}
log.Debug("Successfully created database in downstream", zap.String("db", name))
return nil
}
// SQLHelper returns an SQLHelper
func (c *TaskContext) SQLHelper() *SQLHelper {
return &SQLHelper{
upstream: c.Upstream,
downstream: c.Downstream,
ctx: c.Ctx,
}
}
// String returns the string representation of the CDCProfile
func (p *CDCProfile) String() string {
builder := strings.Builder{}
builder.WriteString("cli changefeed create ")
if p.PDUri == "" {
p.PDUri = "http://127.0.0.1:2379"
}
builder.WriteString("--pd=" + p.PDUri + " ")
if p.SinkURI == "" {
log.Fatal("SinkURI cannot be empty!")
}
builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ")
if p.ConfigFile != "" {
builder.WriteString("--config=" + p.ConfigFile + " ")
}
if p.Opts == nil || len(p.Opts) == 0 {
return builder.String()
}
for k, v := range p.Opts {
builder.WriteString("--opts=\"" + k + "=" + v + "\" ")
}
builder.WriteString(" --log-level debug")
return builder.String()
}