diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/DatabaseChunkedReader.java b/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/DatabaseChunkedReader.java index 68591a34c..a85330546 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/DatabaseChunkedReader.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/databases/dbreader/DatabaseChunkedReader.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import javax.sql.DataSource; import org.apache.avro.Schema; import org.apache.commons.lang.StringUtils; @@ -99,7 +98,7 @@ public class DatabaseChunkedReader implements Closeable { /** * Create a DatabaseChunkedReader instance * @param props Configuration - * @param source JDBC DataSource object to use for connecting + * @param connection JDBC DataSource Connection object * @param sourceQuery Query to execute on the source in chunked mode. The query cannot be any SQL query and * needs to follow specific rules. The query should only involve one table, as specified * by the 'table' parameter, should query all unique key columns and in the order specified @@ -111,7 +110,7 @@ public class DatabaseChunkedReader implements Closeable { * @param databaseSource DatabaseSource implementation to query table metadata needed for constructing the chunk query * @param id Name to identify the reader instance in logs */ - public DatabaseChunkedReader(Properties props, DataSource source, String sourceQuery, String db, String table, + public DatabaseChunkedReader(Properties props, Connection connection, String sourceQuery, String db, String table, DatabaseSource databaseSource, String id) throws SQLException { _databaseChunkedReaderConfig = new DatabaseChunkedReaderConfig(props); _sourceQuery = sourceQuery; @@ -121,8 +120,7 @@ public DatabaseChunkedReader(Properties props, DataSource source, String sourceQ _fetchSize = _databaseChunkedReaderConfig.getFetchSize(); _queryTimeoutSecs = _databaseChunkedReaderConfig.getQueryTimeout(); _rowCountLimit = _databaseChunkedReaderConfig.getRowCountLimit(); - _connection = source.getConnection(); - Validate.notNull(_connection, "getConnection returned null for source" + source); + _connection = connection; _chunkedQueryManager = _databaseChunkedReaderConfig.getChunkedQueryManager(); _skipBadMessagesEnabled = _databaseChunkedReaderConfig.getShouldSkipBadMessage(); diff --git a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java index 640764489..fdb0b6878 100644 --- a/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java +++ b/datastream-common/src/test/java/com/linkedin/datastream/common/databases/dbreader/TestDatabaseChunkedReader.java @@ -233,7 +233,7 @@ public void testRowCount() throws Exception { for (int i = 0; i < numPartitions; i++) { try (DatabaseChunkedReader reader = - new DatabaseChunkedReader(props, mockSources.get(i), "TEST_DB", TEST_SOURCE_QUERY, + new DatabaseChunkedReader(props, mockSources.get(i).getConnection(), "TEST_DB", TEST_SOURCE_QUERY, TEST_COMPOSITE_KEY_TABLE, mockDBSource, "testRowCount_" + i)) { reader.subscribe(Collections.singletonList(0), null); for (DatabaseRow row = reader.poll(); row != null; row = reader.poll()) { @@ -286,7 +286,7 @@ void testSkipBadMessages() throws SQLException, SchemaGenerationException { int count = 0; DatabaseChunkedReader reader = - new DatabaseChunkedReader(props, mockDs, TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, readerId); + new DatabaseChunkedReader(props, mockDs.getConnection(), TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, readerId); reader.subscribe(Collections.singletonList(0), null); for (DatabaseRow row = reader.poll(); row != null; row = reader.poll()) { Assert.assertEquals(row, new DatabaseRow(Collections.singletonList(field))); @@ -323,7 +323,7 @@ public void testCheckpointedChunkedReader() throws SQLException, SchemaGeneratio Mockito.when(mockRs.next()).thenReturn(false); //No results for the query. Test to ensure the first query is a chunked query Mockito.when(mockStmt.executeQuery()).thenReturn(mockRs); DatabaseChunkedReader reader = - new DatabaseChunkedReader(props, mockDs, TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, + new DatabaseChunkedReader(props, mockDs.getConnection(), TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, "TEST_CHECKPOINT"); Map checkpoint = new HashMap<>(); checkpoint.put("key1", 99); diff --git a/gradle/maven.gradle b/gradle/maven.gradle index 76f598ce5..fc6aca855 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "1.2.0" + version = "2.0.0" } subprojects {