Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dep(*): remove usage of tidb-tools/pkg/{dbutil, utils} (#11122) #11154

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/tiflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package main

import (
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tiflow/engine/pkg/cmd"
)

func main() {
collate.SetNewCollationEnabledForTest(false)
cmd.Run()
}
22 changes: 16 additions & 6 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package common

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"os"
Expand All @@ -29,8 +30,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/config/security"
"github.com/pingcap/tiflow/dm/pb"
Expand All @@ -43,6 +44,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

Expand All @@ -57,7 +59,7 @@ var (
// CtlClient used to get master client for dmctl.
type CtlClient struct {
mu sync.RWMutex
tls *toolutils.TLS
tlsConfig *tls.Config
conn *grpc.ClientConn
MasterClient pb.MasterClient // exposed to be used in test
EtcdClient *clientv3.Client // exposed to be used in export config
Expand All @@ -77,9 +79,13 @@ func (c *CtlClient) updateMasterClient() error {
}

endpoints := c.EtcdClient.Endpoints()
grpcTLS := grpc.WithInsecure()
if c.tlsConfig != nil {
grpcTLS = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConfig))
}
for _, endpoint := range endpoints {
//nolint:staticcheck
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), grpcTLS, grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err == nil {
c.conn = conn
c.MasterClient = pb.NewMasterClient(conn)
Expand Down Expand Up @@ -158,7 +164,11 @@ func InitUtils(cfg *Config) error {

// InitClient initializes dm-master client.
func InitClient(addr string, securityCfg security.Security) error {
tls, err := toolutils.NewTLS(securityCfg.SSLCA, securityCfg.SSLCert, securityCfg.SSLKey, "", securityCfg.CertAllowedCN)
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(securityCfg.SSLCA),
util.WithCertAndKeyPath(securityCfg.SSLCert, securityCfg.SSLKey),
util.WithVerifyCommonName(securityCfg.CertAllowedCN),
)
if err != nil {
return terror.ErrCtlInvalidTLSCfg.Delegate(err)
}
Expand All @@ -169,14 +179,14 @@ func InitClient(addr string, securityCfg security.Security) error {
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
TLS: tls.TLSConfig(),
TLS: tlsConfig,
})
if err != nil {
return err
}

GlobalCtlClient = &CtlClient{
tls: tls,
tlsConfig: tlsConfig,
EtcdClient: etcdClient,
}

Expand Down
15 changes: 12 additions & 3 deletions dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"time"

"github.com/pingcap/failpoint"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tiflow/dm/master/metrics"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
Expand Down Expand Up @@ -114,14 +115,22 @@ func (s *Server) electionNotify(ctx context.Context) {
func (s *Server) createLeaderClient(leaderAddr string) {
s.closeLeaderClient()

tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(s.cfg.SSLCA),
util.WithCertAndKeyPath(s.cfg.SSLCert, s.cfg.SSLKey),
util.WithVerifyCommonName(s.cfg.CertAllowedCN),
)
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
}
grpcTLS := grpc.WithInsecure()
if tlsConfig != nil {
grpcTLS = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

//nolint:staticcheck
conn, err := grpc.Dial(leaderAddr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second))
conn, err := grpc.Dial(leaderAddr, grpcTLS, grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
Expand Down
7 changes: 5 additions & 2 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"
"time"

toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tiflow/dm/pkg/etcdutil"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand Down Expand Up @@ -123,7 +123,10 @@ func prepareJoinEtcd(cfg *Config) error {
return nil
}

tlsCfg, err := toolutils.ToTLSConfig(cfg.SSLCA, cfg.SSLCert, cfg.SSLKey)
tlsCfg, err := util.NewTLSConfig(
util.WithCAPath(cfg.SSLCA),
util.WithCertAndKeyPath(cfg.SSLCert, cfg.SSLKey),
)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "generate tls config")
}
Expand Down
2 changes: 1 addition & 1 deletion dm/master/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"time"

cpu "github.com/pingcap/tidb-tools/pkg/utils"
cpu "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down
56 changes: 36 additions & 20 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tiflow/dm/checker"
dmcommon "github.com/pingcap/tiflow/dm/common"
Expand Down Expand Up @@ -61,6 +61,11 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
<<<<<<< HEAD
=======
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
>>>>>>> de06be9be5 (dep(*): remove usage of tidb-tools/pkg/{dbutil, utils} (#11122))
)

const (
Expand Down Expand Up @@ -174,18 +179,20 @@ func (s *Server) Start(ctx context.Context) (err error) {
return
}

tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(s.cfg.SSLCA),
util.WithCertAndKeyPath(s.cfg.SSLCert, s.cfg.SSLKey),
util.WithVerifyCommonName(s.cfg.CertAllowedCN),
)
if err != nil {
return terror.ErrMasterTLSConfigNotValid.Delegate(err)
}

// tls2 is used for grpc client in grpc gateway
tls2, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err != nil {
return terror.ErrMasterTLSConfigNotValid.Delegate(err)
grpcTLS := grpc.WithInsecure()
if tlsConfig != nil {
grpcTLS = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

apiHandler, err := getHTTPAPIHandler(ctx, s.cfg.AdvertiseAddr, tls2.ToGRPCDialOption())
apiHandler, err := getHTTPAPIHandler(ctx, s.cfg.AdvertiseAddr, grpcTLS)
if err != nil {
return
}
Expand All @@ -206,18 +213,13 @@ func (s *Server) Start(ctx context.Context) (err error) {
"/debug/": getDebugHandler(),
}
if s.cfg.OpenAPI {
// tls3 is used to openapi reverse proxy
tls3, err1 := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err1 != nil {
return terror.ErrMasterTLSConfigNotValid.Delegate(err1)
}
if initOpenAPIErr := s.InitOpenAPIHandles(tls3.TLSConfig()); initOpenAPIErr != nil {
if initOpenAPIErr := s.InitOpenAPIHandles(tlsConfig); initOpenAPIErr != nil {
return terror.ErrOpenAPICommonError.Delegate(initOpenAPIErr)
}

const dashboardPrefix = "/dashboard/"
scheme := "http://"
if tls3.TLSConfig() != nil {
if tlsConfig != nil {
scheme = "https://"
}
log.L().Info("Web UI enabled", zap.String("dashboard", scheme+s.cfg.AdvertiseAddr+dashboardPrefix))
Expand All @@ -238,7 +240,7 @@ func (s *Server) Start(ctx context.Context) (err error) {

// create an etcd client used in the whole server instance.
// NOTE: we only use the local member's address now, but we can use all endpoints of the cluster if needed.
s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.AdvertiseAddr)}, tls.TLSConfig())
s.etcdClient, err = etcdutil.CreateClient([]string{withHost(s.cfg.AdvertiseAddr)}, tlsConfig)
if err != nil {
return
}
Expand Down Expand Up @@ -2153,11 +2155,15 @@ func (s *Server) listMemberMaster(ctx context.Context, names []string) (*pb.Memb

client := &http.Client{}
if len(s.cfg.SSLCA) != 0 {
inner, err := toolutils.ToTLSConfigWithVerify(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.CertAllowedCN)
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(s.cfg.SSLCA),
util.WithCertAndKeyPath(s.cfg.SSLCert, s.cfg.SSLKey),
util.WithVerifyCommonName(s.cfg.CertAllowedCN),
)
if err != nil {
return resp, err
}
client = toolutils.ClientWithTLS(inner)
client = util.ClientWithTLS(tlsConfig)
}
client.Timeout = 1 * time.Second

Expand Down Expand Up @@ -2402,15 +2408,25 @@ func (s *Server) createMasterClientByName(ctx context.Context, name string) (pb.
if len(clientURLs) == 0 {
return nil, nil, errors.New("master not found")
}
tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)

tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(s.cfg.SSLCA),
util.WithCertAndKeyPath(s.cfg.SSLCert, s.cfg.SSLKey),
util.WithVerifyCommonName(s.cfg.CertAllowedCN),
)
if err != nil {
return nil, nil, err
}

grpcTLS := grpc.WithInsecure()
if tlsConfig != nil {
grpcTLS = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

var conn *grpc.ClientConn
for _, clientURL := range clientURLs {
//nolint:staticcheck
conn, err = grpc.Dial(clientURL, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second))
conn, err = grpc.Dial(clientURL, grpcTLS, grpc.WithBackoffMaxDelay(3*time.Second))
if err == nil {
masterClient := pb.NewMasterClient(conn)
return masterClient, conn, nil
Expand Down
10 changes: 5 additions & 5 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
toolutils "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util"
tidbmock "github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tiflow/dm/checker"
common2 "github.com/pingcap/tiflow/dm/common"
Expand Down Expand Up @@ -1635,12 +1635,12 @@ func (t *testMasterSuite) testNormalServerLifecycle(cfg *Config, checkLogic func

func (t *testMasterSuite) testHTTPInterface(url string, contain []byte) {
// we use HTTPS in some test cases.
tlsConfig, err := toolutils.NewTLSConfig(
toolutils.WithCAPath(pwd+"/tls_for_test/ca.pem"),
toolutils.WithCertAndKeyPath(pwd+"/tls_for_test/dm.pem", pwd+"/tls_for_test/dm.key"),
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(pwd+"/tls_for_test/ca.pem"),
util.WithCertAndKeyPath(pwd+"/tls_for_test/dm.pem", pwd+"/tls_for_test/dm.key"),
)
require.NoError(t.T(), err)
cli := toolutils.ClientWithTLS(tlsConfig)
cli := util.ClientWithTLS(tlsConfig)

// nolint:noctx
resp, err := cli.Get(url)
Expand Down
15 changes: 12 additions & 3 deletions dm/master/workerrpc/rawgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"context"
"time"

toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tiflow/dm/config/security"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/terror"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
)

// GRPCClient stores raw grpc connection and worker client.
Expand All @@ -43,12 +44,20 @@ func NewGRPCClientWrap(conn *grpc.ClientConn, client pb.WorkerClient) (*GRPCClie

// NewGRPCClient initializes a new grpc client from worker address.
func NewGRPCClient(addr string, securityCfg security.Security) (*GRPCClient, error) {
tls, err := toolutils.NewTLS(securityCfg.SSLCA, securityCfg.SSLCert, securityCfg.SSLKey, addr, securityCfg.CertAllowedCN)
tlsConfig, err := util.NewTLSConfig(
util.WithCAPath(securityCfg.SSLCA),
util.WithCertAndKeyPath(securityCfg.SSLCert, securityCfg.SSLKey),
util.WithVerifyCommonName(securityCfg.CertAllowedCN),
)
if err != nil {
return nil, terror.ErrMasterGRPCCreateConn.Delegate(err)
}
grpcTLS := grpc.WithInsecure()
if tlsConfig != nil {
grpcTLS = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(),
conn, err := grpc.Dial(addr, grpcTLS,
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 100 * time.Millisecond,
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/checker/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"database/sql"
"fmt"

toolsutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/pkg/errors"
)

// MySQLVersionChecker checks mysql/mariadb/rds,... version.
Expand Down Expand Up @@ -135,7 +135,7 @@ func (pc *MySQLServerIDChecker) Check(ctx context.Context) *Result {

serverID, err := dbutil.ShowServerID(ctx, pc.db)
if err != nil {
if toolsutils.OriginError(err) != sql.ErrNoRows {
if errors.OriginError(err) != sql.ErrNoRows {
markCheckError(result, err)
return result
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/checker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -132,7 +132,7 @@ func IsTiDBFromVersion(version string) bool {
func markCheckError(result *Result, err error) {
if err != nil {
state := StateFailure
if utils.OriginError(err) == context.Canceled {
if cerrors.OriginError(err) == context.Canceled {
state = StateWarning
}
// `StateWarning` can't cover `StateFailure`.
Expand Down
Loading
Loading