Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
*: addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kennytm committed Jan 2, 2019
1 parent c7f0a37 commit 294f228
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 14 deletions.
8 changes: 6 additions & 2 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s

for _, table := range targetTables {
for engineID := 0; engineID < table.EnginesCount; engineID++ {
if closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID); err == nil {
fmt.Fprintln(os.Stderr, "Cleaning up engine:", table.TableName, engineID)
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
} else {
closedEngine.Cleanup(ctx)
}
}
Expand Down
20 changes: 12 additions & 8 deletions lightning/kv/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@ Usual workflow:
2. For each table,
a. Create an `OpenedEngine` via `importer.OpenEngine()`
i. Split into multiple "batches" consisting of data files with roughly equal total size.
b. For each chunk,
ii. For each batch,
i. Create a `WriteStream` via `engine.NewWriteStream()`
ii. Deliver data into the stream via `stream.Put()`
iii. Close the stream via `stream.Close()`
a. Create an `OpenedEngine` via `importer.OpenEngine()`
c. When all chunks are written, obtain a `ClosedEngine` via `engine.CloseEngine()`
b. For each chunk,
d. Import data via `engine.Import()`
i. Create a `WriteStream` via `engine.NewWriteStream()`
ii. Deliver data into the stream via `stream.Put()`
iii. Close the stream via `stream.Close()`
e. Cleanup via `engine.Cleanup()`
c. When all chunks are written, obtain a `ClosedEngine` via `engine.CloseEngine()`
d. Import data via `engine.Import()`
e. Cleanup via `engine.Cleanup()`
3. Close the connection via `importer.Close()`
Expand Down
7 changes: 3 additions & 4 deletions lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int, batchSize int64) ([]*Table
rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2)
filesRegions = append(filesRegions, &TableRegion{
EngineID: curEngineID,

DB: meta.DB,
Table: meta.Name,
File: dataFile,
DB: meta.DB,
Table: meta.Name,
File: dataFile,
Chunk: Chunk{
Offset: 0,
EndOffset: dataFileSize,
Expand Down
1 change: 1 addition & 0 deletions tests/checkpoint_engines/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ level = "info"

[checkpoint]
enable = true
driver = "file"

[tikv-importer]
addr = "127.0.0.1:8808"
Expand Down
29 changes: 29 additions & 0 deletions tests/checkpoint_engines/mysql.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[lightning]
table-concurrency = 1
check-requirements = false
file = "/tmp/lightning_test_result/lightning-checkpoint-engines.log"
level = "info"

[checkpoint]
enable = true
driver = "mysql"

[tikv-importer]
addr = "127.0.0.1:8808"

[mydumper]
data-source-dir = "tests/checkpoint_engines/data"
batch-size = 1 # force creating a new batch for every file.

[tidb]
host = "127.0.0.1"
port = 4000
user = "root"
status-port = 10080
pd-addr = "127.0.0.1:2379"
log-level = "error"

[post-restore]
checksum = true
compact = false
analyze = false
23 changes: 23 additions & 0 deletions tests/checkpoint_engines/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,26 @@ check_contains 'sum(c): 10'
run_sql 'SELECT count(*), sum(c) FROM cpeng.b'
check_contains 'count(*): 4'
check_contains 'sum(c): 46'

# Now, try again with MySQL checkpoints

run_sql 'DROP DATABASE cpeng;'

set +e
for i in $(seq 4); do
echo "******** Importing Table Now (step $i/4) ********"
run_lightning mysql 2> /dev/null
[ $? -ne 0 ] || exit 1
done
set -e

echo "******** Verify checkpoint no-op ********"
run_lightning mysql

run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
check_contains 'count(*): 4'
check_contains 'sum(c): 10'

run_sql 'SELECT count(*), sum(c) FROM cpeng.b'
check_contains 'count(*): 4'
check_contains 'sum(c): 46'

0 comments on commit 294f228

Please sign in to comment.