-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support write TiCDC log to S3. #826
Conversation
This reverts commit 16baa26.
cdc/processor.go
Outdated
@@ -1024,6 +1024,11 @@ func runProcessor( | |||
cancel() | |||
return nil, errors.Trace(err) | |||
} | |||
err = sink.Initialize(ctx, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems initialize
does nothing in processor, is it better to remove it?
cdc/sink/cdclog/file.go
Outdated
} | ||
data, err := row.ToProtoBuf().Marshal() | ||
if err != nil { | ||
errCh <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to return when an error happens?
cdc/sink/cdclog/file.go
Outdated
if err != nil { | ||
errCh <- err | ||
} | ||
rowDatas = append(rowDatas, append(data, '\n')...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is \n
used as row delimiter, what happens when data contains \n
@@ -442,6 +442,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr | |||
Table: &model.TableName{ | |||
Schema: schemaName, | |||
Table: tableName, | |||
TableID: row.PhysicalTableID, | |||
Partition: partitionID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove the Partition
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about remove the Partition in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
/run-integration-tests |
/lgtm |
cdc/sink/cdclog/file.go
Outdated
if err != nil { | ||
return err | ||
} | ||
file, err := os.OpenFile(filepath.Join(tableDir, fileName), os.O_CREATE|os.O_WRONLY|os.O_APPEND, defaultFileMode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new file has the same file name with the rotated file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, when batch events flushed, it will create new file name here, but not every new file name will be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eh... maybe we should use the first event' commit ts as the filename
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need test for file sink, we can do it in another PR
/run-integration-tests |
/run-integration-tests |
/run-integration-tests |
/merge |
/run-all-tests |
@3pointer merge failed. |
/run-all-tests |
What problem does this PR solve?
part of #768, support incremental log and write log data to s3 directly.
What is changed and how it works?
Define cdclog with three kind of events.
cdclog
ddlog
log.meta
Since s3 doesn't allow append-like operation. there are three different treatments for different events
Check List
Tests
Side effects
Related changes
tidb-cdc/cdc-ansible
Release note