Skip to content
Merged

1 #1

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5256c26
Updated release notes (0.4.2)
shyiko Sep 20, 2016
75f0a45
Updated refs to latest 0.4.2 release
shyiko Sep 20, 2016
984f116
ByteArrayInputStream peek may be wrong after EOF
Sep 28, 2016
a8dc3f7
Merge pull request #117 from bseibel/master
shyiko Sep 28, 2016
911304a
Updated Travis CI configuration
shyiko Sep 28, 2016
5744bdd
Added MySQL@5.7.15 Vagrant box
shyiko Oct 4, 2016
bbba2c7
Added support for MySQL JSON type
rhauch Oct 3, 2016
a2afe53
Merge pull request #119 from rhauch/issue-115
shyiko Oct 6, 2016
09471b1
Moved **.mysql.binlog.json to **.mysql.binlog.event.deserialization.json
shyiko Oct 6, 2016
8f9132e
Fixed DATE deserialization test on MySQL 5.7
shyiko Oct 6, 2016
c8e81c8
Fixed checkstyle violation
shyiko Oct 6, 2016
3527e99
Added 0.5.0 release notes
shyiko Oct 6, 2016
fb379ad
Updated refs to latest 0.5.0 release
shyiko Oct 6, 2016
78513a1
Changed the client so that GTIDs are updated before calling the event…
rhauch Oct 14, 2016
bfacb94
Merge pull request #125 from rhauch/issue-122
shyiko Oct 14, 2016
3f3f595
Added 0.5.1 release notes
shyiko Oct 18, 2016
09092f1
Updated refs to latest 0.5.1 release
shyiko Oct 18, 2016
87e68b5
Updated "real-world applications" section of readme.md
shyiko Oct 23, 2016
6bb1ce4
Added link to streamsets/datacollector
shyiko Nov 3, 2016
3080b8d
Added .jabbarc
shyiko Nov 11, 2016
89725d2
Added 0.5.2 release notes
shyiko Nov 20, 2016
ad0f727
Updated refs to latest (0.5.2) release
shyiko Nov 20, 2016
d63f581
Added 0.6.0 release notes
shyiko Nov 28, 2016
d625c53
Updated refs to the latest 0.6.0 release (readme.md)
shyiko Nov 28, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
.idea
.iml
.DS_Store
target
target
.classpath
.project
.settings
.vagrant
2 changes: 2 additions & 0 deletions .jabbarc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# https://github.com/shyiko/jabba
jdk: 1.6
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ language: java
script: 'if [[ $TRAVIS_REPO_SLUG == "shyiko/mysql-binlog-connector-java" && $TRAVIS_BRANCH == "master" && $TRAVIS_PULL_REQUEST == "false" ]]; then mvn -P with-sources-and-javadocs -Ddeploy=snapshot -DskipTests=true -s .travis.settings.xml; fi'
env:
global:
- secure: QI6w4d/W9Sjckgcdwc/K5Rb+UFa6PxDbhiilv70fEZCXg0P4GMBFGD/zbpsfTlLbA9rgKBotrSm7EXRtQM94dluW0kS9jQiJfw33l9WbfMAJL4797xq+7ekvIvPBf7xVfwGsAcCcB5DcPBTKroX2s8tJEY3CQTienkW4pT7/zh0=
- secure: b/7SnPmS2BUrzc167F5sbI8ILNBqo3ODz9ZtuOOcXQ/9uJVgaLrB3KmKAHgIsjOiZ/PPp7qGNL4jW9CD7b7LAldMyIcszdKptc/fr2yHZkM17IuQJtL5tjBNlLkqeApW37ddPp0mp5IhhqzZMZ8Zu8/2vlyNJrVrYgnIXuoLLRE=
- secure: WLU2kkL9WSeS1LGUOr5UZyv8G1TnUuXOviLuAAcvv+JW+CFcoQwnDlCC7SJJOqGixKpg8+04zr5CHaVU2QmG4E/+XohuPqH23rdfBi9Ra4BubXjQvV3siSQXEAIjZWubyJ7/4EYzBK821FnlnthnabFsow9enBv6MT5NmscOy+0=
- secure: ivE8LTu0CrgvNH6Yjise5lLaZx5kpG9M76CuRK2APUEbAC0QOPWWudsx2yuDmn2X6lIXFYIU5WUadDE6n9LYAfbHdV0wRUD1Qimgt7oDto5vnWZIZrbWwyqSqK7RCxw503ofbHXVPRyabLRYACZV+7T+K2aDJw8V/cIITIAC7gI=
29 changes: 28 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...HEAD)
## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.6.0...HEAD)

### Added
- A way to control Socket i/s buffering (using BinaryLogClient::setSocketFactory()).
Expand All @@ -16,6 +16,33 @@ This is **BACKWARD-INCOMPATIBLE** change.
- BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)).
This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed).

## [0.6.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.2...0.6.0) - 2016-11-27

### Added
- EventDeserializer compatibility modes to mimic upcoming 1.0.0 event deserialization behavior ([#131](https://github.com/shyiko/mysql-binlog-connector-java/pull/131)).

## [0.5.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.1...0.5.2) - 2016-11-19

### Fixed
- (JSON) deserialization of null/true/false/(u)int(16|32)/variable-length data types ([#129](https://github.com/shyiko/mysql-binlog-connector-java/issues/129)).

## [0.5.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.0...0.5.1) - 2016-10-18

### Fixed
- ROWS_QUERY event deserialization ([#124](https://github.com/shyiko/mysql-binlog-connector-java/issues/124)).
- JSON length determination.
- GTID sync (GtidSet is now updated before BinaryLogClient.EventListener|s are notified).

## [0.5.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.2...0.5.0) - 2016-10-06

### Added
- JSON support ([#119](https://github.com/shyiko/mysql-binlog-connector-java/pull/119)) (thanks to [@rhauch](https://github.com/rhauch)).

## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-20

### Fixed
- A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)).

## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31

### Fixed
Expand Down
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<vagrant.bin>vagrant</vagrant.bin>
<vagrant.integration.box>${basedir}/supplement/vagrant/mysql-5.6.12-sandbox-prepackaged</vagrant.integration.box>
<vagrant.integration.box>${basedir}/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged</vagrant.integration.box>
</properties>

<dependencies>
Expand Down Expand Up @@ -207,6 +207,8 @@
-Dvagrant.integration.box=supplement/vagrant/mysql-5.5.27-sandbox-prepackaged
mvn -P coverage verify \
-Dvagrant.integration.box=supplement/vagrant/mysql-5.6.12-sandbox-prepackaged
mvn -P coverage verify \
-Dvagrant.integration.box=supplement/vagrant/mysql-5.7.15-sandbox-prepackaged

# submit coverage report to coveralls
mvn -P coverage coveralls:jacoco -DrepoToken=&lt;coveralls.io&gt;
Expand Down
20 changes: 12 additions & 8 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.4.1-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22)
# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.6.0-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22)

MySQL Binary Log connector.

Expand Down Expand Up @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.4.1</version>
<version>0.6.0</version>
</dependency>
```

Expand Down Expand Up @@ -115,7 +115,7 @@ mBeanServer.registerMBean(stats, statsObjectName);

#### Using SSL

> Introduced in 0.4.1.
> Introduced in 0.4.0.

TLSv1.1 & TLSv1.2 require [JDK 7](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6916074)+.
Prior to MySQL 5.7.10, MySQL supported only TLSv1
Expand Down Expand Up @@ -180,11 +180,15 @@ For the insight into the internals of MySQL look [here](https://dev.mysql.com/do

## Real-world applications

Some of the OSS built on top of mysql-binlog-conector-java:
[debezium](https://github.com/debezium/debezium) (distributed platform for change data capture),
[mardambey/mypipe](https://github.com/mardambey/mypipe) (MySQL to Apache Kafka replicator),
[ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) (MySQL to MongoDB replicator),
[shyiko/rook](https://github.com/shyiko/rook) (generic Change Data Capture (CDC) toolkit).
Some of the OSS using / built on top of mysql-binlog-conector-java:
* [debezium](https://github.com/debezium/debezium) A low latency data streaming platform for change data capture (CDC).
* [mavenlink/changestream](https://github.com/mavenlink/changestream) - A stream of changes for MySQL built on Akka.
* [mardambey/mypipe](https://github.com/mardambey/mypipe) MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka.
* [ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) MySQL to MongoDB data replicator.
* [sharetribe/dumpr](https://github.com/sharetribe/dumpr) A Clojure library for live replicating data from a MySQL database.
* [shyiko/rook](https://github.com/shyiko/rook) Generic Change Data Capture (CDC) toolkit.
* [streamsets/datacollector](https://github.com/streamsets/datacollector) Continuous big data ingestion infrastructure.
* [twingly/ecco](https://github.com/twingly/ecco) MySQL replication binlog parser in JRuby.

It's also used [on a large scale](https://twitter.com/atwinmutt/status/626816601078300672) in MailChimp. You can read about it [here](http://devs.mailchimp.com/blog/powering-mailchimp-pro-reporting/).

Expand Down
65 changes: 40 additions & 25 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,11 @@ private void listenForEventPackets() throws IOException {
continue;
}
if (isConnected()) {
// Update the GTID before the event listeners, and the binlog filename/position after the
// listeners (since the binlog filename/position point to the *next* event) ...
boolean updatedGtid = updateGtid(event);
notifyEventListeners(event);
updatePosition(event);
updatePosition(event, updatedGtid);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -780,40 +783,52 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac
return result;
}

private void updatePosition(Event event) {
private boolean updateGtid(Event event) {
EventHeader eventHeader = event.getHeader();
EventType eventType = eventHeader.getEventType();
if (gtidSet != null && eventType == EventType.XID) {
advanceGTID();
} else
if (gtidSet != null && eventType == EventType.QUERY) {
QueryEventData queryEventData = getInternalEventData(event);
String query = queryEventData.getSql();
if ("COMMIT".equals(query) || "ROLLBACK".equals(query) ||
(previousEvent != null && previousEvent.getHeader().getEventType() == EventType.GTID &&
!"BEGIN".equals(query))) {
if (gtidSet != null) {
if (eventType == EventType.XID) {
advanceGTID();
return true;
}
} else
if (gtidSet != null && eventType == EventType.GTID) {
if (previousGtidEvent != null) {
if (advanceGTID()) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "GtidSet wasn't synchronized before GTID. " +
"Please submit a bug report to https://github.com/shyiko/mysql-binlog-connector-java");
if (eventType == EventType.QUERY) {
QueryEventData queryEventData = getInternalEventData(event);
String query = queryEventData.getSql();
if ("COMMIT".equals(query) || "ROLLBACK".equals(query) ||
(previousEvent != null && previousEvent.getHeader().getEventType() == EventType.GTID &&
!"BEGIN".equals(query))) {
advanceGTID();
}
return true;
}
if (eventType == EventType.GTID) {
if (previousGtidEvent != null) {
if (advanceGTID()) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "GtidSet wasn't synchronized before GTID. " +
"Please submit a bug report to https://github.com/shyiko/mysql-binlog-connector-java");
}
}
}
previousGtidEvent = event;
return true;
}
previousGtidEvent = event;
} else
if (eventType == EventType.ROTATE) {
}
return false;
}

private void updatePosition(Event event, boolean updatedGtid) {
EventHeader eventHeader = event.getHeader();
EventType eventType = eventHeader.getEventType();
if (updatedGtid) {
// the GTID was previously updated based upon this event
} else if (eventType == EventType.ROTATE) {
RotateEventData rotateEventData = getInternalEventData(event);
binlogFilename = rotateEventData.getBinlogFilename();
binlogPosition = rotateEventData.getBinlogPosition();
} else
// do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of
// client) table mapping cache could be reconstructed before hitting row mutation event
if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) {
} else if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) {
// do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of
// client) table mapping cache could be reconstructed before hitting row mutation event
EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
long nextBinlogPosition = trackableEventHeader.getNextPosition();
if (nextBinlogPosition > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
Expand All @@ -27,6 +23,10 @@
import java.util.Map;
import java.util.TimeZone;

import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

/**
* Whole class is basically a mix of <a href="https://code.google.com/p/open-replicator">open-replicator</a>'s
* AbstractRowEventParser and MySQLUtils. Main purpose here is to ease rows deserialization.<p>
Expand Down Expand Up @@ -170,6 +170,8 @@ protected Serializable deserializeCell(ColumnType type, int meta, int length, By
return deserializeSet(length, inputStream);
case GEOMETRY:
return deserializeGeometry(meta, inputStream);
case JSON:
return deserializeJson(meta, inputStream);
default:
throw new IOException("Unsupported type " + type);
}
Expand Down Expand Up @@ -329,6 +331,21 @@ protected byte[] deserializeGeometry(int meta, ByteArrayInputStream inputStream)
return inputStream.read(dataLength);
}

/**
* Deserialize the {@code JSON} value on the input stream, and return MySQL's internal binary representation
* of the JSON value. See {@link com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary} for
* a utility to parse this binary representation into something more useful, including a string representation.
*
* @param meta the number of bytes in which the length of the JSON value is found first on the input stream
* @param inputStream the stream containing the JSON value
* @return the MySQL internal binary representation of the JSON value; may be null
* @throws IOException if there is a problem reading the input stream
*/
protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException {
int blobLength = inputStream.readInteger(4);
return inputStream.read(blobLength);
}

// checkstyle, please ignore ParameterNumber for the next line
private static Long asUnixTime(int year, int month, int day, int hour, int minute, int second, int millis) {
// https://dev.mysql.com/doc/refman/5.0/en/datetime.html
Expand Down Expand Up @@ -376,7 +393,7 @@ private static int[] split(long value, int divider, int length) {
/**
* see mysql/strings/decimal.c
*/
private static BigDecimal asBigDecimal(int precision, int scale, byte[] value) {
public static BigDecimal asBigDecimal(int precision, int scale, byte[] value) {
boolean positive = (value[0] & 0x80) == 0x80;
value[0] ^= 0x80;
if (!positive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum ColumnType {
TIMESTAMP_V2(17),
DATETIME_V2(18),
TIME_V2(19),
JSON(245),
NEWDECIMAL(246),
ENUM(247),
SET(248),
Expand Down
Loading