Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 109 additions & 7 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.maria.BinlogCheckpointDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.maria.GtidDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.maria.GtidListDeserializer;
import com.github.shyiko.mysql.binlog.event.maria.Gtid;
import com.github.shyiko.mysql.binlog.event.maria.MariaGtidEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
Expand All @@ -46,13 +51,7 @@
import com.github.shyiko.mysql.binlog.network.protocol.Packet;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.*;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
Expand Down Expand Up @@ -158,6 +157,11 @@ public X509Certificate[] getAcceptedIssuers() {
private Event previousEvent;
private Event previousGtidEvent;

// MariaDB
private Gtid mariaGtid;
private String gtid;
private boolean mariaDB;

/**
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
* @see BinaryLogClient#BinaryLogClient(String, int, String, String, String)
Expand Down Expand Up @@ -834,6 +838,9 @@ private void updatePosition(Event event, boolean updatedGtid) {
if (nextBinlogPosition > 0) {
binlogPosition = nextBinlogPosition;
}
} else
if (eventType == EventType.MARIA_GTID_EVENT){
updateMariaGTID(event);
}
previousEvent = event;
}
Expand Down Expand Up @@ -1020,6 +1027,101 @@ private void disconnectChannel() throws IOException {
}
}

/**
* @return Note that this value changes with each received GTID event (provided client is in GTID mode).
*/
public String getGtid() {
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
return gtidSet.toString();
}
return gtid;
}
}

/**
* @param gtid For MySQL this is GTID set format, for MariaDB the format is domainId-serverId-sequenceNumber(can be an empty string).
* <p>NOTE #1: Any value but null will switch BinaryLogClient into a GTID mode (in which case GTID set will be
* updated with each incoming GTID event) as well as set binlogFilename to "" (empty string) (meaning
* BinaryLogClient will request events "outside of the set" <u>starting from the oldest known binlog</u>).
* <p>NOTE #2: {@link #setBinlogFilename(String)} and {@link #setBinlogPosition(long)} can be used to specify the
* exact position from which MySQL server should start streaming events (taking into account GTID set).
* @see #getGtid()
*/
public BinaryLogClient setGtid(String gtid) {
if (gtid != null && this.binlogFilename == null) {
this.binlogFilename = "";
}
synchronized (gtidSetAccessLock) {
this.gtid = gtid;
}
return this;
}

// region MariaDB
public boolean isMariaDB() {
return mariaDB;
}

private void updateMariaGTID(Event event) {
EventHeader eventHeader = event.getHeader();
if (eventHeader.getEventType() == EventType.MARIA_GTID_EVENT) {
synchronized (gtidSetAccessLock) {
if (mariaGtid != null) {
MariaGtidEventData eventData = event.getData();
mariaGtid.setDomainId(eventData.getDomainId());
mariaGtid.setSequenceNumber(eventData.getSequenceNumber());
gtid = mariaGtid.toString();
}
}
}
}

private DumpBinaryLogCommand requestMariaBinaryLogStream() throws IOException {
if ("gtid_current_pos".equals(gtid) || "".equals(gtid)||gtid==null) {
channel.write(new QueryCommand("select @@gtid_current_pos"));
ResultSetRowPacket[] rs = readResultSet();
gtid = rs[0].getValue(0);
logger.fine("Use server current gtid position "+gtid);
}

// update server id
channel.write(new QueryCommand("SHOW VARIABLES LIKE 'SERVER_ID'"));
ResultSetRowPacket[] rs = readResultSet();
long serverId = Long.parseLong(rs[0].getValue(1));
// If we got multi gtid, chose the gtid for current server
String[] split = gtid.split(",");
for (String s : split) {
Gtid g = new Gtid(s);
if (g.getServerId() == serverId) {
mariaGtid = g;
gtid = mariaGtid.toString();
logger.fine("Chose gtid "+gtid+" for this server");
}
}

// set up gtid
channel.write(new QueryCommand("SET @mariadb_slave_capability = 4"));// support GTID
channel.read();// ignore
channel.write(new QueryCommand("SET @slave_connect_state = '" + gtid + "'"));
channel.read();// ignore
channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
channel.read();// ignore
channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0"));
channel.read();// ignore
// Register First
Command command = new RegisterSlaveCommand(serverId, "", "", "", 0, 0, 0);
channel.write(command);
channel.read();// ignore

// MariaDB Event
eventDeserializer.setEventDataDeserializer(EventType.MARIA_GTID_EVENT, new GtidDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.MARIA_GTID_LIST_EVENT, new GtidListDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.MARIA_BINLOG_CHECKPOINT_EVENT, new BinlogCheckpointDeserializer());

return new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
// endregion
/**
* {@link BinaryLogClient}'s event listener.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,49 @@ public enum EventType {
*/
GTID,
ANONYMOUS_GTID,
PREVIOUS_GTIDS;
PREVIOUS_GTIDS,

/* MARIA_EVENT_BEGIN */

MARIA_ANNOTATE_ROWS_EVENT(160),
/**
* Binlog checkpoint event. Used for XA crash recovery on the master, not used
* in replication.
* A binlog checkpoint event specifies a binlog file such that XA crash
* recovery can start from that file - and it is guaranteed to find all XIDs
* that are prepared in storage engines but not yet committed.
*/
MARIA_BINLOG_CHECKPOINT_EVENT(161),
/**
* Gtid event. For global transaction ID, used to start a new event group,
* instead of the old BEGIN query event, and also to mark stand-alone
* events.
*/
MARIA_GTID_EVENT(162),
/**
* Gtid list event. Logged at the start of every binlog, to record the
* current replication state. This consists of the last GTID seen for
* each replication domain.
*/
MARIA_GTID_LIST_EVENT(163),
;

final int v;

EventType()
{
v = ordinal();
}

EventType(int n)
{
v = n;
}

public int get()
{
return v;
}

public static boolean isRowMutation(EventType eventType) {
return EventType.isWrite(eventType) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
*/
public class EventDeserializer {

private final EventHeaderDeserializer eventHeaderDeserializer;
private final EventDataDeserializer defaultEventDataDeserializer;
private final Map<EventType, EventDataDeserializer> eventDataDeserializers;
protected final EventHeaderDeserializer eventHeaderDeserializer;
protected final EventDataDeserializer defaultEventDataDeserializer;
protected final Map<EventType, EventDataDeserializer> eventDataDeserializers;

private int checksumLength;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,25 @@
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>
*/
public class EventHeaderV4Deserializer implements EventHeaderDeserializer<EventHeaderV4> {

private static final EventType[] EVENT_TYPES = EventType.values();

private static final Map<Integer,EventType> EVENT_TYPES;

static {
// Mutable map but will never update, so it's safe.
EVENT_TYPES = new HashMap<Integer, EventType>();
for (EventType type : EventType.values())
{
EVENT_TYPES.put(type.get(),type);
}
}
@Override
public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOException {
EventHeaderV4 header = new EventHeaderV4();
Expand All @@ -41,10 +52,11 @@ public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOExce
}

private static EventType getEventType(int ordinal) throws IOException {
if (ordinal >= EVENT_TYPES.length) {
EventType type = EVENT_TYPES.get(ordinal);
if (type == null) {
throw new IOException("Unknown event type " + ordinal);
}
return EVENT_TYPES[ordinal];
return type;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.shyiko.mysql.binlog.event.deserialization.maria;

import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.maria.BinlogCheckpointEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;

/**
* @author <a href="http://github.com/wenerme">wener</a>
*/
public class BinlogCheckpointDeserializer implements EventDataDeserializer<BinlogCheckpointEventData> {
@Override
public BinlogCheckpointEventData deserialize(ByteArrayInputStream is) throws IOException {
BinlogCheckpointEventData e = new BinlogCheckpointEventData();
e.setBinlogFilename(is.readString(is.readInteger(4)));
return e;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.shyiko.mysql.binlog.event.deserialization.maria;

import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.maria.MariaGtidEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;

/**
* @author <a href="http://github.com/wenerme">wener</a>
*/
public class GtidDeserializer implements EventDataDeserializer<MariaGtidEventData> {
@Override
public MariaGtidEventData deserialize(ByteArrayInputStream is) throws IOException {
MariaGtidEventData e = new MariaGtidEventData();
e.setSequenceNumber(is.readLong(8));
e.setDomainId(is.readLong(4));
e.setFlags(is.readInteger(1));

// reserved
long n = 6 + ((e.getFlags() & MariaGtidEventData.FL_GROUP_COMMIT_ID) > 0 ? 2 : 0);
long skip = is.skip(n);
assert n == skip;
return e;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.github.shyiko.mysql.binlog.event.deserialization.maria;

import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.maria.Gtid;
import com.github.shyiko.mysql.binlog.event.maria.GtidListEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;

/**
* @author <a href="http://github.com/wenerme">wener</a>
*/
public class GtidListDeserializer implements EventDataDeserializer<GtidListEventData> {
@Override
public GtidListEventData deserialize(ByteArrayInputStream is) throws IOException {
GtidListEventData e = new GtidListEventData();
e.getList().clear();
long count = is.readLong(4);
e.setFlag((int) (count >> 28));// higher 4 bit
count = count & 0x1fffffff;// lower 28 bit

for (long i = 0; i < count; i++) {
e.getList().add(new Gtid(is.readLong(4), is.readLong(4), is.readLong(8)));
}
return e;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.shyiko.mysql.binlog.event.maria;

import com.github.shyiko.mysql.binlog.event.EventData;

/**
* @author <a href="http://github.com/wenerme">wener</a>
*/
public class BinlogCheckpointEventData implements EventData {
private String binlogFilename;

public String getBinlogFilename() {
return binlogFilename;
}

public BinlogCheckpointEventData setBinlogFilename(String binlogFilename) {
this.binlogFilename = binlogFilename;
return this;
}

@Override
public String toString() {
return "BinlogCheckpointEventData{" +
"binlogFilename='" + binlogFilename + '\'' +
'}';
}
}
Loading