diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventData.java
index 59255d31..edda8911 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventData.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventData.java
@@ -28,6 +28,7 @@ public class TableMapEventData implements EventData {
private byte[] columnTypes;
private int[] columnMetadata;
private BitSet columnNullability;
+ private TableMapEventMetadata eventMetadata;
public long getTableId() {
return tableId;
@@ -77,6 +78,10 @@ public void setColumnNullability(BitSet columnNullability) {
this.columnNullability = columnNullability;
}
+ public TableMapEventMetadata getEventMetadata() { return eventMetadata; }
+
+ public void setEventMetadata(TableMapEventMetadata eventMetadata) { this.eventMetadata = eventMetadata; }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -93,6 +98,7 @@ public String toString() {
sb.append(i == 0 ? "" : ", ").append(columnMetadata[i]);
}
sb.append(", columnNullability=").append(columnNullability);
+ sb.append(", eventMetadata=").append(eventMetadata);
sb.append('}');
return sb.toString();
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventMetadata.java b/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventMetadata.java
new file mode 100644
index 00000000..66f030ec
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/TableMapEventMetadata.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2013 Stanley Shyiko
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog.event;
+
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Ahmed Abdul Hamid
+ */
+public class TableMapEventMetadata implements EventData {
+
+ private BitSet signedness;
+ private DefaultCharset defaultCharset;
+ private List columnCharsets;
+ private List columnNames;
+ private List setStrValues;
+ private List enumStrValues;
+ private List geometryTypes;
+ private List simplePrimaryKeys;
+ private Map primaryKeysWithPrefix;
+ private DefaultCharset enumAndSetDefaultCharset;
+ private List enumAndSetColumnCharsets;
+
+ public BitSet getSignedness() {
+ return signedness;
+ }
+
+ public void setSignedness(BitSet signedness) {
+ this.signedness = signedness;
+ }
+
+ public DefaultCharset getDefaultCharset() {
+ return defaultCharset;
+ }
+
+ public void setDefaultCharset(DefaultCharset defaultCharset) {
+ this.defaultCharset = defaultCharset;
+ }
+
+ public List getColumnCharsets() {
+ return columnCharsets;
+ }
+
+ public void setColumnCharsets(List columnCharsets) {
+ this.columnCharsets = columnCharsets;
+ }
+
+ public List getColumnNames() {
+ return columnNames;
+ }
+
+ public void setColumnNames(List columnNames) {
+ this.columnNames = columnNames;
+ }
+
+ public List getSetStrValues() {
+ return setStrValues;
+ }
+
+ public void setSetStrValues(List setStrValues) {
+ this.setStrValues = setStrValues;
+ }
+
+ public List getEnumStrValues() {
+ return enumStrValues;
+ }
+
+ public void setEnumStrValues(List enumStrValues) {
+ this.enumStrValues = enumStrValues;
+ }
+
+ public List getGeometryTypes() {
+ return geometryTypes;
+ }
+
+ public void setGeometryTypes(List geometryTypes) {
+ this.geometryTypes = geometryTypes;
+ }
+
+ public List getSimplePrimaryKeys() {
+ return simplePrimaryKeys;
+ }
+
+ public void setSimplePrimaryKeys(List simplePrimaryKeys) {
+ this.simplePrimaryKeys = simplePrimaryKeys;
+ }
+
+ public Map getPrimaryKeysWithPrefix() {
+ return primaryKeysWithPrefix;
+ }
+
+ public void setPrimaryKeysWithPrefix(Map primaryKeysWithPrefix) {
+ this.primaryKeysWithPrefix = primaryKeysWithPrefix;
+ }
+
+ public DefaultCharset getEnumAndSetDefaultCharset() {
+ return enumAndSetDefaultCharset;
+ }
+
+ public void setEnumAndSetDefaultCharset(DefaultCharset enumAndSetDefaultCharset) {
+ this.enumAndSetDefaultCharset = enumAndSetDefaultCharset;
+ }
+
+ public List getEnumAndSetColumnCharsets() {
+ return enumAndSetColumnCharsets;
+ }
+
+ public void setEnumAndSetColumnCharsets(List enumAndSetColumnCharsets) {
+ this.enumAndSetColumnCharsets = enumAndSetColumnCharsets;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("TableMapEventMetadata");
+ sb.append("{signedness=").append(signedness);
+ sb.append(", defaultCharset=").append(defaultCharset == null ? "null" : defaultCharset);
+
+ sb.append(", columnCharsets=").append(columnCharsets == null ? "null" : "");
+ appendList(sb, columnCharsets);
+
+ sb.append(", columnNames=").append(columnNames == null ? "null" : "");
+ appendList(sb, columnNames);
+
+ sb.append(", setStrValues=").append(setStrValues == null ? "null" : "");
+ for (int i = 0; setStrValues != null && i < setStrValues.size(); ++i) {
+ sb.append(i == 0 ? "" : ", ").append(join(", ", setStrValues.get(i)));
+ }
+
+ sb.append(", enumStrValues=").append(enumStrValues == null ? "null" : "");
+ for (int i = 0; enumStrValues != null && i < enumStrValues.size(); ++i) {
+ sb.append(i == 0 ? "" : ", ").append(join(", ", enumStrValues.get(i)));
+ }
+
+ sb.append(", geometryTypes=").append(geometryTypes == null ? "null" : "");
+ appendList(sb, geometryTypes);
+
+ sb.append(", simplePrimaryKeys=").append(simplePrimaryKeys == null ? "null" : "");
+ appendList(sb, simplePrimaryKeys);
+
+ sb.append(", primaryKeysWithPrefix=").append(primaryKeysWithPrefix == null ? "null" : "");
+ appendMap(sb, primaryKeysWithPrefix);
+
+ sb.append(", enumAndSetDefaultCharset=").append(enumAndSetDefaultCharset == null ? "null" :
+ enumAndSetDefaultCharset);
+
+ sb.append(", enumAndSetColumnCharsets=").append(enumAndSetColumnCharsets == null ? "null" : "");
+ appendList(sb, enumAndSetColumnCharsets);
+
+ sb.append('}');
+ return sb.toString();
+ }
+
+ private static String join(CharSequence delimiter, CharSequence... elements) {
+ if (elements == null || elements.length == 0) {
+ return "";
+ }
+
+ final StringBuilder sb = new StringBuilder();
+ sb.append(elements[0]);
+
+ for (int i = 1; i < elements.length; ++i) {
+ sb.append(delimiter).append(elements[i]);
+ }
+ return sb.toString();
+ }
+
+ private static void appendList(StringBuilder sb, List> elements) {
+ if (elements == null) {
+ return;
+ }
+
+ for (int i = 0; i < elements.size(); ++i) {
+ sb.append(i == 0 ? "" : ", ").append(elements.get(i));
+ }
+ }
+
+ private static void appendMap(StringBuilder sb, Map, ?> map) {
+ if (map == null) {
+ return;
+ }
+
+ int entryCount = 0;
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ sb.append(entryCount++ == 0 ? "" : ", ").append(entry.getKey() + ": " + entry.getValue());
+ }
+ }
+
+ /**
+ * @author Ahmed Abdul Hamid
+ */
+ public static class DefaultCharset implements Serializable {
+ private int defaultCharsetCollation;
+ private Map charsetCollations;
+
+ public void setDefaultCharsetCollation(int defaultCharsetCollation) {
+ this.defaultCharsetCollation = defaultCharsetCollation;
+ }
+
+ public int getDefaultCharsetCollation() {
+ return defaultCharsetCollation;
+ }
+
+ public void setCharsetCollations(Map charsetCollations) {
+ this.charsetCollations = charsetCollations;
+ }
+
+ public Map getCharsetCollations() {
+ return charsetCollations;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(defaultCharsetCollation);
+ sb.append(", charsetCollations=") .append(charsetCollations == null ? "null" : "");
+ appendMap(sb, charsetCollations);
+ return sb.toString();
+ }
+ }
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
index 45589469..31825e08 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
@@ -25,6 +25,8 @@
*/
public class TableMapEventDataDeserializer implements EventDataDeserializer {
+ private final TableMapEventMetadataDeserializer metadataDeserializer = new TableMapEventMetadataDeserializer();
+
@Override
public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
TableMapEventData eventData = new TableMapEventData();
@@ -38,6 +40,8 @@ public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IO
inputStream.readPackedInteger(); // metadata length
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
+ eventData.setEventMetadata(metadataDeserializer.deserialize(
+ new ByteArrayInputStream(inputStream.read(inputStream.available())), numberOfColumns));
return eventData;
}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventMetadataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventMetadataDeserializer.java
new file mode 100644
index 00000000..a4978817
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventMetadataDeserializer.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2013 Stanley Shyiko
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
+import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata.DefaultCharset;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Ahmed Abdul Hamid
+ */
+public class TableMapEventMetadataDeserializer {
+
+ public TableMapEventMetadata deserialize(ByteArrayInputStream inputStream, int numberOfColumns) throws IOException {
+ int remainingBytes = inputStream.available();
+ if (remainingBytes <= 0) {
+ return null;
+ }
+
+ TableMapEventMetadata result = new TableMapEventMetadata();
+
+ for (; remainingBytes > 0; inputStream.enterBlock(remainingBytes)) {
+ MetadataFieldType fieldType = MetadataFieldType.byCode(inputStream.readInteger(1));
+ int fieldLength = inputStream.readPackedInteger();
+
+ remainingBytes = inputStream.available();
+ inputStream.enterBlock(fieldLength);
+
+ switch (fieldType) {
+ case SIGNEDNESS:
+ result.setSignedness(readSignedness(inputStream, numberOfColumns));
+ break;
+ case DEFAULT_CHARSET:
+ result.setDefaultCharset(readDefaultCharset(inputStream));
+ break;
+ case COLUMN_CHARSET:
+ result.setColumnCharsets(readIntegers(inputStream));
+ break;
+ case COLUMN_NAME:
+ result.setColumnNames(readColumnNames(inputStream));
+ break;
+ case SET_STR_VALUE:
+ result.setSetStrValues(readTypeValues(inputStream));
+ break;
+ case ENUM_STR_VALUE:
+ result.setEnumStrValues(readTypeValues(inputStream));
+ break;
+ case GEOMETRY_TYPE:
+ result.setGeometryTypes(readIntegers(inputStream));
+ break;
+ case SIMPLE_PRIMARY_KEY:
+ result.setSimplePrimaryKeys(readIntegers(inputStream));
+ break;
+ case PRIMARY_KEY_WITH_PREFIX:
+ result.setPrimaryKeysWithPrefix(readIntegerPairs(inputStream));
+ break;
+ case ENUM_AND_SET_DEFAULT_CHARSET:
+ result.setEnumAndSetDefaultCharset(readDefaultCharset(inputStream));
+ break;
+ case ENUM_AND_SET_COLUMN_CHARSET:
+ result.setEnumAndSetColumnCharsets(readIntegers(inputStream));
+ break;
+ default:
+ inputStream.enterBlock(remainingBytes);
+ throw new IOException("Unsupported table metadata field type " + fieldType);
+ }
+ remainingBytes -= fieldLength;
+ }
+ return result;
+ }
+
+ private static BitSet readSignedness(ByteArrayInputStream inputStream, int length) throws IOException {
+ BitSet result = new BitSet();
+ // according to MySQL internals the amount of storage required for N columns is INT((N+7)/8) bytes
+ byte[] bytes = inputStream.read((length + 7) >> 3);
+ for (int i = 0; i < length; ++i) {
+ if ((bytes[i >> 3] & (1 << (7 - (i % 8)))) != 0) {
+ result.set(i);
+ }
+ }
+ return result;
+ }
+
+ private static DefaultCharset readDefaultCharset(ByteArrayInputStream inputStream) throws IOException {
+ TableMapEventMetadata.DefaultCharset result = new TableMapEventMetadata.DefaultCharset();
+ result.setDefaultCharsetCollation(inputStream.readPackedInteger());
+ Map charsetCollations = readIntegerPairs(inputStream);
+ if (!charsetCollations.isEmpty()) {
+ result.setCharsetCollations(charsetCollations);
+ }
+ return result;
+ }
+
+ private static List readIntegers(ByteArrayInputStream inputStream) throws IOException {
+ List result = new ArrayList();
+ while (inputStream.available() > 0) {
+ result.add(inputStream.readPackedInteger());
+ }
+ return result;
+ }
+
+ private static List readColumnNames(ByteArrayInputStream inputStream) throws IOException {
+ List columnNames = new ArrayList();
+ while (inputStream.available() > 0) {
+ columnNames.add(inputStream.readLengthEncodedString());
+ }
+ return columnNames;
+ }
+
+ private static List readTypeValues(ByteArrayInputStream inputStream) throws IOException {
+ List result = new ArrayList();
+ while (inputStream.available() > 0) {
+ List typeValues = new ArrayList();
+ int valuesCount = inputStream.readPackedInteger();
+ for (int i = 0; i < valuesCount; ++i) {
+ typeValues.add(inputStream.readLengthEncodedString());
+ }
+ result.add(typeValues.toArray(new String[typeValues.size()]));
+ }
+ return result;
+ }
+
+ private static Map readIntegerPairs(ByteArrayInputStream inputStream) throws IOException {
+ Map result = new LinkedHashMap();
+ while (inputStream.available() > 0) {
+ int columnIndex = inputStream.readPackedInteger();
+ int columnCharset = inputStream.readPackedInteger();
+ result.put(columnIndex, columnCharset);
+ }
+ return result;
+ }
+
+ private enum MetadataFieldType {
+ SIGNEDNESS(1), // Signedness of numeric colums
+ DEFAULT_CHARSET(2), // Charsets of character columns
+ COLUMN_CHARSET(3), // Charsets of character columns
+ COLUMN_NAME(4), // Names of columns
+ SET_STR_VALUE(5), // The string values of SET columns
+ ENUM_STR_VALUE(6), // The string values is ENUM columns
+ GEOMETRY_TYPE(7), // The real type of geometry columns
+ SIMPLE_PRIMARY_KEY(8), // The primary key without any prefix
+ PRIMARY_KEY_WITH_PREFIX(9), // The primary key with some prefix
+ ENUM_AND_SET_DEFAULT_CHARSET(10), // Charsets of ENUM and SET columns
+ ENUM_AND_SET_COLUMN_CHARSET(11); // Charsets of ENUM and SET columns
+
+ private final int code;
+
+ private MetadataFieldType(int code) {
+ this.code = code;
+ }
+
+ public int getCode() { return code; }
+
+ private static final Map INDEX_BY_CODE;
+
+ static {
+ INDEX_BY_CODE = new HashMap();
+ for (MetadataFieldType fieldType : values()) {
+ INDEX_BY_CODE.put(fieldType.code, fieldType);
+ }
+ }
+
+ public static MetadataFieldType byCode(int code) {
+ return INDEX_BY_CODE.get(code);
+ }
+ }
+}