Permalink
Browse files

Replication binary log event checksum support.

  • Loading branch information...
Nirbhay Choubey
Nirbhay Choubey committed Feb 12, 2016
1 parent 0483482 commit f1436b547c3c5b9dfc00f5b39a110de09710c1ae
Showing with 246 additions and 28 deletions.
  1. +73 −27 binlog.go
  2. +133 −0 checksum.go
  3. +2 −0 error.go
  4. +24 −0 prot_binlog.go
  5. +14 −1 url.go
View
100 binlog.go
@@ -86,8 +86,10 @@ const (
type Binlog struct {
reader binlogReader
index binlogIndex
checksum checksumVerifier
desc eventDescription
tableMap *TableMapEvent
p properties
}
type binlogReader interface {
@@ -105,6 +107,7 @@ type eventDescription struct {
creationTime time.Time
commonHeaderLength uint8
postHeaderLength []byte
checksumAlg uint8
}
type binlogIndex struct {
@@ -147,6 +150,10 @@ func (b *Binlog) Connect(dsn string) error {
return myError(ErrScheme, p.scheme)
}
b.checksum = new(checksumOff)
b.p = p
return nil
}
@@ -166,16 +173,32 @@ func (b *Binlog) Next() bool {
return b.reader.next()
}
func (b *Binlog) RawEvent() (re RawEvent) {
var off int
func (b *Binlog) RawEvent() (re RawEvent, err error) {
var (
off int
end int
)
err = b.Error()
if err != nil {
return
}
re.body = b.reader.event()
re.header, off = parseEventHeader(re.body)
end = len(re.body)
if b.checksum.algorithm() != BINLOG_CHECKSUM_ALG_OFF {
// exclude the event checksum
end -= _BINLOG_CHECKSUM_LENGTH
}
switch re.header.type_ {
case START_EVENT_V3:
ev := new(StartEventV3)
b.parseStartEventV3(re.body[off:], ev)
b.parseStartEventV3(re.body[off:end], ev)
// now that we have parsed START_EVENT_V3, we can
// update binlog description
@@ -184,7 +207,8 @@ func (b *Binlog) RawEvent() (re RawEvent) {
b.desc.creationTime = ev.creationTime
case FORMAT_DESCRIPTION_EVENT:
ev := new(FormatDescriptionEvent)
b.parseFormatDescriptionEvent(re.body[off:], ev)
b.parseFormatDescriptionEvent(re.body[off:end], ev)
// now that we have parsed FORMAT_DESCRIPTION_EVENT, we can
// update binlog description
@@ -195,10 +219,20 @@ func (b *Binlog) RawEvent() (re RawEvent) {
// number of events
b.desc.postHeaderLength = make([]byte, len(ev.postHeaderLength))
copy(b.desc.postHeaderLength, ev.postHeaderLength)
// checksum algorithm
b.desc.checksumAlg = ev.checksumAlg
// update event checksum verifier
updateChecksumVerifier(b)
default: // do nothing
}
re.binlog = b
// verify event checksum
if b.p.binlogVerifyChecksum && !b.checksum.test(re.body) {
err = myError(ErrEventChecksumFailure)
return
}
return
}
@@ -269,6 +303,13 @@ func (re *RawEvent) Event() Event {
// move past event header, as it has already been parsed
off := 19
end := len(re.body)
if binlog.checksum.algorithm() != BINLOG_CHECKSUM_ALG_OFF {
// exclude the event checksum
end -= _BINLOG_CHECKSUM_LENGTH
}
switch re.header.type_ {
case START_EVENT_V3:
ev := new(StartEventV3)
@@ -287,7 +328,7 @@ func (re *RawEvent) Event() Event {
case QUERY_EVENT:
ev := new(QueryEvent)
ev.header = header
binlog.parseQueryEvent(buf[off:], ev)
binlog.parseQueryEvent(buf[off:end], ev)
return ev
case STOP_EVENT:
@@ -299,63 +340,63 @@ func (re *RawEvent) Event() Event {
case ROTATE_EVENT:
ev := new(RotateEvent)
ev.header = header
binlog.parseRotateEvent(buf[off:], ev)
binlog.parseRotateEvent(buf[off:end], ev)
return ev
case INTVAR_EVENT:
ev := new(IntvarEvent)
ev.header = header
binlog.parseIntvarEvent(buf[off:], ev)
binlog.parseIntvarEvent(buf[off:end], ev)
return ev
case LOAD_EVENT:
fallthrough
case NEW_LOAD_EVENT:
ev := new(LoadEvent)
ev.header = header
binlog.parseLoadEvent(buf[off:], ev)
binlog.parseLoadEvent(buf[off:end], ev)
return ev
case SLAVE_EVENT:
ev := new(SlaveEvent)
ev.header = header
binlog.parseSlaveEvent(buf[off:], ev)
binlog.parseSlaveEvent(buf[off:end], ev)
return ev
case CREATE_FILE_EVENT:
ev := new(CreateFileEvent)
ev.header = header
binlog.parseCreateFileEvent(buf[off:], ev)
binlog.parseCreateFileEvent(buf[off:end], ev)
return ev
case APPEND_BLOCK_EVENT:
ev := new(AppendBlockEvent)
ev.header = header
binlog.parseAppendBlockEvent(buf[off:], ev)
binlog.parseAppendBlockEvent(buf[off:end], ev)
return ev
case EXEC_LOAD_EVENT:
ev := new(ExecLoadEvent)
ev.header = header
binlog.parseExecLoadEvent(buf[off:], ev)
binlog.parseExecLoadEvent(buf[off:end], ev)
return ev
case DELETE_FILE_EVENT:
ev := new(DeleteFileEvent)
ev.header = header
binlog.parseDeleteFileEvent(buf[off:], ev)
binlog.parseDeleteFileEvent(buf[off:end], ev)
return ev
case RAND_EVENT:
ev := new(RandEvent)
ev.header = header
binlog.parseRandEvent(buf[off:], ev)
binlog.parseRandEvent(buf[off:end], ev)
return ev
case USER_VAR_EVENT:
ev := new(UserVarEvent)
ev.header = header
binlog.parseUserVarEvent(buf[off:], ev)
binlog.parseUserVarEvent(buf[off:end], ev)
return ev
case FORMAT_DESCRIPTION_EVENT:
@@ -379,25 +420,25 @@ func (re *RawEvent) Event() Event {
case XID_EVENT:
ev := new(XidEvent)
ev.header = header
binlog.parseXidEvent(buf[off:], ev)
binlog.parseXidEvent(buf[off:end], ev)
return ev
case BEGIN_LOAD_QUERY_EVENT:
ev := new(BeginLoadQueryEvent)
ev.header = header
binlog.parseBeginLoadQueryEvent(buf[off:], ev)
binlog.parseBeginLoadQueryEvent(buf[off:end], ev)
return ev
case EXECUTE_LOAD_QUERY_EVENT:
ev := new(ExecuteLoadQueryEvent)
ev.header = header
binlog.parseExecuteLoadQueryEvent(buf[off:], ev)
binlog.parseExecuteLoadQueryEvent(buf[off:end], ev)
return ev
case TABLE_MAP_EVENT:
ev := new(TableMapEvent)
ev.header = header
binlog.parseTableMapEvent(buf[off:], ev)
binlog.parseTableMapEvent(buf[off:end], ev)
binlog.tableMap = ev
return ev
@@ -408,13 +449,13 @@ func (re *RawEvent) Event() Event {
DELETE_ROWS_EVENT:
ev := new(RowsEvent)
ev.header = header
binlog.parseRowsEvent(buf[off:], ev)
binlog.parseRowsEvent(buf[off:end], ev)
return ev
case INCIDENT_EVENT:
ev := new(IncidentEvent)
ev.header = header
binlog.parseIncidentEvent(buf[off:], ev)
binlog.parseIncidentEvent(buf[off:end], ev)
return ev
case HEARTBEAT_LOG_EVENT:
@@ -430,31 +471,31 @@ func (re *RawEvent) Event() Event {
case ROWS_QUERY_LOG_EVENT:
ev := new(RowsQueryLogEvent)
ev.header = header
binlog.parseRowsQueryLogEvent(buf[off:], ev)
binlog.parseRowsQueryLogEvent(buf[off:end], ev)
return ev
case ANNOTATE_ROWS_EVENT:
ev := new(AnnotateRowsEvent)
ev.header = header
binlog.parseAnnotateRowsEvent(buf[off:], ev)
binlog.parseAnnotateRowsEvent(buf[off:end], ev)
return ev
case BINLOG_CHECKPOINT_EVENT:
ev := new(BinlogCheckpointEvent)
ev.header = header
binlog.parseBinlogCheckpointEvent(buf[off:], ev)
binlog.parseBinlogCheckpointEvent(buf[off:end], ev)
return ev
case GTID_EVENT:
ev := new(GtidEvent)
ev.header = header
binlog.parseGtidEvent(buf[off:], ev)
binlog.parseGtidEvent(buf[off:end], ev)
return ev
case GTID_LIST_EVENT:
ev := new(GtidListEvent)
ev.header = header
binlog.parseGtidListEvent(buf[off:], ev)
binlog.parseGtidListEvent(buf[off:end], ev)
return ev
default: // unimplemented events
@@ -650,6 +691,7 @@ type FormatDescriptionEvent struct {
creationTime time.Time
commonHeaderLength uint8
postHeaderLength []byte
checksumAlg uint8
}
func (e *FormatDescriptionEvent) Time() time.Time {
@@ -684,6 +726,10 @@ func (e *FormatDescriptionEvent) CreationTime() time.Time {
return e.creationTime
}
func (e *FormatDescriptionEvent) ChecksumAlgorithm() uint8 {
return e.checksumAlg
}
// STOP_EVENT
type StopEvent struct {
header eventHeader
Oops, something went wrong.

0 comments on commit f1436b5

Please sign in to comment.