From 3598de77bf327ac4a662a77c83d71cb1ce10bd39 Mon Sep 17 00:00:00 2001 From: "jackey.zhang" Date: Tue, 13 Jul 2021 11:39:58 +0800 Subject: [PATCH 01/17] Adding support Mariadb events, such as GTID and annotate rows event(sql) --- .../shyiko/mysql/binlog/BinaryLogClient.java | 101 +++++++---- .../shyiko/mysql/binlog/MariadbGtidSet.java | 158 ++++++++++++++++++ .../binlog/event/AnnotateRowsEventData.java | 31 ++++ .../shyiko/mysql/binlog/event/EventType.java | 119 +++++++------ .../binlog/event/MariadbGtidEventData.java | 43 +++++ .../event/MariadbGtidListEventData.java | 29 ++++ .../AnnotateRowsEventDataDeserializer.java | 24 +++ .../deserialization/EventDeserializer.java | 6 + .../EventHeaderV4Deserializer.java | 8 +- .../MariadbGtidEventDataDeserializer.java | 33 ++++ .../MariadbGtidListEventDataDeserializer.java | 39 +++++ .../command/DumpBinaryLogCommand.java | 13 +- ...BinaryLogClientMariadbIntegrationTest.java | 87 ++++++++++ 13 files changed, 605 insertions(+), 86 deletions(-) create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index d8b8299..5ba4cb2 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -15,21 +15,9 @@ */ package com.github.shyiko.mysql.binlog; -import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventHeader; -import com.github.shyiko.mysql.binlog.event.EventHeaderV4; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.GtidEventData; -import com.github.shyiko.mysql.binlog.event.QueryEventData; -import com.github.shyiko.mysql.binlog.event.RotateEventData; -import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.event.*; +import com.github.shyiko.mysql.binlog.event.deserialization.*; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper; -import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; import com.github.shyiko.mysql.binlog.network.AuthenticationException; @@ -46,15 +34,7 @@ import com.github.shyiko.mysql.binlog.network.protocol.Packet; import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2Command; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSecurityPasswordCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.Command; -import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.*; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -141,6 +121,8 @@ public X509Certificate[] getAcceptedIssuers() { private boolean useBinlogFilenamePositionInGtidMode; private String gtid; private boolean tx; + private boolean isMariadb = false; + private boolean mariadbSendAnnotateRowsEvent = false; private EventDeserializer eventDeserializer = new EventDeserializer(); @@ -335,7 +317,12 @@ public void setGtidSet(String gtidSet) { this.binlogFilename = ""; } synchronized (gtidSetAccessLock) { - this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null; + // mariadb GtidSet format will be domainId-serverId-sequence + if (gtidSet != null && !gtidSet.contains(":")) { + this.gtidSet = new MariadbGtidSet(gtidSet); + } else { + this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null; + } } } @@ -501,6 +488,19 @@ public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } + public boolean isMariadbSendAnnotateRowsEvent() { + return mariadbSendAnnotateRowsEvent; + } + + /** + * Only in Mariadb, if set true, the Slave server connects with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2) + * in the COM_BINLOG_DUMP Slave Registration phase + * @param mariadbSendAnnotateRowsEvent + */ + public void setMariadbSendAnnotateRowsEvent(boolean mariadbSendAnnotateRowsEvent) { + this.mariadbSendAnnotateRowsEvent = mariadbSendAnnotateRowsEvent; + } + /** * Connect to the replication stream. Note that this method blocks until disconnected. * @throws AuthenticationException if authentication fails @@ -538,10 +538,15 @@ public void connect() throws IOException, IllegalStateException { channel.authenticationComplete(); connectionId = greetingPacket.getThreadId(); + isMariadb = greetingPacket.getServerVersion().toLowerCase().contains("mariadb"); if ("".equals(binlogFilename)) { synchronized (gtidSetAccessLock) { if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { - gtidSet = new GtidSet(fetchGtidPurged()); + if (isMariadb) { + gtidSet = new MariadbGtidSet(fetchGtidPurged()); + } else { + gtidSet = new GtidSet(fetchGtidPurged()); + } } } } @@ -594,6 +599,11 @@ public void connect() throws IOException, IllegalStateException { if (gtidSet != null) { ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); + if (isMariadb) { + ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); + } } } listenForEventPackets(); @@ -730,10 +740,22 @@ private void requestBinaryLogStream() throws IOException { Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { if (gtidSet != null) { - dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, - useBinlogFilenamePositionInGtidMode ? binlogFilename : "", - useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, - gtidSet); + if (isMariadb) { + channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); + checkError(channel.read()); + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isMariadbSendAnnotateRowsEvent()); + } else { + dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, + useBinlogFilenamePositionInGtidMode ? binlogFilename : "", + useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, + gtidSet); + } } else { dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition); } @@ -1034,13 +1056,30 @@ private void updateGtidSet(Event event) { GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData()); gtid = gtidEventData.getGtid(); break; + case MARIADB_GTID: + MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDataWrapper.internal(event.getData()); + mariadbGtidEventData.setServerId(eventHeader.getServerId()); + gtid = mariadbGtidEventData.toString(); + break; + case MARIADB_GTID_LIST: + MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDataWrapper.internal(event.getData()); + gtid = mariadbGtidListEventData.getMariaGTIDSet().toString(); + break; case XID: commitGtid(); tx = false; break; case QUERY: - QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData()); - String sql = queryEventData.getSql(); + case ANNOTATE_ROWS: + String sql; + if (eventHeader.getEventType() == EventType.QUERY) { + QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData()); + sql = queryEventData.getSql(); + } else { + AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDataWrapper.internal(event.getData()); + sql = annotateRowsEventData.getRowsQuery(); + } + if (sql == null) { break; } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java new file mode 100644 index 0000000..c86ee54 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java @@ -0,0 +1,158 @@ +package com.github.shyiko.mysql.binlog; + +import java.util.*; + +/** + * Mariadb Global Transaction ID + * + * @author Winger + * @see GTID for the original doc + */ +public class MariadbGtidSet extends GtidSet { + + private Map map = new HashMap<>(); + + public MariadbGtidSet() { + super(null); // + } + + public MariadbGtidSet(String gtidSet) { + super(null); + if (gtidSet != null && gtidSet.length() > 0) { + String[] gtids = gtidSet.replaceAll("\n", "").split(","); + for (String gtid : gtids) { + MariaGtid mariaGtid = MariaGtid.parse(gtid); + map.put(mariaGtid.getDomainId(), mariaGtid); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (MariaGtid gtid : map.values()) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(gtid.toString()); + } + return sb.toString(); + } + + @Override + public Collection getUUIDSets() { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public UUIDSet getUUIDSet(String uuid) { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public UUIDSet putUUIDSet(UUIDSet uuidSet) { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public boolean add(String gtid) { + MariaGtid mariaGtid = MariaGtid.parse(gtid); + map.put(mariaGtid.getDomainId(), mariaGtid); + return true; + } + + public void add(MariaGtid gtid) { + map.put(gtid.getDomainId(), gtid); + } + + @Override + public boolean isContainedWithin(GtidSet other) { + throw new UnsupportedOperationException("Mariadb gtid not support this method"); + } + + @Override + public int hashCode() { + return map.keySet().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof MariadbGtidSet) { + MariadbGtidSet that = (MariadbGtidSet) obj; + return this.map.equals(that.map); + } + return false; + } + + public static class MariaGtid { + + // {domainId}-{serverId}-{sequence} + private long domainId; + private long serverId; + private long sequence; + + public MariaGtid(long domainId, long serverId, long sequence) { + this.domainId = domainId; + this.serverId = serverId; + this.sequence = sequence; + } + + public MariaGtid(String gtid) { + String[] gtidArr = gtid.split("-"); + this.domainId = Long.parseLong(gtidArr[0]); + this.serverId = Long.parseLong(gtidArr[1]); + this.sequence = Long.parseLong(gtidArr[2]); + } + + public static MariaGtid parse(String gtid) { + return new MariaGtid(gtid); + } + + public long getDomainId() { + return domainId; + } + + public void setDomainId(long domainId) { + this.domainId = domainId; + } + + public long getServerId() { + return serverId; + } + + public void setServerId(long serverId) { + this.serverId = serverId; + } + + public long getSequence() { + return sequence; + } + + public void setSequence(long sequence) { + this.sequence = sequence; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MariaGtid mariaGtid = (MariaGtid) o; + return domainId == mariaGtid.domainId && + serverId == mariaGtid.serverId && + sequence == mariaGtid.sequence; + } + + @Override + public String toString() { + return String.format("%s-%s-%s", domainId, serverId, sequence); + } + } +} + diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java new file mode 100644 index 0000000..85fa79e --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/AnnotateRowsEventData.java @@ -0,0 +1,31 @@ +package com.github.shyiko.mysql.binlog.event; + +/** + * Mariadb ANNOTATE_ROWS_EVENT events accompany row events and describe the query which caused the row event + * Enable this with --binlog-annotate-row-events (default on from MariaDB 10.2.4). + * In the binary log, each Annotate_rows event precedes the corresponding Table map event. + * Note the master server sends ANNOTATE_ROWS_EVENT events only if the Slave server connects + * with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2) in the COM_BINLOG_DUMP Slave Registration phase. + * + * @author Winger + * @see ANNOTATE_ROWS_EVENT for the original doc + */ +public class AnnotateRowsEventData implements EventData { + + private String rowsQuery; + + public String getRowsQuery() { + return rowsQuery; + } + + public void setRowsQuery(String rowsQuery) { + this.rowsQuery = rowsQuery; + } + + @Override + public String toString() { + return "AnnotateRowsEventData{" + + "rowsQuery='" + rowsQuery + '\'' + + '}'; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java index 7f94ccf..fb1fcaa 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java @@ -16,99 +16,99 @@ package com.github.shyiko.mysql.binlog.event; /** + * @author Stanley Shyiko * @see Event Meanings for the original * documentation. - * @author Stanley Shyiko */ public enum EventType { /** * Events of this event type should never occur. Not written to a binary log. */ - UNKNOWN, + UNKNOWN(0), /** * A descriptor event that is written to the beginning of the each binary log file. (In MySQL 4.0 and 4.1, * this event is written only to the first binary log file that the server creates after startup.) This event is * used in MySQL 3.23 through 4.1 and superseded in MySQL 5.0 by {@link #FORMAT_DESCRIPTION}. */ - START_V3, + START_V3(1), /** * Written when an updating statement is done. */ - QUERY, + QUERY(2), /** * Written when mysqld stops. */ - STOP, + STOP(3), /** * Written when mysqld switches to a new binary log file. This occurs when someone issues a FLUSH LOGS statement or * the current binary log file becomes larger than max_binlog_size. */ - ROTATE, + ROTATE(4), /** * Written every time a statement uses an AUTO_INCREMENT column or the LAST_INSERT_ID() function; precedes other * events for the statement. This is written only before a {@link #QUERY} and is not used in case of RBR. */ - INTVAR, + INTVAR(5), /** * Used for LOAD DATA INFILE statements in MySQL 3.23. */ - LOAD, + LOAD(6), /** * Not used. */ - SLAVE, + SLAVE(7), /** * Used for LOAD DATA INFILE statements in MySQL 4.0 and 4.1. */ - CREATE_FILE, + CREATE_FILE(8), /** * Used for LOAD DATA INFILE statements as of MySQL 4.0. */ - APPEND_BLOCK, + APPEND_BLOCK(9), /** * Used for LOAD DATA INFILE statements in 4.0 and 4.1. */ - EXEC_LOAD, + EXEC_LOAD(10), /** * Used for LOAD DATA INFILE statements as of MySQL 4.0. */ - DELETE_FILE, + DELETE_FILE(11), /** * Used for LOAD DATA INFILE statements in MySQL 4.0 and 4.1. */ - NEW_LOAD, + NEW_LOAD(12), /** * Written every time a statement uses the RAND() function; precedes other events for the statement. Indicates the * seed values to use for generating a random number with RAND() in the next statement. This is written only * before a {@link #QUERY} and is not used in case of RBR. */ - RAND, + RAND(13), /** * Written every time a statement uses a user variable; precedes other events for the statement. Indicates the * value to use for the user variable in the next statement. This is written only before a {@link #QUERY} and * is not used in case of RBR. */ - USER_VAR, + USER_VAR(14), /** * A descriptor event that is written to the beginning of the each binary log file. * This event is used as of MySQL 5.0; it supersedes {@link #START_V3}. */ - FORMAT_DESCRIPTION, + FORMAT_DESCRIPTION(15), /** * Generated for a commit of a transaction that modifies one or more tables of an XA-capable storage engine. * Normal transactions are implemented by sending a {@link #QUERY} containing a BEGIN statement and a {@link #QUERY} * containing a COMMIT statement (or a ROLLBACK statement if the transaction is rolled back). */ - XID, + XID(16), /** * Used for LOAD DATA INFILE statements as of MySQL 5.0. */ - BEGIN_LOAD_QUERY, + BEGIN_LOAD_QUERY(17), /** * Used for LOAD DATA INFILE statements as of MySQL 5.0. */ - EXECUTE_LOAD_QUERY, + EXECUTE_LOAD_QUERY(18), /** * This event precedes each row operation event. It maps a table definition to a number, where the table definition * consists of database and table names and column definitions. The purpose of this event is to enable replication @@ -117,105 +117,128 @@ public enum EventType { * of TABLE_MAP events: one per table used by events in the sequence. * Used in case of RBR. */ - TABLE_MAP, + TABLE_MAP(19), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_WRITE_ROWS, + PRE_GA_WRITE_ROWS(20), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_UPDATE_ROWS, + PRE_GA_UPDATE_ROWS(21), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.0 - 5.1.15). */ - PRE_GA_DELETE_ROWS, + PRE_GA_DELETE_ROWS(22), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - WRITE_ROWS, + WRITE_ROWS(23), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - UPDATE_ROWS, + UPDATE_ROWS(24), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.16 - mysql-trunk). */ - DELETE_ROWS, + DELETE_ROWS(25), /** * Used to log an out of the ordinary event that occurred on the master. It notifies the slave that something * happened on the master that might cause data to be in an inconsistent state. */ - INCIDENT, + INCIDENT(26), /** * Sent by a master to a slave to let the slave know that the master is still alive. Not written to a binary log. */ - HEARTBEAT, + HEARTBEAT(27), /** * In some situations, it is necessary to send over ignorable data to the slave: data that a slave can handle in * case there is code for handling it, but which can be ignored if it is not recognized. */ - IGNORABLE, + IGNORABLE(28), /** * Introduced to record the original query for rows events in RBR. */ - ROWS_QUERY, + ROWS_QUERY(29), /** * Describes inserted rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_WRITE_ROWS, + EXT_WRITE_ROWS(30), /** * Describes updated rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_UPDATE_ROWS, + EXT_UPDATE_ROWS(31), /** * Describes deleted rows (within a single table). * Used in case of RBR (5.1.18+). */ - EXT_DELETE_ROWS, + EXT_DELETE_ROWS(32), /** * Global Transaction Identifier. */ - GTID, - ANONYMOUS_GTID, - PREVIOUS_GTIDS, - TRANSACTION_CONTEXT, - VIEW_CHANGE, + GTID(33), + ANONYMOUS_GTID(34), + PREVIOUS_GTIDS(35), + TRANSACTION_CONTEXT(36), + VIEW_CHANGE(37), /** * Prepared XA transaction terminal event similar to XID except that it is specific to XA transaction. */ - XA_PREPARE; + XA_PREPARE(38), + + /** + * MariaDB Support Events + * + * @see Replication Protocol for the original doc. + */ + ANNOTATE_ROWS(160), // + MARIADB_GTID(162), + MARIADB_GTID_LIST(163); + + private final int eventNumber; + + EventType(int eventNumber) { + this.eventNumber = eventNumber; + } public static boolean isRowMutation(EventType eventType) { return EventType.isWrite(eventType) || - EventType.isUpdate(eventType) || - EventType.isDelete(eventType); + EventType.isUpdate(eventType) || + EventType.isDelete(eventType); } public static boolean isWrite(EventType eventType) { return eventType == PRE_GA_WRITE_ROWS || - eventType == WRITE_ROWS || - eventType == EXT_WRITE_ROWS; + eventType == WRITE_ROWS || + eventType == EXT_WRITE_ROWS; } public static boolean isUpdate(EventType eventType) { return eventType == PRE_GA_UPDATE_ROWS || - eventType == UPDATE_ROWS || - eventType == EXT_UPDATE_ROWS; + eventType == UPDATE_ROWS || + eventType == EXT_UPDATE_ROWS; } public static boolean isDelete(EventType eventType) { return eventType == PRE_GA_DELETE_ROWS || - eventType == DELETE_ROWS || - eventType == EXT_DELETE_ROWS; + eventType == DELETE_ROWS || + eventType == EXT_DELETE_ROWS; } + public static EventType byEventNumber(int num) { + for (EventType type : EventType.values()) { + if (type.eventNumber == num) { + return type; + } + } + return null; + } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java new file mode 100644 index 0000000..54838d0 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java @@ -0,0 +1,43 @@ +package com.github.shyiko.mysql.binlog.event; + +/** + * MariaDB and MySQL have different GTID implementations, and that these are not compatible with each other. + * + * @author Winger + * @see GTID_EVENT for the original doc + */ +public class MariadbGtidEventData implements EventData { + + private long sequence; + private long domainId; + private long serverId; + + public long getSequence() { + return sequence; + } + + public void setSequence(long sequence) { + this.sequence = sequence; + } + + public long getDomainId() { + return domainId; + } + + public void setDomainId(long domainId) { + this.domainId = domainId; + } + + public long getServerId() { + return serverId; + } + + public void setServerId(long serverId) { + this.serverId = serverId; + } + + @Override + public String toString() { + return domainId + "-" + serverId + "-" + sequence; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java new file mode 100644 index 0000000..ae6e91f --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidListEventData.java @@ -0,0 +1,29 @@ +package com.github.shyiko.mysql.binlog.event; + +import com.github.shyiko.mysql.binlog.MariadbGtidSet; + +/** + * Logged in every binlog to record the current replication state + * + * @author Winger + * @see GTID_LIST_EVENT for the original doc + */ +public class MariadbGtidListEventData implements EventData { + + private MariadbGtidSet mariaGTIDSet; + + public MariadbGtidSet getMariaGTIDSet() { + return mariaGTIDSet; + } + + public void setMariaGTIDSet(MariadbGtidSet mariaGTIDSet) { + this.mariaGTIDSet = mariaGTIDSet; + } + + @Override + public String toString() { + return "MariadbGtidListEventData{" + + "mariaGTIDSet=" + mariaGTIDSet + + '}'; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java new file mode 100644 index 0000000..688a9b2 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java @@ -0,0 +1,24 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; + +/** + * Mariadb ANNOTATE_ROWS_EVENT Fields + *
+ *  string The SQL statement (not null-terminated)
+ * 
+ * + * @author Winger + */ +public class AnnotateRowsEventDataDeserializer implements EventDataDeserializer { + + @Override + public AnnotateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + AnnotateRowsEventData event = new AnnotateRowsEventData(); + event.setRowsQuery(inputStream.readString(inputStream.available())); + return event; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index cf93685..ffb3788 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -120,6 +120,12 @@ private void registerDefaultEventDataDeserializers() { new PreviousGtidSetDeserializer()); eventDataDeserializers.put(EventType.XA_PREPARE, new XAPrepareEventDataDeserializer()); + eventDataDeserializers.put(EventType.ANNOTATE_ROWS, + new AnnotateRowsEventDataDeserializer()); + eventDataDeserializers.put(EventType.MARIADB_GTID, + new MariadbGtidEventDataDeserializer()); + eventDataDeserializers.put(EventType.MARIADB_GTID_LIST, + new MariadbGtidListEventDataDeserializer()); } public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java index c0569f0..e953362 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventHeaderV4Deserializer.java @@ -26,8 +26,6 @@ */ public class EventHeaderV4Deserializer implements EventHeaderDeserializer { - private static final EventType[] EVENT_TYPES = EventType.values(); - @Override public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOException { EventHeaderV4 header = new EventHeaderV4(); @@ -41,10 +39,8 @@ public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOExce } private static EventType getEventType(int ordinal) { - if (ordinal >= EVENT_TYPES.length) { - return EventType.UNKNOWN; - } - return EVENT_TYPES[ordinal]; + EventType eventType = EventType.byEventNumber(ordinal); + return eventType == null ? EventType.UNKNOWN : eventType; } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java new file mode 100644 index 0000000..c57e255 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java @@ -0,0 +1,33 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; + +/** + * Mariadb GTID_EVENT Fields + *
+ *     uint<8> GTID sequence
+ *     uint<4> Replication Domain ID
+ *     uint<1> Flags
+ *
+ * 	if flag & FL_GROUP_COMMIT_ID
+ * 	    uint<8> commit_id
+ * 	else
+ * 	    uint<6> 0
+ * 
+ * + * @author Winger + * @see GTID_EVENT for the original doc + */ +public class MariadbGtidEventDataDeserializer implements EventDataDeserializer { + @Override + public MariadbGtidEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + MariadbGtidEventData event = new MariadbGtidEventData(); + event.setSequence(inputStream.readLong(8)); + event.setDomainId(inputStream.readInteger(4)); + // Flags ignore + return event; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java new file mode 100644 index 0000000..8a42953 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java @@ -0,0 +1,39 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + + +import com.github.shyiko.mysql.binlog.MariadbGtidSet; +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +import java.io.IOException; + +/** + * Mariadb GTID_LIST_EVENT Fields + *
+ *  uint<4> Number of GTIDs
+ *  GTID[0]
+ *      uint<4> Replication Domain ID
+ *      uint<4> Server_ID
+ *      uint<8> GTID sequence ...
+ * GTID[n]
+ * 
+ * + * @author Winger + * @see GTID_EVENT for the original doc + */ +public class MariadbGtidListEventDataDeserializer implements EventDataDeserializer { + @Override + public MariadbGtidListEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + MariadbGtidListEventData eventData = new MariadbGtidListEventData(); + long gtidLength = inputStream.readInteger(4); + MariadbGtidSet mariaGTIDSet = new MariadbGtidSet(); + for (int i = 0; i < gtidLength; i++) { + long domainId = inputStream.readInteger(4); + long serverId = inputStream.readInteger(4); + long sequence = inputStream.readLong(8); + mariaGTIDSet.add(new MariadbGtidSet.MariaGtid(domainId, serverId, sequence)); + } + eventData.setMariaGTIDSet(mariaGTIDSet); + return eventData; + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java index 36216c7..b7436aa 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogCommand.java @@ -24,9 +24,11 @@ */ public class DumpBinaryLogCommand implements Command { + public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2; private long serverId; private String binlogFilename; private long binlogPosition; + private boolean sendAnnotateRowsEvent; public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition) { this.serverId = serverId; @@ -34,12 +36,21 @@ public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPos this.binlogPosition = binlogPosition; } + public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition, boolean sendAnnotateRowsEvent) { + this(serverId, binlogFilename, binlogPosition); + this.sendAnnotateRowsEvent = sendAnnotateRowsEvent; + } + @Override public byte[] toByteArray() throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); buffer.writeInteger(CommandType.BINLOG_DUMP.ordinal(), 1); buffer.writeLong(this.binlogPosition, 4); - buffer.writeInteger(0, 2); // flag + int binlogFlags = 0; + if (sendAnnotateRowsEvent) { + binlogFlags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT; + } + buffer.writeInteger(binlogFlags, 2); // flag buffer.writeLong(this.serverId, 4); buffer.writeString(this.binlogFilename); return buffer.toByteArray(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java new file mode 100644 index 0000000..4b7ea9c --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java @@ -0,0 +1,87 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import org.testng.annotations.Test; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertNotEquals; +import static org.testng.AssertJUnit.assertNotNull; + +/** + * BinaryLogClientMariadbGTIDIntegrationTest.java + * + * @author winger[jackey.zhang@woqutech.com] + * @create 2021-07-13 10:52 上午 + */ +public class BinaryLogClientMariadbIntegrationTest { + + protected BinaryLogClientIntegrationTest.MySQLConnection master; + + @Test + public void testMariadbUseGTIDAndAnnotateRowsEvent() throws Exception { + master = new BinaryLogClientIntegrationTest.MySQLConnection("127.0.0.1", 3306, "root", "123456"); + master.execute(new BinaryLogClientIntegrationTest.Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("drop database if exists mbcj_test"); + statement.execute("create database mbcj_test"); + statement.execute("use mbcj_test"); + statement.execute("CREATE TABLE if not exists foo (i int)"); + statement.execute("CREATE TABLE if not exists bar (i int)"); + } + }); + // get current gtid + final String[] currentGtidPos = new String[1]; + master.query("show global variables like 'gtid_current_pos%'", new BinaryLogClientIntegrationTest.Callback() { + + @Override + public void execute(ResultSet rs) throws SQLException { + rs.next(); + currentGtidPos[0] = rs.getString(2); + } + }); + + CountDownEventListener eventListener; + BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "root", "123456"); + client.setGtidSet(currentGtidPos[0]); + client.setMariadbSendAnnotateRowsEvent(true); + + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, + EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG); + client.setEventDeserializer(eventDeserializer); + client.registerEventListener(new TraceEventListener()); + client.registerLifecycleListener(new TraceLifecycleListener()); + client.registerEventListener(eventListener = new CountDownEventListener()); + + master.execute(new BinaryLogClientIntegrationTest.Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("INSERT INTO foo set i = 2"); + statement.execute("DROP TABLE IF EXISTS bar"); + } + }); + + try { + eventListener.reset(); + client.connect(); + + eventListener.waitFor(MariadbGtidEventData.class, 1, TimeUnit.SECONDS.toMillis(4)); + String gtidSet = client.getGtidSet(); + assertNotNull(gtidSet); + + eventListener.reset(); + eventListener.waitFor(AnnotateRowsEventData.class, 1, TimeUnit.SECONDS.toMillis(4)); + gtidSet = client.getGtidSet(); + assertNotEquals(currentGtidPos[0], gtidSet); + } finally { + client.disconnect(); + } + } +} From 1f2779137fa202624aed00c97564b0f1995ba721 Mon Sep 17 00:00:00 2001 From: winger Date: Thu, 15 Jul 2021 13:46:57 +0800 Subject: [PATCH 02/17] refact to support mariadb types --- .../shyiko/mysql/binlog/BinaryLogClient.java | 167 +++++++++--------- .../mysql/binlog/MariadbBinaryLogClient.java | 88 +++++++++ ...ariadbBinaryLogClientIntegrationTest.java} | 13 +- .../mysql/binlog/MariadbGtidSetTest.java | 36 ++++ ...AnnotateRowsEventDataDeserializerTest.java | 27 +++ .../MariadbGtidEventDataDeserializerTest.java | 26 +++ ...iadbGtidListEventDataDeserializerTest.java | 26 +++ 7 files changed, 288 insertions(+), 95 deletions(-) create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java rename src/test/java/com/github/shyiko/mysql/binlog/{BinaryLogClientMariadbIntegrationTest.java => MariadbBinaryLogClientIntegrationTest.java} (89%) create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 5ba4cb2..62fc0ae 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -15,9 +15,24 @@ */ package com.github.shyiko.mysql.binlog; -import com.github.shyiko.mysql.binlog.event.*; -import com.github.shyiko.mysql.binlog.event.deserialization.*; +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeader; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.GtidEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper; +import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; import com.github.shyiko.mysql.binlog.network.AuthenticationException; @@ -34,7 +49,12 @@ import com.github.shyiko.mysql.binlog.network.protocol.Packet; import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; -import com.github.shyiko.mysql.binlog.network.protocol.command.*; +import com.github.shyiko.mysql.binlog.network.protocol.command.Command; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -115,14 +135,12 @@ public X509Certificate[] getAcceptedIssuers() { private volatile long connectionId; private SSLMode sslMode = SSLMode.DISABLED; - private GtidSet gtidSet; - private final Object gtidSetAccessLock = new Object(); + protected GtidSet gtidSet; + protected final Object gtidSetAccessLock = new Object(); private boolean gtidSetFallbackToPurged; private boolean useBinlogFilenamePositionInGtidMode; private String gtid; private boolean tx; - private boolean isMariadb = false; - private boolean mariadbSendAnnotateRowsEvent = false; private EventDeserializer eventDeserializer = new EventDeserializer(); @@ -132,7 +150,7 @@ public X509Certificate[] getAcceptedIssuers() { private SocketFactory socketFactory; private SSLSocketFactory sslSocketFactory; - private volatile PacketChannel channel; + protected volatile PacketChannel channel; private volatile boolean connected; private volatile long masterServerId = -1; @@ -317,15 +335,14 @@ public void setGtidSet(String gtidSet) { this.binlogFilename = ""; } synchronized (gtidSetAccessLock) { - // mariadb GtidSet format will be domainId-serverId-sequence - if (gtidSet != null && !gtidSet.contains(":")) { - this.gtidSet = new MariadbGtidSet(gtidSet); - } else { - this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null; - } + this.gtidSet = gtidSet != null ? buildGtidSet(gtidSet) : null; } } + protected GtidSet buildGtidSet(String gtidSet) { + return new GtidSet(gtidSet); + } + /** * @see #setGtidSetFallbackToPurged(boolean) * @return whether gtid_purged is used as a fallback @@ -488,19 +505,6 @@ public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } - public boolean isMariadbSendAnnotateRowsEvent() { - return mariadbSendAnnotateRowsEvent; - } - - /** - * Only in Mariadb, if set true, the Slave server connects with the BINLOG_SEND_ANNOTATE_ROWS_EVENT flag (value is 2) - * in the COM_BINLOG_DUMP Slave Registration phase - * @param mariadbSendAnnotateRowsEvent - */ - public void setMariadbSendAnnotateRowsEvent(boolean mariadbSendAnnotateRowsEvent) { - this.mariadbSendAnnotateRowsEvent = mariadbSendAnnotateRowsEvent; - } - /** * Connect to the replication stream. Note that this method blocks until disconnected. * @throws AuthenticationException if authentication fails @@ -538,17 +542,8 @@ public void connect() throws IOException, IllegalStateException { channel.authenticationComplete(); connectionId = greetingPacket.getThreadId(); - isMariadb = greetingPacket.getServerVersion().toLowerCase().contains("mariadb"); if ("".equals(binlogFilename)) { - synchronized (gtidSetAccessLock) { - if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { - if (isMariadb) { - gtidSet = new MariadbGtidSet(fetchGtidPurged()); - } else { - gtidSet = new GtidSet(fetchGtidPurged()); - } - } - } + setupGtidSet(); } if (binlogFilename == null) { fetchBinlogFilenameAndPosition(); @@ -597,13 +592,7 @@ public void connect() throws IOException, IllegalStateException { ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); synchronized (gtidSetAccessLock) { if (gtidSet != null) { - ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); - if (isMariadb) { - ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); - } + ensureGtidEventDataDeserializer(); } } listenForEventPackets(); @@ -676,7 +665,7 @@ public Object call() throws Exception { }; } - private void checkError(byte[] packet) throws IOException { + protected void checkError(byte[] packet) throws IOException { if (packet[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length); ErrorPacket errorPacket = new ErrorPacket(bytes); @@ -720,7 +709,6 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio return false; } - private void enableHeartbeat() throws IOException { channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000)); byte[] statementResult = channel.read(); @@ -735,27 +723,15 @@ private void setMasterServerId() throws IOException { } } - private void requestBinaryLogStream() throws IOException { + protected void requestBinaryLogStream() throws IOException { long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178 Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { if (gtidSet != null) { - if (isMariadb) { - channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); - checkError(channel.read()); - channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); - checkError(channel.read()); - channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); - checkError(channel.read()); - channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); - checkError(channel.read()); - dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isMariadbSendAnnotateRowsEvent()); - } else { - dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, - useBinlogFilenamePositionInGtidMode ? binlogFilename : "", - useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, - gtidSet); - } + dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, + useBinlogFilenamePositionInGtidMode ? binlogFilename : "", + useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, + gtidSet); } else { dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition); } @@ -763,7 +739,7 @@ private void requestBinaryLogStream() throws IOException { channel.write(dumpBinaryLogCommand); } - private void ensureEventDataDeserializer(EventType eventType, + protected void ensureEventDataDeserializer(EventType eventType, Class eventDataDeserializerClass) { EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); if (eventDataDeserializer.getClass() != eventDataDeserializerClass && @@ -780,6 +756,10 @@ private void ensureEventDataDeserializer(EventType eventType, } } + protected void ensureGtidEventDataDeserializer() { + ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); + } private void spawnKeepAliveThread() { final ExecutorService threadExecutor = @@ -924,6 +904,14 @@ private String fetchGtidPurged() throws IOException { return ""; } + protected void setupGtidSet() throws IOException{ + synchronized (gtidSetAccessLock) { + if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { + gtidSet = new GtidSet(fetchGtidPurged()); + } + } + } + private void fetchBinlogFilenameAndPosition() throws IOException { ResultSetRowPacket[] resultSet; channel.write(new QueryCommand("show master status")); @@ -1025,7 +1013,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac return result; } - private void updateClientBinlogFilenameAndPosition(Event event) { + protected void updateClientBinlogFilenameAndPosition(Event event) { EventHeader eventHeader = event.getHeader(); EventType eventType = eventHeader.getEventType(); if (eventType == EventType.ROTATE) { @@ -1044,7 +1032,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) { } } - private void updateGtidSet(Event event) { + protected void updateGtidSet(Event event) { synchronized (gtidSetAccessLock) { if (gtidSet == null) { return; @@ -1070,34 +1058,39 @@ private void updateGtidSet(Event event) { tx = false; break; case QUERY: - case ANNOTATE_ROWS: - String sql; - if (eventHeader.getEventType() == EventType.QUERY) { - QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData()); - sql = queryEventData.getSql(); - } else { - AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDataWrapper.internal(event.getData()); - sql = annotateRowsEventData.getRowsQuery(); - } - + QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData()); + String sql = queryEventData.getSql(); if (sql == null) { break; } - if ("BEGIN".equals(sql)) { - tx = true; - } else - if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) { - commitGtid(); - tx = false; - } else - if (!tx) { - // auto-commit query, likely DDL - commitGtid(); + commitGtid(sql); + break; + case ANNOTATE_ROWS: + AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); + sql = annotateRowsEventData.getRowsQuery(); + if (sql == null) { + break; } + commitGtid(sql); + break; default: } } + protected void commitGtid(String sql) { + if ("BEGIN".equals(sql)) { + tx = true; + } else + if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) { + commitGtid(); + tx = false; + } else + if (!tx) { + // auto-commit query, likely DDL + commitGtid(); + } + } + private void commitGtid() { if (gtid != null) { synchronized (gtidSetAccessLock) { @@ -1308,7 +1301,7 @@ public interface LifecycleListener { /** * Default (no-op) implementation of {@link LifecycleListener}. */ - public static abstract class AbstractLifecycleListener implements LifecycleListener { + public static abstract class AbstractLifecycleListener implements LifecycleListener { public void onConnect(BinaryLogClient client) { } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java new file mode 100644 index 0000000..3dc67fa --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java @@ -0,0 +1,88 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer; +import com.github.shyiko.mysql.binlog.network.protocol.command.Command; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; + +import java.io.IOException; + +/** + * Mariadb replication stream client. + * + * @author Winger + */ +public class MariadbBinaryLogClient extends BinaryLogClient { + + private boolean useSendAnnotateRowsEvent; + + public MariadbBinaryLogClient(String username, String password) { + super(username, password); + } + + public MariadbBinaryLogClient(String schema, String username, String password) { + super(schema, username, password); + } + + public MariadbBinaryLogClient(String hostname, int port, String username, String password) { + super(hostname, port, username, password); + } + + public MariadbBinaryLogClient(String hostname, int port, String schema, String username, String password) { + super(hostname, port, schema, username, password); + } + + @Override + protected GtidSet buildGtidSet(String gtidSet) { + return new MariadbGtidSet(gtidSet); + } + + @Override + protected void setupGtidSet() throws IOException { + //Mariadb ignore + } + + @Override + protected void requestBinaryLogStream() throws IOException { + long serverId = isBlocking() ? this.getServerId() : 0; // http://bugs.mysql.com/bug.php?id=71178 + Command dumpBinaryLogCommand; + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); + checkError(channel.read()); + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent()); + + } else { + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, getBinlogFilename(), getBinlogPosition()); + } + } + channel.write(dumpBinaryLogCommand); + } + + @Override + protected void ensureGtidEventDataDeserializer() { + ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); + } + + public boolean isUseSendAnnotateRowsEvent() { + return useSendAnnotateRowsEvent; + } + + public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) { + this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent; + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java similarity index 89% rename from src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java rename to src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java index 4b7ea9c..fac8b57 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientMariadbIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java @@ -14,18 +14,15 @@ import static org.testng.AssertJUnit.assertNotNull; /** - * BinaryLogClientMariadbGTIDIntegrationTest.java - * - * @author winger[jackey.zhang@woqutech.com] - * @create 2021-07-13 10:52 上午 + * @author Winger */ -public class BinaryLogClientMariadbIntegrationTest { +public class MariadbBinaryLogClientIntegrationTest { protected BinaryLogClientIntegrationTest.MySQLConnection master; @Test public void testMariadbUseGTIDAndAnnotateRowsEvent() throws Exception { - master = new BinaryLogClientIntegrationTest.MySQLConnection("127.0.0.1", 3306, "root", "123456"); + master = new BinaryLogClientIntegrationTest.MySQLConnection("127.0.0.1", 3306, "root", ""); master.execute(new BinaryLogClientIntegrationTest.Callback() { @Override public void execute(Statement statement) throws SQLException { @@ -48,9 +45,9 @@ public void execute(ResultSet rs) throws SQLException { }); CountDownEventListener eventListener; - BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "root", "123456"); + MariadbBinaryLogClient client = new MariadbBinaryLogClient("127.0.0.1", 3306, "root", "123456"); client.setGtidSet(currentGtidPos[0]); - client.setMariadbSendAnnotateRowsEvent(true); + client.setUseSendAnnotateRowsEvent(true); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java new file mode 100644 index 0000000..71d5753 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbGtidSetTest.java @@ -0,0 +1,36 @@ +package com.github.shyiko.mysql.binlog; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +/** + * @author Winger + */ +public class MariadbGtidSetTest { + + @Test + public void testAdd() { + MariadbGtidSet gtidSet = new MariadbGtidSet("0-102-7255"); + gtidSet.add("0-102-7256"); + gtidSet.add("0-102-7257"); + gtidSet.add("0-102-7259"); + gtidSet.add("1-102-7300"); + assertNotEquals(gtidSet.toString(), "1-102-7300"); + assertNotEquals(gtidSet.toString(), "0-102-7259"); + assertEquals(gtidSet.toString(), "0-102-7259,1-102-7300"); + } + + @Test + public void testEmptySet() { + assertEquals(new MariadbGtidSet("").toString(), ""); + } + + @Test + public void testEquals() { + assertEquals(new MariadbGtidSet(""), new MariadbGtidSet(null)); + assertEquals(new MariadbGtidSet(""), new MariadbGtidSet("")); + assertEquals(new MariadbGtidSet("0-0-7404"), new MariadbGtidSet("0-0-7404")); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java new file mode 100644 index 0000000..e117d32 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializerTest.java @@ -0,0 +1,27 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import org.junit.Test; + +import java.io.IOException; + +import static junit.framework.Assert.assertEquals; + +/** + * @author Winger + */ +public class AnnotateRowsEventDataDeserializerTest { + + private static final byte[] DATA = {73, 78, 83, 69, 82, 84, 32, 73, 78, 84, 79, 32, 102, 111, 111, 32, 115, 101, 116, 32, 105, 32, 61, 32, 50}; + + private static final String sql = "INSERT INTO foo set i = 2"; + + @Test + public void deserialize() throws IOException { + AnnotateRowsEventDataDeserializer deserializer = new AnnotateRowsEventDataDeserializer(); + AnnotateRowsEventData eventData = deserializer.deserialize(new ByteArrayInputStream(DATA)); + + assertEquals(sql, eventData.getRowsQuery()); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java new file mode 100644 index 0000000..f6e703b --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializerTest.java @@ -0,0 +1,26 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import org.junit.Test; + +import java.io.IOException; + +import static junit.framework.Assert.assertEquals; + +/** + * @author Winger + */ +public class MariadbGtidEventDataDeserializerTest { + + private static final byte[] DATA = {-20, 28, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 121}; + + private static final String GTID_SET = "0-0-7404"; + + @Test + public void deserialize() throws IOException { + MariadbGtidEventDataDeserializer deserializer = new MariadbGtidEventDataDeserializer(); + MariadbGtidEventData eventData = deserializer.deserialize(new ByteArrayInputStream(DATA)); + assertEquals(GTID_SET, eventData.toString()); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java new file mode 100644 index 0000000..3a6f33f --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializerTest.java @@ -0,0 +1,26 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import org.junit.Test; + +import java.io.IOException; + +import static junit.framework.Assert.assertEquals; + +/** + * @author Winger + */ +public class MariadbGtidListEventDataDeserializerTest { + + private static final byte[] DATA = {1, 0, 0, 0, 0, 0, 0, 0, 102, 0, 0, 0, 87, 28, 0, 0, 0, 0, 0, 0, 77}; + + private static final String GTID_SET_LIST = "MariadbGtidListEventData{mariaGTIDSet=0-102-7255}"; + + @Test + public void deserialize() throws IOException { + MariadbGtidListEventDataDeserializer deserializer = new MariadbGtidListEventDataDeserializer(); + MariadbGtidListEventData eventData = deserializer.deserialize(new ByteArrayInputStream(DATA)); + assertEquals(GTID_SET_LIST, eventData.toString()); + } +} From 486a447a10d378ebf34a132ce31327cdc743318d Mon Sep 17 00:00:00 2001 From: winger Date: Fri, 16 Jul 2021 11:34:08 +0800 Subject: [PATCH 03/17] add some doc informations about mariadb --- README.md | 8 ++++++++ .../com/github/shyiko/mysql/binlog/BinaryLogClient.java | 2 +- .../shyiko/mysql/binlog/MariadbBinaryLogClient.java | 3 --- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 407e27a..d17ce52 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,14 @@ kick off from a specific filename or position, use `client.setBinlogFilename(fil > `client.connect()` is blocking (meaning that client will listen for events in the current thread). `client.connect(timeout)`, on the other hand, spawns a separate thread. +> Note difference between MariaDB and MySQL +``` +BinaryLogClient client = new MariadbBinaryLogClient("hostname", 3306, "username", "password"); +// ... as same as BinaryLogClient +``` +> `client.setGtidSet(gtid)` meaning that client kick off from a specific gtid, MariaDB also support. +> `client.setUseSendAnnotateRowsEvent(true)` meaning that client will send annotate rows events(describe the query which caused the row event), and 'false' by default + #### Controlling event deserialization > You might need it for several reasons: diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 62fc0ae..b7b3b23 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -1301,7 +1301,7 @@ public interface LifecycleListener { /** * Default (no-op) implementation of {@link LifecycleListener}. */ - public static abstract class AbstractLifecycleListener implements LifecycleListener { + public static abstract class AbstractLifecycleListener implements LifecycleListener { public void onConnect(BinaryLogClient client) { } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java index 3dc67fa..0c6740d 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java @@ -1,10 +1,7 @@ package com.github.shyiko.mysql.binlog; -import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; -import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer; import com.github.shyiko.mysql.binlog.network.protocol.command.Command; From 1c214000f2f99b3b68384b0b675e2a6ec1549b13 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 13:29:13 -0700 Subject: [PATCH 04/17] fixup javadoc stuff --- pom.xml | 2 +- .../AnnotateRowsEventDataDeserializer.java | 2 +- .../MariadbGtidEventDataDeserializer.java | 12 ++++++------ .../MariadbGtidListEventDataDeserializer.java | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index 50c2922..5c5806e 100644 --- a/pom.xml +++ b/pom.xml @@ -165,7 +165,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.2.0 + 3.4.0 attach-javadocs diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java index 688a9b2..981d9c9 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AnnotateRowsEventDataDeserializer.java @@ -8,7 +8,7 @@ /** * Mariadb ANNOTATE_ROWS_EVENT Fields *
- *  string The SQL statement (not null-terminated)
+ *  string<EOF> The SQL statement (not null-terminated)
  * 
* * @author Winger diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java index c57e255..4eb45f1 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java @@ -8,14 +8,14 @@ /** * Mariadb GTID_EVENT Fields *
- *     uint<8> GTID sequence
- *     uint<4> Replication Domain ID
- *     uint<1> Flags
+ *     uint8 GTID sequence
+ *     uint4 Replication Domain ID
+ *     uint1 Flags
  *
- * 	if flag & FL_GROUP_COMMIT_ID
- * 	    uint<8> commit_id
+ * 	if flag & FL_GROUP_COMMIT_ID
+ * 	    uint8 commit_id
  * 	else
- * 	    uint<6> 0
+ * 	    uint6 0
  * 
* * @author Winger diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java index 8a42953..2583619 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidListEventDataDeserializer.java @@ -10,11 +10,11 @@ /** * Mariadb GTID_LIST_EVENT Fields *
- *  uint<4> Number of GTIDs
+ *  uint4 Number of GTIDs
  *  GTID[0]
- *      uint<4> Replication Domain ID
- *      uint<4> Server_ID
- *      uint<8> GTID sequence ...
+ *      uint4 Replication Domain ID
+ *      uint4 Server_ID
+ *      uint8 GTID sequence ...
  * GTID[n]
  * 
* From 67a840edd8692f8ec4ba8eb83d910f65bde85df8 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 16:33:07 -0700 Subject: [PATCH 05/17] can we build under maria? --- .circleci/config.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 37fa840..ef8eeeb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -56,4 +56,8 @@ workflows: name: "test-8.0" mysql: "8.0" requires: [ "build" ] + - test: + name: "test-mariadb" + mysql: "mariadb" + requires: [ "build" ] From 5d4c0fd746ef0ec25029b80177da167ee2848796 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 16:43:03 -0700 Subject: [PATCH 06/17] see about running tests --- .../MariadbBinaryLogClientIntegrationTest.java | 12 +++++++++--- .../shyiko/mysql/binlog/MysqlOnetimeServer.java | 9 +++++++-- .../com/github/shyiko/mysql/binlog/MysqlVersion.java | 6 ++++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java index fac8b57..ca5039e 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java @@ -3,6 +3,8 @@ import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData; import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.sql.ResultSet; @@ -16,13 +18,17 @@ /** * @author Winger */ -public class MariadbBinaryLogClientIntegrationTest { +public class MariadbBinaryLogClientIntegrationTest extends BinaryLogClientIntegrationTest { + MysqlOnetimeServer primaryServer; protected BinaryLogClientIntegrationTest.MySQLConnection master; @Test public void testMariadbUseGTIDAndAnnotateRowsEvent() throws Exception { - master = new BinaryLogClientIntegrationTest.MySQLConnection("127.0.0.1", 3306, "root", ""); + if ( !mysqlVersion.isMaria ) + throw new SkipException("not maria"); + + master.execute(new BinaryLogClientIntegrationTest.Callback() { @Override public void execute(Statement statement) throws SQLException { @@ -45,7 +51,7 @@ public void execute(ResultSet rs) throws SQLException { }); CountDownEventListener eventListener; - MariadbBinaryLogClient client = new MariadbBinaryLogClient("127.0.0.1", 3306, "root", "123456"); + MariadbBinaryLogClient client = new MariadbBinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); client.setGtidSet(currentGtidPos[0]); client.setUseSendAnnotateRowsEvent(true); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java index a3c5f51..127714b 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java @@ -249,8 +249,13 @@ public void shutDown() { } public static MysqlVersion getVersion() { - String[] parts = getVersionString().split("\\."); - return new MysqlVersion(Integer.valueOf(parts[0]), Integer.valueOf(parts[1])); + String version = getVersionString(); + if ( version == "mariadb") { + return new MysqlVersion(0, 0, true); + } else { + String[] parts = version.split("\\."); + return new MysqlVersion(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]), false); + } } private static String getVersionString() { diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MysqlVersion.java b/src/test/java/com/github/shyiko/mysql/binlog/MysqlVersion.java index 7213a72..949873b 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MysqlVersion.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MysqlVersion.java @@ -5,12 +5,14 @@ import java.sql.SQLException; public class MysqlVersion { + public boolean isMaria; private final int major; private final int minor; - public MysqlVersion(int major, int minor) { + public MysqlVersion(int major, int minor, boolean isMaria) { this.major = major; this.minor = minor; + this.isMaria = isMaria; } public boolean atLeast(int major, int minor) { @@ -27,7 +29,7 @@ public boolean lessThan(int major, int minor) { public static MysqlVersion capture(Connection c) throws SQLException { DatabaseMetaData meta = c.getMetaData(); - return new MysqlVersion(meta.getDatabaseMajorVersion(), meta.getDatabaseMinorVersion()); + return new MysqlVersion(meta.getDatabaseMajorVersion(), meta.getDatabaseMinorVersion(), false); } public int getMajor() { From fbc9c73c8b8bb0d720abbafb7e80aa04d57c0d32 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 16:45:11 -0700 Subject: [PATCH 07/17] streamline a touch --- .circleci/config.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ef8eeeb..f09dc95 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,9 +32,7 @@ jobs: - restore_cache: key: dependency-cache-{{ checksum "pom.xml" }} - run: - command: echo "testing under mysql $MYSQL_VERSION" - - run: - name: testit + name: "testing under version << parameters.mysql >>" command: mvn verify -Dgpg.skip - store_artifacts: path: test.log From f2ea85cf9721cc4aed54a826e31085b9bdc4b7da Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 16:46:47 -0700 Subject: [PATCH 08/17] oh java --- .../java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java index 127714b..ea043e9 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java @@ -250,7 +250,7 @@ public void shutDown() { public static MysqlVersion getVersion() { String version = getVersionString(); - if ( version == "mariadb") { + if ( version.equals("mariadb") ) { return new MysqlVersion(0, 0, true); } else { String[] parts = version.split("\\."); From 85840c30fe184503d4c43931fa4656a0673e2ce7 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 17:29:11 -0700 Subject: [PATCH 09/17] add --debug --- .../java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java index ea043e9..12d3cd9 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java @@ -70,6 +70,7 @@ public void boot() throws Exception { ProcessBuilder pb = new ProcessBuilder( dir + "/src/test/onetimeserver", + "--debug", "--mysql-version=" + getVersionString(), "--log-slave-updates", "--log-bin=master", From 62475193129e642a9e27392904cb44a2558770fb Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 18:39:45 -0700 Subject: [PATCH 10/17] bump From 988750eeb8f303cc265bee8ddb4e9b9426385f2d Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Fri, 12 Aug 2022 22:22:18 -0700 Subject: [PATCH 11/17] maria fixes in JSON land --- .../deserialization/json/JsonBinary.java | 10 ++++ .../mysql/binlog/CountDownEventListener.java | 16 +++++++ .../json/JsonBinaryValueIntegrationTest.java | 47 ++++++++++++++++--- 3 files changed, 66 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java index b167ba7..8a2b8d8 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java @@ -162,11 +162,21 @@ public class JsonBinary { * @throws IOException if there is a problem reading or processing the binary representation */ public static String parseAsString(byte[] bytes) throws IOException { + /* check for mariaDB-format JSON strings inside columns marked JSON */ + if ( isJSONString(bytes) ) { + return new String(bytes); + } JsonStringFormatter handler = new JsonStringFormatter(); parse(bytes, handler); return handler.getString(); } + private static boolean isJSONString(byte[] bytes) { + if (bytes[0] > 0x0f) + return true; + else + return false; + } /** * Parse the MySQL binary representation of a {@code JSON} value and call the supplied {@link JsonFormatter} * for the various components of the value. diff --git a/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java b/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java index 73a6f5b..6256efe 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/CountDownEventListener.java @@ -86,6 +86,22 @@ private void waitForCounterToGetZero(String counterName, AtomicInteger counter, } } + public void waitForAtLeast(EventType eventType, int numberOfEvents, long timeoutInMilliseconds) + throws TimeoutException, InterruptedException { + AtomicInteger counter = getCounter(countersByType, eventType); + + synchronized (counter) { + if (counter.get() < numberOfEvents) { + counter.wait(timeoutInMilliseconds); + if (counter.get() < numberOfEvents) { + throw new TimeoutException("Received " + (numberOfEvents + counter.get()) + " " + + eventType.name() + " event(s) instead of expected " + numberOfEvents); + } + } + counter.set(0); + } + } + public void reset() { synchronized (countersByType) { countersByType.clear(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index 8fb3917..8866c15 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -22,17 +22,20 @@ import com.github.shyiko.mysql.binlog.MysqlOnetimeServer; import com.github.shyiko.mysql.binlog.TraceEventListener; import com.github.shyiko.mysql.binlog.TraceLifecycleListener; +import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import org.skyscreamer.jsonassert.JSONAssert; +import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.io.IOException; import java.io.Serializable; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; @@ -66,6 +69,8 @@ public class JsonBinaryValueIntegrationTest { private BinaryLogClient client; private CountDownEventListener eventListener; + private boolean isMaria = "mariadb".equals(System.getenv("MYSQL_VERSION")); + @BeforeClass public void setUp() throws Exception { TimeZone.setDefault(TimeZone.getTimeZone("GMT")); @@ -102,10 +107,15 @@ public void execute(Statement statement) throws SQLException { System.err.println("skipping JSON tests (pre 5.7)"); throw new org.testng.SkipException("JSON data type is not supported by current version of MySQL"); } - eventListener.waitFor(EventType.QUERY, 3, DEFAULT_TIMEOUT); + eventListener.waitForAtLeast(EventType.QUERY, 3, DEFAULT_TIMEOUT); eventListener.reset(); } + private String parseAndRemoveSpaces(byte[] jsonBinary) throws IOException { + String parsed = JsonBinary.parseAsString(jsonBinary); + return parsed.replaceAll(" ", ""); + } + @Test public void testMysql8JsonSetPartialUpdateWithHoles() throws Exception { CapturingEventListener capturingEventListener = new CapturingEventListener(); @@ -122,7 +132,7 @@ public void testMysql8JsonSetPartialUpdateWithHoles() throws Exception { List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); - assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace("970785C8-C299", "970785C8")); + assertEquals(parseAndRemoveSpaces((byte[]) updateData[0]), json.replace("970785C8-C299", "970785C8")); } @Test @@ -141,7 +151,7 @@ public void testMysql8JsonRemovePartialUpdateWithHoles() throws Exception { List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); - assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace("\"ab\":\"970785C8-C299\"", "")); + assertEquals(parseAndRemoveSpaces((byte[]) updateData[0]), json.replace("\"ab\":\"970785C8-C299\"", "")); client.unregisterEventListener(capturingEventListener); } @@ -162,7 +172,7 @@ public void testMysql8JsonRemovePartialUpdateWithHolesAndSparseKeys() throws Exc List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); - assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace( + assertEquals(parseAndRemoveSpaces((byte[]) updateData[0]), json.replace( "\"17fc9889474028063990914001f6854f6b8b5784\":\"test_field_for_remove_fields_behaviour_2\",", "")); client.unregisterEventListener(capturingEventListener); @@ -184,7 +194,7 @@ public void testMysql8JsonReplacePartialUpdateWithHoles() throws Exception { List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); - assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), json.replace("970785C8-C299", "9707")); + assertEquals(parseAndRemoveSpaces((byte[]) updateData[0]), json.replace("970785C8-C299", "9707")); client.unregisterEventListener(capturingEventListener); } @@ -207,7 +217,9 @@ public void testMysql8JsonRemoveArrayValue() throws Exception { List updateEvents = capturingEventListener.getEvents(UpdateRowsEventData.class); Serializable[] updateData = updateEvents.iterator().next().getRows().get(0).getValue(); - assertEquals(JsonBinary.parseAsString((byte[]) updateData[0]), "[\"foo\",\"baz\"]"); + String parsed = parseAndRemoveSpaces((byte[]) updateData[0]); + + assertEquals(parsed, "[\"foo\",\"baz\"]"); client.unregisterEventListener(capturingEventListener); } @@ -449,12 +461,18 @@ public void testEmptyArray() throws Exception { @Test public void testScalarDateTime() throws Exception { + if ( isMaria ) + throw new SkipException(""); + assertEquals(writeAndCaptureJSON("CAST(CAST('2015-01-15 23:24:25' AS DATETIME) AS JSON)"), "\"2015-01-15 23:24:25\""); } @Test public void testScalarTime() throws Exception { + if ( isMaria ) + throw new SkipException(""); + assertEquals(writeAndCaptureJSON("CAST(CAST('23:24:25' AS TIME) AS JSON)"), "\"23:24:25\""); assertEquals(writeAndCaptureJSON("CAST(CAST('23:24:25.12' AS TIME(3)) AS JSON)"), @@ -465,12 +483,17 @@ public void testScalarTime() throws Exception { @Test public void testScalarDate() throws Exception { + if ( isMaria ) + throw new SkipException(""); assertEquals(writeAndCaptureJSON("CAST(CAST('2015-01-15' AS DATE) AS JSON)"), "\"2015-01-15\""); } @Test public void testScalarTimestamp() throws Exception { + if ( isMaria ) + throw new SkipException(""); + // timestamp literals are interpreted by MySQL as DATETIME values assertEquals(writeAndCaptureJSON("CAST(TIMESTAMP'2015-01-15 23:24:25' AS JSON)"), "\"2015-01-15 23:24:25\""); @@ -485,6 +508,9 @@ public void testScalarTimestamp() throws Exception { @Test public void testScalarGeometry() throws Exception { + if ( isMaria ) + throw new SkipException(""); + assertEquals(writeAndCaptureJSON("CAST(ST_GeomFromText('POINT(1 1)') AS JSON)"), "{\"type\":\"Point\",\"coordinates\":[1.0,1.0]}"); } @@ -496,6 +522,9 @@ public void testScalarStringWithCharsetConversion() throws Exception { @Test public void testScalarBinaryAsBase64() throws Exception { + if ( isMaria ) + throw new SkipException(""); + assertEquals(writeAndCaptureJSON("CAST(x'cafe' AS JSON)"), "\"yv4=\""); assertEquals(writeAndCaptureJSON("CAST(x'cafebabe' AS JSON)"), "\"yv66vg==\""); } @@ -529,10 +558,14 @@ private String writeAndCaptureJSON(final String value) throws Exception { System.out.println("I am about to fail an expectation..."); assertTrue(false, "did not receive rows in json test for " + value); } - byte[] b = (byte[]) capturingEventListener.getEvents(WriteRowsEventData.class).get(0).getRows().get(0)[0]; + WriteRowsEventData e = capturingEventListener.getEvents(WriteRowsEventData.class).get(0); + Serializable[] firstRow = e.getRows().get(0); + + byte[] b = (byte[]) firstRow[0]; return b == null ? null : JsonBinary.parseAsString(b); } + @AfterMethod public void afterEachTest() throws Exception { final CountDownLatch latch = new CountDownLatch(1); From 492c6a46c512094064cb488d7bac1070e9d50ecf Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sat, 13 Aug 2022 12:27:35 -0700 Subject: [PATCH 12/17] can maven be quiet plz? --- .circleci/config.yml | 2 +- .../deserialization/json/JsonBinaryValueIntegrationTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f09dc95..35b8d1d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -26,7 +26,7 @@ jobs: type: string environment: MYSQL_VERSION: "<< parameters.mysql >>" - JAVA_TOOL_OPTIONS: "-Xmx250m" + JAVA_TOOL_OPTIONS: "-Xmx250m -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn" steps: - checkout - restore_cache: diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index 8866c15..5c268cb 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -493,7 +493,7 @@ public void testScalarDate() throws Exception { public void testScalarTimestamp() throws Exception { if ( isMaria ) throw new SkipException(""); - + // timestamp literals are interpreted by MySQL as DATETIME values assertEquals(writeAndCaptureJSON("CAST(TIMESTAMP'2015-01-15 23:24:25' AS JSON)"), "\"2015-01-15 23:24:25\""); From 7c52c0abfff4d6ca3e978faa34d2193419df6945 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sat, 13 Aug 2022 14:38:20 -0700 Subject: [PATCH 13/17] abstract out some tests, shuffle around maria-specific stuff --- .../shyiko/mysql/binlog/BinaryLogClient.java | 13 +- .../mysql/binlog/MariadbBinaryLogClient.java | 28 +++ .../mysql/binlog/AbstractIntegrationTest.java | 69 +++++++ .../BinaryLogClientIntegrationTest.java | 174 +----------------- ...MariadbBinaryLogClientIntegrationTest.java | 16 +- .../shyiko/mysql/binlog/MySQLConnection.java | 117 ++++++++++++ .../json/JsonBinaryValueIntegrationTest.java | 6 +- 7 files changed, 228 insertions(+), 195 deletions(-) create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/MySQLConnection.java diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 1c7f74a..d3650ba 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -139,7 +139,7 @@ public X509Certificate[] getAcceptedIssuers() { protected final Object gtidSetAccessLock = new Object(); private boolean gtidSetFallbackToPurged; private boolean useBinlogFilenamePositionInGtidMode; - private String gtid; + protected String gtid; private boolean tx; private EventDeserializer eventDeserializer = new EventDeserializer(); @@ -1010,7 +1010,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac return result; } - protected void updateClientBinlogFilenameAndPosition(Event event) { + private void updateClientBinlogFilenameAndPosition(Event event) { EventHeader eventHeader = event.getHeader(); EventType eventType = eventHeader.getEventType(); if (eventType == EventType.ROTATE) { @@ -1041,15 +1041,6 @@ protected void updateGtidSet(Event event) { GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData()); gtid = gtidEventData.getGtid(); break; - case MARIADB_GTID: - MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDataWrapper.internal(event.getData()); - mariadbGtidEventData.setServerId(eventHeader.getServerId()); - gtid = mariadbGtidEventData.toString(); - break; - case MARIADB_GTID_LIST: - MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDataWrapper.internal(event.getData()); - gtid = mariadbGtidListEventData.getMariaGTIDSet().toString(); - break; case XID: commitGtid(); tx = false; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java index 0c6740d..aa6cc5a 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java @@ -1,7 +1,12 @@ package com.github.shyiko.mysql.binlog; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeader; import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer; import com.github.shyiko.mysql.binlog.network.protocol.command.Command; @@ -75,6 +80,29 @@ protected void ensureGtidEventDataDeserializer() { ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); } + @Override + protected void updateGtidSet(Event event) { + synchronized (gtidSetAccessLock) { + if (gtidSet == null) { + return; + } + } + EventHeader eventHeader = event.getHeader(); + switch(eventHeader.getEventType()) { + case MARIADB_GTID: + MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); + mariadbGtidEventData.setServerId(eventHeader.getServerId()); + gtid = mariadbGtidEventData.toString(); + break; + case MARIADB_GTID_LIST: + MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); + gtid = mariadbGtidListEventData.getMariaGTIDSet().toString(); + break; + default: + super.updateGtidSet(event); + } + } + public boolean isUseSendAnnotateRowsEvent() { return useSendAnnotateRowsEvent; } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java new file mode 100644 index 0000000..802052f --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/AbstractIntegrationTest.java @@ -0,0 +1,69 @@ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import org.testng.annotations.BeforeClass; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.TimeZone; + +public abstract class AbstractIntegrationTest { + protected MySQLConnection master; + protected MySQLConnection slave; + protected BinaryLogClient client; + protected CountDownEventListener eventListener; + protected MysqlVersion mysqlVersion; + + protected MysqlOnetimeServerOptions getOptions() { + MysqlOnetimeServerOptions options = new MysqlOnetimeServerOptions(); + options.fullRowMetaData = true; + return options; + } + + @BeforeClass + public void setUp() throws Exception { + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + mysqlVersion = MysqlOnetimeServer.getVersion(); + MysqlOnetimeServer masterServer = new MysqlOnetimeServer(getOptions()); + MysqlOnetimeServer slaveServer = new MysqlOnetimeServer(getOptions()); + + masterServer.boot(); + slaveServer.boot(); + slaveServer.setupSlave(masterServer.getPort()); + + master = new MySQLConnection("127.0.0.1", masterServer.getPort(), "root", ""); + slave = new MySQLConnection("127.0.0.1", slaveServer.getPort(), "root", ""); + + client = new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password); + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, + EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG); + client.setEventDeserializer(eventDeserializer); + client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances + client.setKeepAlive(false); + client.registerEventListener(new TraceEventListener()); + client.registerEventListener(eventListener = new CountDownEventListener()); + client.registerLifecycleListener(new TraceLifecycleListener()); + client.connect(BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT); + master.execute(new BinaryLogClientIntegrationTest.Callback() { + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("drop database if exists mbcj_test"); + statement.execute("create database mbcj_test"); + statement.execute("use mbcj_test"); + } + }); + eventListener.waitFor(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT); + + if ( mysqlVersion.atLeast(8, 0) ) { + setupMysql8Login(master); + eventListener.waitFor(EventType.QUERY, 2, BinaryLogClientIntegrationTest.DEFAULT_TIMEOUT); + } + } + + protected void setupMysql8Login(MySQLConnection server) throws Exception { + server.execute("create user 'mysql8' IDENTIFIED WITH caching_sha2_password BY 'testpass'"); + server.execute("grant replication slave, replication client on *.* to 'mysql8'"); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 7e04312..cba2057 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -35,16 +35,13 @@ import com.github.shyiko.mysql.binlog.network.SSLMode; import com.github.shyiko.mysql.binlog.network.ServerException; import com.github.shyiko.mysql.binlog.network.SocketFactory; -import com.mysql.cj.MysqlConnection; import org.mockito.InOrder; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.io.Closeable; import java.io.EOFException; import java.io.FilterInputStream; import java.io.FilterOutputStream; @@ -56,8 +53,6 @@ import java.math.MathContext; import java.net.Socket; import java.net.SocketException; -import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; @@ -97,7 +92,7 @@ /** * @author Stanley Shyiko */ -public class BinaryLogClientIntegrationTest { +public class BinaryLogClientIntegrationTest extends AbstractIntegrationTest { protected static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); @@ -109,58 +104,6 @@ public class BinaryLogClientIntegrationTest { private final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault(); - protected MySQLConnection master, slave; - protected BinaryLogClient client; - protected CountDownEventListener eventListener; - protected MysqlVersion mysqlVersion; - - protected MysqlOnetimeServerOptions getOptions() { - MysqlOnetimeServerOptions options = new MysqlOnetimeServerOptions(); - options.fullRowMetaData = true; - return options; - } - - @BeforeClass - public void setUp() throws Exception { - TimeZone.setDefault(TimeZone.getTimeZone("GMT")); - mysqlVersion = MysqlOnetimeServer.getVersion(); - MysqlOnetimeServer masterServer = new MysqlOnetimeServer(getOptions()); - MysqlOnetimeServer slaveServer = new MysqlOnetimeServer(getOptions()); - - masterServer.boot(); - slaveServer.boot(); - slaveServer.setupSlave(masterServer.getPort()); - - master = new MySQLConnection("127.0.0.1", masterServer.getPort(), "root", ""); - slave = new MySQLConnection("127.0.0.1", slaveServer.getPort(), "root", ""); - - client = new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password); - EventDeserializer eventDeserializer = new EventDeserializer(); - eventDeserializer.setCompatibilityMode(CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, - CompatibilityMode.DATE_AND_TIME_AS_LONG); - client.setEventDeserializer(eventDeserializer); - client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances - client.setKeepAlive(false); - client.registerEventListener(new TraceEventListener()); - client.registerEventListener(eventListener = new CountDownEventListener()); - client.registerLifecycleListener(new TraceLifecycleListener()); - client.connect(DEFAULT_TIMEOUT); - master.execute(new Callback() { - @Override - public void execute(Statement statement) throws SQLException { - statement.execute("drop database if exists mbcj_test"); - statement.execute("create database mbcj_test"); - statement.execute("use mbcj_test"); - } - }); - eventListener.waitFor(EventType.QUERY, 2, DEFAULT_TIMEOUT); - - if ( mysqlVersion.atLeast(8, 0) ) { - setupMysql8Login(master); - eventListener.waitFor(EventType.QUERY, 2, DEFAULT_TIMEOUT); - } - } - @BeforeMethod public void beforeEachTest() throws Exception { master.execute(new Callback() { @@ -170,7 +113,7 @@ public void execute(Statement statement) throws SQLException { statement.execute("create table bikini_bottom (name varchar(255) primary key)"); } }); - eventListener.waitFor(EventType.QUERY, 2, DEFAULT_TIMEOUT); + eventListener.waitForAtLeast(EventType.QUERY, 2, DEFAULT_TIMEOUT); eventListener.reset(); } @@ -1061,11 +1004,6 @@ public void execute(Statement statement) throws SQLException { } } - private void setupMysql8Login(MySQLConnection server) throws Exception { - server.execute("create user 'mysql8' IDENTIFIED WITH caching_sha2_password BY 'testpass'"); - server.execute("grant replication slave, replication client on *.* to 'mysql8'"); - } - @Test public void testMysql8Auth() throws Exception { if ( !mysqlVersion.atLeast(8, 0) ) @@ -1207,114 +1145,6 @@ public void execute(Statement statement) throws SQLException { } } - /** - * Representation of a MySQL connection. - */ - public static final class MySQLConnection implements Closeable { - - private final String hostname; - private final int port; - private final String username; - private final String password; - private Connection connection; - - public MySQLConnection(String hostname, int port, String username, String password) - throws ClassNotFoundException, SQLException { - this.hostname = hostname; - this.port = port; - this.username = username; - this.password = password; - Class.forName("com.mysql.jdbc.Driver"); - connect(); - } - - private void connect() throws SQLException { - this.connection = DriverManager.getConnection("jdbc:mysql://" + hostname + ":" + port + - "?serverTimezone=UTC", username, password); - execute(new Callback() { - - @Override - public void execute(Statement statement) throws SQLException { - statement.execute("SET time_zone = '+00:00'"); - } - }); - } - - public String hostname() { - return hostname; - } - - public int port() { - return port; - } - - public String username() { - return username; - } - - public String password() { - return password; - } - - public void execute(Callback callback, boolean autocommit) throws SQLException { - connection.setAutoCommit(autocommit); - Statement statement = connection.createStatement(); - try { - callback.execute(statement); - if (!autocommit) { - connection.commit(); - } - } finally { - statement.close(); - } - } - - public void execute(Callback callback) throws SQLException { - execute(callback, false); - } - - public void execute(final String...statements) throws SQLException { - execute(new Callback() { - @Override - public void execute(Statement statement) throws SQLException { - for (String command : statements) { - statement.execute(command); - } - } - }); - } - - public void query(String sql, Callback callback) throws SQLException { - connection.setAutoCommit(false); - Statement statement = connection.createStatement(); - try { - ResultSet rs = statement.executeQuery(sql); - try { - callback.execute(rs); - connection.commit(); - } finally { - rs.close(); - } - } finally { - statement.close(); - } - } - - @Override - public void close() throws IOException { - try { - connection.close(); - } catch (SQLException e) { - throw new IOException(e); - } - } - - public void reconnect() throws IOException, SQLException { - close(); - connect(); - } - } - /** * Callback used in the {@link MySQLConnection#execute(Callback)} method. * diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java index ca5039e..6cf8181 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java @@ -4,7 +4,7 @@ import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import org.testng.SkipException; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.sql.ResultSet; @@ -18,17 +18,15 @@ /** * @author Winger */ -public class MariadbBinaryLogClientIntegrationTest extends BinaryLogClientIntegrationTest { - - MysqlOnetimeServer primaryServer; - protected BinaryLogClientIntegrationTest.MySQLConnection master; - - @Test - public void testMariadbUseGTIDAndAnnotateRowsEvent() throws Exception { +public class MariadbBinaryLogClientIntegrationTest extends AbstractIntegrationTest { + @BeforeMethod + public void checkMariaDB() throws Exception { if ( !mysqlVersion.isMaria ) throw new SkipException("not maria"); + } - + @Test + public void testMariadbUseGTIDAndAnnotateRowsEvent() throws Exception { master.execute(new BinaryLogClientIntegrationTest.Callback() { @Override public void execute(Statement statement) throws SQLException { diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MySQLConnection.java b/src/test/java/com/github/shyiko/mysql/binlog/MySQLConnection.java new file mode 100644 index 0000000..3ba13a8 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/MySQLConnection.java @@ -0,0 +1,117 @@ +package com.github.shyiko.mysql.binlog; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Representation of a MySQL connection. + */ +public final class MySQLConnection implements Closeable { + + public final String hostname; + public final int port; + public final String username; + public final String password; + public Connection connection; + + public MySQLConnection(String hostname, int port, String username, String password) + throws ClassNotFoundException, SQLException { + this.hostname = hostname; + this.port = port; + this.username = username; + this.password = password; + Class.forName("com.mysql.jdbc.Driver"); + connect(); + } + + private void connect() throws SQLException { + this.connection = DriverManager.getConnection("jdbc:mysql://" + hostname + ":" + port + + "?serverTimezone=UTC", username, password); + execute(new BinaryLogClientIntegrationTest.Callback() { + + @Override + public void execute(Statement statement) throws SQLException { + statement.execute("SET time_zone = '+00:00'"); + } + }); + } + + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + + public String username() { + return username; + } + + public String password() { + return password; + } + + public void execute(BinaryLogClientIntegrationTest.Callback callback, boolean autocommit) throws SQLException { + connection.setAutoCommit(autocommit); + Statement statement = connection.createStatement(); + try { + callback.execute(statement); + if ( !autocommit ) { + connection.commit(); + } + } finally { + statement.close(); + } + } + + public void execute(BinaryLogClientIntegrationTest.Callback callback) throws SQLException { + execute(callback, false); + } + + public void execute(final String... statements) throws SQLException { + execute(new BinaryLogClientIntegrationTest.Callback() { + @Override + public void execute(Statement statement) throws SQLException { + for ( String command : statements ) { + statement.execute(command); + } + } + }); + } + + public void query(String sql, BinaryLogClientIntegrationTest.Callback callback) throws SQLException { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + try { + ResultSet rs = statement.executeQuery(sql); + try { + callback.execute(rs); + connection.commit(); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + } + + @Override + public void close() throws IOException { + try { + connection.close(); + } catch ( SQLException e ) { + throw new IOException(e); + } + } + + public void reconnect() throws IOException, SQLException { + close(); + connect(); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index 5c268cb..0ef0cde 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -19,10 +19,10 @@ import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest; import com.github.shyiko.mysql.binlog.CapturingEventListener; import com.github.shyiko.mysql.binlog.CountDownEventListener; +import com.github.shyiko.mysql.binlog.MySQLConnection; import com.github.shyiko.mysql.binlog.MysqlOnetimeServer; import com.github.shyiko.mysql.binlog.TraceEventListener; import com.github.shyiko.mysql.binlog.TraceLifecycleListener; -import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.QueryEventData; @@ -65,7 +65,7 @@ public class JsonBinaryValueIntegrationTest { private final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault(); - private BinaryLogClientIntegrationTest.MySQLConnection master; + private MySQLConnection master; private BinaryLogClient client; private CountDownEventListener eventListener; @@ -78,7 +78,7 @@ public void setUp() throws Exception { MysqlOnetimeServer masterServer = new MysqlOnetimeServer(); masterServer.boot(); - master = new BinaryLogClientIntegrationTest.MySQLConnection("127.0.0.1", masterServer.getPort(), "root", ""); + master = new MySQLConnection("127.0.0.1", masterServer.getPort(), "root", ""); client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances From 2ae2eb801d196f86471a7998a7c7a8566b743d57 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sat, 13 Aug 2022 21:44:13 -0700 Subject: [PATCH 14/17] refactor: bring mariaDB back into the mainline path no need to know what sever you're connecting to. we can figure it out. Also, these tests never worked. fix them. --- .../shyiko/mysql/binlog/BinaryLogClient.java | 120 ++++++++++++++++-- .../mysql/binlog/MariadbBinaryLogClient.java | 113 ----------------- .../shyiko/mysql/binlog/MariadbGtidSet.java | 7 + ...MariadbBinaryLogClientIntegrationTest.java | 17 ++- 4 files changed, 129 insertions(+), 128 deletions(-) delete mode 100644 src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index d3650ba..3591674 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -25,12 +25,15 @@ import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper; import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; @@ -71,6 +74,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -138,6 +142,7 @@ public X509Certificate[] getAcceptedIssuers() { protected GtidSet gtidSet; protected final Object gtidSetAccessLock = new Object(); private boolean gtidSetFallbackToPurged; + private boolean gtidEnabled = false; private boolean useBinlogFilenamePositionInGtidMode; protected String gtid; private boolean tx; @@ -168,6 +173,10 @@ public X509Certificate[] getAcceptedIssuers() { private final Lock connectLock = new ReentrantLock(); private final Lock keepAliveThreadExecutorLock = new ReentrantLock(); + private boolean useSendAnnotateRowsEvent; + + + private Boolean isMariaDB; /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). @@ -321,7 +330,7 @@ public String getGtidSet() { } /** - * @param gtidSet GTID set (can be an empty string). + * @param gtidStr GTID set string (can be an empty string). *

NOTE #1: Any value but null will switch BinaryLogClient into a GTID mode (this will also set binlogFilename * to "" (provided it's null) forcing MySQL to send events starting from the oldest known binlog (keep in mind * that connection will fail if gtid_purged is anything but empty (unless @@ -330,19 +339,27 @@ public String getGtidSet() { * @see #getGtidSet() * @see #setGtidSetFallbackToPurged(boolean) */ - public void setGtidSet(String gtidSet) { - if (gtidSet != null && this.binlogFilename == null) { + public void setGtidSet(String gtidStr) { + if ( gtidStr == null ) + return; + + this.gtidEnabled = true; + + if (this.binlogFilename == null) { this.binlogFilename = ""; } + synchronized (gtidSetAccessLock) { - this.gtidSet = gtidSet != null ? buildGtidSet(gtidSet) : null; + if ( !gtidStr.equals("") ) { + if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) { + this.gtidSet = new MariadbGtidSet(gtidStr); + } else { + this.gtidSet = new GtidSet(gtidStr); + } + } } } - protected GtidSet buildGtidSet(String gtidSet) { - return new GtidSet(gtidSet); - } - /** * @see #setGtidSetFallbackToPurged(boolean) * @return whether gtid_purged is used as a fallback @@ -505,6 +522,21 @@ public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } + + /** + * @return true/false depending on whether we've connected to MariaDB. NULL if not connected. + */ + public Boolean getMariaDB() { + return isMariaDB; + } + + public boolean isUseSendAnnotateRowsEvent() { + return useSendAnnotateRowsEvent; + } + + public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) { + this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent; + } /** * Connect to the replication stream. Note that this method blocks until disconnected. * @throws AuthenticationException if authentication fails @@ -536,6 +568,7 @@ public void connect() throws IOException, IllegalStateException { } GreetingPacket greetingPacket = receiveGreeting(); + detectMariaDB(greetingPacket); tryUpgradeToSSL(greetingPacket); new Authenticator(greetingPacket, channel, schema, username, password).authenticate(); @@ -591,7 +624,7 @@ public void connect() throws IOException, IllegalStateException { } ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); synchronized (gtidSetAccessLock) { - if (gtidSet != null) { + if (this.gtidEnabled) { ensureGtidEventDataDeserializer(); } } @@ -606,6 +639,13 @@ public void connect() throws IOException, IllegalStateException { } } + private void detectMariaDB(GreetingPacket packet) { + String serverVersion = packet.getServerVersion(); + if ( serverVersion == null ) + return; + + this.isMariaDB = serverVersion.toLowerCase().contains("mariadb"); + } /** * Apply additional options for connection before requesting binlog stream. */ @@ -725,9 +765,16 @@ private void setMasterServerId() throws IOException { protected void requestBinaryLogStream() throws IOException { long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178 + if ( this.isMariaDB ) + requestBinaryLogStreamMaria(serverId); + else + requestBinaryLogStreamMysql(serverId); + } + + private void requestBinaryLogStreamMysql(long serverId) throws IOException { Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { - if (gtidSet != null) { + if (this.gtidEnabled) { dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, useBinlogFilenamePositionInGtidMode ? binlogFilename : "", useBinlogFilenamePositionInGtidMode ? binlogPosition : 4, @@ -737,6 +784,27 @@ protected void requestBinaryLogStream() throws IOException { } } channel.write(dumpBinaryLogCommand); + + } + + protected void requestBinaryLogStreamMaria(long serverId) throws IOException { + Command dumpBinaryLogCommand; + synchronized (gtidSetAccessLock) { + if (this.gtidEnabled) { + channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); + checkError(channel.read()); + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent()); + } else { + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition); + } + } + channel.write(dumpBinaryLogCommand); } protected void ensureEventDataDeserializer(EventType eventType, @@ -759,6 +827,9 @@ protected void ensureEventDataDeserializer(EventType eventType, protected void ensureGtidEventDataDeserializer() { ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class); + ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); } private void spawnKeepAliveThread() { @@ -902,11 +973,27 @@ private String fetchGtidPurged() throws IOException { } protected void setupGtidSet() throws IOException{ + if (!this.gtidEnabled) + return; + synchronized (gtidSetAccessLock) { - if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { - gtidSet = new GtidSet(fetchGtidPurged()); + if ( this.isMariaDB ) { + if ( gtidSet == null ) { + gtidSet = new MariadbGtidSet(""); + } else if ( !(gtidSet instanceof MariadbGtidSet) ) { + throw new RuntimeException("Connected to MariaDB but given a mysql GTID set!"); + } + } else { + if ( gtidSet == null && gtidSetFallbackToPurged ) { + gtidSet = new GtidSet(fetchGtidPurged()); + } else if ( gtidSet == null ){ + gtidSet = new GtidSet(""); + } else if ( gtidSet instanceof MariadbGtidSet ) { + throw new RuntimeException("Connected to Mysql but given a MariaDB GTID set!"); + } } } + } private void fetchBinlogFilenameAndPosition() throws IOException { @@ -1061,6 +1148,15 @@ protected void updateGtidSet(Event event) { } commitGtid(sql); break; + case MARIADB_GTID: + MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); + mariadbGtidEventData.setServerId(eventHeader.getServerId()); + gtid = mariadbGtidEventData.toString(); + break; + case MARIADB_GTID_LIST: + MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); + gtid = mariadbGtidListEventData.getMariaGTIDSet().toString(); + break; default: } } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java deleted file mode 100644 index aa6cc5a..0000000 --- a/src/main/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClient.java +++ /dev/null @@ -1,113 +0,0 @@ -package com.github.shyiko.mysql.binlog; - -import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventHeader; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; -import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData; -import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer; -import com.github.shyiko.mysql.binlog.network.protocol.command.Command; -import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; - -import java.io.IOException; - -/** - * Mariadb replication stream client. - * - * @author Winger - */ -public class MariadbBinaryLogClient extends BinaryLogClient { - - private boolean useSendAnnotateRowsEvent; - - public MariadbBinaryLogClient(String username, String password) { - super(username, password); - } - - public MariadbBinaryLogClient(String schema, String username, String password) { - super(schema, username, password); - } - - public MariadbBinaryLogClient(String hostname, int port, String username, String password) { - super(hostname, port, username, password); - } - - public MariadbBinaryLogClient(String hostname, int port, String schema, String username, String password) { - super(hostname, port, schema, username, password); - } - - @Override - protected GtidSet buildGtidSet(String gtidSet) { - return new MariadbGtidSet(gtidSet); - } - - @Override - protected void setupGtidSet() throws IOException { - //Mariadb ignore - } - - @Override - protected void requestBinaryLogStream() throws IOException { - long serverId = isBlocking() ? this.getServerId() : 0; // http://bugs.mysql.com/bug.php?id=71178 - Command dumpBinaryLogCommand; - synchronized (gtidSetAccessLock) { - if (gtidSet != null) { - channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); - checkError(channel.read()); - channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); - checkError(channel.read()); - channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); - checkError(channel.read()); - channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); - checkError(channel.read()); - dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent()); - - } else { - dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, getBinlogFilename(), getBinlogPosition()); - } - } - channel.write(dumpBinaryLogCommand); - } - - @Override - protected void ensureGtidEventDataDeserializer() { - ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class); - } - - @Override - protected void updateGtidSet(Event event) { - synchronized (gtidSetAccessLock) { - if (gtidSet == null) { - return; - } - } - EventHeader eventHeader = event.getHeader(); - switch(eventHeader.getEventType()) { - case MARIADB_GTID: - MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); - mariadbGtidEventData.setServerId(eventHeader.getServerId()); - gtid = mariadbGtidEventData.toString(); - break; - case MARIADB_GTID_LIST: - MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDeserializer.EventDataWrapper.internal(event.getData()); - gtid = mariadbGtidListEventData.getMariaGTIDSet().toString(); - break; - default: - super.updateGtidSet(event); - } - } - - public boolean isUseSendAnnotateRowsEvent() { - return useSendAnnotateRowsEvent; - } - - public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) { - this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent; - } -} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java index c86ee54..49fede5 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java @@ -1,6 +1,7 @@ package com.github.shyiko.mysql.binlog; import java.util.*; +import java.util.regex.Pattern; /** * Mariadb Global Transaction ID @@ -27,6 +28,12 @@ public MariadbGtidSet(String gtidSet) { } } + static Pattern MARIA_GTID_PATTERN = Pattern.compile("^\\d+-\\d+-\\d+"); + + public static boolean isMariaGtidSet(String gtidSet) { + return MARIA_GTID_PATTERN.matcher(gtidSet).find(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java index 6cf8181..e35a1d6 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java @@ -19,6 +19,18 @@ * @author Winger */ public class MariadbBinaryLogClientIntegrationTest extends AbstractIntegrationTest { + @Override + protected MysqlOnetimeServerOptions getOptions() { + MysqlOnetimeServerOptions options = super.getOptions(); + if ( options.extraParams == null ) + options.extraParams = ""; + else + options.extraParams += " "; + + options.extraParams += "--binlog-annotate-row-events"; + return options; + } + @BeforeMethod public void checkMariaDB() throws Exception { if ( !mysqlVersion.isMaria ) @@ -49,7 +61,7 @@ public void execute(ResultSet rs) throws SQLException { }); CountDownEventListener eventListener; - MariadbBinaryLogClient client = new MariadbBinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); + BinaryLogClient client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); client.setGtidSet(currentGtidPos[0]); client.setUseSendAnnotateRowsEvent(true); @@ -71,13 +83,12 @@ public void execute(Statement statement) throws SQLException { try { eventListener.reset(); - client.connect(); + client.connect(5000); eventListener.waitFor(MariadbGtidEventData.class, 1, TimeUnit.SECONDS.toMillis(4)); String gtidSet = client.getGtidSet(); assertNotNull(gtidSet); - eventListener.reset(); eventListener.waitFor(AnnotateRowsEventData.class, 1, TimeUnit.SECONDS.toMillis(4)); gtidSet = client.getGtidSet(); assertNotEquals(currentGtidPos[0], gtidSet); From 9e1926772f35e7fbe0b8efeddbc7dae27c18cf28 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sun, 14 Aug 2022 04:17:50 -0700 Subject: [PATCH 15/17] update README --- README.md | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 53aa87d..761d17d 100644 --- a/README.md +++ b/README.md @@ -84,13 +84,15 @@ kick off from a specific filename or position, use `client.setBinlogFilename(fil > `client.connect()` is blocking (meaning that client will listen for events in the current thread). `client.connect(timeout)`, on the other hand, spawns a separate thread. -> Note difference between MariaDB and MySQL -``` -BinaryLogClient client = new MariadbBinaryLogClient("hostname", 3306, "username", "password"); -// ... as same as BinaryLogClient -``` -> `client.setGtidSet(gtid)` meaning that client kick off from a specific gtid, MariaDB also support. -> `client.setUseSendAnnotateRowsEvent(true)` meaning that client will send annotate rows events(describe the query which caused the row event), and 'false' by default + +#### MariaDB + +The stock BinaryLogClient works out of the box with MariaDB but there's two differences; + +One, MariaDB's GTIDs are different. They're still strings but parse differently. +Two, Maria can send the ANNOTATE_ROWS events which allows you to recover the SQL used to generate rows in row-based replication. + +See https://mariadb.com/kb/en/annotate_rows_log_event/ and `client.setUseSendAnnotateRowsEvent(true)` #### Controlling event deserialization From 420d192ad96cd9528bd282c1a337e6140d7452ef Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sun, 14 Aug 2022 13:55:33 -0700 Subject: [PATCH 16/17] checkpoint --- pom.xml | 2 +- .../com/github/shyiko/mysql/binlog/BinaryLogClient.java | 2 +- src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java | 7 +++++++ .../binlog/MariadbBinaryLogClientIntegrationTest.java | 3 +++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 5c5806e..6863549 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk mysql-binlog-connector-java - 0.26.1 + 0.27.0-PRE1 mysql-binlog-connector-java MySQL Binary Log connector diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 3591674..bd21d88 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -784,7 +784,6 @@ private void requestBinaryLogStreamMysql(long serverId) throws IOException { } } channel.write(dumpBinaryLogCommand); - } protected void requestBinaryLogStreamMaria(long serverId) throws IOException { @@ -793,6 +792,7 @@ protected void requestBinaryLogStreamMaria(long serverId) throws IOException { if (this.gtidEnabled) { channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); checkError(channel.read()); + logger.info(gtidSet.toString()); channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'")); checkError(channel.read()); channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); diff --git a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java index 55d619e..d1d70f8 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java @@ -40,6 +40,13 @@ public class GtidSet { private final Map map = new LinkedHashMap(); + public static GtidSet parse(String gtidStr) { + if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) { + return new MariadbGtidSet(gtidStr); + } else { + return new GtidSet(gtidStr); + } + } /** * @param gtidSet gtid set comprised of closed intervals (like MySQL's executed_gtid_set). */ diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java index e35a1d6..c021ec4 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MariadbBinaryLogClientIntegrationTest.java @@ -22,6 +22,9 @@ public class MariadbBinaryLogClientIntegrationTest extends AbstractIntegrationTe @Override protected MysqlOnetimeServerOptions getOptions() { MysqlOnetimeServerOptions options = super.getOptions(); + if ( !mysqlVersion.isMaria ) + return options; + if ( options.extraParams == null ) options.extraParams = ""; else From 669087892cd217fbbfc447b02af4b4ebfff38aa8 Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sun, 21 Aug 2022 21:25:44 -0700 Subject: [PATCH 17/17] pre-3 This resolves the current/historical split and parses the flags --- pom.xml | 2 +- .../github/shyiko/mysql/binlog/GtidSet.java | 4 + .../shyiko/mysql/binlog/MariadbGtidSet.java | 87 +++++++++++++++++-- .../binlog/event/MariadbGtidEventData.java | 16 ++++ .../MariadbGtidEventDataDeserializer.java | 1 + 5 files changed, 101 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 6863549..3e8a149 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk mysql-binlog-connector-java - 0.27.0-PRE1 + 0.27.0-PRE3 mysql-binlog-connector-java MySQL Binary Log connector diff --git a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java index d1d70f8..fb68b54 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java @@ -167,6 +167,10 @@ public String toString() { return join(gtids, ","); } + public String toSeenString() { + return this.toString(); + } + private static String join(Collection o, String delimiter) { if (o.isEmpty()) { return ""; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java index 49fede5..4a0bf45 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/MariadbGtidSet.java @@ -10,20 +10,36 @@ * @see GTID for the original doc */ public class MariadbGtidSet extends GtidSet { + /* + we keep two maps; one of them contains the current GTID position for + each domain. The other contains all the "seen" GTID positions for each + domain and can be used to compare against another gtid postion. + */ + protected Map positionMap = new HashMap<>(); - private Map map = new HashMap<>(); + protected Map> seenMap = new LinkedHashMap<>(); public MariadbGtidSet() { super(null); // } + /** + * Initialize a new MariaDB gtid set from a string, like: + * 0-1-24,0-555555-9709 + * DOMAIN_ID-SERVER_ID-SEQUENCE[,DOMAIN_ID-SERVER_ID-SEQUENCE] + * + * note that for duplicate domain ids it's "last one wins" for the current position + * @param gtidSet a string representing the gtid set. + */ public MariadbGtidSet(String gtidSet) { super(null); if (gtidSet != null && gtidSet.length() > 0) { String[] gtids = gtidSet.replaceAll("\n", "").split(","); for (String gtid : gtids) { MariaGtid mariaGtid = MariaGtid.parse(gtid); - map.put(mariaGtid.getDomainId(), mariaGtid); + + positionMap.put(mariaGtid.getDomainId(), mariaGtid); + addToSeenSet(mariaGtid); } } } @@ -34,10 +50,20 @@ public static boolean isMariaGtidSet(String gtidSet) { return MARIA_GTID_PATTERN.matcher(gtidSet).find(); } + private void addToSeenSet(MariaGtid gtid) { + if ( !this.seenMap.containsKey(gtid.domainId) ) { + this.seenMap.put(gtid.domainId, new LinkedHashMap<>()); + } + + LinkedHashMap domainMap = this.seenMap.get(gtid.domainId); + domainMap.put(gtid.serverId, gtid); + } + + @Override public String toString() { StringBuilder sb = new StringBuilder(); - for (MariaGtid gtid : map.values()) { + for (MariaGtid gtid : positionMap.values()) { if (sb.length() > 0) { sb.append(","); } @@ -46,6 +72,21 @@ public String toString() { return sb.toString(); } + @Override + public String toSeenString() { + StringBuilder sb = new StringBuilder(); + for (Long domainID : seenMap.keySet()) { + for( MariaGtid gtid: seenMap.get(domainID).values() ) { + if (sb.length() > 0) { + sb.append(","); + } + + sb.append(gtid.toString()); + } + } + return sb.toString(); + } + @Override public Collection getUUIDSets() { throw new UnsupportedOperationException("Mariadb gtid not support this method"); @@ -64,22 +105,52 @@ public UUIDSet putUUIDSet(UUIDSet uuidSet) { @Override public boolean add(String gtid) { MariaGtid mariaGtid = MariaGtid.parse(gtid); - map.put(mariaGtid.getDomainId(), mariaGtid); + add(mariaGtid); return true; } public void add(MariaGtid gtid) { - map.put(gtid.getDomainId(), gtid); + positionMap.put(gtid.getDomainId(), gtid); + addToSeenSet(gtid); } + /* + we're trying to ask "is this position behind the other position?" + - if we have a domain that the other doesn't, we're probably "ahead". + - the inverse is true too + */ @Override public boolean isContainedWithin(GtidSet other) { - throw new UnsupportedOperationException("Mariadb gtid not support this method"); + if (!(other instanceof MariadbGtidSet)) + return false; + + MariadbGtidSet o = (MariadbGtidSet) other; + + for ( Long domainID : this.seenMap.keySet() ) { + if ( !o.seenMap.containsKey(domainID) ) { + return false; + } + + LinkedHashMap thisDomainMap = this.seenMap.get(domainID); + LinkedHashMap otherDomainMap = o.seenMap.get(domainID); + + for ( Long serverID : thisDomainMap.keySet() ) { + if ( !otherDomainMap.containsKey(serverID)) { + return false; + } + + MariaGtid thisGtid = thisDomainMap.get(serverID); + MariaGtid otherGtid = otherDomainMap.get(serverID); + if ( thisGtid.sequence >= otherGtid.sequence ) + return false; + } + } + return true; } @Override public int hashCode() { - return map.keySet().hashCode(); + return this.seenMap.keySet().hashCode(); } @Override @@ -89,7 +160,7 @@ public boolean equals(Object obj) { } if (obj instanceof MariadbGtidSet) { MariadbGtidSet that = (MariadbGtidSet) obj; - return this.map.equals(that.map); + return this.positionMap.equals(that.positionMap); } return false; } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java index 54838d0..43f0c55 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MariadbGtidEventData.java @@ -7,11 +7,19 @@ * @see GTID_EVENT for the original doc */ public class MariadbGtidEventData implements EventData { + public static int FL_STANDALONE = 1; + public static int FL_GROUP_COMMIT_ID = 2; + public static int FL_TRANSACTIONAL = 4; + public static int FL_ALLOW_PARALLEL = 8; + public static int FL_WAITED = 16; + public static int FL_DDL = 32; private long sequence; private long domainId; private long serverId; + private int flags; + public long getSequence() { return sequence; } @@ -36,6 +44,14 @@ public void setServerId(long serverId) { this.serverId = serverId; } + public int getFlags() { + return flags; + } + + public void setFlags(int flags) { + this.flags = flags; + } + @Override public String toString() { return domainId + "-" + serverId + "-" + sequence; diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java index 4eb45f1..4f619b5 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/MariadbGtidEventDataDeserializer.java @@ -27,6 +27,7 @@ public MariadbGtidEventData deserialize(ByteArrayInputStream inputStream) throws MariadbGtidEventData event = new MariadbGtidEventData(); event.setSequence(inputStream.readLong(8)); event.setDomainId(inputStream.readInteger(4)); + event.setFlags(inputStream.readInteger(1)); // Flags ignore return event; }