diff --git a/.gitignore b/.gitignore index 681a4aa..ae7668d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ .idea .iml .DS_Store -target \ No newline at end of file +target +.classpath +.project +.settings +.vagrant \ No newline at end of file diff --git a/.jabbarc b/.jabbarc new file mode 100644 index 0000000..cd4ca1f --- /dev/null +++ b/.jabbarc @@ -0,0 +1,2 @@ +# https://github.com/shyiko/jabba +jdk: 1.6 diff --git a/.travis.yml b/.travis.yml index b051d18..e45adfe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,5 +2,5 @@ language: java script: 'if [[ $TRAVIS_REPO_SLUG == "shyiko/mysql-binlog-connector-java" && $TRAVIS_BRANCH == "master" && $TRAVIS_PULL_REQUEST == "false" ]]; then mvn -P with-sources-and-javadocs -Ddeploy=snapshot -DskipTests=true -s .travis.settings.xml; fi' env: global: - - secure: QI6w4d/W9Sjckgcdwc/K5Rb+UFa6PxDbhiilv70fEZCXg0P4GMBFGD/zbpsfTlLbA9rgKBotrSm7EXRtQM94dluW0kS9jQiJfw33l9WbfMAJL4797xq+7ekvIvPBf7xVfwGsAcCcB5DcPBTKroX2s8tJEY3CQTienkW4pT7/zh0= - - secure: b/7SnPmS2BUrzc167F5sbI8ILNBqo3ODz9ZtuOOcXQ/9uJVgaLrB3KmKAHgIsjOiZ/PPp7qGNL4jW9CD7b7LAldMyIcszdKptc/fr2yHZkM17IuQJtL5tjBNlLkqeApW37ddPp0mp5IhhqzZMZ8Zu8/2vlyNJrVrYgnIXuoLLRE= + - secure: WLU2kkL9WSeS1LGUOr5UZyv8G1TnUuXOviLuAAcvv+JW+CFcoQwnDlCC7SJJOqGixKpg8+04zr5CHaVU2QmG4E/+XohuPqH23rdfBi9Ra4BubXjQvV3siSQXEAIjZWubyJ7/4EYzBK821FnlnthnabFsow9enBv6MT5NmscOy+0= + - secure: ivE8LTu0CrgvNH6Yjise5lLaZx5kpG9M76CuRK2APUEbAC0QOPWWudsx2yuDmn2X6lIXFYIU5WUadDE6n9LYAfbHdV0wRUD1Qimgt7oDto5vnWZIZrbWwyqSqK7RCxw503ofbHXVPRyabLRYACZV+7T+K2aDJw8V/cIITIAC7gI= diff --git a/changelog.md b/changelog.md index 9feac23..c74ea18 100644 --- a/changelog.md +++ b/changelog.md @@ -2,7 +2,7 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...HEAD) +## [Unreleased](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.6.0...HEAD) ### Added - A way to control Socket i/s buffering (using BinaryLogClient::setSocketFactory()). @@ -16,6 +16,33 @@ This is **BACKWARD-INCOMPATIBLE** change. - BINARY/VARBINARY deserialization ([#56](https://github.com/shyiko/mysql-binlog-connector-java/issues/56)). This is **BACKWARD-INCOMPATIBLE** change as CHAR/VARCHAR/BINARY/VARBINARY are now returned as `byte[]` (which you can obviously convert to String with `new String(byte[], Charset)` if needed). +## [0.6.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.2...0.6.0) - 2016-11-27 + +### Added + - EventDeserializer compatibility modes to mimic upcoming 1.0.0 event deserialization behavior ([#131](https://github.com/shyiko/mysql-binlog-connector-java/pull/131)). + +## [0.5.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.1...0.5.2) - 2016-11-19 + +### Fixed + - (JSON) deserialization of null/true/false/(u)int(16|32)/variable-length data types ([#129](https://github.com/shyiko/mysql-binlog-connector-java/issues/129)). + +## [0.5.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.5.0...0.5.1) - 2016-10-18 + +### Fixed + - ROWS_QUERY event deserialization ([#124](https://github.com/shyiko/mysql-binlog-connector-java/issues/124)). + - JSON length determination. + - GTID sync (GtidSet is now updated before BinaryLogClient.EventListener|s are notified). + +## [0.5.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.2...0.5.0) - 2016-10-06 + +### Added + - JSON support ([#119](https://github.com/shyiko/mysql-binlog-connector-java/pull/119)) (thanks to [@rhauch](https://github.com/rhauch)). + +## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-20 + +### Fixed + - A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)). + ## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31 ### Fixed diff --git a/pom.xml b/pom.xml index 45831fc..d77e5ae 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ UTF-8 UTF-8 vagrant - ${basedir}/supplement/vagrant/mysql-5.6.12-sandbox-prepackaged + ${basedir}/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged @@ -207,6 +207,8 @@ -Dvagrant.integration.box=supplement/vagrant/mysql-5.5.27-sandbox-prepackaged mvn -P coverage verify \ -Dvagrant.integration.box=supplement/vagrant/mysql-5.6.12-sandbox-prepackaged + mvn -P coverage verify \ + -Dvagrant.integration.box=supplement/vagrant/mysql-5.7.15-sandbox-prepackaged # submit coverage report to coveralls mvn -P coverage coveralls:jacoco -DrepoToken=<coveralls.io> diff --git a/readme.md b/readme.md index b1ef8a9..989f32d 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.4.1-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +# mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](http://img.shields.io/badge/maven_central-0.6.0-blue.svg?style=flat)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) MySQL Binary Log connector. @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.4.1 + 0.6.0 ``` @@ -115,7 +115,7 @@ mBeanServer.registerMBean(stats, statsObjectName); #### Using SSL -> Introduced in 0.4.1. +> Introduced in 0.4.0. TLSv1.1 & TLSv1.2 require [JDK 7](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6916074)+. Prior to MySQL 5.7.10, MySQL supported only TLSv1 @@ -180,11 +180,15 @@ For the insight into the internals of MySQL look [here](https://dev.mysql.com/do ## Real-world applications -Some of the OSS built on top of mysql-binlog-conector-java: -[debezium](https://github.com/debezium/debezium) (distributed platform for change data capture), -[mardambey/mypipe](https://github.com/mardambey/mypipe) (MySQL to Apache Kafka replicator), -[ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) (MySQL to MongoDB replicator), -[shyiko/rook](https://github.com/shyiko/rook) (generic Change Data Capture (CDC) toolkit). +Some of the OSS using / built on top of mysql-binlog-conector-java: +* [debezium](https://github.com/debezium/debezium) A low latency data streaming platform for change data capture (CDC). +* [mavenlink/changestream](https://github.com/mavenlink/changestream) - A stream of changes for MySQL built on Akka. +* [mardambey/mypipe](https://github.com/mardambey/mypipe) MySQL binary log consumer with the ability to act on changed rows and publish changes to different systems with emphasis on Apache Kafka. +* [ngocdaothanh/mydit](https://github.com/ngocdaothanh/mydit) MySQL to MongoDB data replicator. +* [sharetribe/dumpr](https://github.com/sharetribe/dumpr) A Clojure library for live replicating data from a MySQL database. +* [shyiko/rook](https://github.com/shyiko/rook) Generic Change Data Capture (CDC) toolkit. +* [streamsets/datacollector](https://github.com/streamsets/datacollector) Continuous big data ingestion infrastructure. +* [twingly/ecco](https://github.com/twingly/ecco) MySQL replication binlog parser in JRuby. It's also used [on a large scale](https://twitter.com/atwinmutt/status/626816601078300672) in MailChimp. You can read about it [here](http://devs.mailchimp.com/blog/powering-mailchimp-pro-reporting/). diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 730d5bc..b9e88b5 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -745,8 +745,11 @@ private void listenForEventPackets() throws IOException { continue; } if (isConnected()) { + // Update the GTID before the event listeners, and the binlog filename/position after the + // listeners (since the binlog filename/position point to the *next* event) ... + boolean updatedGtid = updateGtid(event); notifyEventListeners(event); - updatePosition(event); + updatePosition(event, updatedGtid); } } } catch (Exception e) { @@ -780,40 +783,52 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac return result; } - private void updatePosition(Event event) { + private boolean updateGtid(Event event) { EventHeader eventHeader = event.getHeader(); EventType eventType = eventHeader.getEventType(); - if (gtidSet != null && eventType == EventType.XID) { - advanceGTID(); - } else - if (gtidSet != null && eventType == EventType.QUERY) { - QueryEventData queryEventData = getInternalEventData(event); - String query = queryEventData.getSql(); - if ("COMMIT".equals(query) || "ROLLBACK".equals(query) || - (previousEvent != null && previousEvent.getHeader().getEventType() == EventType.GTID && - !"BEGIN".equals(query))) { + if (gtidSet != null) { + if (eventType == EventType.XID) { advanceGTID(); + return true; } - } else - if (gtidSet != null && eventType == EventType.GTID) { - if (previousGtidEvent != null) { - if (advanceGTID()) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, "GtidSet wasn't synchronized before GTID. " + - "Please submit a bug report to https://github.com/shyiko/mysql-binlog-connector-java"); + if (eventType == EventType.QUERY) { + QueryEventData queryEventData = getInternalEventData(event); + String query = queryEventData.getSql(); + if ("COMMIT".equals(query) || "ROLLBACK".equals(query) || + (previousEvent != null && previousEvent.getHeader().getEventType() == EventType.GTID && + !"BEGIN".equals(query))) { + advanceGTID(); + } + return true; + } + if (eventType == EventType.GTID) { + if (previousGtidEvent != null) { + if (advanceGTID()) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, "GtidSet wasn't synchronized before GTID. " + + "Please submit a bug report to https://github.com/shyiko/mysql-binlog-connector-java"); + } } } + previousGtidEvent = event; + return true; } - previousGtidEvent = event; - } else - if (eventType == EventType.ROTATE) { + } + return false; + } + + private void updatePosition(Event event, boolean updatedGtid) { + EventHeader eventHeader = event.getHeader(); + EventType eventType = eventHeader.getEventType(); + if (updatedGtid) { + // the GTID was previously updated based upon this event + } else if (eventType == EventType.ROTATE) { RotateEventData rotateEventData = getInternalEventData(event); binlogFilename = rotateEventData.getBinlogFilename(); binlogPosition = rotateEventData.getBinlogPosition(); - } else - // do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of - // client) table mapping cache could be reconstructed before hitting row mutation event - if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) { + } else if (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) { + // do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of + // client) table mapping cache could be reconstructed before hitting row mutation event EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; long nextBinlogPosition = trackableEventHeader.getNextPosition(); if (nextBinlogPosition > 0) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java index 688db5f..2d4f0b6 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java @@ -15,10 +15,6 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization; -import com.github.shyiko.mysql.binlog.event.EventData; -import com.github.shyiko.mysql.binlog.event.TableMapEventData; -import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; - import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; @@ -27,6 +23,10 @@ import java.util.Map; import java.util.TimeZone; +import com.github.shyiko.mysql.binlog.event.EventData; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + /** * Whole class is basically a mix of open-replicator's * AbstractRowEventParser and MySQLUtils. Main purpose here is to ease rows deserialization.

@@ -170,6 +170,8 @@ protected Serializable deserializeCell(ColumnType type, int meta, int length, By return deserializeSet(length, inputStream); case GEOMETRY: return deserializeGeometry(meta, inputStream); + case JSON: + return deserializeJson(meta, inputStream); default: throw new IOException("Unsupported type " + type); } @@ -329,6 +331,21 @@ protected byte[] deserializeGeometry(int meta, ByteArrayInputStream inputStream) return inputStream.read(dataLength); } + /** + * Deserialize the {@code JSON} value on the input stream, and return MySQL's internal binary representation + * of the JSON value. See {@link com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary} for + * a utility to parse this binary representation into something more useful, including a string representation. + * + * @param meta the number of bytes in which the length of the JSON value is found first on the input stream + * @param inputStream the stream containing the JSON value + * @return the MySQL internal binary representation of the JSON value; may be null + * @throws IOException if there is a problem reading the input stream + */ + protected byte[] deserializeJson(int meta, ByteArrayInputStream inputStream) throws IOException { + int blobLength = inputStream.readInteger(4); + return inputStream.read(blobLength); + } + // checkstyle, please ignore ParameterNumber for the next line private static Long asUnixTime(int year, int month, int day, int hour, int minute, int second, int millis) { // https://dev.mysql.com/doc/refman/5.0/en/datetime.html @@ -376,7 +393,7 @@ private static int[] split(long value, int divider, int length) { /** * see mysql/strings/decimal.c */ - private static BigDecimal asBigDecimal(int precision, int scale, byte[] value) { + public static BigDecimal asBigDecimal(int precision, int scale, byte[] value) { boolean positive = (value[0] & 0x80) == 0x80; value[0] ^= 0x80; if (!positive) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java index f5f6938..cc8c941 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java @@ -45,6 +45,7 @@ public enum ColumnType { TIMESTAMP_V2(17), DATETIME_V2(18), TIME_V2(19), + JSON(245), NEWDECIMAL(246), ENUM(247), SET(248), diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java new file mode 100644 index 0000000..d5824a3 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinary.java @@ -0,0 +1,1006 @@ +/* + * Copyright 2016 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.Charset; + +import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; + +/** + * Utility to parse the binary-encoded value of a MySQL {@code JSON} type, translating the encoded representation into + * method calls on a supplied {@link JsonFormatter} implementation. + * + *

Binary Format

+ * + * Each JSON value (scalar, object or array) has a one byte type identifier followed by the actual value. + * + *

Scalar

+ * + * The binary value may contain a single scalar that is one of: + * + * + *

JSON Object

+ * + * If the value is a JSON object, its binary representation will have a header that contains: + * + * + * The actual keys and values will come after the header, in the same order as in the header. + * + *

JSON Array

+ * + * If the value is a JSON array, the binary representation will have a header with + * + * followed by the actual values, in the same order as in the header. + * + *

Grammar

+ * The grammar of the binary representation of JSON objects are defined in the MySQL codebase in the + * json_binary.h file: + *

+ * + *

+ *   doc ::= type value
+ *   type ::=
+ *       0x00 |  // small JSON object
+ *       0x01 |  // large JSON object
+ *       0x02 |  // small JSON array
+ *       0x03 |  // large JSON array
+ *       0x04 |  // literal (true/false/null)
+ *       0x05 |  // int16
+ *       0x06 |  // uint16
+ *       0x07 |  // int32
+ *       0x08 |  // uint32
+ *       0x09 |  // int64
+ *       0x0a |  // uint64
+ *       0x0b |  // double
+ *       0x0c |  // utf8mb4 string
+ *       0x0f    // custom data (any MySQL data type)
+ *   value ::=
+ *       object  |
+ *       array   |
+ *       literal |
+ *       number  |
+ *       string  |
+ *       custom-data
+ *   object ::= element-count size key-entry* value-entry* key* value*
+ *   array ::= element-count size value-entry* value*
+ *   // number of members in object or number of elements in array
+ *   element-count ::=
+ *       uint16 |  // if used in small JSON object/array
+ *       uint32    // if used in large JSON object/array
+ *   // number of bytes in the binary representation of the object or array
+ *   size ::=
+ *       uint16 |  // if used in small JSON object/array
+ *       uint32    // if used in large JSON object/array
+ *   key-entry ::= key-offset key-length
+ *   key-offset ::=
+ *       uint16 |  // if used in small JSON object
+ *       uint32    // if used in large JSON object
+ *   key-length ::= uint16    // key length must be less than 64KB
+ *   value-entry ::= type offset-or-inlined-value
+ *   // This field holds either the offset to where the value is stored,
+ *   // or the value itself if it is small enough to be inlined (that is,
+ *   // if it is a JSON literal or a small enough [u]int).
+ *   offset-or-inlined-value ::=
+ *       uint16 |   // if used in small JSON object/array
+ *       uint32     // if used in large JSON object/array
+ *   key ::= utf8mb4-data
+ *   literal ::=
+ *       0x00 |   // JSON null literal
+ *       0x01 |   // JSON true literal
+ *       0x02 |   // JSON false literal
+ *   number ::=  ....  // little-endian format for [u]int(16|32|64), whereas
+ *                     // double is stored in a platform-independent, eight-byte
+ *                     // format using float8store()
+ *   string ::= data-length utf8mb4-data
+ *   custom-data ::= custom-type data-length binary-data
+ *   custom-type ::= uint8   // type identifier that matches the
+ *                           // internal enum_field_types enum
+ *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+ *                           // field is continued in the next byte,
+ *                           // otherwise it is the last byte of the length
+ *                           // field. So we need 1 byte to represent
+ *                           // lengths up to 127, 2 bytes to represent
+ *                           // lengths up to 16383, and so on...
+ * 
+ * + * @author Randall Hauch + */ +public class JsonBinary { + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** + * Parse the MySQL binary representation of a {@code JSON} value and return the JSON string representation. + *

+ * This method is equivalent to {@link #parse(byte[], JsonFormatter)} using the {@link JsonStringFormatter}. + * + * @param bytes the binary representation; may not be null + * @return the JSON string representation; never null + * @throws IOException if there is a problem reading or processing the binary representation + */ + public static String parseAsString(byte[] bytes) throws IOException { + JsonStringFormatter handler = new JsonStringFormatter(); + parse(bytes, handler); + return handler.getString(); + } + + /** + * Parse the MySQL binary representation of a {@code JSON} value and call the supplied {@link JsonFormatter} + * for the various components of the value. + * + * @param bytes the binary representation; may not be null + * @param formatter the formatter that will be called as the binary representation is parsed; may not be null + * @throws IOException if there is a problem reading or processing the binary representation + */ + public static void parse(byte[] bytes, JsonFormatter formatter) throws IOException { + new JsonBinary(bytes).parse(formatter); + } + + private final ByteArrayInputStream reader; + + public JsonBinary(byte[] bytes) { + this(new ByteArrayInputStream(bytes)); + } + + public JsonBinary(ByteArrayInputStream contents) { + this.reader = contents; + } + + @Override + public String toString() { + return getString(); + } + + public String getString() { + JsonStringFormatter handler = new JsonStringFormatter(); + try { + parse(handler); + } catch (IOException e) { + throw new RuntimeException(e); + } + return handler.getString(); + } + + public void parse(JsonFormatter formatter) throws IOException { + parse(readValueType(), formatter); + } + + protected void parse(ValueType type, JsonFormatter formatter) throws IOException { + switch (type) { + case SMALL_DOCUMENT: + parseObject(true, formatter); + break; + case LARGE_DOCUMENT: + parseObject(false, formatter); + break; + case SMALL_ARRAY: + parseArray(true, formatter); + break; + case LARGE_ARRAY: + parseArray(false, formatter); + break; + case LITERAL: + parseBoolean(formatter); + break; + case INT16: + parseInt16(formatter); + break; + case UINT16: + parseUInt16(formatter); + break; + case INT32: + parseInt32(formatter); + break; + case UINT32: + parseUInt32(formatter); + break; + case INT64: + parseInt64(formatter); + break; + case UINT64: + parseUInt64(formatter); + break; + case DOUBLE: + parseDouble(formatter); + break; + case STRING: + parseString(formatter); + break; + case CUSTOM: + parseOpaque(formatter); + break; + default: + throw new IOException("Unknown type value '" + asHex(type.getCode()) + + "' in first byte of a JSON value"); + } + } + + /** + * Parse a JSON object. + *

+ * The grammar of the binary representation of JSON objects are defined in the MySQL code base in the + * json_binary.h file: + *

Grammar

+ * + *

Grammar

+ * + *
+     *   value ::=
+     *       object  |
+     *       array   |
+     *       literal |
+     *       number  |
+     *       string  |
+     *       custom-data
+     *   object ::= element-count size key-entry* value-entry* key* value*
+     *   // number of members in object or number of elements in array
+     *   element-count ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   // number of bytes in the binary representation of the object or array
+     *   size ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   key-entry ::= key-offset key-length
+     *   key-offset ::=
+     *       uint16 |  // if used in small JSON object
+     *       uint32    // if used in large JSON object
+     *   key-length ::= uint16    // key length must be less than 64KB
+     *   value-entry ::= type offset-or-inlined-value
+     *   // This field holds either the offset to where the value is stored,
+     *   // or the value itself if it is small enough to be inlined (that is,
+     *   // if it is a JSON literal or a small enough [u]int).
+     *   offset-or-inlined-value ::=
+     *       uint16 |   // if used in small JSON object/array
+     *       uint32     // if used in large JSON object/array
+     *   key ::= utf8mb4-data
+     *   literal ::=
+     *       0x00 |   // JSON null literal
+     *       0x01 |   // JSON true literal
+     *       0x02 |   // JSON false literal
+     *   number ::=  ....  // little-endian format for [u]int(16|32|64), whereas
+     *                     // double is stored in a platform-independent, eight-byte
+     *                     // format using float8store()
+     *   string ::= data-length utf8mb4-data
+     *   custom-data ::= custom-type data-length binary-data
+     *   custom-type ::= uint8   // type identifier that matches the
+     *                           // internal enum_field_types enum
+     *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+     *                           // field is continued in the next byte,
+     *                           // otherwise it is the last byte of the length
+     *                           // field. So we need 1 byte to represent
+     *                           // lengths up to 127, 2 bytes to represent
+     *                           // lengths up to 16383, and so on...
+     * 
+ * + * @param small {@code true} if the object being read is "small", or {@code false} otherwise + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseObject(boolean small, JsonFormatter formatter) + throws IOException { + // Read the header ... + int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in"); + int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of"); + + // Read each key-entry, consisting of the offset and length of each key ... + int[] keyLengths = new int[numElements]; + for (int i = 0; i != numElements; ++i) { + readUnsignedIndex(numBytes, small, "key offset in"); // unused + keyLengths[i] = readUInt16(); + } + + // Read each key value value-entry + ValueEntry[] entries = new ValueEntry[numElements]; + for (int i = 0; i != numElements; ++i) { + // Parse the value ... + ValueType type = readValueType(); + switch (type) { + case LITERAL: + entries[i] = new ValueEntry(type).setValue(readLiteral()); + break; + case INT16: + case UINT16: + // The "offset" is actually the value ... + int value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + break; + case INT32: + case UINT32: + if (!small) { + // The value should be large enough to handle the actual value ... + value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + } + default: + // It is an offset, not a value ... + int offset = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); + if (offset >= numBytes) { + throw new IOException("The offset for the value in the JSON binary document is " + + offset + + ", which is larger than the binary form of the JSON document (" + + numBytes + " bytes)"); + } + entries[i] = new ValueEntry(type, offset); + } + } + + // Read each key ... + String[] keys = new String[numElements]; + for (int i = 0; i != numElements; ++i) { + keys[i] = reader.readString(keyLengths[i]); + } + + // Now parse the values ... + formatter.beginObject(numElements); + for (int i = 0; i != numElements; ++i) { + if (i != 0) { + formatter.nextEntry(); + } + formatter.name(keys[i]); + ValueEntry entry = entries[i]; + if (entry.resolved) { + Object value = entry.value; + if (value == null) { + formatter.valueNull(); + } else if (value instanceof Boolean) { + formatter.value((Boolean) value); + } else if (value instanceof Integer) { + formatter.value((Integer) value); + } + } else { + // Parse the value ... + parse(entry.type, formatter); + } + } + formatter.endObject(); + } + + /** + * Parse a JSON array. + *

+ * The grammar of the binary representation of JSON objects are defined in the MySQL code base in the + * json_binary.h file, and are: + *

Grammar

+ * + *

Grammar

+ * + *
+     *   value ::=
+     *       object  |
+     *       array   |
+     *       literal |
+     *       number  |
+     *       string  |
+     *       custom-data
+     *   array ::= element-count size value-entry* value*
+     *   // number of members in object or number of elements in array
+     *   element-count ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   // number of bytes in the binary representation of the object or array
+     *   size ::=
+     *       uint16 |  // if used in small JSON object/array
+     *       uint32    // if used in large JSON object/array
+     *   value-entry ::= type offset-or-inlined-value
+     *   // This field holds either the offset to where the value is stored,
+     *   // or the value itself if it is small enough to be inlined (that is,
+     *   // if it is a JSON literal or a small enough [u]int).
+     *   offset-or-inlined-value ::=
+     *       uint16 |   // if used in small JSON object/array
+     *       uint32     // if used in large JSON object/array
+     *   key ::= utf8mb4-data
+     *   literal ::=
+     *       0x00 |   // JSON null literal
+     *       0x01 |   // JSON true literal
+     *       0x02 |   // JSON false literal
+     *   number ::=  ....  // little-endian format for [u]int(16|32|64), whereas
+     *                     // double is stored in a platform-independent, eight-byte
+     *                     // format using float8store()
+     *   string ::= data-length utf8mb4-data
+     *   custom-data ::= custom-type data-length binary-data
+     *   custom-type ::= uint8   // type identifier that matches the
+     *                           // internal enum_field_types enum
+     *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+     *                           // field is continued in the next byte,
+     *                           // otherwise it is the last byte of the length
+     *                           // field. So we need 1 byte to represent
+     *                           // lengths up to 127, 2 bytes to represent
+     *                           // lengths up to 16383, and so on...
+     * 
+ * + * @param small {@code true} if the object being read is "small", or {@code false} otherwise + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + // checkstyle, please ignore MethodLength for the next line + protected void parseArray(boolean small, JsonFormatter formatter) + throws IOException { + // Read the header ... + int numElements = readUnsignedIndex(Integer.MAX_VALUE, small, "number of elements in"); + int numBytes = readUnsignedIndex(Integer.MAX_VALUE, small, "size of"); + + // Read each key value value-entry + ValueEntry[] entries = new ValueEntry[numElements]; + for (int i = 0; i != numElements; ++i) { + // Parse the value ... + ValueType type = readValueType(); + switch (type) { + case LITERAL: + entries[i] = new ValueEntry(type).setValue(readLiteral()); + break; + case INT16: + case UINT16: + // The "offset" is actually the value ... + int value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + break; + case INT32: + case UINT32: + if (!small) { + // The value should be large enough to handle the actual value ... + value = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); + entries[i] = new ValueEntry(type).setValue(value); + } + default: + // It is an offset, not a value ... + int offset = readUnsignedIndex(Integer.MAX_VALUE, small, "value offset in"); + if (offset >= numBytes) { + throw new IOException("The offset for the value in the JSON binary document is " + + offset + + ", which is larger than the binary form of the JSON document (" + + numBytes + " bytes)"); + } + entries[i] = new ValueEntry(type, offset); + } + } + + // Now parse the values ... + formatter.beginArray(numElements); + for (int i = 0; i != numElements; ++i) { + if (i != 0) { + formatter.nextEntry(); + } + ValueEntry entry = entries[i]; + if (entry.resolved) { + Object value = entry.value; + if (value == null) { + formatter.valueNull(); + } else if (value instanceof Boolean) { + formatter.value((Boolean) value); + } else if (value instanceof Integer) { + formatter.value((Integer) value); + } + } else { + // Parse the value ... + parse(entry.type, formatter); + } + } + formatter.endArray(); + } + + /** + * Parse a literal value that is either null, {@code true}, or {@code false}. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseBoolean(JsonFormatter formatter) throws IOException { + Boolean literal = readLiteral(); + if (literal == null) { + formatter.valueNull(); + } else { + formatter.value(literal); + } + } + + /** + * Parse a 2 byte integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseInt16(JsonFormatter formatter) throws IOException { + int value = readInt16(); + formatter.value(value); + } + + /** + * Parse a 2 byte unsigned integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseUInt16(JsonFormatter formatter) throws IOException { + int value = readUInt16(); + formatter.value(value); + } + + /** + * Parse a 4 byte integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseInt32(JsonFormatter formatter) throws IOException { + int value = readInt32(); + formatter.value(value); + } + + /** + * Parse a 4 byte unsigned integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseUInt32(JsonFormatter formatter) throws IOException { + long value = readUInt32(); + formatter.value(value); + } + + /** + * Parse a 8 byte integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseInt64(JsonFormatter formatter) throws IOException { + long value = readInt64(); + formatter.value(value); + } + + /** + * Parse a 8 byte unsigned integer value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseUInt64(JsonFormatter formatter) throws IOException { + BigInteger value = readUInt64(); + formatter.value(value); + } + + /** + * Parse a 8 byte double value. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDouble(JsonFormatter formatter) throws IOException { + long rawValue = readInt64(); + double value = Double.longBitsToDouble(rawValue); + formatter.value(value); + } + + /** + * Parse the length and value of a string stored in MySQL's "utf8mb" character set (which equates to Java's + * UTF-8 character set. The length is a {@link #readVariableInt() variable length integer} length of the string. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseString(JsonFormatter formatter) throws IOException { + int length = readVariableInt(); + String value = new String(reader.read(length), UTF_8); + formatter.value(value); + } + + /** + * Parse an opaque type. Specific types such as {@link #parseDate(JsonFormatter) DATE}, + * {@link #parseTime(JsonFormatter) TIME}, and {@link #parseDatetime(JsonFormatter) DATETIME} values are + * stored as opaque types, though they are to be unpacked. TIMESTAMPs are also stored as opaque types, but + * converted by MySQL to + * {@code DATETIME} prior to storage. + * Other MySQL types are stored as opaque types and passed on to the formatter as opaque values. + *

+ * See the + * MySQL source code for the logic used in this method. + *

+ *

Grammar

+ * + *
+     *   custom-data ::= custom-type data-length binary-data
+     *   custom-type ::= uint8   // type identifier that matches the
+     *                           // internal enum_field_types enum
+     *   data-length ::= uint8*  // If the high bit of a byte is 1, the length
+     *                           // field is continued in the next byte,
+     *                           // otherwise it is the last byte of the length
+     *                           // field. So we need 1 byte to represent
+     *                           // lengths up to 127, 2 bytes to represent
+     *                           // lengths up to 16383, and so on...
+     * 
+ * + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseOpaque(JsonFormatter formatter) throws IOException { + // Read the custom type, which should be a standard ColumnType ... + int customType = reader.read(); + ColumnType type = ColumnType.byCode(customType); + if (type == null) { + throw new IOException("Unknown type '" + asHex(customType) + + "' in first byte of a JSON opaque value"); + } + // Read the data length ... + int length = readVariableInt(); + + switch (type) { + case DECIMAL: + case NEWDECIMAL: + // See 'Json_decimal::convert_from_binary' + // https://github.com/mysql/mysql-server/blob/5.7/sql/json_dom.cc#L1625 + parseDecimal(length, formatter); + break; + + // All dates and times are in one of these types + // See 'Json_datetime::to_packed' for details + // https://github.com/mysql/mysql-server/blob/5.7/sql/json_dom.cc#L1681 + // which calls 'TIME_to_longlong_packed' + // https://github.com/mysql/mysql-server/blob/5.7/sql-common/my_time.c#L2005 + // + // and 'Json_datetime::from_packed' + // https://github.com/mysql/mysql-server/blob/5.7/sql/json_dom.cc#L1688 + // which calls 'TIME_from_longlong_packed' + // https://github.com/mysql/mysql-server/blob/5.7/sql/sql_time.cc#L1624 + case DATE: + parseDate(formatter); + break; + case TIME: + case TIME_V2: + parseTime(formatter); + break; + case DATETIME: + case DATETIME_V2: + case TIMESTAMP: + case TIMESTAMP_V2: + parseDatetime(formatter); + break; + default: + parseOpaqueValue(type, length, formatter); + } + } + + /** + * Parse a {@code DATE} value, which is stored using the same format as {@code DATETIME}: + * 5 bytes + fractional-seconds storage. However, the hour, minute, second, and fractional seconds are ignored. + *

+ * The non-fractional part is 40 bits: + * + *

+     *  1 bit  sign           (1= non-negative, 0= negative)
+     *  17 bits year*13+month  (year 0-9999, month 0-12)
+     *   5 bits day            (0-31)
+     *   5 bits hour           (0-23)
+     *   6 bits minute         (0-59)
+     *   6 bits second         (0-59)
+     * 
+ * + * The fractional part is typically dependent upon the fsp (i.e., fractional seconds part) defined by + * a column, but in the case of JSON it is always 3 bytes. + *

+ * The format of all temporal values is outlined in the MySQL documentation, + * although since the MySQL {@code JSON} type is only available in 5.7, only version 2 of the date-time formats + * are necessary. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDate(JsonFormatter formatter) throws IOException { + long raw = readInt64(); + long value = raw >> 24; + int yearMonth = (int) (value >> 22) % (1 << 17); // 17 bits starting at 22nd + int year = yearMonth / 13; + int month = yearMonth % 13; + int day = (int) (value >> 17) % (1 << 5); // 5 bits starting at 17th + formatter.valueDate(year, month, day); + } + + /** + * Parse a {@code TIME} value, which is stored using the same format as {@code DATETIME}: + * 5 bytes + fractional-seconds storage. However, the year, month, and day values are ignored + *

+ * The non-fractional part is 40 bits: + * + *

+     *  1 bit  sign           (1= non-negative, 0= negative)
+     *  17 bits year*13+month  (year 0-9999, month 0-12)
+     *   5 bits day            (0-31)
+     *   5 bits hour           (0-23)
+     *   6 bits minute         (0-59)
+     *   6 bits second         (0-59)
+     * 
+ * + * The fractional part is typically dependent upon the fsp (i.e., fractional seconds part) defined by + * a column, but in the case of JSON it is always 3 bytes. + *

+ * The format of all temporal values is outlined in the MySQL documentation, + * although since the MySQL {@code JSON} type is only available in 5.7, only version 2 of the date-time formats + * are necessary. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseTime(JsonFormatter formatter) throws IOException { + long raw = readInt64(); + long value = raw >> 24; + boolean negative = value < 0L; + int hour = (int) (value >> 12) % (1 << 10); // 10 bits starting at 12th + int min = (int) (value >> 6) % (1 << 6); // 6 bits starting at 6th + int sec = (int) value % (1 << 6); // 6 bits starting at 0th + if (negative) { + hour *= -1; + } + int microSeconds = (int) (raw % (1 << 24)); + formatter.valueTime(hour, min, sec, microSeconds); + } + + /** + * Parse a {@code DATETIME} value, which is stored as 5 bytes + fractional-seconds storage. + *

+ * The non-fractional part is 40 bits: + * + *

+     *  1 bit  sign           (1= non-negative, 0= negative)
+     *  17 bits year*13+month  (year 0-9999, month 0-12)
+     *   5 bits day            (0-31)
+     *   5 bits hour           (0-23)
+     *   6 bits minute         (0-59)
+     *   6 bits second         (0-59)
+     * 
+ * + * The sign bit is always 1. A value of 0 (negative) is reserved. The fractional part is typically dependent upon + * the fsp (i.e., fractional seconds part) defined by a column, but in the case of JSON it is always 3 bytes. + * Unlike the documentation, however, the 8 byte value is in little-endian form. + *

+ * The format of all temporal values is outlined in the MySQL documentation, + * although since the MySQL {@code JSON} type is only available in 5.7, only version 2 of the date-time formats + * are necessary. + * + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDatetime(JsonFormatter formatter) throws IOException { + long raw = readInt64(); + long value = raw >> 24; + int yearMonth = (int) (value >> 22) % (1 << 17); // 17 bits starting at 22nd + int year = yearMonth / 13; + int month = yearMonth % 13; + int day = (int) (value >> 17) % (1 << 5); // 5 bits starting at 17th + int hour = (int) (value >> 12) % (1 << 5); // 5 bits starting at 12th + int min = (int) (value >> 6) % (1 << 6); // 6 bits starting at 6th + int sec = (int) (value % (1 << 6)); // 6 bits starting at 0th + int microSeconds = (int) (raw % (1 << 24)); + formatter.valueDatetime(year, month, day, hour, min, sec, microSeconds); + } + + /** + * Parse a {@code DECIMAL} value. The first two bytes are the precision and scale, followed by the binary + * representation of the decimal itself. + * + * @param length the length of the complete binary representation + * @param formatter the formatter to be notified of the parsed value; may not be null + * @throws IOException if there is a problem reading the JSON value + */ + protected void parseDecimal(int length, JsonFormatter formatter) throws IOException { + // First two bytes are the precision and scale ... + int precision = reader.read(); + int scale = reader.read(); + + // Followed by the binary representation (see `my_decimal_get_binary_size`) + int decimalLength = length - 2; + BigDecimal dec = AbstractRowsEventDataDeserializer.asBigDecimal(precision, scale, reader.read(decimalLength)); + formatter.value(dec); + } + + protected void parseOpaqueValue(ColumnType type, int length, JsonFormatter formatter) + throws IOException { + formatter.valueOpaque(type, reader.read(length)); + } + + protected int readFractionalSecondsInMicroseconds() throws IOException { + return (int) readBigEndianLong(3); + } + + protected long readBigEndianLong(int numBytes) throws IOException { + byte[] bytes = reader.read(numBytes); + long result = 0; + for (int i = 0; i != numBytes; i++) { + int b = bytes[i] & 0xFF; + result = (result << 8) | b; + } + return result; + } + + protected int readUnsignedIndex(int maxValue, boolean isSmall, String desc) throws IOException { + long result = isSmall ? readUInt16() : readUInt32(); + if (result >= maxValue) { + throw new IOException("The " + desc + " the JSON document is " + result + + " and is too big for the binary form of the document (" + maxValue + ")"); + } + if (result > Integer.MAX_VALUE) { + throw new IOException("The " + desc + " the JSON document is " + result + " and is too big to be used"); + } + return (int) result; + } + + protected int readInt16() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read(); + return (short) (b2 << 8 | b1); + } + + protected int readUInt16() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + return (b2 << 8 | b1) & 0xFFFF; + } + + protected int readInt24() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read(); + return b3 << 16 | b2 << 8 | b1; + } + + protected int readInt32() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read() & 0xFF; + int b4 = reader.read(); + return b4 << 24 | b3 << 16 | b2 << 8 | b1; + } + + protected long readUInt32() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read() & 0xFF; + int b4 = reader.read() & 0xFF; + return (long) ((b4 << 24) | (b3 << 16) | (b2 << 8) | b1) & 0xFFFFFFFF; + } + + protected long readInt64() throws IOException { + int b1 = reader.read() & 0xFF; + int b2 = reader.read() & 0xFF; + int b3 = reader.read() & 0xFF; + long b4 = reader.read() & 0xFF; + long b5 = reader.read() & 0xFF; + long b6 = reader.read() & 0xFF; + long b7 = reader.read() & 0xFF; + long b8 = reader.read(); + return b8 << 56 | (b7 << 48) | (b6 << 40) | (b5 << 32) | + (b4 << 24) | (b3 << 16) | (b2 << 8) | b1; + } + + protected BigInteger readUInt64() throws IOException { + byte[] bigEndian = new byte[8]; + for (int i = 8; i != 0; --i) { + bigEndian[i - 1] = (byte) (reader.read() & 0xFF); + } + return new BigInteger(1, bigEndian); + } + + /** + * Read a variable-length integer value. + *

+ * If the high bit of a byte is 1, the length field is continued in the next byte, otherwise it is the last + * byte of the length field. So we need 1 byte to represent lengths up to 127, 2 bytes to represent lengths up + * to 16383, and so on... + * + * @return the integer value + */ + protected int readVariableInt() throws IOException { + byte b = 0; + int length = 0; + do { + b = (byte) reader.read(); + length = (length << 7) + (b & 0x7F); + } while (b < 0); + return length; + } + + protected Boolean readLiteral() throws IOException { + byte b = (byte) reader.read(); + if (b == 0x00) { + return null; + } else if (b == 0x01) { + return Boolean.TRUE; + } else if (b == 0x02) { + return Boolean.FALSE; + } + throw new IOException("Unexpected value '" + asHex(b) + "' for literal"); + } + + protected ValueType readValueType() throws IOException { + byte b = (byte) reader.read(); + ValueType result = ValueType.byCode(b); + if (result == null) { + throw new IOException("Unknown value type code '" + String.format("%02X", (int) b) + "'"); + } + return result; + } + + protected static String asHex(byte b) { + return String.format("%02X ", b); + } + + protected static String asHex(int value) { + return Integer.toHexString(value); + } + + /** + * Class used internally to hold value entry information. + */ + protected static final class ValueEntry { + + protected final ValueType type; + protected final int index; + protected Object value; + protected boolean resolved; + + public ValueEntry(ValueType type) { + this.type = type; + this.index = 0; + } + + public ValueEntry(ValueType type, int index) { + this.type = type; + this.index = index; + } + + public ValueEntry setValue(Object value) { + this.value = value; + this.resolved = true; + return this; + } + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonFormatter.java new file mode 100644 index 0000000..2888182 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonFormatter.java @@ -0,0 +1,177 @@ +/* + * Copyright 2016 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; + +/** + * Handle the various actions involved when {@link JsonBinary#parse(byte[], JsonFormatter)} a JSON binary + * value. + * + * @author Randall Hauch + */ +public interface JsonFormatter { + + /** + * Prepare to receive the name-value pairs in a JSON object. + * + * @param numElements the number of name-value pairs (or elements) + */ + void beginObject(int numElements); + + /** + * Prepare to receive the value pairs that in a JSON array. + * + * @param numElements the number of array elements + */ + void beginArray(int numElements); + + /** + * Complete the previously-started JSON object. + */ + void endObject(); + + /** + * Complete the previously-started JSON array. + */ + void endArray(); + + /** + * Receive the name of an element in a JSON object. + * + * @param name the element's name; never null + */ + void name(String name); + + /** + * Receive the string value of an element in a JSON object. + * + * @param value the element's value; never null + */ + void value(String value); + + /** + * Receive the integer value of an element in a JSON object. + * + * @param value the element's value + */ + void value(int value); + + /** + * Receive the long value of an element in a JSON object. + * + * @param value the element's value + */ + void value(long value); + + /** + * Receive the double value of an element in a JSON object. + * + * @param value the element's value + */ + void value(double value); + + /** + * Receive the {@link BigInteger} value of an element in a JSON object. + * + * @param value the element's value; never null + */ + void value(BigInteger value); + + /** + * Receive the {@link BigDecimal} value of an element in a JSON object. + * + * @param value the element's value; never null + */ + void value(BigDecimal value); + + /** + * Receive the boolean value of an element in a JSON object. + * + * @param value the element's value + */ + void value(boolean value); + + /** + * Receive a null value of an element in a JSON object. + */ + void valueNull(); + + /** + * Receive the year value of an element in a JSON object. + * + * @param year the year number that makes up the element's value + */ + void valueYear(int year); + + /** + * Receive the date value of an element in a JSON object. + * + * @param year the positive or negative year in the element's date value + * @param month the month (0-12) in the element's date value + * @param day the day of the month (0-31) in the element's date value + */ + void valueDate(int year, int month, int day); + + /** + * Receive the date and time value of an element in a JSON object. + * + * @param year the positive or negative year in the element's date value + * @param month the month (0-12) in the element's date value + * @param day the day of the month (0-31) in the element's date value + * @param hour the hour of the day (0-24) in the element's time value + * @param min the minutes of the hour (0-60) in the element's time value + * @param sec the seconds of the minute (0-60) in the element's time value + * @param microSeconds the number of microseconds in the element's time value + */ + // checkstyle, please ignore ParameterNumber for the next line + void valueDatetime(int year, int month, int day, int hour, int min, int sec, int microSeconds); + + /** + * Receive the time value of an element in a JSON object. + * + * @param hour the hour of the day (0-24) in the element's time value + * @param min the minutes of the hour (0-60) in the element's time value + * @param sec the seconds of the minute (0-60) in the element's time value + * @param microSeconds the number of microseconds in the element's time value + */ + void valueTime(int hour, int min, int sec, int microSeconds); + + /** + * Receive the timestamp value of an element in a JSON object. + * + * @param secondsPastEpoch the number of seconds past epoch (January 1, 1970) in the element's timestamp value + * @param microSeconds the number of microseconds in the element's time value + */ + void valueTimestamp(long secondsPastEpoch, int microSeconds); + + /** + * Receive an opaque value of an element in a JSON object. + * + * @param type the column type for the value; may not be null + * @param value the binary representation for the element's value + */ + void valueOpaque(ColumnType type, byte[] value); + + /** + * Called after an entry signaling that another entry will be signaled. + */ + void nextEntry(); + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java new file mode 100644 index 0000000..e2f657c --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java @@ -0,0 +1,325 @@ +/* + * Copyright 2016 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType; + +/** + * A {@link JsonFormatter} implementation that creates a JSON string representation. + * + * @author Randall Hauch + */ +public class JsonStringFormatter implements JsonFormatter { + + /** + * Value used for lookup tables to indicate that matching characters + * do not need to be escaped. + */ + private static final int ESCAPE_NONE = 0; + + /** + * Value used for lookup tables to indicate that matching characters + * are to be escaped using standard escaping; for JSON this means + * (for example) using "backslash - u" escape method. + */ + private static final int ESCAPE_GENERIC = -1; + + /** + * A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8 characters) + * must be escaped. A value of '0' means no escaping is required; positive values must be escaped with a + * preceding backslash; and negative values that generic escaping (e.g., {@code \\uXXXX}). + */ + private static final int[] ESCAPES; + + static { + int[] escape = new int[128]; + // Generic escape for control characters ... + for (int i = 0; i < 32; ++i) { + escape[i] = ESCAPE_GENERIC; + } + // Backslash escape for other specific characters ... + escape['"'] = '"'; + escape['\\'] = '\\'; + // Escaping of slash is optional, so let's not add it + escape[0x08] = 'b'; + escape[0x09] = 't'; + escape[0x0C] = 'f'; + escape[0x0A] = 'n'; + escape[0x0D] = 'r'; + ESCAPES = escape; + } + + private static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray(); + + private final StringBuilder sb = new StringBuilder(); + + @Override + public String toString() { + return getString(); + } + + public String getString() { + return sb.toString(); + } + + @Override + public void beginObject(int numElements) { + sb.append('{'); + } + + @Override + public void beginArray(int numElements) { + sb.append('['); + } + + @Override + public void endObject() { + sb.append('}'); + } + + @Override + public void endArray() { + sb.append(']'); + } + + @Override + public void name(String name) { + sb.append('"'); + appendString(name); + sb.append("\":"); + } + + @Override + public void value(String value) { + sb.append('"'); + appendString(value); + sb.append('"'); + } + + @Override + public void value(int value) { + sb.append(Integer.toString(value)); + } + + @Override + public void value(long value) { + sb.append(Long.toString(value)); + } + + @Override + public void value(double value) { + // Double's toString method will result in scientific notation and loss of precision + String str = Double.toString(value); + if (str.contains("E")) { + value(new BigDecimal(value)); + } else { + sb.append(str); + } + } + + @Override + public void value(BigInteger value) { + // Using the BigInteger.toString() method will result in scientific notation, so instead ... + value(new BigDecimal(value)); + } + + @Override + public void value(BigDecimal value) { + // Using the BigInteger.toString() method will result in scientific notation, so instead ... + sb.append(value.toPlainString()); + } + + @Override + public void value(boolean value) { + sb.append(Boolean.toString(value)); + } + + @Override + public void valueNull() { + sb.append("null"); + } + + @Override + public void valueYear(int year) { + sb.append(year); + } + + @Override + public void valueDate(int year, int month, int day) { + sb.append('"'); + appendDate(year, month, day); + sb.append('"'); + } + + @Override + // checkstyle, please ignore ParameterNumber for the next line + public void valueDatetime(int year, int month, int day, int hour, int min, int sec, int microSeconds) { + sb.append('"'); + appendDate(year, month, day); + sb.append(' '); + appendTime(hour, min, sec, microSeconds); + sb.append('"'); + } + + @Override + public void valueTime(int hour, int min, int sec, int microSeconds) { + sb.append('"'); + if (hour < 0) { + sb.append('-'); + hour = Math.abs(hour); + } + appendTime(hour, min, sec, microSeconds); + sb.append('"'); + } + + @Override + public void valueTimestamp(long secondsPastEpoch, int microSeconds) { + sb.append(secondsPastEpoch); + appendSixDigitUnsignedInt(microSeconds, false); + } + + @Override + public void valueOpaque(ColumnType type, byte[] value) { + sb.append('"'); + sb.append(javax.xml.bind.DatatypeConverter.printBase64Binary(value)); + sb.append('"'); + } + + @Override + public void nextEntry() { + sb.append(','); + } + + /** + * Append a string by escaping any characters that must be escaped. + * + * @param original the string to be written; may not be null + */ + protected void appendString(String original) { + for (int i = 0, len = original.length(); i < len; ++i) { + char c = original.charAt(i); + int ch = c; + if (ch < 0 || ESCAPES[ch] == 0) { + sb.append(c); + continue; + } + int escape = ESCAPES[ch]; + if (escape > 0) { // 2-char escape, fine + sb.append('\\'); + sb.append(c); + } else { + unicodeEscape(ch); + } + } + } + + /** + * Append a generic Unicode escape (e.g., {@code \\uXXXX}) for given character. + * + * @param charToEscape the character to escape + */ + private void unicodeEscape(int charToEscape) { + sb.append('\\'); + sb.append('u'); + if (charToEscape > 0xFF) { + int hi = (charToEscape >> 8) & 0xFF; + sb.append(HEX_CODES[hi >> 4]); + sb.append(HEX_CODES[hi & 0xF]); + charToEscape &= 0xFF; + } else { + sb.append('0'); + sb.append('0'); + } + // We know it's a control char, so only the last 2 chars are non-0 + sb.append(HEX_CODES[charToEscape >> 4]); + sb.append(HEX_CODES[charToEscape & 0xF]); + } + + protected void appendTwoDigitUnsignedInt(int value) { + assert value >= 0; + assert value < 100; + if (value < 10) { + sb.append("0").append(value); + } else { + sb.append(value); + } + } + + protected void appendFourDigitUnsignedInt(int value) { + if (value < 10) { + sb.append("000").append(value); + } else if (value < 100) { + sb.append("00").append(value); + } else if (value < 1000) { + sb.append("0").append(value); + } else { + sb.append(value); + } + } + + protected void appendSixDigitUnsignedInt(int value, boolean trimTrailingZeros) { + assert value > 0; + assert value < 1000000; + // Add prefixes if necessary ... + if (value < 10) { + sb.append("00000"); + } else if (value < 100) { + sb.append("0000"); + } else if (value < 1000) { + sb.append("000"); + } else if (value < 10000) { + sb.append("00"); + } else if (value < 100000) { + sb.append("0"); + } + if (trimTrailingZeros) { + // Remove any trailing 0's ... + for (int i = 0; i != 6; ++i) { + if (value % 10 == 0) { + value /= 10; + } + } + sb.append(value); + } + } + + protected void appendDate(int year, int month, int day) { + if (year < 0) { + sb.append('-'); + year = Math.abs(year); + } + appendFourDigitUnsignedInt(year); + sb.append('-'); + appendTwoDigitUnsignedInt(month); + sb.append('-'); + appendTwoDigitUnsignedInt(day); + } + + protected void appendTime(int hour, int min, int sec, int microSeconds) { + appendTwoDigitUnsignedInt(hour); + sb.append(':'); + appendTwoDigitUnsignedInt(min); + sb.append(':'); + appendTwoDigitUnsignedInt(sec); + if (microSeconds != 0) { + sb.append('.'); + appendSixDigitUnsignedInt(microSeconds, true); + } + } +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/ValueType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/ValueType.java new file mode 100644 index 0000000..816bc60 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/ValueType.java @@ -0,0 +1,87 @@ +/* + * Copyright 2016 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import java.util.HashMap; +import java.util.Map; + +/** + * The set of values that can be used within a MySQL JSON value. + *

+ * These values are defined in the MySQL codebase in the + * json_binary.h file, and are: + * + *

+ *   type ::=
+ *       0x00 |  // small JSON object
+ *       0x01 |  // large JSON object
+ *       0x02 |  // small JSON array
+ *       0x03 |  // large JSON array
+ *       0x04 |  // literal (true/false/null)
+ *       0x05 |  // int16
+ *       0x06 |  // uint16
+ *       0x07 |  // int32
+ *       0x08 |  // uint32
+ *       0x09 |  // int64
+ *       0x0a |  // uint64
+ *       0x0b |  // double
+ *       0x0c |  // utf8mb4 string
+ *       0x0f    // custom data (any MySQL data type)
+ * 
+ * + * @author Randall Hauch + */ +public enum ValueType { + + SMALL_DOCUMENT(0x00), + LARGE_DOCUMENT(0x01), + SMALL_ARRAY(0x02), + LARGE_ARRAY(0x03), + LITERAL(0x04), + INT16(0x05), + UINT16(0x06), + INT32(0x07), + UINT32(0x08), + INT64(0x09), + UINT64(0x0a), + DOUBLE(0x0b), + STRING(0x0c), + CUSTOM(0x0f); + + private final int code; + + ValueType(int code) { + this.code = code; + } + + public int getCode() { + return this.code; + } + + private static final Map TYPE_BY_CODE; + + static { + TYPE_BY_CODE = new HashMap(); + for (ValueType type : values()) { + TYPE_BY_CODE.put(type.code, type); + } + } + + public static ValueType byCode(int code) { + return TYPE_BY_CODE.get(code); + } + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java index 350b870..b19eff2 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java @@ -171,7 +171,7 @@ public int available() throws IOException { } public int peek() throws IOException { - if (peek == null) { + if (peek == null || peek == -1) { peek = readWithinBlockBoundaries(); } return peek; @@ -180,7 +180,7 @@ public int peek() throws IOException { @Override public int read() throws IOException { int result; - if (peek == null) { + if (peek == null || peek == -1) { result = readWithinBlockBoundaries(); } else { result = peek; diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index ddddc18..8ede146 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -49,6 +49,7 @@ import java.net.SocketException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.AbstractMap; @@ -253,9 +254,21 @@ public void testDeserializationOfNEWDECIMAL() throws Exception { public void testDeserializationOfDATE() throws Exception { assertEquals(writeAndCaptureRow("date", "'1989-03-21'"), new Serializable[]{ generateTime(1989, 3, 21, 0, 0, 0, 0)}); - assertEquals(writeAndCaptureRow("date", "'0000-03-21'"), new Serializable[]{null}); - assertEquals(writeAndCaptureRow("date", "'1989-00-21'"), new Serializable[]{null}); - assertEquals(writeAndCaptureRow("date", "'1989-03-00'"), new Serializable[]{null}); + final boolean[] noZeroInDate = new boolean[1]; + master.query("select @@sql_mode;", new Callback() { + + @Override + public void execute(ResultSet rs) throws SQLException { + // NO_ZERO_IN_DATE is turned on by default in MySQL 5.7 + // https://github.com/shyiko/mysql-binlog-connector-java/pull/119#issuecomment-251870581 + noZeroInDate[0] = rs.next() && rs.getString(1).contains("NO_ZERO_IN_DATE"); + } + }); + if (!noZeroInDate[0]) { + assertEquals(writeAndCaptureRow("date", "'0000-03-21'"), new Serializable[]{null}); + assertEquals(writeAndCaptureRow("date", "'1989-00-21'"), new Serializable[]{null}); + assertEquals(writeAndCaptureRow("date", "'1989-03-00'"), new Serializable[]{null}); + } } @Test @@ -847,15 +860,18 @@ public void execute(Statement statement) throws SQLException { } } - private static final class MySQLConnection implements Closeable { + /** + * Representation of a MySQL connection. + */ + public static final class MySQLConnection implements Closeable { - private String hostname; - private int port; - private String username; - private String password; + private final String hostname; + private final int port; + private final String username; + private final String password; private Connection connection; - private MySQLConnection(String hostname, int port, String username, String password) + public MySQLConnection(String hostname, int port, String username, String password) throws ClassNotFoundException, SQLException { this.hostname = hostname; this.port = port; @@ -873,6 +889,22 @@ public void execute(Statement statement) throws SQLException { }); } + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + + public String username() { + return username; + } + + public String password() { + return password; + } + public void execute(Callback callback) throws SQLException { connection.setAutoCommit(false); Statement statement = connection.createStatement(); @@ -884,6 +916,33 @@ public void execute(Callback callback) throws SQLException { } } + public void execute(final String...statements) throws SQLException { + execute(new Callback() { + @Override + public void execute(Statement statement) throws SQLException { + for (String command : statements) { + statement.execute(command); + } + } + }); + } + + public void query(String sql, Callback callback) throws SQLException { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + try { + ResultSet rs = statement.executeQuery(sql); + try { + callback.execute(rs); + connection.commit(); + } finally { + rs.close(); + } + } finally { + statement.close(); + } + } + @Override public void close() throws IOException { try { @@ -894,7 +953,12 @@ public void close() throws IOException { } } - private interface Callback { + /** + * Callback used in the {@link MySQLConnection#execute(Callback)} method. + * + * @param the type of argument + */ + public interface Callback { void execute(T obj) throws SQLException; } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java new file mode 100644 index 0000000..104c8fd --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -0,0 +1,331 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event.deserialization.json; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import java.io.Serializable; +import java.sql.SQLSyntaxErrorException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.ResourceBundle; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest.MySQLConnection; +import com.github.shyiko.mysql.binlog.CapturingEventListener; +import com.github.shyiko.mysql.binlog.CountDownEventListener; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; + +/** + * @author Randall Hauch + */ +public class JsonBinaryValueIntegrationTest { + + private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); + + private MySQLConnection master; + private Map jsonValuesByKey; + + @BeforeClass + public void setUp() throws Exception { + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + ResourceBundle bundle = ResourceBundle.getBundle("jdbc"); + String prefix = "jdbc.mysql.replication."; + master = new MySQLConnection(bundle.getString(prefix + "master.hostname"), + Integer.parseInt(bundle.getString(prefix + "master.port")), + bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password")); + BinaryLogClient client = new BinaryLogClient(master.hostname(), master.port(), master.username(), + master.password()); + client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances + client.setKeepAlive(false); + // Uncomment the next line for detailed traces of the events ... + // client.registerEventListener(new TraceEventListener()); + CountDownEventListener eventListener; + client.registerEventListener(eventListener = new CountDownEventListener()); + // client.registerLifecycleListener(new TraceLifecycleListener()); + client.connect(DEFAULT_TIMEOUT); + try { + master.execute("drop database if exists json_test", + "create database json_test", + "use json_test", + "create table t1 (i INT, j JSON)"); + eventListener.waitFor(EventType.QUERY, 3, DEFAULT_TIMEOUT); + eventListener.reset(); + } catch (SQLSyntaxErrorException e) { + // Skip the tests altogether since MySQL is pre 5.7 + throw new org.testng.SkipException("JSON data type is not supported by current version of MySQL"); + } + + // Insert values into the t1 table ... + CapturingEventListener capturingEventListener = new CapturingEventListener(); + client.registerEventListener(capturingEventListener); + master.execute("INSERT INTO t1 VALUES (0, NULL);", + "INSERT INTO t1 VALUES (1, '{\"a\": 2}');", + "INSERT INTO t1 VALUES (2, '[1,2]');", + // checkstyle, please ignore LineLength for the next line + "INSERT INTO t1 VALUES (3, '{\"a\":\"b\", \"c\":\"d\",\"ab\":\"abc\", \"bc\": [\"x\", \"y\"]}');", + "INSERT INTO t1 VALUES (4, '[\"here\", [\"I\", \"am\"], \"!!!\"]');", + "INSERT INTO t1 VALUES (5, '\"scalar string\"');", + "INSERT INTO t1 VALUES (6, 'true');", + "INSERT INTO t1 VALUES (7, 'false');", + "INSERT INTO t1 VALUES (8, 'null');", + "INSERT INTO t1 VALUES (9, '-1');", + "INSERT INTO t1 VALUES (10, CAST(CAST(1 AS UNSIGNED) AS JSON));", + "INSERT INTO t1 VALUES (11, '32767');", + "INSERT INTO t1 VALUES (12, '32768');", + "INSERT INTO t1 VALUES (13, '-32768');", + "INSERT INTO t1 VALUES (14, '-32769');", + "INSERT INTO t1 VALUES (15, '2147483647');", + "INSERT INTO t1 VALUES (16, '2147483648');", + "INSERT INTO t1 VALUES (17, '-2147483648');", + "INSERT INTO t1 VALUES (18, '-2147483649');", + "INSERT INTO t1 VALUES (19, '18446744073709551615');", + "INSERT INTO t1 VALUES (20, '18446744073709551616');", + "INSERT INTO t1 VALUES (21, '3.14');", + "INSERT INTO t1 VALUES (22, '{}');", + "INSERT INTO t1 VALUES (23, '[]');", + "INSERT INTO t1 VALUES (24, CAST(CAST('2015-01-15 23:24:25' AS DATETIME) AS JSON));", + "INSERT INTO t1 VALUES (25, CAST(CAST('23:24:25' AS TIME) AS JSON));", + "INSERT INTO t1 VALUES (125, CAST(CAST('23:24:25.12' AS TIME(3)) AS JSON));", + "INSERT INTO t1 VALUES (225, CAST(CAST('23:24:25.0237' AS TIME(3)) AS JSON));", + "INSERT INTO t1 VALUES (26, CAST(CAST('2015-01-15' AS DATE) AS JSON));", + "INSERT INTO t1 VALUES (27, CAST(TIMESTAMP'2015-01-15 23:24:25' AS JSON));", + "INSERT INTO t1 VALUES (127, CAST(TIMESTAMP'2015-01-15 23:24:25.12' AS JSON));", + "INSERT INTO t1 VALUES (227, CAST(TIMESTAMP'2015-01-15 23:24:25.0237' AS JSON));", + "INSERT INTO t1 VALUES (327, CAST(UNIX_TIMESTAMP('2015-01-15 23:24:25') AS JSON));", + "INSERT INTO t1 VALUES (28, CAST(ST_GeomFromText('POINT(1 1)') AS JSON));", + // auto-convert to utf8mb4 + "INSERT INTO t1 VALUES (29, CAST('[]' AS CHAR CHARACTER SET 'ascii'));", + "INSERT INTO t1 VALUES (30, CAST(x'cafe' AS JSON));", + "INSERT INTO t1 VALUES (31, CAST(x'cafebabe' AS JSON));", + // # Maximum allowed key length is 64k-1 + "INSERT INTO t1 VALUES (100, CONCAT('{\"', REPEAT('a', 64 * 1024 - 1), '\":123}'));"); + + // Wait for the inserts to appear ... + eventListener.waitFor(WriteRowsEventData.class, 37, DEFAULT_TIMEOUT); + + jsonValuesByKey = new HashMap(); + List events = capturingEventListener.getEvents(WriteRowsEventData.class); + for (WriteRowsEventData event : events) { + List writtenRows = event.getRows(); + for (Serializable[] row : writtenRows) { + assertEquals(row.length, 2); + // Read the values ... + Integer rowNum = (Integer) row[0]; + byte[] jsonBinary = (byte[]) row[1]; + assertNotNull(rowNum); + jsonValuesByKey.put(rowNum, jsonBinary); + } + } + } + + @Test + public void testNullJsonValue() throws Exception { + assertJson(0, null); + } + + @Test + public void testSimpleJsonObject() throws Exception { + assertJson(1, "{\"a\":2}"); + } + + @Test + public void testMultiLevelJsonObject() throws Exception { + assertJson(3, "{\"a\":\"b\",\"c\":\"d\",\"ab\":\"abc\",\"bc\":[\"x\",\"y\"]}"); + } + + @Test + public void testSimpleJsonArray() throws Exception { + assertJson(2, "[1,2]"); + } + + @Test + public void testMultiLevelJsonArray() throws Exception { + assertJson(4, "[\"here\",[\"I\",\"am\"],\"!!!\"]"); + } + + @Test + public void testScalarString() throws Exception { + assertJson(5, "\"scalar string\""); + } + + @Test + public void testScalarBooleanTrue() throws Exception { + assertJson(6, "true"); + } + + @Test + public void testScalarBooleanFalse() throws Exception { + assertJson(7, "false"); + } + + @Test + public void testScalarNull() throws Exception { + assertJson(8, "null"); + } + + @Test + public void testScalarNegativeInteger() throws Exception { + assertJson(9, "-1"); + } + + @Test + public void testScalarUnsignedInteger() throws Exception { + assertJson(10, "1"); + } + + @Test + public void testScalarMaxPositiveInt16() throws Exception { + assertJson(11, "32767"); + } + + @Test + public void testScalarInt32() throws Exception { + assertJson(12, "32768"); + } + + @Test + public void testScalarNegativeInt16() throws Exception { + assertJson(13, "-32768"); + } + + @Test + public void testScalarNegativeInt32() throws Exception { + assertJson(14, "-32769"); + } + + @Test + public void testScalarMaxPositiveInt32() throws Exception { + assertJson(15, "2147483647"); + } + + @Test + public void testScalarPositiveInt64() throws Exception { + assertJson(16, "2147483648"); + } + + @Test + public void testScalarMaxNegativeInt32() throws Exception { + assertJson(17, "-2147483648"); + } + + @Test + public void testScalarNegativeInt64() throws Exception { + assertJson(18, "-2147483649"); + } + + @Test + public void testScalarUInt64() throws Exception { + assertJson(19, "18446744073709551615"); + } + + @Test + public void testScalarBeyondUInt64() throws Exception { + assertJson(20, "18446744073709551616"); + } + + @Test + public void testScalarPi() throws Exception { + assertJson(21, "3.14"); + } + + @Test + public void testEmptyObject() throws Exception { + assertJson(22, "{}"); + } + + @Test + public void testEmptyArray() throws Exception { + assertJson(23, "[]"); + } + + @Test + public void testScalarDateTime() throws Exception { + assertJson(24, "\"2015-01-15 23:24:25\""); + } + + @Test + public void testScalarTime() throws Exception { + assertJson(25, "\"23:24:25\""); + assertJson(125, "\"23:24:25.12\""); + assertJson(225, "\"23:24:25.024\""); + } + + @Test + public void testScalarDate() throws Exception { + assertJson(26, "\"2015-01-15\""); + } + + @Test + public void testScalarTimestamp() throws Exception { + // Timestamp literals are interpreted by MySQL as DATETIME values + assertJson(27, "\"2015-01-15 23:24:25\""); + assertJson(127, "\"2015-01-15 23:24:25.12\""); + assertJson(227, "\"2015-01-15 23:24:25.0237\""); + // The UNIX_TIMESTAMP(ts) function returns the number of seconds past epoch for the given ts + assertJson(327, "1421364265"); + } + + @Test + public void testScalarGeometry() throws Exception { + assertJson(28, "{\"type\":\"Point\",\"coordinates\":[1.0,1.0]}"); + } + + @Test + public void testScalarStringWithCharsetConversion() throws Exception { + assertJson(29, "[]"); + } + + @Test + public void testScalarBinaryAsBase64() throws Exception { + assertJson(30, "\"yv4=\""); + assertJson(31, "\"yv66vg==\""); + } + + protected void assertJson(int i, String expected) throws Exception { + byte[] b = jsonForId(i); + String json = b != null ? JsonBinary.parseAsString(b) : null; + assertEquals(json, expected); + } + + /** + * Get the binary representation of the JSON value that corresponds to the specified row number. + * + * @param i the row number; should be unique for an insert to work + * @return the binary representation of the JSON value as read from the {@link WriteRowsEventData} event; + * may be null if the JSON value is null + */ + protected byte[] jsonForId(int i) throws Exception { + return jsonValuesByKey.get(i); + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + if (master != null) { + master.execute("drop database if exists json_test"); + master.close(); + } + } +} diff --git a/supplement/codequality/checkstyle.xml b/supplement/codequality/checkstyle.xml index d5ad630..d74512a 100644 --- a/supplement/codequality/checkstyle.xml +++ b/supplement/codequality/checkstyle.xml @@ -72,7 +72,7 @@ - + diff --git a/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile b/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile new file mode 100644 index 0000000..7a644a5 --- /dev/null +++ b/supplement/vagrant/mysql-5.7.15-sandbox-prepackaged/vagrantfile @@ -0,0 +1,6 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'shyiko/mysql-sandbox-prepackaged' + config.vm.box_version = '5.7.15' + config.vm.network :forwarded_port, guest: 33061, host: 33061 + config.vm.network :forwarded_port, guest: 33062, host: 33062 +end diff --git a/supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile b/supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile new file mode 100644 index 0000000..07cfd30 --- /dev/null +++ b/supplement/vagrant/mysql-5.7.15-sandbox/vagrantfile @@ -0,0 +1,22 @@ +Vagrant.configure("2") do |config| + config.vm.box = 'lucid32' + config.vm.box_url = 'http://files.vagrantup.com/lucid32.box' + config.vm.provision :shell, :inline => %Q( + sed -i.bak -r 's/(us.)?(archive|security).ubuntu.com/old-releases.ubuntu.com/g' /etc/apt/sources.list + apt-get update && apt-get install -y make libaio1 # libaio1 required by mysql + echo 'Downloading MySQL distribution ...' + wget --progress=dot:mega --content-disposition \ + http://cdn.mysql.com/Downloads/MySQL-5.7/mysql-5.7.15-linux-glibc2.5-i686.tar.gz \ + 2>&1 | grep --line-buffered -o '[0-9]*%' + wget -O - https://github.com/datacharmer/mysql-sandbox/releases/download/3.1.13/MySQL-Sandbox-3.1.13.tar.gz | tar xzv + (cd MySQL-Sandbox-3.1.13 && perl Makefile.PL && make && make install) + su -c "make_replication_sandbox ~/mysql-5.7.15-linux-glibc2.5-i686.tar.gz \ + --remote_access='%' --how_many_slaves=1 --sandbox_base_port=33061 \ + --master_options='-c binlog_format=ROW' \ + --slave_options='-c binlog_format=ROW -c log-slave-updates=TRUE'" vagrant + rm -f *.tar.gz + sed -i -e "s/exit\ 0/\\/home\\/vagrant\\/sandboxes\\/rsandbox_mysql-5_7_15\\/restart_all; exit 0/g" /etc/rc.local + ) + config.vm.network :forwarded_port, guest: 33061, host: 33061 + config.vm.network :forwarded_port, guest: 33062, host: 33062 +end