Skip to content

Commit

Permalink
Update the DatabaseChunkedReader to take the Connection as input rath…
Browse files Browse the repository at this point in the history
…er than the DataSource (#850)
  • Loading branch information
somandal committed Aug 19, 2021
1 parent 1cc4624 commit a23209c
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
Expand Up @@ -16,7 +16,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;

import org.apache.avro.Schema;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -99,7 +98,7 @@ public class DatabaseChunkedReader implements Closeable {
/**
* Create a DatabaseChunkedReader instance
* @param props Configuration
* @param source JDBC DataSource object to use for connecting
* @param connection JDBC DataSource Connection object
* @param sourceQuery Query to execute on the source in chunked mode. The query cannot be any SQL query and
* needs to follow specific rules. The query should only involve one table, as specified
* by the 'table' parameter, should query all unique key columns and in the order specified
Expand All @@ -111,7 +110,7 @@ public class DatabaseChunkedReader implements Closeable {
* @param databaseSource DatabaseSource implementation to query table metadata needed for constructing the chunk query
* @param id Name to identify the reader instance in logs
*/
public DatabaseChunkedReader(Properties props, DataSource source, String sourceQuery, String db, String table,
public DatabaseChunkedReader(Properties props, Connection connection, String sourceQuery, String db, String table,
DatabaseSource databaseSource, String id) throws SQLException {
_databaseChunkedReaderConfig = new DatabaseChunkedReaderConfig(props);
_sourceQuery = sourceQuery;
Expand All @@ -121,8 +120,7 @@ public DatabaseChunkedReader(Properties props, DataSource source, String sourceQ
_fetchSize = _databaseChunkedReaderConfig.getFetchSize();
_queryTimeoutSecs = _databaseChunkedReaderConfig.getQueryTimeout();
_rowCountLimit = _databaseChunkedReaderConfig.getRowCountLimit();
_connection = source.getConnection();
Validate.notNull(_connection, "getConnection returned null for source" + source);
_connection = connection;
_chunkedQueryManager = _databaseChunkedReaderConfig.getChunkedQueryManager();
_skipBadMessagesEnabled = _databaseChunkedReaderConfig.getShouldSkipBadMessage();

Expand Down
Expand Up @@ -233,7 +233,7 @@ public void testRowCount() throws Exception {

for (int i = 0; i < numPartitions; i++) {
try (DatabaseChunkedReader reader =
new DatabaseChunkedReader(props, mockSources.get(i), "TEST_DB", TEST_SOURCE_QUERY,
new DatabaseChunkedReader(props, mockSources.get(i).getConnection(), "TEST_DB", TEST_SOURCE_QUERY,
TEST_COMPOSITE_KEY_TABLE, mockDBSource, "testRowCount_" + i)) {
reader.subscribe(Collections.singletonList(0), null);
for (DatabaseRow row = reader.poll(); row != null; row = reader.poll()) {
Expand Down Expand Up @@ -286,7 +286,7 @@ void testSkipBadMessages() throws SQLException, SchemaGenerationException {

int count = 0;
DatabaseChunkedReader reader =
new DatabaseChunkedReader(props, mockDs, TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, readerId);
new DatabaseChunkedReader(props, mockDs.getConnection(), TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource, readerId);
reader.subscribe(Collections.singletonList(0), null);
for (DatabaseRow row = reader.poll(); row != null; row = reader.poll()) {
Assert.assertEquals(row, new DatabaseRow(Collections.singletonList(field)));
Expand Down Expand Up @@ -323,7 +323,7 @@ public void testCheckpointedChunkedReader() throws SQLException, SchemaGeneratio
Mockito.when(mockRs.next()).thenReturn(false); //No results for the query. Test to ensure the first query is a chunked query
Mockito.when(mockStmt.executeQuery()).thenReturn(mockRs);
DatabaseChunkedReader reader =
new DatabaseChunkedReader(props, mockDs, TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource,
new DatabaseChunkedReader(props, mockDs.getConnection(), TEST_SIMPLE_QUERY, "TEST_DB", TEST_SIMPLE_KEY_TABLE, mockDBSource,
"TEST_CHECKPOINT");
Map<String, Object> checkpoint = new HashMap<>();
checkpoint.put("key1", 99);
Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
@@ -1,5 +1,5 @@
allprojects {
version = "1.2.0"
version = "2.0.0"
}

subprojects {
Expand Down

0 comments on commit a23209c

Please sign in to comment.