From 5256c2637bef44792d927ae48726d82e1e00b008 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Tue, 20 Sep 2016 16:05:46 -0700 Subject: [PATCH 01/21] Updated release notes (0.4.2) --- changelog.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/changelog.md b/changelog.md index 9feac23..1a653c2 100644 --- a/changelog.md +++ b/changelog.md @@ -16,6 +16,11 @@ This is **BACKWARD-INCOMPATIBLE** change. - BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)). This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed). +## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-20 + +### Fixed + - A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)). + ## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31 ### Fixed From 75f0a45ffb0e8d1fc49d819296f509df4cc9de7a Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Tue, 20 Sep 2016 16:13:34 -0700 Subject: [PATCH 02/21] Updated refs to latest 0.4.2 release --- readme.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/readme.md b/readme.md index b1ef8a9..a8877eb 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.4.1-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.4.2-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) MySQL Binary Log connector. @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.4.1 + 0.4.2 ``` @@ -115,7 +115,7 @@ mBeanServer.registerMBean(stats, statsObjectName); #### Using SSL -> Introduced in 0.4.1. +> Introduced in 0.4.0. TLSv1.1 & TLSv1.2 require [JDK 7](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6916074)+. Prior to MySQL 5.7.10, MySQL supported only TLSv1 From 984f116f89a207b21c58a5873776ae4948ad6a14 Mon Sep 17 00:00:00 2001 From: Brandon Seibel Date: Wed, 28 Sep 2016 15:57:05 -0400 Subject: [PATCH 03/21] ByteArrayInputStream peek may be wrong after EOF When we eventually reach EOF on an ever growing binlog, nextEvent cannot read a new event because peek is -1, but this peek is saved from the previous event reaching EOF and may be out of sync with the underlying InputStream. This change makes peek() always check the underlying stream if it had just EOF'd (peek == -1) --- .../github/shyiko/mysql/binlog/io/ByteArrayInputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java index 350b870..b19eff2 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java @@ -171,7 +171,7 @@ public int available() throws IOException { } public int peek() throws IOException { - if (peek == null) { + if (peek == null || peek == -1) { peek = readWithinBlockBoundaries(); } return peek; @@ -180,7 +180,7 @@ public int peek() throws IOException { @Override public int read() throws IOException { int result; - if (peek == null) { + if (peek == null || peek == -1) { result = readWithinBlockBoundaries(); } else { result = peek; From 911304a1be7fdfe5fc0662ba1ce9aec397af1910 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Wed, 28 Sep 2016 14:53:55 -0700 Subject: [PATCH 04/21] Updated Travis CI configuration --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index b051d18..e45adfe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,5 +2,5 @@ language: java script: 'if [[ $TRAVIS_REPO_SLUG == "shyiko/mysql-binlog-connector-java" && $TRAVIS_BRANCH == "master" && $TRAVIS_PULL_REQUEST == "false" ]]; then mvn -P with-sources-and-javadocs -Ddeploy=snapshot -DskipTests=true -s .travis.settings.xml; fi' env: global: - - secure: QI6w4d/W9Sjckgcdwc/K5Rb+UFa6PxDbhiilv70fEZCXg0P4GMBFGD/zbpsfTlLbA9rgKBotrSm7EXRtQM94dluW0kS9jQiJfw33l9WbfMAJL4797xq+7ekvIvPBf7xVfwGsAcCcB5DcPBTKroX2s8tJEY3CQTienkW4pT7/zh0= - - secure: b/7SnPmS2BUrzc167F5sbI8ILNBqo3ODz9ZtuOOcXQ/9uJVgaLrB3KmKAHgIsjOiZ/PPp7qGNL4jW9CD7b7LAldMyIcszdKptc/fr2yHZkM17IuQJtL5tjBNlLkqeApW37ddPp0mp5IhhqzZMZ8Zu8/2vlyNJrVrYgnIXuoLLRE= + - secure: WLU2kkL9WSeS1LGUOr5UZyv8G1TnUuXOviLuAAcvv+JW+CFcoQwnDlCC7SJJOqGixKpg8+04zr5CHaVU2QmG4E/+XohuPqH23rdfBi9Ra4BubXjQvV3siSQXEAIjZWubyJ7/4EYzBK821FnlnthnabFsow9enBv6MT5NmscOy+0= + - secure: ivE8LTu0CrgvNH6Yjise5lLaZx5kpG9M76CuRK2APUEbAC0QOPWWudsx2yuDmn2X6lIXFYIU5WUadDE6n9LYAfbHdV0wRUD1Qimgt7oDto5vnWZIZrbWwyqSqK7RCxw503ofbHXVPRyabLRYACZV+7T+K2aDJw8V/cIITIAC7gI= From 5744bdd0436e1f22f7977887a25bb53f7d39a096 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Tue, 4 Oct 2016 09:34:44 -0700 Subject: [PATCH 05/21] Added MySQL@5.7.15 Vagrant box --- .../vagrantfile | 6 +++++ .../vagrant/mysql-5.7.15-sandbox/vagrantfile | 22 +++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile create mode 100644 supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile diff --git a/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile b/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile new file mode 100644 index 0000000..7a644a5 --- /dev/null +++ b/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile @@ -0,0 +1,6 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'shyiko/mysql-sandbox-prepackaged' + config.vm.box_version = '5.7.15' + config.vm.network :forwarded_port, guest: 33061, host: 33061 + config.vm.network :forwarded_port, guest: 33062, host: 33062 +end diff --git a/supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile b/supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile new file mode 100644 index 0000000..07cfd30 --- /dev/null +++ b/supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile @@ -0,0 +1,22 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'lucid32' + config.vm.box_url = 'http://files.vagrantup.com/lucid32.box' + config.vm.provision :shell, :inline => %Q( + sed -i.bak -r 's/(us.)?(archive|security).ubuntu.com/old-releases.ubuntu.com/g' /etc/apt/sources.list + apt-get update && apt-get install -y make libaio1 # libaio1 required by mysql + echo 'Downloading MySQL distribution ...' + wget --progress=dot:mega --content-disposition \ + http://cdn.mysql.com/Downloads/MySQL-5.7/mysql-5.7.15-linux-glibc2.5-i686.tar.gz \ + 2>&1 | grep --line-buffered -o '[0-9]*%' + wget -O - https://github.com/datacharmer/mysql-sandbox/releases/download/3.1.13/MySQL-Sandbox-3.1.13.tar.gz | tar xzv + (cd MySQL-Sandbox-3.1.13 && perl Makefile.PL && make && make install) + su -c "make_replication_sandbox ~/mysql-5.7.15-linux-glibc2.5-i686.tar.gz \ + --remote_access='%' --how_many_slaves=1 --sandbox_base_port=33061 \ + --master_options='-c binlog_format=ROW' \ + --slave_options='-c binlog_format=ROW -c log-slave-updates=TRUE'" vagrant + rm -f *.tar.gz + sed -i -e "s/exit\ 0/\\/home\\/vagrant\\/sandboxes\\/rsandbox_mysql-5_7_15\\/restart_all; exit 0/g" /etc/rc.local + ) + config.vm.network :forwarded_port, guest: 33061, host: 33061 + config.vm.network :forwarded_port, guest: 33062, host: 33062 +end From bbba2c7c3761b071803665d85ecfce88f97ce962 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Mon, 3 Oct 2016 17:18:58 -0500 Subject: [PATCH 06/21] Added support for MySQL JSON type Added support for the JSON type added to MySQL 5.7. This required adding a new `ColumnType` and a new method to deserialize the JSON value, although the deserialization simply returns the `byte[]` containing the internal binary representation of the JSON value. In order to be useful, this binary representation needs to be parsed into something more useful. A new `JsonBinary` class was created that can perform this parsing and delegate to a `JsonFormatter` implementation when JSON objects or arrays are started/ended, when various name-value pairs are encountered in an object, when values are encountered in an array, and when scalar values are encountered (outside of a JSON object or array, which is not really standard JSON). Since many clients will simply want to obtain a JSON string representation of the JSON value, a `JsonStringFormatter` class was added, and a `JsonBinary.parseAsString` utility method added to conveniently obtain the JSON string representation. Of course, if this is not sufficient, clients can either subclass `JsonStringFormatter` to customize the behavior or implement their own `JsonFormatter` with completely custom behavior. Quite a few integration test methods were added to insert various JSON values into a table, read the corresponding `WriteRowsEventData` event for the INSERT, extract the binary JSON value from the event, and then compare the string representation of the JSON value to expected forms. Note that MySQL does allow for some values within JSON objects and arrays to be of types other than boolean, numeric, and strings (per the JSON spec). For example, MySQL `BLOB`, `DATE`, `TIME`, `DATETIME`, and `TIMESTAMP` values can be used and are encoded as "opaque types", although the `JsonBinary` parser does know how to interpret this opaque forms and call appropriate methods on the `JsonFormatter`. The `JsonStringFormatter` simply encodes the temporal types as formatted dates, times, or timestamps, while all other opaque values are represented as Base64-encoded strings. Also, MySQL does allow scalar values to be used in `JSON` columns outside of JSON objects and arrays. This does not adhere to the JSON spec, and as such the `JsonStringFormatter` simply returns the string representations of these scalar values. (Any other logic can be customized.) --- .gitignore | 6 +- pom.xml | 4 +- .../AbstractRowsEventDataDeserializer.java | 27 +- .../event/deserialization/ColumnType.java | 1 + .../shyiko/mysql/binlog/json/JsonBinary.java | 1012 +++++++++++++++++ .../mysql/binlog/json/JsonFormatter.java | 176 +++ .../binlog/json/JsonStringFormatter.java | 325 ++++++ .../shyiko/mysql/binlog/json/ValueType.java | 99 ++ .../BinaryLogClientIntegrationTest.java | 66 +- .../json/JsonBinaryValueIntegrationTest.java | 333 ++++++ supplement/codequality/checkstyle.xml | 2 +- 11 files changed, 2036 insertions(+), 15 deletions(-) create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java create mode 100644 src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java create mode 100644 src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java diff --git a/.gitignore b/.gitignore index 681a4aa..ae7668d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ .idea .iml .DS_Store -target \ No newline at end of file +target +.classpath +.project +.settings +.vagrant \ No newline at end of file diff --git a/pom.xml b/pom.xml index 45831fc..d77e5ae 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ UTF-8 UTF-8 vagrant - ${basedir}/supplement/vagrant/mysql-5.6.12-sandbox-prepackaged + ${basedir}/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged @@ -207,6 +207,8 @@ -Dvagrant.integration.box=supplement/vagrant/mysql-5.5.27-sandbox-prepackaged mvn -P coverage verify \ -Dvagrant.integration.box=supplement/vagrant/mysql-5.6.12-sandbox-prepackaged + mvn -P coverage verify \ + -Dvagrant.integration.box=supplement/vagrant/mysql-5.7.15-sandbox-prepackaged # submit coverage report to coveralls mvn -P coverage coveralls:jacoco -DrepoToken=<coveralls.io> diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java index 688db5f..61146c4 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java @@ -15,10 +15,6 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization; -import com.github.shyiko.mysql.binlog.event.EventData; -import com.github.shyiko.mysql.binlog.event.TableMapEventData; -import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; - import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; @@ -27,6 +23,10 @@ import java.util.Map; import java.util.TimeZone; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + /** * Whole class is basically a mix of open-replicator's * AbstractRowEventParser and MySQLUtils. Main purpose here is to ease rows deserialization.

@@ -170,6 +170,8 @@ protected Serializable deserializeCell(ColumnType type, int meta, int length, By return deserializeSet(length, inputStream); case GEOMETRY: return deserializeGeometry(meta, inputStream); + case JSON: + return deserializeJson(meta, inputStream); default: throw new IOException("Unsupported type " + type); } @@ -329,6 +331,21 @@ protected byte[] deserializeGeometry(int meta, ByteArrayInputStream inputStream) return inputStream.read(dataLength); } + /** + * Deserialize the {@code JSON} value on the input stream, and return MySQL's internal binary representation + * of the JSON value. See {@link com.github.shyiko.mysql.binlog.json.JsonBinary} for a utility to parse this + * binary representation into something more useful, including a string representation. + * + * @param meta the number of bytes in which the length of the JSON value is found first on the input stream + * @param inputStream the stream containing the JSON value + * @return the MySQL internal binary representation of the JSON value; may be null + * @throws IOException if there is a problem reading the input stream + */ + protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException { + int blobLength = inputStream.readInteger(4); + return inputStream.read(blobLength); + } + // checkstyle, please ignore ParameterNumber for the next line private static Long asUnixTime(int year, int month, int day, int hour, int minute, int second, int millis) { // https://dev.mysql.com/doc/refman/5.0/en/datetime.html @@ -376,7 +393,7 @@ private static int[] split(long value, int divider, int length) { /** * see mysql/strings/decimal.c */ - private static BigDecimal asBigDecimal(int precision, int scale, byte[] value) { + public static BigDecimal asBigDecimal(int precision, int scale, byte[] value) { boolean positive = (value[0] & 0x80) == 0x80; value[0] ^= 0x80; if (!positive) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java index f5f6938..cc8c941 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java @@ -45,6 +45,7 @@ public enum ColumnType { TIMESTAMP_V2(17), DATETIME_V2(18), TIME_V2(19), + JSON(245), NEWDECIMAL(246), ENUM(247), SET(248), diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java b/src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java new file mode 100644 index 0000000..a719afd --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java @@ -0,0 +1,1012 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.json; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.Charset; + +import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +/** + * Utility to parse the binary-encoded value of a MySQL {@code JSON} type, translating the encoded representation into + * method calls on a supplied {@link JsonHandler} implementation. + * + *

Binary Format

+ * + * Each JSON value (scalar, object or array) has a one byte type identifier followed by the actual value. + * + *

Scalar

+ * + * The binary value may contain a single scalar that is one of: + *
    + *
  • null
  • + *
  • boolean
  • + *
  • int16
  • + *
  • int32
  • + *
  • int64
  • + *
  • uint16
  • + *
  • uint32
  • + *
  • uint64
  • + *
  • double
  • + *
  • string
  • + *
  • {@code DATE} as a string of the form {@code YYYY-MM-DD} where {@code YYYY} can be positive or negative
  • + *
  • {@code TIME} as a string of the form {@code HH-MM-SS} where {@code HH} can be positive or negative
  • + *
  • {@code DATETIME} as a string of the form {@code YYYY-MM-DD HH-mm-SS.ssssss} where {@code YYYY} can be positive or + * negative
  • + *
  • {@code TIMESTAMP} as the number of microseconds past epoch (January 1, 1970), or if negative the number of + * microseconds before epoch (January 1, 1970)
  • + *
  • any other MySQL value encoded as an opaque binary value
  • + *
+ * + *

JSON Object

+ * + * If the value is a JSON object, its binary representation will have a header that contains: + *
    + *
  • the member count
  • + *
  • the size of the binary value in bytes
  • + *
  • a list of pointers to each key
  • + *
  • a list of pointers to each value
  • + *
+ * + * The actual keys and values will come after the header, in the same order as in the header. + * + *

JSON Array

+ * + * If the value is a JSON array, the binary representation will have a header with + *
    + *
  • the element count
  • + *
  • the size of the binary value in bytes
  • + *
  • a list of pointers to each value
  • + *
+ * followed by the actual values, in the same order as in the header. + * + *

Grammar

+ * The grammar of the binary representation of JSON objects are defined in the MySQL codebase in the + * json_binary.h file: + *

+ * + *

+ *   doc ::= type value
+ *   type ::=
+ *       0x00 |       // small JSON object
+ *       0x01 |       // large JSON object
+ *       0x02 |       // small JSON array
+ *       0x03 |       // large JSON array
+ *       0x04 |       // literal (true/false/null)
+ *       0x05 |       // int16
+ *       0x06 |       // uint16
+ *       0x07 |       // int32
+ *       0x08 |       // uint32
+ *       0x09 |       // int64
+ *       0x0a |       // uint64
+ *       0x0b |       // double
+ *       0x0c |       // utf8mb4 string
+ *       0x0f         // custom data (any MySQL data type)
+ *   value ::=
+ *       object  |
+ *       array   |
+ *       literal |
+ *       number  |
+ *       string  |
+ *       custom-data
+ *   object ::= element-count size key-entry* value-entry* key* value*
+ *   array ::= element-count size value-entry* value*
+ *   // number of members in object or number of elements in array
+ *   element-count ::=
+ *       uint16 |  // if used in small JSON object/array
+ *       uint32    // if used in large JSON object/array
+ *   // number of bytes in the binary representation of the object or array
+ *   size ::=
+ *       uint16 |  // if used in small JSON object/array
+ *       uint32    // if used in large JSON object/array
+ *   key-entry ::= key-offset key-length
+ *   key-offset ::=
+ *       uint16 |  // if used in small JSON object
+ *       uint32    // if used in large JSON object
+ *   key-length ::= uint16    // key length must be less than 64KB
+ *   value-entry ::= type offset-or-inlined-value
+ *   // This field holds either the offset to where the value is stored,
+ *   // or the value itself if it is small enough to be inlined (that is,
+ *   // if it is a JSON literal or a small enough [u]int).
+ *   offset-or-inlined-value ::=
+ *       uint16 |   // if used in small JSON object/array
+ *       uint32     // if used in large JSON object/array
+ *   key ::= utf8mb4-data
+ *   literal ::=
+ *       0x00 |   // JSON null literal
+ *       0x01 |   // JSON true literal
+ *       0x02 |   // JSON false literal
+ *   number ::=  ....  // little-endian format for [u]int(16|32|64), whereas
+ *                     // double is stored in a platform-independent, eight-byte
+ *                     // format using float8store()
+ *   string ::= data-length utf8mb4-data
+ *   custom-data ::= custom-type data-length binary-data
+ *   custom-type ::= uint8   // type identifier that matches the
+ *                           // internal enum_field_types enum
+ *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+ *                           // field is continued in the next byte,
+ *                           // otherwise it is the last byte of the length
+ *                           // field. So we need 1 byte to represent
+ *                           // lengths up to 127, 2 bytes to represent
+ *                           // lengths up to 16383, and so on...
+ * 
+ * + * @author Randall Hauch + */ +public class JsonBinary { + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** + * Parse the MySQL binary representation of a {@code JSON} value and return the JSON string representation. + *

+ * This method is equivalent to {@link #parse(byte[], JsonFormatter)} using the {@link JsonStringFormatter}. + * + * @param bytes the binary representation; may not be null + * @return the JSON string representation; never null + * @throws IOException if there is a problem reading or processing the binary representation + */ + public static String parseAsString(byte[] bytes) throws IOException { + JsonStringFormatter handler = new JsonStringFormatter(); + parse(bytes, handler); + return handler.getString(); + } + + /** + * Parse the MySQL binary representation of a {@code JSON} value and call the supplied {@link JsonFormatter} + * for the various components of the value. + * + * @param bytes the binary representation; may not be null + * @param formatter the formatter that will be called as the binary representation is parsed; may not be null + * @throws IOException if there is a problem reading or processing the binary representation + */ + public static void parse(byte[] bytes, JsonFormatter formatter) throws IOException { + new JsonBinary(bytes).parse(formatter); + } + + private final ByteArrayInputStream reader; + + public JsonBinary(byte[] bytes) { + this(new ByteArrayInputStream(bytes)); + } + + public JsonBinary(ByteArrayInputStream contents) { + this.reader = contents; + } + + @Override + public String toString() { + return asString(); + } + + public String asString() { + JsonStringFormatter handler = new JsonStringFormatter(); + try { + parse(handler); + } catch (IOException e) { + throw new RuntimeException(e); + } + return handler.toString(); + } + + public void parse(JsonFormatter formatter) throws IOException { + parse(readValueType(), formatter); + } + + protected void parse(ValueType type, JsonFormatter formatter) throws IOException { + switch (type) { + case SMALL_DOCUMENT: + parseObject(true, formatter); + break; + case LARGE_DOCUMENT: + parseObject(false, formatter); + break; + case SMALL_ARRAY: + parseArray(true, formatter); + break; + case LARGE_ARRAY: + parseArray(false, formatter); + break; + case LITERAL: + parseBoolean(formatter); + break; + case INT16: + parseInt16(formatter); + break; + case UINT16: + parseUInt16(formatter); + break; + case INT32: + parseInt32(formatter); + break; + case UINT32: + parseUInt32(formatter); + break; + case INT64: + parseInt64(formatter); + break; + case UINT64: + parseUInt64(formatter); + break; + case DOUBLE: + parseDouble(formatter); + break; + case STRING: + parseString(formatter); + break; + case CUSTOM: + parseOpaque(formatter); + break; + default: + throw new IOException("Unknown type value '" + asHex(type.code()) + + "' in first byte of a JSON value"); + } + } + + /** + * Parse a JSON object. + *

+ * The grammar of the binary representation of JSON objects are defined in the MySQL code base in the + * json_binary.h file: + *

Grammar

+ * + *

Grammar

+ * + *
+     *   value ::=
+     *       object  |
+     *       array   |
+     *       literal |
+     *       number  |
+     *       string  |
+     *       custom-data
+     *   object ::= element-count size key-entry* value-entry* key* value*
+     *   // number of members in object or number of elements in array
+     *   element-count ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   // number of bytes in the binary representation of the object or array
+     *   size ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   key-entry ::= key-offset key-length
+     *   key-offset ::=
+     *       uint16 |  // if used in small JSON object
+     *       uint32    // if used in large JSON object
+     *   key-length ::= uint16    // key length must be less than 64KB
+     *   value-entry ::= type offset-or-inlined-value
+     *   // This field holds either the offset to where the value is stored,
+     *   // or the value itself if it is small enough to be inlined (that is,
+     *   // if it is a JSON literal or a small enough [u]int).
+     *   offset-or-inlined-value ::=
+     *       uint16 |   // if used in small JSON object/array
+     *       uint32     // if used in large JSON object/array
+     *   key ::= utf8mb4-data
+     *   literal ::=
+     *       0x00 |   // JSON null literal
+     *       0x01 |   // JSON true literal
+     *       0x02 |   // JSON false literal
+     *   number ::=  ....  // little-endian format for [u]int(16|32|64), whereas
+     *                     // double is stored in a platform-independent, eight-byte
+     *                     // format using float8store()
+     *   string ::= data-length utf8mb4-data
+     *   custom-data ::= custom-type data-length binary-data
+     *   custom-type ::= uint8   // type identifier that matches the
+     *                           // internal enum_field_types enum
+     *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+     *                           // field is continued in the next byte,
+     *                           // otherwise it is the last byte of the length
+     *                           // field. So we need 1 byte to represent
+     *                           // lengths up to 127, 2 bytes to represent
+     *                           // lengths up to 16383, and so on...
+     * 
+ * + * @param offset the offset at which the value is to be read; may not be null + * @param isSmall {@code true} if the object being read is "small", or {@code false} otherwise + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseObject(boolean isSmall, JsonFormatter formatter) + throws IOException { + // Read the header ... + int numElements = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "number of elements in"); + int numBytes = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "size of"); + + // Read each key-entry, consisting of the offset and length of each key ... + int[] keyLengths = new int[numElements]; + for (int i = 0; i != numElements; ++i) { + readUnsignedIndex(numBytes, isSmall, "key offset in"); // unused + keyLengths[i] = readUInt16(); + } + + // Read each key value value-entry + ValueEntry[] entries = new ValueEntry[numElements]; + for (int i = 0; i != numElements; ++i) { + // Parse the value ... + ValueType type = readValueType(); + switch (type) { + case LITERAL: + entries[i] = new ValueEntry(type).setValue(readLiteral()); + break; + case INT16: + case UINT16: + // The "offset" is actually the value ... + int value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + break; + case INT32: + case UINT32: + if (!isSmall) { + // The value should be large enough to handle the actual value ... + value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + } + default: + // It is an offset, not a value ... + int offset = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + if (offset >= numBytes) { + throw new IOException("The offset for the value in the JSON binary document is " + + offset + + ", which is larger than the binary form of the JSON document (" + + numBytes + " bytes)"); + } + entries[i] = new ValueEntry(type, offset); + } + } + + // Read each key ... + String[] keys = new String[numElements]; + for (int i = 0; i != numElements; ++i) { + keys[i] = reader.readString(keyLengths[i]); + } + + // Now parse the values ... + formatter.beginObject(numElements); + for (int i = 0; i != numElements; ++i) { + if (i != 0) { + formatter.nextEntry(); + } + formatter.name(keys[i]); + ValueEntry entry = entries[i]; + if (entry.resolved) { + Object value = entry.value; + if (value == null) { + formatter.valueNull(); + } else if (value instanceof Boolean) { + formatter.value(((Boolean) value).booleanValue()); + } else if (value instanceof Integer) { + formatter.value(((Integer) value).intValue()); + } + } else { + // Parse the value ... + parse(entry.type, formatter); + } + } + formatter.endObject(); + } + + /** + * Parse a JSON array. + *

+ * The grammar of the binary representation of JSON objects are defined in the MySQL code base in the + * json_binary.h file, and are: + *

Grammar

+ * + *

Grammar

+ * + *
+     *   value ::=
+     *       object  |
+     *       array   |
+     *       literal |
+     *       number  |
+     *       string  |
+     *       custom-data
+     *   array ::= element-count size value-entry* value*
+     *   // number of members in object or number of elements in array
+     *   element-count ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   // number of bytes in the binary representation of the object or array
+     *   size ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   value-entry ::= type offset-or-inlined-value
+     *   // This field holds either the offset to where the value is stored,
+     *   // or the value itself if it is small enough to be inlined (that is,
+     *   // if it is a JSON literal or a small enough [u]int).
+     *   offset-or-inlined-value ::=
+     *       uint16 |   // if used in small JSON object/array
+     *       uint32     // if used in large JSON object/array
+     *   key ::= utf8mb4-data
+     *   literal ::=
+     *       0x00 |   // JSON null literal
+     *       0x01 |   // JSON true literal
+     *       0x02 |   // JSON false literal
+     *   number ::=  ....  // little-endian format for [u]int(16|32|64), whereas
+     *                     // double is stored in a platform-independent, eight-byte
+     *                     // format using float8store()
+     *   string ::= data-length utf8mb4-data
+     *   custom-data ::= custom-type data-length binary-data
+     *   custom-type ::= uint8   // type identifier that matches the
+     *                           // internal enum_field_types enum
+     *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+     *                           // field is continued in the next byte,
+     *                           // otherwise it is the last byte of the length
+     *                           // field. So we need 1 byte to represent
+     *                           // lengths up to 127, 2 bytes to represent
+     *                           // lengths up to 16383, and so on...
+     * 
+ * + * @param offset the offset at which the value is to be read; may not be null + * @param isSmall {@code true} if the object being read is "small", or {@code false} otherwise + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + // checkstyle, please ignore MethodLength for the next line + protected void parseArray(boolean isSmall, JsonFormatter formatter) + throws IOException { + // Read the header ... + int numElements = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "number of elements in"); + int numBytes = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "size of"); + + // Read each key value value-entry + ValueEntry[] entries = new ValueEntry[numElements]; + for (int i = 0; i != numElements; ++i) { + // Parse the value ... + ValueType type = readValueType(); + switch (type) { + case LITERAL: + entries[i] = new ValueEntry(type).setValue(readLiteral()); + break; + case INT16: + case UINT16: + // The "offset" is actually the value ... + int value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + break; + case INT32: + case UINT32: + if (!isSmall) { + // The value should be large enough to handle the actual value ... + value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + } + default: + // It is an offset, not a value ... + int offset = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + if (offset >= numBytes) { + throw new IOException("The offset for the value in the JSON binary document is " + + offset + + ", which is larger than the binary form of the JSON document (" + + numBytes + " bytes)"); + } + entries[i] = new ValueEntry(type, offset); + } + } + + // Now parse the values ... + formatter.beginArray(numElements); + for (int i = 0; i != numElements; ++i) { + if (i != 0) { + formatter.nextEntry(); + } + ValueEntry entry = entries[i]; + if (entry.resolved) { + Object value = entry.value; + if (value == null) { + formatter.valueNull(); + } else if (value instanceof Boolean) { + formatter.value(((Boolean) value).booleanValue()); + } else if (value instanceof Integer) { + formatter.value(((Integer) value).intValue()); + } + } else { + // Parse the value ... + parse(entry.type, formatter); + } + } + formatter.endArray(); + } + + /** + * Parse a literal value that is either null, {@code true}, or {@code false}. + * + * @param offset the offset at which the value is to be read; may not be null + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseBoolean(JsonFormatter formatter) throws IOException { + Boolean literal = readLiteral(); + if (literal == null) { + formatter.valueNull(); + } else { + formatter.value(literal.booleanValue()); + } + } + + /** + * Parse a 2 byte integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseInt16(JsonFormatter formatter) throws IOException { + int value = readInt16(); + formatter.value(value); + } + + /** + * Parse a 2 byte unsigned integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseUInt16(JsonFormatter formatter) throws IOException { + int value = readUInt16(); + formatter.value(value); + } + + /** + * Parse a 4 byte integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseInt32(JsonFormatter formatter) throws IOException { + int value = readInt32(); + formatter.value(value); + } + + /** + * Parse a 4 byte unsigned integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseUInt32(JsonFormatter formatter) throws IOException { + long value = readUInt32(); + formatter.value(value); + } + + /** + * Parse a 8 byte integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseInt64(JsonFormatter formatter) throws IOException { + long value = readInt64(); + formatter.value(value); + } + + /** + * Parse a 8 byte unsigned integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseUInt64(JsonFormatter formatter) throws IOException { + BigInteger value = readUInt64(); + formatter.value(value); + } + + /** + * Parse a 8 byte double value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDouble(JsonFormatter formatter) throws IOException { + long rawValue = readInt64(); + double value = Double.longBitsToDouble(rawValue); + formatter.value(value); + } + + /** + * Parse the length and value of a string stored in MySQL's "utf8mb" character set (which equates to Java's + * {@link StandardCharsets#UTF_8} character set. The length is a {@link #readVariableInt(ByteBuffer) + * variable length integer} length of the string. + * + * @param offset the offset at which the value is to be read; may not be null + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseString(JsonFormatter formatter) throws IOException { + int length = readVariableInt(); + byte[] content = new byte[length]; + reader.read(content); + String value = new String(content, UTF_8); + formatter.value(value); + } + + /** + * Parse an opaque type. Specific types such as {@link #parseDate(Offset, formatter) DATE}, + * {@link #parseTime(Offset, formatter) TIME}, and {@link #parseDatetime(Offset, formatter) DATETIME} values are + * stored as opaque types, though they are to be unpacked. {@link #parseTimestamp(Offset, formatter) TIMESTAMP} + * are also stored as opaque types, but + * converted by MySQL to + * {@code DATETIME} prior to storage. + * Other MySQL types are stored as opaque types and passed on to the formatter as opaque values. + *

+ * See the + * MySQL source code for the logic used in this method. + *

+ *

Grammar

+ * + *
+     *   custom-data ::= custom-type data-length binary-data
+     *   custom-type ::= uint8   // type identifier that matches the
+     *                           // internal enum_field_types enum
+     *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+     *                           // field is continued in the next byte,
+     *                           // otherwise it is the last byte of the length
+     *                           // field. So we need 1 byte to represent
+     *                           // lengths up to 127, 2 bytes to represent
+     *                           // lengths up to 16383, and so on...
+     * 
+ * + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseOpaque(JsonFormatter formatter) throws IOException { + // Read the custom type, which should be a standard ColumnType ... + int customType = reader.read(); + ColumnType type = ColumnType.byCode(customType); + if (type == null) { + throw new IOException("Unknown type '" + asHex(customType) + + "' in first byte of a JSON opaque value"); + } + // Read the data length ... + int length = readVariableInt(); + + switch (type) { + case DECIMAL: + case NEWDECIMAL: + // See 'Json_decimal::convert_from_binary' + // https://github.com/mysql/mysql-server/blob/5.7/sql/json_dom.cc#L1625 + parseDecimal(length, formatter); + break; + + // All dates and times are in one of these types + // See 'Json_datetime::to_packed' for details + // https://github.com/mysql/mysql-server/blob/5.7/sql/json_dom.cc#L1681 + // which calls 'TIME_to_longlong_packed' + // https://github.com/mysql/mysql-server/blob/5.7/sql-common/my_time.c#L2005 + // + // and 'Json_datetime::from_packed' + // https://github.com/mysql/mysql-server/blob/5.7/sql/json_dom.cc#L1688 + // which calls 'TIME_from_longlong_packed' + // https://github.com/mysql/mysql-server/blob/5.7/sql/sql_time.cc#L1624 + case DATE: + parseDate(formatter); + break; + case TIME: + case TIME_V2: + parseTime(formatter); + break; + case DATETIME: + case DATETIME_V2: + case TIMESTAMP: + case TIMESTAMP_V2: + parseDatetime(formatter); + break; + default: + parseOpaqueValue(type, length, formatter); + } + } + + /** + * Parse a {@code DATE} value, which is stored using the same format as {@code DATETIME}: + * 5 bytes + fractional-seconds storage. However, the hour, minute, second, and fractional seconds are ignored. + *

+ * The non-fractional part is 40 bits: + * + *

+     *  1 bit  sign           (1= non-negative, 0= negative)
+     *  17 bits year*13+month  (year 0-9999, month 0-12)
+     *   5 bits day            (0-31)
+     *   5 bits hour           (0-23)
+     *   6 bits minute         (0-59)
+     *   6 bits second         (0-59)
+     * 
+ * + * The fractional part is typically dependent upon the fsp (i.e., fractional seconds part) defined by + * a column, but in the case of JSON it is always 3 bytes. + *

+ * The format of all temporal values is outlined in the MySQL documentation, + * although since the MySQL {@code JSON} type is only available in 5.7, only version 2 of the date-time formats + * are necessary. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDate(JsonFormatter formatter) throws IOException { + long raw = readInt64(); + long value = raw >> 24; + int yearMonth = (int) (value >> 22) % (1 << 17); // 17 bits starting at 22nd + int year = yearMonth / 13; + int month = yearMonth % 13; + int day = (int) (value >> 17) % (1 << 5); // 5 bits starting at 17th + formatter.valueDate(year, month, day); + } + + /** + * Parse a {@code TIME} value, which is stored using the same format as {@code DATETIME}: + * 5 bytes + fractional-seconds storage. However, the year, month, and day values are ignored + *

+ * The non-fractional part is 40 bits: + * + *

+     *  1 bit  sign           (1= non-negative, 0= negative)
+     *  17 bits year*13+month  (year 0-9999, month 0-12)
+     *   5 bits day            (0-31)
+     *   5 bits hour           (0-23)
+     *   6 bits minute         (0-59)
+     *   6 bits second         (0-59)
+     * 
+ * + * The fractional part is typically dependent upon the fsp (i.e., fractional seconds part) defined by + * a column, but in the case of JSON it is always 3 bytes. + *

+ * The format of all temporal values is outlined in the MySQL documentation, + * although since the MySQL {@code JSON} type is only available in 5.7, only version 2 of the date-time formats + * are necessary. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseTime(JsonFormatter formatter) throws IOException { + long raw = readInt64(); + long value = raw >> 24; + boolean negative = value < 0L; + int hour = (int) (value >> 12) % (1 << 10); // 10 bits starting at 12th + int min = (int) (value >> 6) % (1 << 6); // 6 bits starting at 6th + int sec = (int) value % (1 << 6); // 6 bits starting at 0th + if (negative) { + hour *= -1; + } + int microSeconds = (int) (raw % (1 << 24)); + formatter.valueTime(hour, min, sec, microSeconds); + } + + /** + * Parse a {@code DATETIME} value, which is stored as 5 bytes + fractional-seconds storage. + *

+ * The non-fractional part is 40 bits: + * + *

+     *  1 bit  sign           (1= non-negative, 0= negative)
+     *  17 bits year*13+month  (year 0-9999, month 0-12)
+     *   5 bits day            (0-31)
+     *   5 bits hour           (0-23)
+     *   6 bits minute         (0-59)
+     *   6 bits second         (0-59)
+     * 
+ * + * The sign bit is always 1. A value of 0 (negative) is reserved. The fractional part is typically dependent upon + * the fsp (i.e., fractional seconds part) defined by a column, but in the case of JSON it is always 3 bytes. + * Unlike the documentation, however, the 8 byte value is in little-endian form. + *

+ * The format of all temporal values is outlined in the MySQL documentation, + * although since the MySQL {@code JSON} type is only available in 5.7, only version 2 of the date-time formats + * are necessary. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDatetime(JsonFormatter formatter) throws IOException { + long raw = readInt64(); + long value = raw >> 24; + int yearMonth = (int) (value >> 22) % (1 << 17); // 17 bits starting at 22nd + int year = yearMonth / 13; + int month = yearMonth % 13; + int day = (int) (value >> 17) % (1 << 5); // 5 bits starting at 17th + int hour = (int) (value >> 12) % (1 << 5); // 5 bits starting at 12th + int min = (int) (value >> 6) % (1 << 6); // 6 bits starting at 6th + int sec = (int) (value % (1 << 6)); // 6 bits starting at 0th + int microSeconds = (int) (raw % (1 << 24)); + formatter.valueDatetime(year, month, day, hour, min, sec, microSeconds); + } + + /** + * Parse a {@code DECIMAL} value. The first two bytes are the precision and scale, followed by the binary + * representation of the decimal itself. + * + * @param length the length of the complete binary representation + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDecimal(int length, JsonFormatter formatter) throws IOException { + // First two bytes are the precision and scale ... + int precision = reader.read(); + int scale = reader.read(); + + // Followed by the binary representation (see `my_decimal_get_binary_size`) + int decimalLength = length - 2; + BigDecimal dec = AbstractRowsEventDataDeserializer.asBigDecimal(precision, scale, reader.read(decimalLength)); + formatter.value(dec); + } + + protected void parseOpaqueValue(ColumnType type, int length, JsonFormatter formatter) + throws IOException { + byte[] rawValue = new byte[length]; + reader.read(rawValue); + formatter.valueOpaque(type, rawValue); + } + + protected int readFractionalSecondsInMicroseconds() throws IOException { + return (int) readBigEndianLong(3); + } + + protected long readBigEndianLong(int numBytes) throws IOException { + byte[] bytes = reader.read(numBytes); + long result = 0; + for (int i = 0; i != numBytes; i++) { + int b = bytes[i] & 0xFF; + result = (result << 8) | b; + } + return result; + } + + protected int readUnsignedIndex(int maxValue, boolean isSmall, String desc) throws IOException { + long result = isSmall ? readUInt16() : readUInt32(); + if (result >= maxValue) { + throw new IOException("The " + desc + " the JSON document is " + result + + " and is too big for the binary form of the document (" + maxValue + ")"); + } + if (result > Integer.MAX_VALUE) { + throw new IOException("The " + desc + " the JSON document is " + result + " and is too big to be used"); + } + return (int) result; + } + + protected int readInt16() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read(); + return (short) (b2 << 8 | b1); + } + + protected int readUInt16() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + return (b2 << 8 | b1) & 0xFFFF; + } + + protected int readInt24() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read(); + return b3 << 16 | b2 << 8 | b1; + } + + protected int readInt32() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read() & 0xFF; + int b4 = reader.read(); + return b4 << 24 | b3 << 16 | b2 << 8 | b1; + } + + protected long readUInt32() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read() & 0xFF; + int b4 = reader.read() & 0xFF; + return (long) ((b4 << 24) | (b3 << 16) | (b2 << 8) | b1) & 0xFFFFFFFF; + } + + protected long readInt64() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read() & 0xFF; + long b4 = reader.read() & 0xFF; + long b5 = reader.read() & 0xFF; + long b6 = reader.read() & 0xFF; + long b7 = reader.read() & 0xFF; + long b8 = reader.read(); + return b8 << 56 | (b7 << 48) | (b6 << 40) | (b5 << 32) | + (b4 << 24) | (b3 << 16) | (b2 << 8) | b1; + } + + protected BigInteger readUInt64() throws IOException { + byte[] bigEndian = new byte[8]; + for (int i = 8; i != 0; --i) { + bigEndian[i - 1] = (byte) (reader.read() & 0xFF); + } + return new BigInteger(1, bigEndian); + } + + /** + * Read a variable-length integer value. + *

+ * If the high bit of a byte is 1, the length field is continued in the next byte, otherwise it is the last + * byte of the length field. So we need 1 byte to represent lengths up to 127, 2 bytes to represent lengths up + * to 16383, and so on... + * + * @param bytes the binary input with the value + * @return the integer value + */ + protected int readVariableInt() throws IOException { + byte b = 0; + int length = 0; + do { + b = (byte) reader.read(); + length = (length << 7) + (b & 0x7F); + } while (b < 0); + return length; + } + + protected Boolean readLiteral() throws IOException { + byte b = (byte) reader.read(); + if (b == 0x00) { + return null; + } else if (b == 0x01) { + return Boolean.TRUE; + } else if (b == 0x02) { + return Boolean.FALSE; + } + throw new IOException("Unexpected value '" + asHex(b) + "' for literal"); + } + + protected ValueType readValueType() throws IOException { + byte b = (byte) reader.read(); + return ValueType.byCode(b); + } + + protected static String asHex(byte b) { + return String.format("%02X ", b); + } + + protected static String asHex(int value) { + return Integer.toHexString(value); + } + + /** + * Class used internally to hold value entry information. + */ + protected static final class ValueEntry { + protected final ValueType type; + protected final int index; + protected Object value = null; + protected boolean resolved = false; + + public ValueEntry(ValueType type) { + this.type = type; + index = 0; + } + + public ValueEntry(ValueType type, int index) { + this.type = type; + this.index = index; + } + + public ValueEntry setValue(Object value) { + this.value = value; + this.resolved = true; + return this; + } + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java new file mode 100644 index 0000000..df622b0 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java @@ -0,0 +1,176 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.json; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; + +/** + * Handle the various actions involved when {@link JsonBinaryValue#parse(byte[], JsonHandler) parsing} a JSON binary + * value. + * + * @author Randall Hauch + */ +public interface JsonFormatter { + + /** + * Prepare to receive the name-value pairs in a JSON object. + * + * @param numElements the number of name-value pairs (or elements) + */ + void beginObject(int numElements); + + /** + * Prepare to receive the value pairs that in a JSON array. + * + * @param numElements the number of array elements + */ + void beginArray(int numElements); + + /** + * Complete the previously-started JSON object. + */ + void endObject(); + + /** + * Complete the previously-started JSON array. + */ + void endArray(); + + /** + * Receive the name of an element in a JSON object. + * + * @param name the element's name; never null + */ + void name(String name); + + /** + * Receive the string value of an element in a JSON object. + * + * @param value the element's value; never null + */ + void value(String value); + + /** + * Receive the integer value of an element in a JSON object. + * + * @param value the element's value + */ + void value(int value); + + /** + * Receive the long value of an element in a JSON object. + * + * @param value the element's value + */ + void value(long value); + + /** + * Receive the double value of an element in a JSON object. + * + * @param value the element's value + */ + void value(double value); + + /** + * Receive the {@link BigInteger} value of an element in a JSON object. + * + * @param value the element's value; never null + */ + void value(BigInteger value); + + /** + * Receive the {@link BigDecimal} value of an element in a JSON object. + * + * @param value the element's value; never null + */ + void value(BigDecimal value); + + /** + * Receive the boolean value of an element in a JSON object. + * + * @param value the element's value + */ + void value(boolean value); + + /** + * Receive a null value of an element in a JSON object. + */ + void valueNull(); + + /** + * Receive the year value of an element in a JSON object. + * + * @param year the year number that makes up the element's value + */ + void valueYear(int year); + + /** + * Receive the date value of an element in a JSON object. + * + * @param year the positive or negative year in the element's date value + * @param month the month (0-12) in the element's date value + * @param day the day of the month (0-31) in the element's date value + */ + void valueDate(int year, int month, int day); + + /** + * Receive the date and time value of an element in a JSON object. + * + * @param year the positive or negative year in the element's date value + * @param month the month (0-12) in the element's date value + * @param day the day of the month (0-31) in the element's date value + * @param hour the hour of the day (0-24) in the element's time value + * @param min the minutes of the hour (0-60) in the element's time value + * @param sec the seconds of the minute (0-60) in the element's time value + * @param microSeconds the number of microseconds in the element's time value + */ + // checkstyle, please ignore ParameterNumber for the next line + void valueDatetime(int year, int month, int day, int hour, int min, int sec, int microSeconds); + + /** + * Receive the time value of an element in a JSON object. + * + * @param hour the hour of the day (0-24) in the element's time value + * @param min the minutes of the hour (0-60) in the element's time value + * @param sec the seconds of the minute (0-60) in the element's time value + * @param microSeconds the number of microseconds in the element's time value + */ + void valueTime(int hour, int min, int sec, int microSeconds); + + /** + * Receive the timestamp value of an element in a JSON object. + * + * @param secondsPastEpoch the number of seconds past epoch (January 1, 1970) in the element's timestamp value + * @param microSeconds the number of microseconds in the element's time value + */ + void valueTimestamp(long secondsPastEpoch, int microSeconds); + + /** + * Receive an opaque value of an element in a JSON object. + * + * @param type the column type for the value; may not be null + * @param value the binary representation for the element's value + */ + void valueOpaque(ColumnType type, byte[] value); + + /** + * Called after an entry signaling that another entry will be signaled. + */ + void nextEntry(); +} \ No newline at end of file diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java new file mode 100644 index 0000000..28eb25a --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java @@ -0,0 +1,325 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.json; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; + +/** + * A {@link JsonValueHandler} implementation that creates a JSON string representation. + * + * @author Randall Hauch + */ +public class JsonStringFormatter implements JsonFormatter { + + /** + * Value used for lookup tables to indicate that matching characters + * do not need to be escaped. + */ + public static final int ESCAPE_NONE = 0; + + /** + * Value used for lookup tables to indicate that matching characters + * are to be escaped using standard escaping; for JSON this means + * (for example) using "backslash - u" escape method. + */ + public static final int ESCAPE_GENERIC = -1; + + /** + * A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8 characters) + * must be escaped. A value of '0' means no escaping is required; positive values must be escaped with a + * preceding backslash; and negative values that generic escaping (e.g., {@code \\uXXXX}). + */ + protected static final int[] ESCAPES; + static { + int[] escape = new int[128]; + // Generic escape for control characters ... + for (int i = 0; i < 32; ++i) { + escape[i] = ESCAPE_GENERIC; + } + // Backslash escape for other specific characters ... + escape['"'] = '"'; + escape['\\'] = '\\'; + // Escaping of slash is optional, so let's not add it + escape[0x08] = 'b'; + escape[0x09] = 't'; + escape[0x0C] = 'f'; + escape[0x0A] = 'n'; + escape[0x0D] = 'r'; + ESCAPES = escape; + } + + protected static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray(); + + private final StringBuilder sb = new StringBuilder(); + private final int[] escapes = ESCAPES; + + @Override + public String toString() { + return getString(); + } + + public String getString() { + return sb.toString(); + } + + @Override + public void beginObject(int numElements) { + sb.append('{'); + } + + @Override + public void beginArray(int numElements) { + sb.append('['); + } + + @Override + public void endObject() { + sb.append('}'); + } + + @Override + public void endArray() { + sb.append(']'); + } + + @Override + public void name(String name) { + sb.append('"'); + appendString(name); + sb.append("\":"); + } + + @Override + public void value(String value) { + sb.append('"'); + appendString(value); + sb.append('"'); + } + + @Override + public void value(int value) { + sb.append(Integer.toString(value)); + } + + @Override + public void value(long value) { + sb.append(Long.toString(value)); + } + + @Override + public void value(double value) { + // Double's toString method will result in scientific notation and loss of precision + String str = Double.toString(value); + if (str.contains("E")) { + value(new BigDecimal(value)); + } else { + sb.append(str); + } + } + + @Override + public void value(BigInteger value) { + // Using the BigInteger.toString() method will result in scientific notation, so instead ... + value(new BigDecimal(value)); + } + + @Override + public void value(BigDecimal value) { + // Using the BigInteger.toString() method will result in scientific notation, so instead ... + sb.append(value.toPlainString()); + } + + @Override + public void value(boolean value) { + sb.append(Boolean.toString(value)); + } + + @Override + public void valueNull() { + sb.append("null"); + } + + @Override + public void valueYear(int year) { + sb.append(year); + } + + @Override + public void valueDate(int year, int month, int day) { + sb.append('"'); + appendDate(year, month, day); + sb.append('"'); + } + + @Override + // checkstyle, please ignore ParameterNumber for the next line + public void valueDatetime(int year, int month, int day, int hour, int min, int sec, int microSeconds) { + sb.append('"'); + appendDate(year, month, day); + sb.append(' '); + appendTime(hour, min, sec, microSeconds); + sb.append('"'); + } + + @Override + public void valueTime(int hour, int min, int sec, int microSeconds) { + sb.append('"'); + if (hour < 0) { + sb.append('-'); + hour = Math.abs(hour); + } + appendTime(hour, min, sec, microSeconds); + sb.append('"'); + } + + @Override + public void valueTimestamp(long secondsPastEpoch, int microSeconds) { + sb.append(secondsPastEpoch); + appendSixDigitUnsignedInt(microSeconds, false); + } + + @Override + public void valueOpaque(ColumnType type, byte[] value) { + sb.append('"'); + sb.append(javax.xml.bind.DatatypeConverter.printBase64Binary(value)); + sb.append('"'); + } + + @Override + public void nextEntry() { + sb.append(','); + } + + /** + * Append a string by escaping any characters that must be escaped. + * + * @param original the string to be written; may not be null + */ + protected void appendString(String original) { + for (int i = 0, len = original.length(); i < len; ++i) { + char c = original.charAt(i); + int ch = c; + if (ch < 0 || escapes[ch] == 0) { + sb.append(c); + continue; + } + int escape = escapes[ch]; + if (escape > 0) { // 2-char escape, fine + sb.append('\\'); + sb.append(c); + } else { + unicodeEscape(ch); + } + } + } + + /** + * Append a generic Unicode escape (e.g., {@code \\uXXXX}) for given character. + * + * @param charToEscape the character to escape + */ + private void unicodeEscape(int charToEscape) { + sb.append('\\'); + sb.append('u'); + if (charToEscape > 0xFF) { + int hi = (charToEscape >> 8) & 0xFF; + sb.append(HEX_CODES[hi >> 4]); + sb.append(HEX_CODES[hi & 0xF]); + charToEscape &= 0xFF; + } else { + sb.append('0'); + sb.append('0'); + } + // We know it's a control char, so only the last 2 chars are non-0 + sb.append(HEX_CODES[charToEscape >> 4]); + sb.append(HEX_CODES[charToEscape & 0xF]); + } + + protected void appendTwoDigitUnsignedInt(int value) { + assert value >= 0; + assert value < 100; + if (value < 10) { + sb.append("0").append(value); + } else { + sb.append(value); + } + } + + protected void appendFourDigitUnsignedInt(int value) { + if (value < 10) { + sb.append("000").append(value); + } else if (value < 100) { + sb.append("00").append(value); + } else if (value < 1000) { + sb.append("0").append(value); + } else { + sb.append(value); + } + } + + protected void appendSixDigitUnsignedInt(int value, boolean trimTrailingZeros) { + assert value > 0; + assert value < 1000000; + // Add prefixes if necessary ... + if (value < 10) { + sb.append("00000"); + } else if (value < 100) { + sb.append("0000"); + } else if (value < 1000) { + sb.append("000"); + } else if (value < 10000) { + sb.append("00"); + } else if (value < 100000) { + sb.append("0"); + } + if (trimTrailingZeros) { + // Remove any trailing 0's ... + for (int i = 0; i != 6; ++i) { + if (value % 10 == 0) { + value /= 10; + } + } + sb.append(value); + } + } + + protected void appendDate(int year, int month, int day) { + if (year < 0) { + sb.append('-'); + year = Math.abs(year); + } + appendFourDigitUnsignedInt(year); + sb.append('-'); + appendTwoDigitUnsignedInt(month); + sb.append('-'); + appendTwoDigitUnsignedInt(day); + } + + protected void appendTime(int hour, int min, int sec, int microSeconds) { + appendTwoDigitUnsignedInt(hour); + sb.append(':'); + appendTwoDigitUnsignedInt(min); + sb.append(':'); + appendTwoDigitUnsignedInt(sec); + if (microSeconds != 0) { + sb.append('.'); + appendSixDigitUnsignedInt(microSeconds, true); + } + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java b/src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java new file mode 100644 index 0000000..3e72b44 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java @@ -0,0 +1,99 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.json; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * The set of values that can be used within a MySQL JSON value. + *

+ * These values are defined in the MySQL codebase in the + * json_binary.h file, and are: + * + *

+ *   type ::=
+ *       0x00 |       // small JSON object
+ *       0x01 |       // large JSON object
+ *       0x02 |       // small JSON array
+ *       0x03 |       // large JSON array
+ *       0x04 |       // literal (true/false/null)
+ *       0x05 |       // int16
+ *       0x06 |       // uint16
+ *       0x07 |       // int32
+ *       0x08 |       // uint32
+ *       0x09 |       // int64
+ *       0x0a |       // uint64
+ *       0x0b |       // double
+ *       0x0c |       // utf8mb4 string
+ *       0x0f         // custom data (any MySQL data type)
+ * 
+ * + * @author Randall Hauch + */ +public enum ValueType { + + SMALL_DOCUMENT(0x00), + LARGE_DOCUMENT(0x01), + SMALL_ARRAY(0x02), + LARGE_ARRAY(0x03), + LITERAL(0x04), + INT16(0x05), + UINT16(0x06), + INT32(0x07), + UINT32(0x08), + INT64(0x09), + UINT64(0x0a), + DOUBLE(0x0b), + STRING(0x0c), + CUSTOM(0x0f); + + private static final Map TYPE_BY_CODE; + + static { + TYPE_BY_CODE = new HashMap(); + for (ValueType type : values()) { + TYPE_BY_CODE.put(type.code, type); + } + } + + /** + * Get the {@link ValueType} for the given binary type code. + * + * @param code the type code + * @return the {@link ValueType}; never null + * @throws IOException if the supplied type code is not known + */ + public static ValueType byCode(int code) throws IOException { + ValueType result = TYPE_BY_CODE.get(code); + if (result == null) { + throw new IOException("Unknown value type code '" + String.format("%02X ", code) + "'"); + } + return result; + } + + private final int code; + + private ValueType(int code) { + this.code = code; + } + + protected int code() { + return this.code; + } + +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index ddddc18..3e0c2b1 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -49,6 +49,7 @@ import java.net.SocketException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.AbstractMap; @@ -847,15 +848,18 @@ public void execute(Statement statement) throws SQLException { } } - private static final class MySQLConnection implements Closeable { + /** + * Representation of a MySQL connection. + */ + public static final class MySQLConnection implements Closeable { - private String hostname; - private int port; - private String username; - private String password; + private final String hostname; + private final int port; + private final String username; + private final String password; private Connection connection; - private MySQLConnection(String hostname, int port, String username, String password) + public MySQLConnection(String hostname, int port, String username, String password) throws ClassNotFoundException, SQLException { this.hostname = hostname; this.port = port; @@ -873,6 +877,22 @@ public void execute(Statement statement) throws SQLException { }); } + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + + public String username() { + return username; + } + + public String password() { + return password; + } + public void execute(Callback callback) throws SQLException { connection.setAutoCommit(false); Statement statement = connection.createStatement(); @@ -884,6 +904,33 @@ public void execute(Callback callback) throws SQLException { } } + public void execute(final String...statements) throws SQLException { + execute(new Callback() { + @Override + public void execute(Statement statement) throws SQLException { + for (String command : statements) { + statement.execute(command); + } + } + }); + } + + public void query(String sql, Callback callback) throws SQLException { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + try { + ResultSet rs = statement.executeQuery(sql); + try { + callback.execute(rs); + connection.commit(); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + } + @Override public void close() throws IOException { try { @@ -894,7 +941,12 @@ public void close() throws IOException { } } - private interface Callback { + /** + * Callback used in the {@link MySQLConnection#execute(Callback)} method. + * + * @param the type of argument + */ + public interface Callback { void execute(T obj) throws SQLException; } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java new file mode 100644 index 0000000..062314c --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java @@ -0,0 +1,333 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.json; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.io.Serializable; +import java.sql.SQLSyntaxErrorException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ResourceBundle; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest.MySQLConnection; +import com.github.shyiko.mysql.binlog.CapturingEventListener; +import com.github.shyiko.mysql.binlog.CountDownEventListener; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; + +/** + * @author Randall Hauch + */ +public class JsonBinaryValueIntegrationTest { + + private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); + + private MySQLConnection master; + private BinaryLogClient client; + private CountDownEventListener eventListener; + private Map jsonValuesByKey; + + @BeforeClass + public void setUp() throws Exception { + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + ResourceBundle bundle = ResourceBundle.getBundle("jdbc"); + String prefix = "jdbc.mysql.replication."; + master = new MySQLConnection(bundle.getString(prefix + "master.hostname"), + Integer.parseInt(bundle.getString(prefix + "master.port")), + bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password")); + client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); + client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances + client.setKeepAlive(false); + // Uncomment the next line for detailed traces of the events ... + // client.registerEventListener(new TraceEventListener()); + client.registerEventListener(eventListener = new CountDownEventListener()); + // client.registerLifecycleListener(new TraceLifecycleListener()); + client.connect(DEFAULT_TIMEOUT); + try { + master.execute("drop database if exists json_test", + "create database json_test", + "use json_test", + "create table t1 (i INT, j JSON)"); + eventListener.waitFor(EventType.QUERY, 3, DEFAULT_TIMEOUT); + eventListener.reset(); + } catch (SQLSyntaxErrorException e) { + // Skip the tests altogether since MySQL is pre 5.7 + throw new org.testng.SkipException("JSON data type is not supported by current version of MySQL"); + } + + // Insert values into the t1 table ... + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + master.execute("INSERT INTO t1 VALUES (0, NULL);", + "INSERT INTO t1 VALUES (1, '{\"a\": 2}');", + "INSERT INTO t1 VALUES (2, '[1,2]');", + // checkstyle, please ignore LineLength for the next line + "INSERT INTO t1 VALUES (3, '{\"a\":\"b\", \"c\":\"d\",\"ab\":\"abc\", \"bc\": [\"x\", \"y\"]}');", + "INSERT INTO t1 VALUES (4, '[\"here\", [\"I\", \"am\"], \"!!!\"]');", + "INSERT INTO t1 VALUES (5, '\"scalar string\"');", + "INSERT INTO t1 VALUES (6, 'true');", + "INSERT INTO t1 VALUES (7, 'false');", + "INSERT INTO t1 VALUES (8, 'null');", + "INSERT INTO t1 VALUES (9, '-1');", + "INSERT INTO t1 VALUES (10, CAST(CAST(1 AS UNSIGNED) AS JSON));", + "INSERT INTO t1 VALUES (11, '32767');", + "INSERT INTO t1 VALUES (12, '32768');", + "INSERT INTO t1 VALUES (13, '-32768');", + "INSERT INTO t1 VALUES (14, '-32769');", + "INSERT INTO t1 VALUES (15, '2147483647');", + "INSERT INTO t1 VALUES (16, '2147483648');", + "INSERT INTO t1 VALUES (17, '-2147483648');", + "INSERT INTO t1 VALUES (18, '-2147483649');", + "INSERT INTO t1 VALUES (19, '18446744073709551615');", + "INSERT INTO t1 VALUES (20, '18446744073709551616');", + "INSERT INTO t1 VALUES (21, '3.14');", + "INSERT INTO t1 VALUES (22, '{}');", + "INSERT INTO t1 VALUES (23, '[]');", + "INSERT INTO t1 VALUES (24, CAST(CAST('2015-01-15 23:24:25' AS DATETIME) AS JSON));", + "INSERT INTO t1 VALUES (25, CAST(CAST('23:24:25' AS TIME) AS JSON));", + "INSERT INTO t1 VALUES (125, CAST(CAST('23:24:25.12' AS TIME(3)) AS JSON));", + "INSERT INTO t1 VALUES (225, CAST(CAST('23:24:25.0237' AS TIME(3)) AS JSON));", + "INSERT INTO t1 VALUES (26, CAST(CAST('2015-01-15' AS DATE) AS JSON));", + "INSERT INTO t1 VALUES (27, CAST(TIMESTAMP'2015-01-15 23:24:25' AS JSON));", + "INSERT INTO t1 VALUES (127, CAST(TIMESTAMP'2015-01-15 23:24:25.12' AS JSON));", + "INSERT INTO t1 VALUES (227, CAST(TIMESTAMP'2015-01-15 23:24:25.0237' AS JSON));", + "INSERT INTO t1 VALUES (327, CAST(UNIX_TIMESTAMP('2015-01-15 23:24:25') AS JSON));", + "INSERT INTO t1 VALUES (28, CAST(ST_GeomFromText('POINT(1 1)') AS JSON));", + // auto-convert to utf8mb4 + "INSERT INTO t1 VALUES (29, CAST('[]' AS CHAR CHARACTER SET 'ascii'));", + "INSERT INTO t1 VALUES (30, CAST(x'cafe' AS JSON));", + "INSERT INTO t1 VALUES (31, CAST(x'cafebabe' AS JSON));", + // # Maximum allowed key length is 64k-1 + "INSERT INTO t1 VALUES (100, CONCAT('{\"', REPEAT('a', 64 * 1024 - 1), '\":123}'));"); + + // Wait for the inserts to appear ... + eventListener.waitFor(WriteRowsEventData.class, 37, DEFAULT_TIMEOUT); + + jsonValuesByKey = new HashMap(); + List events = capturingEventListener.getEvents(WriteRowsEventData.class); + for (WriteRowsEventData event : events) { + List writtenRows = event.getRows(); + for (Serializable[] row : writtenRows) { + assertEquals(row.length, 2); + // Read the values ... + Integer rowNum = (Integer) row[0]; + byte[] jsonBinary = (byte[]) row[1]; + assertNotNull(rowNum); + assertTrue(jsonBinary == null || jsonBinary instanceof byte[]); + jsonValuesByKey.put(rowNum, jsonBinary); + } + } + } + + @Test + public void testNullJsonValue() throws Exception { + assertJson(0, null); + } + + @Test + public void testSimpleJsonObject() throws Exception { + assertJson(1, "{\"a\":2}"); + } + + @Test + public void testMultiLevelJsonObject() throws Exception { + assertJson(3, "{\"a\":\"b\",\"c\":\"d\",\"ab\":\"abc\",\"bc\":[\"x\",\"y\"]}"); + } + + @Test + public void testSimpleJsonArray() throws Exception { + assertJson(2, "[1,2]"); + } + + @Test + public void testMultiLevelJsonArray() throws Exception { + assertJson(4, "[\"here\",[\"I\",\"am\"],\"!!!\"]"); + } + + @Test + public void testScalarString() throws Exception { + assertJson(5, "\"scalar string\""); + } + + @Test + public void testScalarBooleanTrue() throws Exception { + assertJson(6, "true"); + } + + @Test + public void testScalarBooleanFalse() throws Exception { + assertJson(7, "false"); + } + + @Test + public void testScalarNull() throws Exception { + assertJson(8, "null"); + } + + @Test + public void testScalarNegativeInteger() throws Exception { + assertJson(9, "-1"); + } + + @Test + public void testScalarUnsignedInteger() throws Exception { + assertJson(10, "1"); + } + + @Test + public void testScalarMaxPositiveInt16() throws Exception { + assertJson(11, "32767"); + } + + @Test + public void testScalarInt32() throws Exception { + assertJson(12, "32768"); + } + + @Test + public void testScalarNegativeInt16() throws Exception { + assertJson(13, "-32768"); + } + + @Test + public void testScalarNegativeInt32() throws Exception { + assertJson(14, "-32769"); + } + + @Test + public void testScalarMaxPositiveInt32() throws Exception { + assertJson(15, "2147483647"); + } + + @Test + public void testScalarPositiveInt64() throws Exception { + assertJson(16, "2147483648"); + } + + @Test + public void testScalarMaxNegativeInt32() throws Exception { + assertJson(17, "-2147483648"); + } + + @Test + public void testScalarNegativeInt64() throws Exception { + assertJson(18, "-2147483649"); + } + + @Test + public void testScalarUInt64() throws Exception { + assertJson(19, "18446744073709551615"); + } + + @Test + public void testScalarBeyondUInt64() throws Exception { + assertJson(20, "18446744073709551616"); + } + + @Test + public void testScalarPi() throws Exception { + assertJson(21, "3.14"); + } + + @Test + public void testEmptyObject() throws Exception { + assertJson(22, "{}"); + } + + @Test + public void testEmptyArray() throws Exception { + assertJson(23, "[]"); + } + + @Test + public void testScalarDateTime() throws Exception { + assertJson(24, "\"2015-01-15 23:24:25\""); + } + + @Test + public void testScalarTime() throws Exception { + assertJson(25, "\"23:24:25\""); + assertJson(125, "\"23:24:25.12\""); + assertJson(225, "\"23:24:25.024\""); + } + + @Test + public void testScalarDate() throws Exception { + assertJson(26, "\"2015-01-15\""); + } + + @Test + public void testScalarTimestamp() throws Exception { + // Timestamp literals are interpreted by MySQL as DATETIME values + assertJson(27, "\"2015-01-15 23:24:25\""); + assertJson(127, "\"2015-01-15 23:24:25.12\""); + assertJson(227, "\"2015-01-15 23:24:25.0237\""); + // The UNIX_TIMESTAMP(ts) function returns the number of seconds past epoch for the given ts + assertJson(327, "1421364265"); + } + + @Test + public void testScalarGeometry() throws Exception { + assertJson(28, "{\"type\":\"Point\",\"coordinates\":[1.0,1.0]}"); + } + + @Test + public void testScalarStringWithCharsetConversion() throws Exception { + assertJson(29, "[]"); + } + + @Test + public void testScalarBinaryAsBase64() throws Exception { + assertJson(30, "\"yv4=\""); + assertJson(31, "\"yv66vg==\""); + } + + protected void assertJson(int i, String expected) throws Exception { + byte[] b = jsonForId(i); + String json = b != null ? JsonBinary.parseAsString(b) : null; + assertEquals(json, expected); + } + + /** + * Get the binary representation of the JSON value that corresponds to the specified row number. + * + * @param i the row number; should be unique for an insert to work + * @return the binary representation of the JSON value as read from the {@link WriteRowsEventData} event; + * may be null if the JSON value is null + */ + protected byte[] jsonForId(int i) throws Exception { + return jsonValuesByKey.get(new Integer(i)); + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + if (master != null) { + master.execute("drop database if exists json_test"); + master.close(); + } + } +} diff --git a/supplement/codequality/checkstyle.xml b/supplement/codequality/checkstyle.xml index d5ad630..21fbe8e 100644 --- a/supplement/codequality/checkstyle.xml +++ b/supplement/codequality/checkstyle.xml @@ -72,7 +72,7 @@ - + From 09471b1b21f386a3dd978a8a6c73ab18b8f0e9ae Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Thu, 6 Oct 2016 10:04:28 -0700 Subject: [PATCH 07/21] Moved **.mysql.binlog.json to **.mysql.binlog.event.deserialization.json --- .../AbstractRowsEventDataDeserializer.java | 4 +- .../deserialization}/json/JsonBinary.java | 124 +++++++++--------- .../deserialization}/json/JsonFormatter.java | 9 +- .../json/JsonStringFormatter.java | 20 +-- .../deserialization}/json/ValueType.java | 68 ++++------ .../json/JsonBinaryValueIntegrationTest.java | 11 +- 6 files changed, 108 insertions(+), 128 deletions(-) rename src/main/java/com/github/shyiko/mysql/binlog/{ => event/deserialization}/json/JsonBinary.java (91%) rename src/main/java/com/github/shyiko/mysql/binlog/{ => event/deserialization}/json/JsonFormatter.java (96%) rename src/main/java/com/github/shyiko/mysql/binlog/{ => event/deserialization}/json/JsonStringFormatter.java (94%) rename src/main/java/com/github/shyiko/mysql/binlog/{ => event/deserialization}/json/ValueType.java (56%) rename src/test/java/com/github/shyiko/mysql/binlog/{ => event/deserialization}/json/JsonBinaryValueIntegrationTest.java (96%) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java index 61146c4..2d4f0b6 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java @@ -333,8 +333,8 @@ protected byte[] deserializeGeometry(int meta, ByteArrayInputStream inputStream) /** * Deserialize the {@code JSON} value on the input stream, and return MySQL's internal binary representation - * of the JSON value. See {@link com.github.shyiko.mysql.binlog.json.JsonBinary} for a utility to parse this - * binary representation into something more useful, including a string representation. + * of the JSON value. See {@link com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary} for + * a utility to parse this binary representation into something more useful, including a string representation. * * @param meta the number of bytes in which the length of the JSON value is found first on the input stream * @param inputStream the stream containing the JSON value diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java similarity index 91% rename from src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java rename to src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java index a719afd..d5824a3 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonBinary.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 Stanley Shyiko + * Copyright 2016 Stanley Shyiko * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.github.shyiko.mysql.binlog.json; +package com.github.shyiko.mysql.binlog.event.deserialization.json; import java.io.IOException; import java.math.BigDecimal; @@ -26,7 +26,7 @@ /** * Utility to parse the binary-encoded value of a MySQL {@code JSON} type, translating the encoded representation into - * method calls on a supplied {@link JsonHandler} implementation. + * method calls on a supplied {@link JsonFormatter} implementation. * *

Binary Format

* @@ -85,20 +85,20 @@ *
  *   doc ::= type value
  *   type ::=
- *       0x00 |       // small JSON object
- *       0x01 |       // large JSON object
- *       0x02 |       // small JSON array
- *       0x03 |       // large JSON array
- *       0x04 |       // literal (true/false/null)
- *       0x05 |       // int16
- *       0x06 |       // uint16
- *       0x07 |       // int32
- *       0x08 |       // uint32
- *       0x09 |       // int64
- *       0x0a |       // uint64
- *       0x0b |       // double
- *       0x0c |       // utf8mb4 string
- *       0x0f         // custom data (any MySQL data type)
+ *       0x00 |  // small JSON object
+ *       0x01 |  // large JSON object
+ *       0x02 |  // small JSON array
+ *       0x03 |  // large JSON array
+ *       0x04 |  // literal (true/false/null)
+ *       0x05 |  // int16
+ *       0x06 |  // uint16
+ *       0x07 |  // int32
+ *       0x08 |  // uint32
+ *       0x09 |  // int64
+ *       0x0a |  // uint64
+ *       0x0b |  // double
+ *       0x0c |  // utf8mb4 string
+ *       0x0f    // custom data (any MySQL data type)
  *   value ::=
  *       object  |
  *       array   |
@@ -193,17 +193,17 @@ public JsonBinary(ByteArrayInputStream contents) {
 
     @Override
     public String toString() {
-        return asString();
+        return getString();
     }
 
-    public String asString() {
+    public String getString() {
         JsonStringFormatter handler = new JsonStringFormatter();
         try {
             parse(handler);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return handler.toString();
+        return handler.getString();
     }
 
     public void parse(JsonFormatter formatter) throws IOException {
@@ -255,8 +255,8 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException
                 parseOpaque(formatter);
                 break;
             default:
-                throw new IOException("Unknown type value '" + asHex(type.code()) +
-                        "' in first byte of a JSON value");
+                throw new IOException("Unknown type value '" + asHex(type.getCode()) +
+                    "' in first byte of a JSON value");
         }
     }
 
@@ -318,21 +318,20 @@ protected void parse(ValueType type, JsonFormatter formatter) throws IOException
      *                           // lengths up to 16383, and so on...
      * 
* - * @param offset the offset at which the value is to be read; may not be null - * @param isSmall {@code true} if the object being read is "small", or {@code false} otherwise + * @param small {@code true} if the object being read is "small", or {@code false} otherwise * @param formatter the formatter to be notified of the parsed value; may not be null * @throws IOException if there is a problem reading the JSON value */ - protected void parseObject(boolean isSmall, JsonFormatter formatter) + protected void parseObject(boolean small, JsonFormatter formatter) throws IOException { // Read the header ... - int numElements = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "number of elements in"); - int numBytes = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "size of"); + int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in"); + int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of"); // Read each key-entry, consisting of the offset and length of each key ... int[] keyLengths = new int[numElements]; for (int i = 0; i != numElements; ++i) { - readUnsignedIndex(numBytes, isSmall, "key offset in"); // unused + readUnsignedIndex(numBytes, small, "key offset in"); // unused keyLengths[i] = readUInt16(); } @@ -348,19 +347,19 @@ protected void parseObject(boolean isSmall, JsonFormatter formatter) case INT16: case UINT16: // The "offset" is actually the value ... - int value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + int value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); entries[i] = new ValueEntry(type).setValue(value); break; case INT32: case UINT32: - if (!isSmall) { + if (!small) { // The value should be large enough to handle the actual value ... - value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); entries[i] = new ValueEntry(type).setValue(value); } default: // It is an offset, not a value ... - int offset = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + int offset = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); if (offset >= numBytes) { throw new IOException("The offset for the value in the JSON binary document is " + offset + @@ -390,9 +389,9 @@ protected void parseObject(boolean isSmall, JsonFormatter formatter) if (value == null) { formatter.valueNull(); } else if (value instanceof Boolean) { - formatter.value(((Boolean) value).booleanValue()); + formatter.value((Boolean) value); } else if (value instanceof Integer) { - formatter.value(((Integer) value).intValue()); + formatter.value((Integer) value); } } else { // Parse the value ... @@ -455,17 +454,16 @@ protected void parseObject(boolean isSmall, JsonFormatter formatter) * // lengths up to 16383, and so on... * * - * @param offset the offset at which the value is to be read; may not be null - * @param isSmall {@code true} if the object being read is "small", or {@code false} otherwise + * @param small {@code true} if the object being read is "small", or {@code false} otherwise * @param formatter the formatter to be notified of the parsed value; may not be null * @throws IOException if there is a problem reading the JSON value */ // checkstyle, please ignore MethodLength for the next line - protected void parseArray(boolean isSmall, JsonFormatter formatter) + protected void parseArray(boolean small, JsonFormatter formatter) throws IOException { // Read the header ... - int numElements = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "number of elements in"); - int numBytes = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "size of"); + int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in"); + int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of"); // Read each key value value-entry ValueEntry[] entries = new ValueEntry[numElements]; @@ -479,19 +477,19 @@ protected void parseArray(boolean isSmall, JsonFormatter formatter) case INT16: case UINT16: // The "offset" is actually the value ... - int value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + int value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); entries[i] = new ValueEntry(type).setValue(value); break; case INT32: case UINT32: - if (!isSmall) { + if (!small) { // The value should be large enough to handle the actual value ... - value = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); entries[i] = new ValueEntry(type).setValue(value); } default: // It is an offset, not a value ... - int offset = readUnsignedIndex(Integer.MAX_VALUE, isSmall, "value offset in"); + int offset = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); if (offset >= numBytes) { throw new IOException("The offset for the value in the JSON binary document is " + offset + @@ -514,9 +512,9 @@ protected void parseArray(boolean isSmall, JsonFormatter formatter) if (value == null) { formatter.valueNull(); } else if (value instanceof Boolean) { - formatter.value(((Boolean) value).booleanValue()); + formatter.value((Boolean) value); } else if (value instanceof Integer) { - formatter.value(((Integer) value).intValue()); + formatter.value((Integer) value); } } else { // Parse the value ... @@ -529,7 +527,6 @@ protected void parseArray(boolean isSmall, JsonFormatter formatter) /** * Parse a literal value that is either null, {@code true}, or {@code false}. * - * @param offset the offset at which the value is to be read; may not be null * @param formatter the formatter to be notified of the parsed value; may not be null * @throws IOException if there is a problem reading the JSON value */ @@ -538,7 +535,7 @@ protected void parseBoolean(JsonFormatter formatter) throws IOException { if (literal == null) { formatter.valueNull(); } else { - formatter.value(literal.booleanValue()); + formatter.value(literal); } } @@ -622,26 +619,21 @@ protected void parseDouble(JsonFormatter formatter) throws IOException { /** * Parse the length and value of a string stored in MySQL's "utf8mb" character set (which equates to Java's - * {@link StandardCharsets#UTF_8} character set. The length is a {@link #readVariableInt(ByteBuffer) - * variable length integer} length of the string. + * UTF-8 character set. The length is a {@link #readVariableInt() variable length integer} length of the string. * - * @param offset the offset at which the value is to be read; may not be null * @param formatter the formatter to be notified of the parsed value; may not be null * @throws IOException if there is a problem reading the JSON value */ protected void parseString(JsonFormatter formatter) throws IOException { int length = readVariableInt(); - byte[] content = new byte[length]; - reader.read(content); - String value = new String(content, UTF_8); + String value = new String(reader.read(length), UTF_8); formatter.value(value); } /** - * Parse an opaque type. Specific types such as {@link #parseDate(Offset, formatter) DATE}, - * {@link #parseTime(Offset, formatter) TIME}, and {@link #parseDatetime(Offset, formatter) DATETIME} values are - * stored as opaque types, though they are to be unpacked. {@link #parseTimestamp(Offset, formatter) TIMESTAMP} - * are also stored as opaque types, but + * Parse an opaque type. Specific types such as {@link #parseDate(JsonFormatter) DATE}, + * {@link #parseTime(JsonFormatter) TIME}, and {@link #parseDatetime(JsonFormatter) DATETIME} values are + * stored as opaque types, though they are to be unpacked. TIMESTAMPs are also stored as opaque types, but * converted by MySQL to * {@code DATETIME} prior to storage. * Other MySQL types are stored as opaque types and passed on to the formatter as opaque values. @@ -852,9 +844,7 @@ protected void parseDecimal(int length, JsonFormatter formatter) throws IOExcept protected void parseOpaqueValue(ColumnType type, int length, JsonFormatter formatter) throws IOException { - byte[] rawValue = new byte[length]; - reader.read(rawValue); - formatter.valueOpaque(type, rawValue); + formatter.valueOpaque(type, reader.read(length)); } protected int readFractionalSecondsInMicroseconds() throws IOException { @@ -946,7 +936,6 @@ protected BigInteger readUInt64() throws IOException { * byte of the length field. So we need 1 byte to represent lengths up to 127, 2 bytes to represent lengths up * to 16383, and so on... * - * @param bytes the binary input with the value * @return the integer value */ protected int readVariableInt() throws IOException { @@ -973,7 +962,11 @@ protected Boolean readLiteral() throws IOException { protected ValueType readValueType() throws IOException { byte b = (byte) reader.read(); - return ValueType.byCode(b); + ValueType result = ValueType.byCode(b); + if (result == null) { + throw new IOException("Unknown value type code '" + String.format("%02X", (int) b) + "'"); + } + return result; } protected static String asHex(byte b) { @@ -988,14 +981,15 @@ protected static String asHex(int value) { * Class used internally to hold value entry information. */ protected static final class ValueEntry { + protected final ValueType type; protected final int index; - protected Object value = null; - protected boolean resolved = false; + protected Object value; + protected boolean resolved; public ValueEntry(ValueType type) { this.type = type; - index = 0; + this.index = 0; } public ValueEntry(ValueType type, int index) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonFormatter.java similarity index 96% rename from src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java rename to src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonFormatter.java index df622b0..2888182 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonFormatter.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonFormatter.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 Stanley Shyiko + * Copyright 2016 Stanley Shyiko * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.github.shyiko.mysql.binlog.json; +package com.github.shyiko.mysql.binlog.event.deserialization.json; import java.math.BigDecimal; import java.math.BigInteger; @@ -21,7 +21,7 @@ import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; /** - * Handle the various actions involved when {@link JsonBinaryValue#parse(byte[], JsonHandler) parsing} a JSON binary + * Handle the various actions involved when {@link JsonBinary#parse(byte[], JsonFormatter)} a JSON binary * value. * * @author Randall Hauch @@ -173,4 +173,5 @@ public interface JsonFormatter { * Called after an entry signaling that another entry will be signaled. */ void nextEntry(); -} \ No newline at end of file + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java similarity index 94% rename from src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java rename to src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java index 28eb25a..e2f657c 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/json/JsonStringFormatter.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 Stanley Shyiko + * Copyright 2016 Stanley Shyiko * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.github.shyiko.mysql.binlog.json; +package com.github.shyiko.mysql.binlog.event.deserialization.json; import java.math.BigDecimal; import java.math.BigInteger; @@ -21,7 +21,7 @@ import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; /** - * A {@link JsonValueHandler} implementation that creates a JSON string representation. + * A {@link JsonFormatter} implementation that creates a JSON string representation. * * @author Randall Hauch */ @@ -31,21 +31,22 @@ public class JsonStringFormatter implements JsonFormatter { * Value used for lookup tables to indicate that matching characters * do not need to be escaped. */ - public static final int ESCAPE_NONE = 0; + private static final int ESCAPE_NONE = 0; /** * Value used for lookup tables to indicate that matching characters * are to be escaped using standard escaping; for JSON this means * (for example) using "backslash - u" escape method. */ - public static final int ESCAPE_GENERIC = -1; + private static final int ESCAPE_GENERIC = -1; /** * A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8 characters) * must be escaped. A value of '0' means no escaping is required; positive values must be escaped with a * preceding backslash; and negative values that generic escaping (e.g., {@code \\uXXXX}). */ - protected static final int[] ESCAPES; + private static final int[] ESCAPES; + static { int[] escape = new int[128]; // Generic escape for control characters ... @@ -64,10 +65,9 @@ public class JsonStringFormatter implements JsonFormatter { ESCAPES = escape; } - protected static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray(); + private static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray(); private final StringBuilder sb = new StringBuilder(); - private final int[] escapes = ESCAPES; @Override public String toString() { @@ -215,11 +215,11 @@ protected void appendString(String original) { for (int i = 0, len = original.length(); i < len; ++i) { char c = original.charAt(i); int ch = c; - if (ch < 0 || escapes[ch] == 0) { + if (ch < 0 || ESCAPES[ch] == 0) { sb.append(c); continue; } - int escape = escapes[ch]; + int escape = ESCAPES[ch]; if (escape > 0) { // 2-char escape, fine sb.append('\\'); sb.append(c); diff --git a/src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/ValueType.java similarity index 56% rename from src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java rename to src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/ValueType.java index 3e72b44..816bc60 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/json/ValueType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/ValueType.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 Stanley Shyiko + * Copyright 2016 Stanley Shyiko * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,9 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.github.shyiko.mysql.binlog.json; +package com.github.shyiko.mysql.binlog.event.deserialization.json; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -27,20 +26,20 @@ * *
  *   type ::=
- *       0x00 |       // small JSON object
- *       0x01 |       // large JSON object
- *       0x02 |       // small JSON array
- *       0x03 |       // large JSON array
- *       0x04 |       // literal (true/false/null)
- *       0x05 |       // int16
- *       0x06 |       // uint16
- *       0x07 |       // int32
- *       0x08 |       // uint32
- *       0x09 |       // int64
- *       0x0a |       // uint64
- *       0x0b |       // double
- *       0x0c |       // utf8mb4 string
- *       0x0f         // custom data (any MySQL data type)
+ *       0x00 |  // small JSON object
+ *       0x01 |  // large JSON object
+ *       0x02 |  // small JSON array
+ *       0x03 |  // large JSON array
+ *       0x04 |  // literal (true/false/null)
+ *       0x05 |  // int16
+ *       0x06 |  // uint16
+ *       0x07 |  // int32
+ *       0x08 |  // uint32
+ *       0x09 |  // int64
+ *       0x0a |  // uint64
+ *       0x0b |  // double
+ *       0x0c |  // utf8mb4 string
+ *       0x0f    // custom data (any MySQL data type)
  * 
* * @author Randall Hauch @@ -62,6 +61,16 @@ public enum ValueType { STRING(0x0c), CUSTOM(0x0f); + private final int code; + + ValueType(int code) { + this.code = code; + } + + public int getCode() { + return this.code; + } + private static final Map TYPE_BY_CODE; static { @@ -71,29 +80,8 @@ public enum ValueType { } } - /** - * Get the {@link ValueType} for the given binary type code. - * - * @param code the type code - * @return the {@link ValueType}; never null - * @throws IOException if the supplied type code is not known - */ - public static ValueType byCode(int code) throws IOException { - ValueType result = TYPE_BY_CODE.get(code); - if (result == null) { - throw new IOException("Unknown value type code '" + String.format("%02X ", code) + "'"); - } - return result; - } - - private final int code; - - private ValueType(int code) { - this.code = code; - } - - protected int code() { - return this.code; + public static ValueType byCode(int code) { + return TYPE_BY_CODE.get(code); } } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java similarity index 96% rename from src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java rename to src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index 062314c..251a13b 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.github.shyiko.mysql.binlog.json; +package com.github.shyiko.mysql.binlog.event.deserialization.json; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; import java.io.Serializable; import java.sql.SQLSyntaxErrorException; @@ -47,8 +46,6 @@ public class JsonBinaryValueIntegrationTest { private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); private MySQLConnection master; - private BinaryLogClient client; - private CountDownEventListener eventListener; private Map jsonValuesByKey; @BeforeClass @@ -59,11 +56,12 @@ public void setUp() throws Exception { master = new MySQLConnection(bundle.getString(prefix + "master.hostname"), Integer.parseInt(bundle.getString(prefix + "master.port")), bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password")); - client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); + BinaryLogClient client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances client.setKeepAlive(false); // Uncomment the next line for detailed traces of the events ... // client.registerEventListener(new TraceEventListener()); + CountDownEventListener eventListener; client.registerEventListener(eventListener = new CountDownEventListener()); // client.registerLifecycleListener(new TraceLifecycleListener()); client.connect(DEFAULT_TIMEOUT); @@ -137,7 +135,6 @@ public void setUp() throws Exception { Integer rowNum = (Integer) row[0]; byte[] jsonBinary = (byte[]) row[1]; assertNotNull(rowNum); - assertTrue(jsonBinary == null || jsonBinary instanceof byte[]); jsonValuesByKey.put(rowNum, jsonBinary); } } @@ -320,7 +317,7 @@ protected void assertJson(int i, String expected) throws Exception { * may be null if the JSON value is null */ protected byte[] jsonForId(int i) throws Exception { - return jsonValuesByKey.get(new Integer(i)); + return jsonValuesByKey.get(i); } @AfterClass(alwaysRun = true) From 8f9132ee773317e00313204beeae8ddcaa43c1b4 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Thu, 6 Oct 2016 10:21:27 -0700 Subject: [PATCH 08/21] Fixed DATE deserialization test on MySQL 5.7 --- .../binlog/BinaryLogClientIntegrationTest.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 3e0c2b1..8ede146 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -254,9 +254,21 @@ public void testDeserializationOfNEWDECIMAL() throws Exception { public void testDeserializationOfDATE() throws Exception { assertEquals(writeAndCaptureRow("date", "'1989-03-21'"), new Serializable[]{ generateTime(1989, 3, 21, 0, 0, 0, 0)}); - assertEquals(writeAndCaptureRow("date", "'0000-03-21'"), new Serializable[]{null}); - assertEquals(writeAndCaptureRow("date", "'1989-00-21'"), new Serializable[]{null}); - assertEquals(writeAndCaptureRow("date", "'1989-03-00'"), new Serializable[]{null}); + final boolean[] noZeroInDate = new boolean[1]; + master.query("select @@sql_mode;", new Callback() { + + @Override + public void execute(ResultSet rs) throws SQLException { + // NO_ZERO_IN_DATE is turned on by default in MySQL 5.7 + // https://github.com/shyiko/mysql-binlog-connector-java/pull/119#issuecomment-251870581 + noZeroInDate[0] = rs.next() && rs.getString(1).contains("NO_ZERO_IN_DATE"); + } + }); + if (!noZeroInDate[0]) { + assertEquals(writeAndCaptureRow("date", "'0000-03-21'"), new Serializable[]{null}); + assertEquals(writeAndCaptureRow("date", "'1989-00-21'"), new Serializable[]{null}); + assertEquals(writeAndCaptureRow("date", "'1989-03-00'"), new Serializable[]{null}); + } } @Test From c8e81c879710dc19941d952f9031b0a98f8b7c02 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Thu, 6 Oct 2016 10:47:01 -0700 Subject: [PATCH 09/21] Fixed checkstyle violation --- .../deserialization/json/JsonBinaryValueIntegrationTest.java | 3 ++- supplement/codequality/checkstyle.xml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index 251a13b..104c8fd 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -56,7 +56,8 @@ public void setUp() throws Exception { master = new MySQLConnection(bundle.getString(prefix + "master.hostname"), Integer.parseInt(bundle.getString(prefix + "master.port")), bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password")); - BinaryLogClient client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password()); + BinaryLogClient client = new BinaryLogClient(master.hostname(), master.port(), master.username(), + master.password()); client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances client.setKeepAlive(false); // Uncomment the next line for detailed traces of the events ... diff --git a/supplement/codequality/checkstyle.xml b/supplement/codequality/checkstyle.xml index 21fbe8e..d74512a 100644 --- a/supplement/codequality/checkstyle.xml +++ b/supplement/codequality/checkstyle.xml @@ -72,7 +72,7 @@
- + From 3527e991a40d85f1b946b009a9ee948c6bdfe200 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Thu, 6 Oct 2016 11:15:36 -0700 Subject: [PATCH 10/21] Added 0.5.0 release notes --- changelog.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 1a653c2..76dbff4 100644 --- a/changelog.md +++ b/changelog.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...HEAD) +## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.0...HEAD) ### Added - A way to control Socket i/s buffering (using BinaryLogClient::setSocketFactory()). @@ -16,6 +16,11 @@ This is **BACKWARD-INCOMPATIBLE** change. - BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)). This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed). +## [0.5.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.2...0.5.0) - 2016-10-06 + +### Added + - JSON support ([#119](https://github.com/shyiko/mysql-binlog-connector-java/pull/119)) (thanks to [@rhauch](https://github.com/rhauch)). + ## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-20 ### Fixed From fb379adeec1648249b25705a6d08974db5533db4 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Thu, 6 Oct 2016 11:17:50 -0700 Subject: [PATCH 11/21] Updated refs to latest 0.5.0 release --- readme.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/readme.md b/readme.md index a8877eb..51bdd41 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.4.2-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.5.0-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) MySQL Binary Log connector. @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.4.2 + 0.5.0 ``` From 78513a1b11b859a6fbfe71cde082296254534ab9 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Fri, 14 Oct 2016 11:43:47 -0500 Subject: [PATCH 12/21] Changed the client so that GTIDs are updated before calling the event listeners Previously, GTIDs were updated at the same time the binlog filename and position were updated: after the event listeners were called. However, this means that the GTID set was not yet updated when the event listeners were called, meaning that the event listeners would see the old GTID set if they asked for it. With this change, the GTID set is updated before the event listeners are called, and the binlog filename and position (which reference the _next_ event) are updated after the event listeners are called. This ensures that the event listeners see the event's correct GTID set and binlog filename and position. --- .../shyiko/mysql/binlog/BinaryLogClient.java | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 730d5bc..b9e88b5 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -745,8 +745,11 @@ private void listenForEventPackets() throws IOException { continue; } if (isConnected()) { + // Update the GTID before the event listeners, and the binlog filename/position after the + // listeners (since the binlog filename/position point to the *next* event) ... + boolean updatedGtid = updateGtid(event); notifyEventListeners(event); - updatePosition(event); + updatePosition(event, updatedGtid); } } } catch (Exception e) { @@ -780,40 +783,52 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac return result; } - private void updatePosition(Event event) { + private boolean updateGtid(Event event) { EventHeader eventHeader = event.getHeader(); EventType eventType = eventHeader.getEventType(); - if (gtidSet != null && eventType == EventType.XID) { - advanceGTID(); - } else - if (gtidSet != null && eventType == EventType.QUERY) { - QueryEventData queryEventData = getInternalEventData(event); - String query = queryEventData.getSql(); - if ("COMMIT".equals(query) || "ROLLBACK".equals(query) || - (previousEvent != null && previousEvent.getHeader().getEventType() == EventType.GTID && - !"BEGIN".equals(query))) { + if (gtidSet != null) { + if (eventType == EventType.XID) { advanceGTID(); + return true; } - } else - if (gtidSet != null && eventType == EventType.GTID) { - if (previousGtidEvent != null) { - if (advanceGTID()) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, "GtidSet wasn't synchronized before GTID. " + - "Please submit a bug report to https://github.com/shyiko/mysql-binlog-connector-java"); + if (eventType == EventType.QUERY) { + QueryEventData queryEventData = getInternalEventData(event); + String query = queryEventData.getSql(); + if ("COMMIT".equals(query) || "ROLLBACK".equals(query) || + (previousEvent != null && previousEvent.getHeader().getEventType() == EventType.GTID && + !"BEGIN".equals(query))) { + advanceGTID(); + } + return true; + } + if (eventType == EventType.GTID) { + if (previousGtidEvent != null) { + if (advanceGTID()) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, "GtidSet wasn't synchronized before GTID. " + + "Please submit a bug report to https://github.com/shyiko/mysql-binlog-connector-java"); + } } } + previousGtidEvent = event; + return true; } - previousGtidEvent = event; - } else - if (eventType == EventType.ROTATE) { + } + return false; + } + + private void updatePosition(Event event, boolean updatedGtid) { + EventHeader eventHeader = event.getHeader(); + EventType eventType = eventHeader.getEventType(); + if (updatedGtid) { + // the GTID was previously updated based upon this event + } else if (eventType == EventType.ROTATE) { RotateEventData rotateEventData = getInternalEventData(event); binlogFilename = rotateEventData.getBinlogFilename(); binlogPosition = rotateEventData.getBinlogPosition(); - } else - // do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of - // client) table mapping cache could be reconstructed before hitting row mutation event - if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) { + } else if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) { + // do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of + // client) table mapping cache could be reconstructed before hitting row mutation event EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; long nextBinlogPosition = trackableEventHeader.getNextPosition(); if (nextBinlogPosition > 0) { From 3f3f595989162e7813d10ee68cc42e41ca5e6fb0 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Tue, 18 Oct 2016 13:51:11 -0700 Subject: [PATCH 13/21] Added 0.5.1 release notes --- changelog.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 76dbff4..20b6906 100644 --- a/changelog.md +++ b/changelog.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.0...HEAD) +## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.1...HEAD) ### Added - A way to control Socket i/s buffering (using BinaryLogClient::setSocketFactory()). @@ -16,6 +16,13 @@ This is **BACKWARD-INCOMPATIBLE** change. - BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)). This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed). +## [0.5.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.0...0.5.1) - 2016-10-18 + +### Fixed + - ROWS_QUERY event deserialization ([#124](https://github.com/shyiko/mysql-binlog-connector-java/issues/124)). + - JSON length determination. + - GTID sync (GtidSet is now updated before BinaryLogClient.EventListener|s are notified). + ## [0.5.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.2...0.5.0) - 2016-10-06 ### Added From 09092f1d4e289bfdfff298154e6d427c2869f607 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Tue, 18 Oct 2016 13:52:26 -0700 Subject: [PATCH 14/21] Updated refs to latest 0.5.1 release --- readme.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/readme.md b/readme.md index 51bdd41..ce8db96 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.5.0-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.5.1-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) MySQL Binary Log connector. @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.5.0 + 0.5.1 ``` From 87e68b50ad8c8dee22c4ec9faa61ca7fb2e91baf Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Sun, 23 Oct 2016 12:39:19 -0700 Subject: [PATCH 15/21] Updated "real-world applications" section of readme.md --- readme.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/readme.md b/readme.md index ce8db96..9a0f5a1 100644 --- a/readme.md +++ b/readme.md @@ -181,10 +181,13 @@ For the insight into the internals of MySQL look [here](https://dev.mysql.com/do ## Real-world applications Some of the OSS built on top of mysql-binlog-conector-java: -[debezium](https://github.com/debezium/debezium) (distributed platform for change data capture), -[mardambey/mypipe](https://github.com/mardambey/mypipe) (MySQL to Apache Kafka replicator), -[ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) (MySQL to MongoDB replicator), -[shyiko/rook](https://github.com/shyiko/rook) (generic Change Data Capture (CDC) toolkit). +* [debezium](https://github.com/debezium/debezium) A low latency data streaming platform for change data capture (CDC). +* [mavenlink/changestream](https://github.com/mavenlink/changestream) - A stream of changes for MySQL built on Akka. +* [mardambey/mypipe](https://github.com/mardambey/mypipe) MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka. +* [ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) MySQL to MongoDB data replicator. +* [sharetribe/dumpr](https://github.com/sharetribe/dumpr) A Clojure library for live replicating data from a MySQL database. +* [shyiko/rook](https://github.com/shyiko/rook) Generic Change Data Capture (CDC) toolkit. +* [twingly/ecco](https://github.com/twingly/ecco) MySQL replication binlog parser in JRuby. It's also used [on a large scale](https://twitter.com/atwinmutt/status/626816601078300672) in MailChimp. You can read about it [here](http://devs.mailchimp.com/blog/powering-mailchimp-pro-reporting/). From 6bb1ce407a305854a4b343595b90585220b0d676 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Wed, 2 Nov 2016 22:32:11 -0700 Subject: [PATCH 16/21] Added link to streamsets/datacollector --- readme.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/readme.md b/readme.md index 9a0f5a1..263713d 100644 --- a/readme.md +++ b/readme.md @@ -180,13 +180,14 @@ For the insight into the internals of MySQL look [here](https://dev.mysql.com/do ## Real-world applications -Some of the OSS built on top of mysql-binlog-conector-java: +Some of the OSS using / built on top of mysql-binlog-conector-java: * [debezium](https://github.com/debezium/debezium) A low latency data streaming platform for change data capture (CDC). * [mavenlink/changestream](https://github.com/mavenlink/changestream) - A stream of changes for MySQL built on Akka. * [mardambey/mypipe](https://github.com/mardambey/mypipe) MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka. * [ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) MySQL to MongoDB data replicator. * [sharetribe/dumpr](https://github.com/sharetribe/dumpr) A Clojure library for live replicating data from a MySQL database. * [shyiko/rook](https://github.com/shyiko/rook) Generic Change Data Capture (CDC) toolkit. +* [streamsets/datacollector](https://github.com/streamsets/datacollector) Continuous big data ingestion infrastructure. * [twingly/ecco](https://github.com/twingly/ecco) MySQL replication binlog parser in JRuby. It's also used [on a large scale](https://twitter.com/atwinmutt/status/626816601078300672) in MailChimp. You can read about it [here](http://devs.mailchimp.com/blog/powering-mailchimp-pro-reporting/). From 3080b8d0b0e49fff71cb75cc6e92dd53bac9839e Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Fri, 11 Nov 2016 11:21:40 -0800 Subject: [PATCH 17/21] Added .jabbarc --- .jabbarc | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .jabbarc diff --git a/.jabbarc b/.jabbarc new file mode 100644 index 0000000..cd4ca1f --- /dev/null +++ b/.jabbarc @@ -0,0 +1,2 @@ +# https://github.com/shyiko/jabba +jdk: 1.6 From 89725d240625083f318acd1405fc48a69a0bee51 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Sat, 19 Nov 2016 21:37:48 -0800 Subject: [PATCH 18/21] Added 0.5.2 release notes --- changelog.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 20b6906..51e907e 100644 --- a/changelog.md +++ b/changelog.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.1...HEAD) +## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.2...HEAD) ### Added - A way to control Socket i/s buffering (using BinaryLogClient::setSocketFactory()). @@ -16,6 +16,11 @@ This is **BACKWARD-INCOMPATIBLE** change. - BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)). This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed). +## [0.5.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.1...0.5.2) - 2016-11-19 + +### Fixed + - (JSON) deserialization of null/true/false/(u)int(16|32)/variable-length data types ([#129](https://github.com/shyiko/mysql-binlog-connector-java/issues/129)). + ## [0.5.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.0...0.5.1) - 2016-10-18 ### Fixed From ad0f727fdcc92a4b539a2a2ab0f446a364af7b99 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Sat, 19 Nov 2016 21:39:03 -0800 Subject: [PATCH 19/21] Updated refs to latest (0.5.2) release --- readme.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/readme.md b/readme.md index 263713d..22e8e4b 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.5.1-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.5.2-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) MySQL Binary Log connector. @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.5.1 + 0.5.2 ``` From d63f5811dd4f0cfd5c80a4042cf119d4a7e610a6 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Sun, 27 Nov 2016 22:31:38 -0800 Subject: [PATCH 20/21] Added 0.6.0 release notes --- changelog.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 51e907e..c74ea18 100644 --- a/changelog.md +++ b/changelog.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.2...HEAD) +## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.6.0...HEAD) ### Added - A way to control Socket i/s buffering (using BinaryLogClient::setSocketFactory()). @@ -16,6 +16,11 @@ This is **BACKWARD-INCOMPATIBLE** change. - BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)). This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed). +## [0.6.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.2...0.6.0) - 2016-11-27 + +### Added + - EventDeserializer compatibility modes to mimic upcoming 1.0.0 event deserialization behavior ([#131](https://github.com/shyiko/mysql-binlog-connector-java/pull/131)). + ## [0.5.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.1...0.5.2) - 2016-11-19 ### Fixed From d625c53f0c826710f83ac394339bdc7d8649fd5d Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Sun, 27 Nov 2016 22:33:23 -0800 Subject: [PATCH 21/21] Updated refs to the latest 0.6.0 release (readme.md) --- readme.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/readme.md b/readme.md index 22e8e4b..989f32d 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.5.2-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.6.0-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) MySQL Binary Log connector. @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.5.2 + 0.6.0 ```