Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: optimize local-backend duplicate detection, resolution and record policy #41629

Open
4 tasks done
sleepymole opened this issue Feb 21, 2023 · 0 comments
Open
4 tasks done
Assignees
Labels
component/lightning This issue is related to Lightning of TiDB. type/enhancement

Comments

@sleepymole
Copy link
Contributor

sleepymole commented Feb 21, 2023

Motivation

Currently, the workflow of duplicate detection is as follows:

  1. During sending kv pairs to TiKV, detect conflicts and record them to local pebble db.
  2. After data is imported, read conflicts from local pebble db and write them to table lightning_task_info.conflict_error_v1.
  3. Detect conflicts from TiKV and record them to table lightning_task_info.conflict_error_v1 as well.
  4. Read conflict records from table lightning_task_info.conflict_error_v1, and resolve all conflicts by deleting all related kv pairs.

The workflow of duplicate detection is not perfect, and there are some problems:

  • If there are too many conflicts, lightning will become very slow. This is because the conflict records are written to TiDB through the SQL interface.
  • duplicate-resolution only supports removing all duplicate records, but sometimes users want to keep one of them.
  • The option of local-backend duplicate detection is inconsistent with the option of tidb-backend, which is confusing to users.
  • Checksum and analyze are not performed after any conflict is detected, which reduces the reliability of the import.

For the above problems, we want to improve the duplicate detection function in the following ways:

  • Reuse on-duplicate option of tidb-backend, and deprecated the duplicate-resolution option of local-backend.
  • Implement replace and ignore strategies for local-backend duplicate detection, and the default strategy is replace.
  • Do not store all conflict records in TiDB, support setting max-error-records for local-backend duplicate detection.
  • Support checksum and analyze no matter whether there are conflicts or not.

Detailed design

To address the above problems, we are introducing a new design for duplicate detection. The new duplicate detection is decoupled from the import process. It will be performed before the import process, read all data from the source, and detect duplicate records. After the detection is completed, the import the process will be started and automatically skip the duplicate records during the encoding process.

ExternalSorter

// ExternalSorter is an interface for sorting key-value pairs in external storage.
// The key-value pairs are sorted by the key, duplicate keys are automatically removed.
type ExternalSorter interface {
	// NewWriter creates a new writer for writing key-value pairs before sorting.
	// Multiple writers can be opened and used concurrently.
	NewWriter(ctx context.Context) (Writer, error)
	// Sort sorts the key-value pairs written by the writer.
	// It should be called after all open writers are closed.
	Sort(ctx context.Context) error
	// IsSorted returns true if the key-value pairs are sorted, iterators are ready to create.
	IsSorted() bool
	// NewIterator creates a new iterator for iterating over the key-value pairs after sorting.
	// Multiple iterators can be opened and used concurrently.
	NewIterator(ctx context.Context) (Iterator, error)
	// Close releases all resources held by the sorter. It will not clean up the external storage,
	// so the sorter can recover from a crash.
	Close() error
	// CloseAndCleanup closes the external sorter and cleans up all resources created by the sorter.
	CloseAndCleanup() error
}

Pesudo code for duplicate detection

resultWriter := resultSorter.NewWriter()
defer resultWriter.Close()

workingSorter.Sort(ctx)
it := workingSorter.NewIterator()
defer it.Close()

var lastKey, lastKeyID []byte
for it.Seek(nil); it.Valid(); it.Next() {
        key, keyID, err := decodeInternalKey(it.UnsafeKey())
        if err != nil {
                return 0, err
        }
        if bytes.Equal(key, lastKey) {
              // Duplicate key found.
        }
        lastKey = append(lastKey[:0], key...)
        lastKeyID = append(lastKeyID[:0], keyID...)
}
resultSorter.Sort(ctx)
return numDups, nil

Development tasks

  • Add ExternalSorter interface and implement DiskSorter
  • Implement basic logic of duplicate detection
  • Implement replace and ignore policies for local-backend
  • Add max-error-records option for local-backend
@sleepymole sleepymole added the component/lightning This issue is related to Lightning of TiDB. label Feb 21, 2023
@sleepymole sleepymole changed the title lightning: optimize duplicate detection for non-incremental import lightning: optimize duplicate detection, resolution and record policy Feb 27, 2023
@sleepymole sleepymole changed the title lightning: optimize duplicate detection, resolution and record policy lightning: optimize local-backend duplicate detection, resolution and record policy Feb 27, 2023
@sleepymole sleepymole self-assigned this Feb 27, 2023
@sleepymole sleepymole reopened this Mar 7, 2023
ti-chi-bot bot pushed a commit that referenced this issue Apr 27, 2023
ti-chi-bot bot pushed a commit that referenced this issue May 15, 2023
ti-chi-bot bot pushed a commit that referenced this issue Jun 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/lightning This issue is related to Lightning of TiDB. type/enhancement
Projects
None yet
Development

No branches or pull requests

1 participant