Skip to content

Commit

Permalink
detect maria's goofy JSON==text+constraint columns as JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
osheroff committed Oct 10, 2022
1 parent 5823423 commit 1121800
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
53 changes: 49 additions & 4 deletions src/main/java/com/zendesk/maxwell/schema/SchemaCapturer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.zendesk.maxwell.CaseSensitivity;
import com.zendesk.maxwell.schema.columndef.ColumnDef;
import com.zendesk.maxwell.schema.columndef.JsonColumnDef;
import com.zendesk.maxwell.schema.columndef.StringColumnDef;
import com.zendesk.maxwell.schema.ddl.InvalidSchemaError;
import com.zendesk.maxwell.util.Sql;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -131,7 +134,14 @@ public Schema capture() throws SQLException {
}
LOGGER.debug("{} database schemas captured!", size);

return new Schema(databases, captureDefaultCharset(), this.sensitivity);

Schema s = new Schema(databases, captureDefaultCharset(), this.sensitivity);
try {
detectMariaDBJSON(s);
} catch ( InvalidSchemaError e ) {
e.printStackTrace();
}
return s;
}

private String captureDefaultCharset() throws SQLException {
Expand Down Expand Up @@ -162,15 +172,19 @@ private void captureDatabase(Database db) throws SQLException {


private boolean isMySQLAtLeast56() throws SQLException {
java.sql.DatabaseMetaData meta = connection.getMetaData();
if ( meta.getDatabaseProductVersion().toLowerCase().contains("maria")) {
if ( isMariaDB() )
return true;
}

java.sql.DatabaseMetaData meta = connection.getMetaData();
int major = meta.getDatabaseMajorVersion();
int minor = meta.getDatabaseMinorVersion();
return ((major == 5 && minor >= 6) || major > 5);
}

private boolean isMariaDB() throws SQLException {
java.sql.DatabaseMetaData meta = connection.getMetaData();
return meta.getDatabaseProductVersion().toLowerCase().contains("maria");
}

private void captureTables(Database db, HashMap<String, Table> tables) throws SQLException {
columnPreparedStatement.setString(1, db.getName());
Expand Down Expand Up @@ -281,4 +295,35 @@ public void close() throws SQLException {
}
}

private void detectMariaDBJSON(Schema schema) throws SQLException, InvalidSchemaError {
String checkConstraintSQL = "SELECT CONSTRAINT_SCHEMA, TABLE_NAME, CONSTRAINT_NAME, CHECK_CLAUSE " +
"from INFORMATION_SCHEMA.CHECK_CONSTRAINTS where LEVEL='column' and CHECK_CLAUSE LIKE 'json_valid(%)'";

String regex = "json_valid\\(`(.*)`\\)";
Pattern pattern = Pattern.compile(regex);

try (
PreparedStatement statement = connection.prepareStatement(checkConstraintSQL);
ResultSet rs = statement.executeQuery()
) {
while ( rs.next() ) {
String checkClause = rs.getString("CHECK_CLAUSE");
Matcher m = pattern.matcher(checkClause);
if ( m.find() ) {
String column = m.group(1);
Database d = schema.findDatabaseOrThrow(rs.getString("CONSTRAINT_SCHEMA"));
Table t = d.findTable(rs.getString("TABLE_NAME"));
short i = t.findColumnIndex(column);
if ( i >= 0 ) {
ColumnDef cd = t.findColumn(i);
if ( cd instanceof StringColumnDef ) {
t.replaceColumn(i, JsonColumnDef.create(cd.getName(), "json", i));
}
}
}
}
}

}

}
9 changes: 7 additions & 2 deletions src/main/java/com/zendesk/maxwell/schema/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,19 @@ public String getName() {
return this.name;
}

public int findColumnIndex(String name) {
return columns.indexOf(name);
public short findColumnIndex(String name) {
return (short) columns.indexOf(name);
}


public ColumnDef findColumn(String name) {
return columns.findByName(name);
}

public ColumnDef findColumn(int index) {
return columns.get(index);
}

@JsonIgnore
public int getPKIndex() {
return this.pkIndex;
Expand Down

0 comments on commit 1121800

Please sign in to comment.