Skip to content

Commit

Permalink
Merge pull request #18 from LakshanSS/siddhi-4.x.x
Browse files Browse the repository at this point in the history
Add support for sqlserver and postgresql in listening mode
  • Loading branch information
mohanvive committed May 15, 2019
2 parents e2ae032 + 54d6aad commit 98b018d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 7 deletions.
8 changes: 8 additions & 0 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,18 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,7 @@
"\ndefine stream inputStream (name string);",
description = "In this example, the CDC source polls the 'students' table for inserts " +
"and updates. The polling column is a timestamp field."
),

)
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ public class CDCSourceConstants {
public static final String CONNECTOR_CLASS = "connector.class";
public static final String DATABASE_PORT = "database.port";
public static final String TABLE_WHITELIST = "table.whitelist";
public static final String DATABASE_DBNAME = "database.dbname";
public static final String DATABASE_HOSTNAME = "database.hostname";
public static final String DATABASE_USER = "database.user";
public static final String DATABASE_PASSWORD = "database.password";
public static final String OFFSET_STORAGE = "offset.storage";
public static final String CDC_SOURCE_OBJECT = "cdc.source.object";
public static final String DATABASE_HISTORY = "database.history";
public static final String MYSQL_CONNECTOR_CLASS = "io.debezium.connector.mysql.MySqlConnector";
public static final String POSTGRESQL_CONNECTOR_CLASS = "io.debezium.connector.postgresql.PostgresConnector";
public static final String SQLSERVER_CONNECTOR_CLASS = "io.debezium.connector.sqlserver.SqlServerConnector";
public static final String BEFORE_PREFIX = "before_";
public static final String CACHE_OBJECT = "cacheObj";
public static final int DEFAULT_SERVER_ID = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,62 @@ public static Map<String, Object> getConfigMap(String username, String password,

//Add other MySQL specific details to configMap.
configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.MYSQL_CONNECTOR_CLASS);
break;
}
case "postgresql": {
//Extract url details
String regex = "jdbc:postgresql://(\\w*|[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" +
"(\\d++)/(\\w*)";
Pattern p = Pattern.compile(regex);
Matcher matcher = p.matcher(url);
if (matcher.find()) {
host = matcher.group(1);
port = Integer.parseInt(matcher.group(2));
database = matcher.group(3);
} else {
throw new WrongConfigurationException("Invalid JDBC url: " + url + " received for stream: " +
siddhiStreamName + ". Expected url format: jdbc:postgresql://<host>:<port>/" +
"<database_name>");
}

//Add extracted url details to configMap.
configMap.put(CDCSourceConstants.DATABASE_HOSTNAME, host);
configMap.put(CDCSourceConstants.DATABASE_PORT, port);
configMap.put(CDCSourceConstants.DATABASE_DBNAME, database);
configMap.put(CDCSourceConstants.TABLE_WHITELIST, tableName);

//Add other PostgreSQL specific details to configMap.
configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.POSTGRESQL_CONNECTOR_CLASS);
break;
}
case "sqlserver": {
//Extract url details
String regex = "jdbc:sqlserver://(\\w*|[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}):" +
"(\\d++);databaseName=(\\w*)";
Pattern p = Pattern.compile(regex);
Matcher matcher = p.matcher(url);
if (matcher.find()) {
host = matcher.group(1);
port = Integer.parseInt(matcher.group(2));
database = matcher.group(3);
} else {
throw new WrongConfigurationException("Invalid JDBC url: " + url + " received for stream: " +
siddhiStreamName + ". Expected url format: jdbc:sqlserver://<host>:<port>;" +
"databaseName=<database_name>");
}
//Add extracted url details to configMap.
configMap.put(CDCSourceConstants.DATABASE_HOSTNAME, host);
configMap.put(CDCSourceConstants.DATABASE_PORT, port);
configMap.put(CDCSourceConstants.TABLE_WHITELIST, tableName);
configMap.put(CDCSourceConstants.DATABASE_DBNAME, database);

//Add other SQLServer specific details to configMap.
configMap.put(CDCSourceConstants.CONNECTOR_CLASS, CDCSourceConstants.SQLSERVER_CONNECTOR_CLASS);
break;
}
default: {
throw new WrongConfigurationException("Unsupported schema. Expected schema: mysql, Found: "
+ splittedURL[1]);
throw new WrongConfigurationException("Unsupported schema. Expected schema: mysql or postgresql" +
"or sqlserver, Found: " + splittedURL[1]);
}
}

Expand Down
18 changes: 15 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium-connector-oracle.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-connector-postgres.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium-connector-sqlserver.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
Expand Down Expand Up @@ -191,9 +201,11 @@
<siddhi-map-keyvalue.version>1.0.13</siddhi-map-keyvalue.version>
<jacoco.plugin.version>0.7.9</jacoco.plugin.version>
<mysql-binlog-connector-java.version>0.13.0</mysql-binlog-connector-java.version>
<debezium-connector-mysql.version>0.8.3.Final</debezium-connector-mysql.version>
<debezium-embedded.version>0.8.3.Final</debezium-embedded.version>
<debezium-connector-oracle.version>0.8.3.Final</debezium-connector-oracle.version>
<debezium-connector-mysql.version>0.9.5.Final</debezium-connector-mysql.version>
<debezium-embedded.version>0.9.5.Final</debezium-embedded.version>
<debezium-connector-oracle.version>0.9.5.Final</debezium-connector-oracle.version>
<debezium-connector-postgres.version>0.9.5.Final</debezium-connector-postgres.version>
<debezium-connector-sqlserver.version>0.9.5.Final</debezium-connector-sqlserver.version>
<hikari.version>3.2.0</hikari.version>
<org.testng.version>6.11</org.testng.version>
<siddhi-store-rdbms.version>4.0.39</siddhi-store-rdbms.version>
Expand Down

0 comments on commit 98b018d

Please sign in to comment.