Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support TiDB bidirectional input #253

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.test.gravity
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.11.4
FROM golang:1.13.3

WORKDIR /gravity

Expand Down
12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dev-down:

go-test:
go test -failfast -race ./integration_test
cd pkg/registry/test_data && make build
go test -timeout 10m -coverprofile=cover.out $(TEST_DIRS) && go tool cover -func=cover.out | tail -n 1

test-local:
Expand All @@ -49,8 +50,6 @@ run-dev:

build:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/gravity cmd/gravity/main.go
#$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/padder cmd/padder/main.go


build-linux:
GOARCH=amd64 GOOS=linux $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/gravity-linux-amd64 cmd/gravity/main.go
Expand All @@ -74,14 +73,21 @@ proto:
@ which protoc >/dev/null || brew install protobuf
@ which protoc-gen-gofast >/dev/null || go get github.com/gogo/protobuf/protoc-gen-gofast

protoc -I=protocol/msgpb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=\
protoc -I=protocol/msgpb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=.\
plugins=grpc,\
Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:./pkg/protocol/msgpb \
protocol/msgpb/message.proto

protoc -I=protocol/tidb -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf --gofast_out=.\
Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types:./pkg/protocol/tidb \
protocol/tidb/tidb.proto


mock:
mockgen -destination ./mock/binlog_checker/mock.go github.com/moiot/gravity/pkg/inputs/helper/binlog_checker BinlogChecker
Expand Down
91 changes: 89 additions & 2 deletions docker-compose-gravity-test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.2'
services:
source-db:
image: mysql:5.7.18
image: mysql:5.7
container_name: source-db-test
environment:
- MYSQL_ALLOW_EMPTY_PASSWORD=yes
Expand All @@ -14,7 +14,7 @@ services:
- ./mycnf:/etc/mysql/conf.d

target-db:
image: mysql:5.7.18
image: mysql:5.7
container_name: target-db-test
environment:
- MYSQL_ALLOW_EMPTY_PASSWORD=yes
Expand Down Expand Up @@ -62,13 +62,99 @@ services:
- "ES_JAVA_OPTS=-Xms750m -Xmx750m"
logging:
driver: none

pd:
image: pingcap/pd:latest
ports:
- "2379"
volumes:
- ./integration_test/config/pd.toml:/pd.toml:ro
command:
- --name=pd
- --client-urls=http://0.0.0.0:2379
- --peer-urls=http://0.0.0.0:2380
- --advertise-client-urls=http://pd:2379
- --advertise-peer-urls=http://pd:2380
- --initial-cluster=pd=http://pd:2380
- --data-dir=/data/pd
- --config=/pd.toml
restart: on-failure

tikv:
image: pingcap/tikv:latest
volumes:
- ./integration_test/config/tikv.toml:/tikv.toml:ro
command:
- --addr=0.0.0.0:20160
- --advertise-addr=tikv:20160
- --data-dir=/data/tikv
- --pd=pd:2379
- --config=/tikv.toml
depends_on:
- "pd"
restart: on-failure

pump:
image: pingcap/tidb-binlog:latest
logging:
driver: none
volumes:
- ./integration_test/config/pump.toml:/pump.toml:ro
command:
- /pump
- --addr=0.0.0.0:8250
- --advertise-addr=pump:8250
- --data-dir=/data/pump
- --node-id=pump
- --pd-urls=http://pd:2379
- --config=/pump.toml
depends_on:
- "pd"
restart: on-failure

drainer:
image: pingcap/tidb-binlog:latest
logging:
driver: none
volumes:
- ./integration_test/config/drainer.toml:/drainer.toml:ro
command:
- /drainer
- --addr=drainer:8249
- --data-dir=/data/data.drainer
- --pd-urls=http://pd:2379
- --config=/drainer.toml
- --initial-commit-ts=0
- --dest-db-type=kafka
depends_on:
- "pd"
- "kafka"
restart: on-failure

tidb:
image: pingcap/tidb:latest
ports:
- "4000:4000"
- "10080:10080"
volumes:
- ./integration_test/config/tidb.toml:/tidb.toml:ro
command:
- --store=tikv
- --path=pd:2379
- --config=/tidb.toml
- --enable-binlog=true
depends_on:
- "tikv"
- "pump"
restart: on-failure

gravity-test:
build:
context: ./
dockerfile: Dockerfile.test.gravity
depends_on:
- mongo
- tidb
environment:
- MONGO_HOST=mongo
- KAFKA_BROKER=kafka:9092
Expand All @@ -78,4 +164,5 @@ services:
"--", "./wait-for-it.sh", "mongo:27017", "-t", "0",
"--", "./wait-for-it.sh", "kafka:9092", "-t", "0",
"--", "./wait-for-it.sh", "elasticsearch:9200", "-t", "0",
"--", "./wait-for-it.sh", "tidb:4000", "-t", "0",
"--", "make", "go-test"]
58 changes: 58 additions & 0 deletions docs/2.0/example-tidb2tidb.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name = "tidb2tidbDemo"
version = "1.0"

[input]
type = "tidbkafka"
mode = "stream"

[input.config]
ignore-bidirectional-data = true

[input.config.position-repo]
type = "mysql-repo"
[input.config.position-repo.config.source]
host = "127.0.0.1"
username = "root"
password = ""
port = 4000

[input.config.source-db]
host = "127.0.0.1"
username = "root"
password = ""
port = 4000

[input.config.source-kafka]
topics = ["obinlog"]
consume-from = "oldest"
group-id = "tidb2tidbDemo"
[input.config.source-kafka.brokers]
broker-addrs = ["localhost:9092", "localhost:9093", "localhost:9094"]

[output]
type = "mysql"

[output.config]
enable-ddl = true

[output.config.target]
host = "127.0.0.1"
username = "root"
password = ""
port = 4000
max-open = 30 # optional, max connections
max-idle = 30 # optional, suggest to be the same as max-open

# The definition of the routing rule
[[output.config.routes]]
match-schema = "test"
match-table = "t"
target-table = "t2"

[scheduler]
type = "batch-table-scheduler"
[scheduler.config]
nr-worker = 30
batch-size = 1000
queue-size = 1024
sliding-window-size = 16384
101 changes: 101 additions & 0 deletions integration_test/config/drainer.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# drainer Configuration.

# addr (i.e. 'host:port') to listen on for drainer connections
# will register this addr into etcd
# addr = "127.0.0.1:8249"

# the interval time (in seconds) of detect pumps' status
detect-interval = 10

# drainer meta data directory path
data-dir = "data.drainer"

# a comma separated list of PD endpoints
pd-urls = "http://127.0.0.1:2379"

# Use the specified compressor to compress payload between pump and drainer
compressor = ""

#[security]
# Path of file that contains list of trusted SSL CAs for connection with cluster components.
# ssl-ca = "/path/to/ca.pem"
# Path of file that contains X509 certificate in PEM format for connection with cluster components.
# ssl-cert = "/path/to/pump.pem"
# Path of file that contains X509 key in PEM format for connection with cluster components.
# ssl-key = "/path/to/pump-key.pem"

# syncer Configuration.
[syncer]

# Assume the upstream sql-mode.
# If this is setted , will use the same sql-mode to parse DDL statment, and set the same sql-mode at downstream when db-type is mysql.
# If this is not setted, it will not set any sql-mode.
# sql-mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"

# number of binlog events in a transaction batch
txn-batch = 20

# work count to execute binlogs
# if the latency between drainer and downstream(mysql or tidb) are too high, you might want to increase this
# to get higher throughput by higher concurrent write to the downstream
worker-count = 16

#disable-dispatch = false

# safe mode will split update to delete and insert
safe-mode = false

# downstream storage, equal to --dest-db-type
# valid values are "mysql", "file", "tidb", "flash", "kafka"
db-type = "kafka"

# disable sync these schema
ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"

##replicate-do-db priority over replicate-do-table if have same db name
##and we support regex expression , start with '~' declare use regex expression.
#
#replicate-do-db = ["~^b.*","s1"]

#[[syncer.replicate-do-table]]
#db-name ="test"
#tbl-name = "log"

#[[syncer.replicate-do-table]]
#db-name ="test"
#tbl-name = "~^a.*"

# disable sync these table
#[[syncer.ignore-table]]
#db-name = "test"
#tbl-name = "log"

# the downstream mysql protocol database
#[syncer.to]
#host = "127.0.0.1"
#user = "root"
#password = ""
#port = 3306

[syncer.to.checkpoint]
# you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb
#schema = "tidb_binlog"

# Uncomment this if you want to use file as db-type.
#[syncer.to]
# directory to save binlog file, default same as data-dir(save checkpoint file) if this is not configured.
# dir = "data.drainer"


# when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default
[syncer.to]
# only need config one of zookeeper-addrs and kafka-addrs, will get kafka address if zookeeper-addrs is configed.
# zookeeper-addrs = "127.0.0.1:2181"
kafka-addrs = "kafka:9092"
kafka-version = "5.1.0"
kafka-max-messages = 1024
#
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
# be careful don't use the same name if run multi drainer instances
topic-name = "obinlog"
Loading