Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
Add JDBC batch insert into sink
Browse files Browse the repository at this point in the history
* Use the `groupTimeout` functionality on the aggregator instead of a `MessageGroupStoreReaper`

added a paragraph describing how to activate batch inserts

fixed code formatting and removed unused imports

faster BatchInsertionTimeoutTests

code style / formatting fixes

* Polishing poms, code style
* Update Copyright
* Optimize `idle-timeout` logic
* remove explicit `toSink` channel definition
  • Loading branch information
Oliver Flasch authored and artembilan committed Jan 2, 2019
1 parent d4d228d commit 6b63922
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 50 deletions.
5 changes: 3 additions & 2 deletions jdbc-app-dependencies/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>jdbc-app-dependencies</artifactId>
Expand All @@ -12,7 +13,7 @@
<artifactId>spring-cloud-dependencies-parent</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>2.1.0.RC3</version>
<relativePath />
<relativePath/>
</parent>

<dependencyManagement>
Expand Down
12 changes: 9 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>jdbc-app-starters-build</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
Expand Down Expand Up @@ -31,7 +32,8 @@
</dependency>
</dependencies>
</dependencyManagement>
<profiles>

<profiles>
<profile>
<id>spring</id>
<repositories>
Expand Down Expand Up @@ -77,7 +79,11 @@
</repository>
</repositories>
<pluginRepositories>
<pluginRepository><id>spring-releases></id><name>Spring Releases</name><url>http://repo.spring.io/libs-release</url></pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/libs-release</url>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
Expand Down
7 changes: 7 additions & 0 deletions spring-cloud-starter-stream-sink-jdbc/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ $$jdbc.columns$$:: $$The comma separated colon-based pairs of column names and S
Names are used at initialization time to issue the DDL.$$ *($$String$$, default: `$$payload:payload.toString()$$`)*
$$jdbc.initialize$$:: $$'true', 'false' or the location of a custom initialization script for the table.$$ *($$String$$, default: `$$false$$`)*
$$jdbc.table-name$$:: $$The name of the table to write into.$$ *($$String$$, default: `$$messages$$`)*
$$jdbc.batch-size$$:: $$Threshold in number of messages when data will be flushed to database table.$$ *($$Integer$$, default: `$$1$$`)*
$$jdbc.idle-timeout$$:: $$Idle timeout in milliseconds when data is automatically flushed to database table.$$ *($$Long$$, default: `$$-1$$`)*
$$spring.datasource.data$$:: $$Data (DML) script resource references.$$ *($$List<String>$$, default: `$$<none>$$`)*
$$spring.datasource.driver-class-name$$:: $$Fully qualified name of the JDBC driver. Auto-detected based on the URL by default.$$ *($$String$$, default: `$$<none>$$`)*
$$spring.datasource.initialization-mode$$:: $$Initialize the datasource using available DDL and DML scripts.$$ *($$DataSourceInitializationMode$$, default: `$$embedded$$`, possible values: `ALWAYS`,`EMBEDDED`,`NEVER`)*
Expand All @@ -52,6 +54,11 @@ So, we can insert it into the table with `name`, `city` and `street` structure u
--jdbc.columns=name,city:address.city,street:address.street
```

This sink supports batch inserts, as far as supported by the underlying JDBC driver.
Batch inserts are configured via the `batch-size` and `idle-timeout` properties:
Incoming messages are aggregated until `batch-size` messages are present, then inserted as a batch.
If `idle-timeout` milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than `batch-size`, capping maximum latency.

NOTE: The module also uses Spring Boot's http://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-sql.html#boot-features-configure-datasource[DataSource support] for configuring the database connection, so properties like `spring.datasource.url` _etc._ apply.

== Build
Expand Down
10 changes: 4 additions & 6 deletions spring-cloud-starter-stream-sink-jdbc/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-cloud-starter-stream-sink-jdbc</artifactId>
Expand All @@ -21,10 +22,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>jdbc-app-starters-common</artifactId>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
Expand Down Expand Up @@ -84,7 +81,8 @@
<jdbc-sink/>
</generatedApps>
<additionalAppProperties>
<additionalAppProperty>spring.cloud.stream.bindings.input.contentType=text/plain</additionalAppProperty>
<additionalAppProperty>spring.cloud.stream.bindings.input.contentType=text/plain
</additionalAppProperty>
</additionalAppProperties>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -36,22 +37,31 @@
import org.springframework.cloud.stream.app.jdbc.ShorthandMapConverter;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ResourceLoader;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.EvaluationException;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.SpelParseException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.SqlParameterSourceFactory;
import org.springframework.integration.json.JsonPropertyAccessor;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

Expand All @@ -63,6 +73,8 @@
* @author Eric Bottard
* @author Thomas Risberg
* @author Robert St. John
* @author Oliver Flasch
* @author Artem Bilan
*/
@EnableBinding(Sink.class)
@EnableConfigurationProperties(JdbcSinkProperties.class)
Expand All @@ -83,16 +95,36 @@ public class JdbcSinkConfiguration {
private EvaluationContext evaluationContext;

@Bean
@ServiceActivator(autoStartup = "true", inputChannel = Sink.INPUT)
@Primary
@ServiceActivator(inputChannel = Sink.INPUT)
FactoryBean<MessageHandler> aggregatorFactoryBean(MessageGroupStore messageGroupStore) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean
.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("payload.getClass().name"));
aggregatorFactoryBean.setReleaseStrategy(new MessageCountReleaseStrategy(this.properties.getBatchSize()));
if (this.properties.getIdleTimeout() >= 0) {
aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(this.properties.getIdleTimeout()));
}
aggregatorFactoryBean.setMessageStore(messageGroupStore);
aggregatorFactoryBean.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
aggregatorFactoryBean.setSendPartialResultOnExpiry(true);
aggregatorFactoryBean.setOutputChannelName("toSink");
return aggregatorFactoryBean;
}

@Bean
@ServiceActivator(inputChannel = "toSink")
public JdbcMessageHandler jdbcMessageHandler(DataSource dataSource) {
final MultiValueMap<String, Expression> columnExpressionVariations = new LinkedMultiValueMap<>();
for (Map.Entry<String, String> entry : properties.getColumnsMap().entrySet()) {
for (Map.Entry<String, String> entry : this.properties.getColumnsMap().entrySet()) {
String value = entry.getValue();
columnExpressionVariations.add(entry.getKey(), spelExpressionParser.parseExpression(value));
columnExpressionVariations.add(entry.getKey(), this.spelExpressionParser.parseExpression(value));
if (!value.startsWith("payload")) {
String qualified = "payload." + value;
try {
columnExpressionVariations.add(entry.getKey(), spelExpressionParser.parseExpression(qualified));
columnExpressionVariations.add(entry.getKey(),
this.spelExpressionParser.parseExpression(qualified));
}
catch (SpelParseException e) {
logger.info("failed to parse qualified fallback expression " + qualified +
Expand All @@ -101,9 +133,9 @@ public JdbcMessageHandler jdbcMessageHandler(DataSource dataSource) {
}
}
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
generateSql(properties.getTableName(), columnExpressionVariations.keySet()));
SqlParameterSourceFactory parameterSourceFactory = new ParameterFactory(
columnExpressionVariations, evaluationContext);
generateSql(this.properties.getTableName(), columnExpressionVariations.keySet()));
SqlParameterSourceFactory parameterSourceFactory =
new ParameterFactory(columnExpressionVariations, this.evaluationContext);
jdbcMessageHandler.setSqlParameterSourceFactory(parameterSourceFactory);
return jdbcMessageHandler;
}
Expand All @@ -117,23 +149,32 @@ public DataSourceInitializer nonBootDataSourceInitializer(DataSource dataSource,
databasePopulator.setIgnoreFailedDrops(true);
dataSourceInitializer.setDatabasePopulator(databasePopulator);
if ("true".equals(properties.getInitialize())) {
databasePopulator.addScript(new DefaultInitializationScriptResource(properties.getTableName(),
properties.getColumnsMap().keySet()));
databasePopulator.addScript(
new DefaultInitializationScriptResource(this.properties.getTableName(),
this.properties.getColumnsMap().keySet()));
}
else {
databasePopulator.addScript(resourceLoader.getResource(properties.getInitialize()));
databasePopulator.addScript(resourceLoader.getResource(this.properties.getInitialize()));
}
return dataSourceInitializer;
}

@Bean
MessageGroupStore messageGroupStore() {
SimpleMessageStore messageGroupStore = new SimpleMessageStore();
messageGroupStore.setTimeoutOnIdle(true);
messageGroupStore.setCopyOnGet(false);
return messageGroupStore;
}

@Bean
public static ShorthandMapConverter shorthandMapConverter() {
return new ShorthandMapConverter();
}

@PostConstruct
public void afterPropertiesSet() {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(beanFactory);
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
}

private String generateSql(String tableName, Set<String> columns) {
Expand Down Expand Up @@ -172,8 +213,9 @@ public SqlParameterSource createParameterSource(Object o) {
}
Message<?> message = (Message<?>) o;
MapSqlParameterSource parameterSource = new MapSqlParameterSource();
for (String key : columnExpressions.keySet()) {
List<Expression> spels = columnExpressions.get(key);
for (Map.Entry<String, List<Expression>> entry : this.columnExpressions.entrySet()) {
String key = entry.getKey();
List<Expression> spels = entry.getValue();
Object value = NOT_SET;
EvaluationException lastException = null;
for (Expression spel : spels) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,12 +22,12 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.app.jdbc.ShorthandMapConverter;


/**
* Holds configuration properties for the Jdbc Sink module.
*
* @author Eric Bottard
* @author Artem Bilan
* @author Oliver Flasch
*/
@ConfigurationProperties("jdbc")
public class JdbcSinkProperties {
Expand All @@ -51,6 +51,16 @@ public class JdbcSinkProperties {
*/
private String initialize = "false";

/**
* Threshold in number of messages when data will be flushed to database table.
*/
private int batchSize = 1;

/**
* Idle timeout in milliseconds when data is automatically flushed to database table.
*/
private long idleTimeout = -1L;

private Map<String, String> columnsMap;

public String getTableName() {
Expand All @@ -77,6 +87,22 @@ public void setInitialize(String initialize) {
this.initialize = initialize;
}

public int getBatchSize() {
return this.batchSize;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public long getIdleTimeout() {
return this.idleTimeout;
}

public void setIdleTimeout(long idleTimeout) {
this.idleTimeout = idleTimeout;
}

Map<String, String> getColumnsMap() {
if (this.columnsMap == null) {
this.columnsMap = this.shorthandMapConverter.convert(this.columns);
Expand Down
Loading

0 comments on commit 6b63922

Please sign in to comment.