Skip to content

Commit

Permalink
Write directly to S3 (pingcap#155)
Browse files Browse the repository at this point in the history
* Write directly to S3

* Use aws CLI from docker instead

* Re-implement on top of github.com/pingcap/br/pkg/storage

* Bump br dependency

* Move initialization of ExternalStorage to adjustConfig

* Use failpoint

* No need to munge the path

* Remove unused

Co-authored-by: kennytm <kennytm@gmail.com>

* Storage flags

* Remove TODO

Co-authored-by: kennytm <kennytm@gmail.com>

Co-authored-by: Chunzhu Li <lichunzhu@stu.xjtu.edu.cn>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people committed Sep 28, 2020
1 parent 1aa8d94 commit c27b688
Show file tree
Hide file tree
Showing 17 changed files with 751 additions and 139 deletions.
16 changes: 13 additions & 3 deletions dumpling/cmd/dumpling/main.go
Expand Up @@ -24,12 +24,14 @@ import (
"time"

"github.com/docker/go-units"
"github.com/pingcap/dumpling/v4/cli"
"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/dumpling/v4/log"
"github.com/pingcap/br/pkg/storage"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/spf13/pflag"
"go.uber.org/zap"

"github.com/pingcap/dumpling/v4/cli"
"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/dumpling/v4/log"
)

var (
Expand Down Expand Up @@ -124,6 +126,8 @@ func main() {
pflag.StringVar(&outputFilenameFormat, "output-filename-template", "", "The output filename template (without file extension)")
pflag.BoolVar(&completeInsert, "complete-insert", false, "Use complete INSERT statements that include column names")

storage.DefineFlags(pflag.CommandLine)

printVersion := pflag.BoolP("version", "V", false, "Print Dumpling version")

pflag.Parse()
Expand Down Expand Up @@ -207,6 +211,12 @@ func main() {
conf.OutputFileTemplate = tmpl
conf.CompleteInsert = completeInsert

err = conf.ParseFromFlags(pflag.CommandLine)
if err != nil {
fmt.Println(err.Error())
os.Exit(2)
}

err = export.Dump(context.Background(), conf)
if err != nil {
log.Error("dump failed error stack info", zap.Error(err))
Expand Down
17 changes: 7 additions & 10 deletions dumpling/go.mod
Expand Up @@ -6,24 +6,22 @@ require (
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/coreos/go-semver v0.3.0
github.com/docker/go-units v0.4.0
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/gorilla/websocket v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/pingcap/br v0.0.0-20200925095602-bf9cc603382e
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/errors v0.11.5-0.20200902104258-eba4f1d8f6de
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/tidb-tools v4.0.5-0.20200817064459-ba61a7376547+incompatible
github.com/pingcap/kvproto v0.0.0-20200910095337-6b893f12be43
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.3
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200825070655-6b09f3acbb1f
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.15.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200824131525-c12d262b63d8 // indirect
Expand All @@ -32,5 +30,4 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
)
502 changes: 502 additions & 0 deletions dumpling/go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dumpling/tests/_utils/run_dumpling
Expand Up @@ -3,6 +3,7 @@
set -e

echo "[$(date)] Executing bin/dumpling..."
echo "$DUMPLING_OUTPUT_DIR"

bin/dumpling -u "$DUMPLING_TEST_USER" -h 127.0.0.1 \
-P "$DUMPLING_TEST_PORT" -B "$DUMPLING_TEST_DATABASE" \
Expand Down
52 changes: 52 additions & 0 deletions dumpling/tests/s3/run.sh
@@ -0,0 +1,52 @@
#!/bin/sh

set -eux

echo "starting localstack writing to ${DUMPLING_OUTPUT_DIR}"
mkdir -p "${DUMPLING_OUTPUT_DIR}"
ls "${DUMPLING_OUTPUT_DIR}"
docker run --name dumpling_test_s3 -d \
-p 5000:5000 \
motoserver/moto
sleep 1 # wait for motoserver to start up
cleanup() {
echo "Stopping motoserver"
docker rm -f dumpling_test_s3
}
trap cleanup EXIT

awslocal() {
docker run --rm --net=host -it -e PAGER=cat -e AWS_ACCESS_KEY_ID=foo -e AWS_SECRET_ACCESS_KEY=foo amazon/aws-cli --endpoint http://localhost:5000 "$@"
}
awslocal s3api create-bucket --bucket mybucket

DB_NAME="s3"
TABLE_NAME="t"

# drop database on mysql
run_sql "drop database if exists \`$DB_NAME\`;"

# build data on mysql
run_sql "create database $DB_NAME;"
run_sql "create table $DB_NAME.$TABLE_NAME (a int(255));"

# insert 100 records
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

# run dumpling!
HOST_DIR=${DUMPLING_OUTPUT_DIR}
export DUMPLING_OUTPUT_DIR=s3://mybucket/dump
export DUMPLING_TEST_DATABASE=$DB_NAME
export AWS_REGION=us-east-1
export AWS_ACCESS_KEY_ID=testid
export AWS_SECRET_ACCESS_KEY=testkey
run_dumpling --s3.endpoint=http://localhost:5000
ls "${HOST_DIR}"

curl -o "${HOST_DIR}/s3-schema-create.sql" http://localhost:5000/mybucket/dump/s3-schema-create.sql
curl -o "${HOST_DIR}/s3.t-schema.sql" http://localhost:5000/mybucket/dump/s3.t-schema.sql
curl -o "${HOST_DIR}/s3.t.0.sql" http://localhost:5000/mybucket/dump/s3.t.0.sql

file_should_exist "$HOST_DIR/s3-schema-create.sql"
file_should_exist "$HOST_DIR/s3.t-schema.sql"
file_should_exist "$HOST_DIR/s3.t.0.sql"
17 changes: 16 additions & 1 deletion dumpling/v4/export/config.go
@@ -1,6 +1,7 @@
package export

import (
"context"
"encoding/json"
"fmt"
"regexp"
Expand All @@ -9,12 +10,16 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/dumpling/v4/log"
"github.com/pingcap/br/pkg/storage"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"go.uber.org/zap"

"github.com/pingcap/dumpling/v4/log"
)

type Config struct {
storage.BackendOptions

Databases []string
Host string
User string
Expand Down Expand Up @@ -62,6 +67,8 @@ type Config struct {
SessionParams map[string]interface{}

PosAfterConnect bool

ExternalStorage storage.ExternalStorage `json:"-"`
}

func DefaultConfig() *Config {
Expand Down Expand Up @@ -117,6 +124,14 @@ func (conf *Config) GetDSN(db string) string {
return dsn
}

func (config *Config) createExternalStorage(ctx context.Context) (storage.ExternalStorage, error) {
b, err := storage.ParseBackend(config.OutputDirPath, &config.BackendOptions)
if err != nil {
return nil, err
}
return storage.Create(ctx, b, false)
}

const (
UnspecifiedSize = 0
DefaultTiDBMemQuotaQuery = 32 * (1 << 30)
Expand Down
18 changes: 18 additions & 0 deletions dumpling/v4/export/config_test.go
@@ -0,0 +1,18 @@
package export

import (
"context"

. "github.com/pingcap/check"
)

var _ = Suite(&testConfigSuite{})

type testConfigSuite struct{}

func (s *testConfigSuite) TestCreateExternalStorage(c *C) {
mockConfig := DefaultConfig()
loc, err := mockConfig.createExternalStorage(context.Background())
c.Assert(err, IsNil)
c.Assert(loc.URI(), Matches, "file:.*")
}
6 changes: 3 additions & 3 deletions dumpling/v4/export/dump.go
Expand Up @@ -17,7 +17,7 @@ import (
)

func Dump(pCtx context.Context, conf *Config) (err error) {
if err = adjustConfig(conf); err != nil {
if err = adjustConfig(pCtx, conf); err != nil {
return withStack(err)
}

Expand Down Expand Up @@ -114,9 +114,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
defer newPool.Close()
}

m := newGlobalMetadata(conf.OutputDirPath)
m := newGlobalMetadata(conf.ExternalStorage)
// write metadata even if dump failed
defer m.writeGlobalMetaData()
defer m.writeGlobalMetaData(ctx)

// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
Expand Down
19 changes: 10 additions & 9 deletions dumpling/v4/export/metadata.go
Expand Up @@ -6,15 +6,16 @@ import (
"database/sql"
"errors"
"fmt"
"path"
"strings"
"time"

"github.com/pingcap/br/pkg/storage"
)

type globalMetadata struct {
buffer bytes.Buffer

filePath string
storage storage.ExternalStorage
}

const (
Expand All @@ -28,10 +29,10 @@ const (
mariadbShowMasterStatusFieldNum = 4
)

func newGlobalMetadata(outputDir string) *globalMetadata {
func newGlobalMetadata(s storage.ExternalStorage) *globalMetadata {
return &globalMetadata{
filePath: path.Join(outputDir, metadataPath),
buffer: bytes.Buffer{},
storage: s,
buffer: bytes.Buffer{},
}
}

Expand Down Expand Up @@ -189,12 +190,12 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp
})
}

func (m *globalMetadata) writeGlobalMetaData() error {
fileWriter, tearDown, err := buildFileWriter(m.filePath)
func (m *globalMetadata) writeGlobalMetaData(ctx context.Context) error {
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath)
if err != nil {
return err
}
defer tearDown()
defer tearDown(ctx)

return write(fileWriter, m.String())
return write(ctx, fileWriter, m.String())
}
33 changes: 14 additions & 19 deletions dumpling/v4/export/metadata_test.go
Expand Up @@ -3,9 +3,9 @@ package export
import (
"context"
"fmt"
"path"

"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/br/pkg/storage"
. "github.com/pingcap/check"
)

Expand All @@ -30,10 +30,8 @@ func (s *testMetaDataSuite) TestMysqlMetaData(c *C) {
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(
sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"}))

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand All @@ -42,6 +40,13 @@ func (s *testMetaDataSuite) TestMysqlMetaData(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *testMetaDataSuite) createStorage(c *C) storage.ExternalStorage {
backend, err := storage.ParseBackend("file:///"+c.MkDir(), nil)
c.Assert(err, IsNil)
testLoc, _ := storage.Create(context.Background(), backend, true)
return testLoc
}

func (s *testMetaDataSuite) TestMetaDataAfterConn(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -63,11 +68,9 @@ func (s *testMetaDataSuite) TestMetaDataAfterConn(c *C) {
sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"}))
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows2)

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, true), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand Down Expand Up @@ -98,10 +101,8 @@ func (s *testMetaDataSuite) TestMysqlWithFollowersMetaData(c *C) {
mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error"))
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(followerRows)

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand Down Expand Up @@ -131,10 +132,8 @@ func (s *testMetaDataSuite) TestMysqlWithNullFollowersMetaData(c *C) {
mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error"))
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(sqlmock.NewRows([]string{"SQL_Remaining_Delay"}).AddRow(nil))

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand All @@ -160,10 +159,8 @@ func (s *testMetaDataSuite) TestMariaDBMetaData(c *C) {
AddRow(gtidSet)
mock.ExpectQuery("SELECT @@global.gtid_binlog_pos").WillReturnRows(rows)
mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(rows)
testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMariaDB, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(mock.ExpectationsWereMet(), IsNil)
}
Expand All @@ -189,10 +186,8 @@ func (s *testMetaDataSuite) TestMariaDBWithFollowersMetaData(c *C) {
AddRow("connection_1"))
mock.ExpectQuery("SHOW ALL SLAVES STATUS").WillReturnRows(followerRows)

testFilePath := "/test"
m := newGlobalMetadata(testFilePath)
m := newGlobalMetadata(s.createStorage(c))
c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil)
c.Assert(m.filePath, Equals, path.Join(testFilePath, metadataPath))

c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+
"\tLog: ON.000001\n"+
Expand Down

0 comments on commit c27b688

Please sign in to comment.