diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/Jdbc.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/Jdbc.java new file mode 100644 index 00000000000..ca7e716355c --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/Jdbc.java @@ -0,0 +1,146 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import javax.sql.DataSource; + +import org.springframework.integration.jdbc.StoredProcExecutor; +import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.jdbc.core.JdbcTemplate; + +/** + * Factory class for JDBC components. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public final class Jdbc { + + /** + * The factory to produce a {@link JdbcInboundChannelAdapterSpec}. + * @param dataSource the {@link DataSource} to build on + * @param selectQuery the select query to build on + * @return the {@link JdbcInboundChannelAdapterSpec} instance + */ + public static JdbcInboundChannelAdapterSpec inboundAdapter(DataSource dataSource, String selectQuery) { + return inboundAdapter(new JdbcTemplate(dataSource), selectQuery); + } + + /** + * The factory to produce a {@link JdbcInboundChannelAdapterSpec}. + * @param jdbcOperations the {@link JdbcOperations} to build on + * @param selectQuery the select query to build on + * @return the {@link JdbcInboundChannelAdapterSpec} instance + */ + public static JdbcInboundChannelAdapterSpec inboundAdapter(JdbcOperations jdbcOperations, String selectQuery) { + return new JdbcInboundChannelAdapterSpec(jdbcOperations, selectQuery); + } + + /** + * The factory to produce a {@link JdbcOutboundChannelAdapterSpec}. + * @param dataSource the {@link DataSource} to build on + * @param updateQuery the update query to build on + * @return the {@link JdbcOutboundChannelAdapterSpec} instance + */ + public static JdbcOutboundChannelAdapterSpec outboundAdapter(DataSource dataSource, String updateQuery) { + return outboundAdapter(new JdbcTemplate(dataSource), updateQuery); + } + + /** + * The factory to produce a {@link JdbcOutboundChannelAdapterSpec}. + * @param jdbcOperations the {@link JdbcOperations} to build on + * @param updateQuery the update query to build on + * @return the {@link JdbcOutboundChannelAdapterSpec} instance + */ + public static JdbcOutboundChannelAdapterSpec outboundAdapter(JdbcOperations jdbcOperations, String updateQuery) { + return new JdbcOutboundChannelAdapterSpec(jdbcOperations, updateQuery); + } + + /** + * The factory to produce a {@link JdbcOutboundGatewaySpec}. + * @param dataSource the {@link DataSource} to build on + * @param updateQuery the update query to build on + * @return the {@link JdbcOutboundGatewaySpec} instance + */ + public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery) { + return outboundGateway(new JdbcTemplate(dataSource), updateQuery); + } + + /** + * The factory to produce a {@link JdbcOutboundGatewaySpec}. + * @param dataSource the {@link DataSource} to build on + * @param updateQuery the update query to build on + * @param selectQuery the select query to build on + * @return the {@link JdbcOutboundGatewaySpec} instance + */ + public static JdbcOutboundGatewaySpec outboundGateway(DataSource dataSource, String updateQuery, String selectQuery) { + return outboundGateway(new JdbcTemplate(dataSource), updateQuery, selectQuery); + } + + /** + * The factory to produce a {@link JdbcOutboundGatewaySpec}. + * @param jdbcOperations the {@link JdbcOperations} to build on + * @param updateQuery the update query to build on + * @return the {@link JdbcOutboundGatewaySpec} instance + */ + public static JdbcOutboundGatewaySpec outboundGateway(JdbcOperations jdbcOperations, String updateQuery) { + return outboundGateway(jdbcOperations, updateQuery, null); + } + + /** + * The factory to produce a {@link JdbcOutboundGatewaySpec}. + * @param jdbcOperations the {@link JdbcOperations} to build on + * @param updateQuery the update query to build on + * @param selectQuery the select query to build on + * @return the {@link JdbcOutboundGatewaySpec} instance + */ + public static JdbcOutboundGatewaySpec outboundGateway(JdbcOperations jdbcOperations, String updateQuery, String selectQuery) { + return new JdbcOutboundGatewaySpec(jdbcOperations, updateQuery, selectQuery); + } + + /** + * The factory to produce a {@link JdbcStoredProcInboundChannelAdapterSpec}. + * @param dataSource the {@link DataSource} to build on + * @return the {@link JdbcStoredProcInboundChannelAdapterSpec} instance + */ + public static JdbcStoredProcInboundChannelAdapterSpec storedProcInboundAdapter(DataSource dataSource) { + return new JdbcStoredProcInboundChannelAdapterSpec(new StoredProcExecutor(dataSource)); + } + + /** + * The factory to produce a {@link JdbcStoredProcOutboundChannelAdapterSpec}. + * @param dataSource the {@link DataSource} to build on + * @return the {@link JdbcStoredProcOutboundChannelAdapterSpec} instance + */ + public static JdbcStoredProcOutboundChannelAdapterSpec storedProcOutboundAdapter(DataSource dataSource) { + return new JdbcStoredProcOutboundChannelAdapterSpec(new StoredProcExecutor(dataSource)); + } + + /** + * The factory to produce a {@link JdbcStoredProcOutboundGatewaySpec}. + * @param dataSource the {@link DataSource} to build on + * @return the {@link JdbcStoredProcOutboundGatewaySpec} instance + */ + public static JdbcStoredProcOutboundGatewaySpec storedProcOutboundGateway(DataSource dataSource) { + return new JdbcStoredProcOutboundGatewaySpec(new StoredProcExecutor(dataSource)); + } + + private Jdbc() { + } + +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcInboundChannelAdapterSpec.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcInboundChannelAdapterSpec.java new file mode 100644 index 00000000000..611f5a95b25 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcInboundChannelAdapterSpec.java @@ -0,0 +1,99 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import org.springframework.integration.dsl.MessageSourceSpec; +import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; +import org.springframework.integration.jdbc.SqlParameterSourceFactory; +import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.jdbc.core.namedparam.SqlParameterSource; + +/** + * A {@link MessageSourceSpec} for a {@link JdbcInboundChannelAdapterSpec}. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class JdbcInboundChannelAdapterSpec extends MessageSourceSpec { + + protected JdbcInboundChannelAdapterSpec(JdbcOperations jdbcOperations, String selectQuery) { + this.target = new JdbcPollingChannelAdapter(jdbcOperations, selectQuery); + } + + /** + * @param rowMapper the rowMapper + * @return the spec + * @see JdbcPollingChannelAdapter#setRowMapper(RowMapper) + */ + public JdbcInboundChannelAdapterSpec rowMapper(RowMapper rowMapper) { + this.target.setRowMapper(rowMapper); + return _this(); + } + + /** + * @param updateSql the updateSql + * @return the spec + * @see JdbcPollingChannelAdapter#setUpdateSql(String) + */ + public JdbcInboundChannelAdapterSpec updateSql(String updateSql) { + this.target.setUpdateSql(updateSql); + return _this(); + } + + /** + * @param updatePerRow the updatePerRow + * @return the spec + * @see JdbcPollingChannelAdapter#setUpdatePerRow(boolean) + */ + public JdbcInboundChannelAdapterSpec updatePerRow(boolean updatePerRow) { + this.target.setUpdatePerRow(updatePerRow); + return _this(); + } + + /** + * @param sqlParameterSourceFactory the sqlParameterSourceFactory + * @return the spec + * @see JdbcPollingChannelAdapter#setUpdateSqlParameterSourceFactory(SqlParameterSourceFactory) + */ + public JdbcInboundChannelAdapterSpec updateSqlParameterSourceFactory(SqlParameterSourceFactory sqlParameterSourceFactory) { + this.target.setUpdateSqlParameterSourceFactory(sqlParameterSourceFactory); + return _this(); + } + + /** + * @param sqlQueryParameterSource the sqlQueryParameterSource + * @return the spec + * @see JdbcPollingChannelAdapter#setSelectSqlParameterSource(SqlParameterSource) + */ + public JdbcInboundChannelAdapterSpec selectSqlParameterSource(SqlParameterSource sqlQueryParameterSource) { + this.target.setSelectSqlParameterSource(sqlQueryParameterSource); + return _this(); + } + + /** + * @param maxRows the maxRows + * @return the spec + * @see JdbcPollingChannelAdapter#setMaxRows(int) + */ + public JdbcInboundChannelAdapterSpec maxRows(int maxRows) { + this.target.setMaxRows(maxRows); + return _this(); + } + +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcOutboundChannelAdapterSpec.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..547bb1026db --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcOutboundChannelAdapterSpec.java @@ -0,0 +1,78 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.jdbc.JdbcMessageHandler; +import org.springframework.integration.jdbc.MessagePreparedStatementSetter; +import org.springframework.integration.jdbc.SqlParameterSourceFactory; +import org.springframework.jdbc.core.JdbcOperations; + +/** + * A {@link MessageHandlerSpec} for a {@link JdbcOutboundChannelAdapterSpec}. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class JdbcOutboundChannelAdapterSpec extends MessageHandlerSpec { + + protected JdbcOutboundChannelAdapterSpec(JdbcOperations jdbcOperations, String updateQuery) { + this.target = new JdbcMessageHandler(jdbcOperations, updateQuery); + } + + /** + * @param keysGenerated the keysGenerated + * @return the spec + * @see JdbcMessageHandler#setKeysGenerated(boolean) + */ + public JdbcOutboundChannelAdapterSpec keysGenerated(boolean keysGenerated) { + this.target.setKeysGenerated(keysGenerated); + return _this(); + } + + /** + * @param sqlParameterSourceFactory the sqlParameterSourceFactory + * @return the spec + * @see JdbcMessageHandler#setSqlParameterSourceFactory(SqlParameterSourceFactory) + */ + public JdbcOutboundChannelAdapterSpec sqlParameterSourceFactory(SqlParameterSourceFactory sqlParameterSourceFactory) { + this.target.setSqlParameterSourceFactory(sqlParameterSourceFactory); + return _this(); + } + + /** + * @param usePayloadAsParameterSource the usePayloadAsParameterSource + * @return the spec + * @see JdbcMessageHandler#setUsePayloadAsParameterSource(boolean) + */ + public JdbcOutboundChannelAdapterSpec usePayloadAsParameterSource(boolean usePayloadAsParameterSource) { + this.target.setUsePayloadAsParameterSource(usePayloadAsParameterSource); + return _this(); + } + + /** + * @param preparedStatementSetter the preparedStatementSetter + * @return the spec + * @see JdbcMessageHandler#setPreparedStatementSetter(MessagePreparedStatementSetter) + */ + public JdbcOutboundChannelAdapterSpec preparedStatementSetter(MessagePreparedStatementSetter preparedStatementSetter) { + this.target.setPreparedStatementSetter(preparedStatementSetter); + return _this(); + } + +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcOutboundGatewaySpec.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcOutboundGatewaySpec.java new file mode 100644 index 00000000000..385f606cbb5 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcOutboundGatewaySpec.java @@ -0,0 +1,99 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.jdbc.JdbcOutboundGateway; +import org.springframework.integration.jdbc.MessagePreparedStatementSetter; +import org.springframework.integration.jdbc.SqlParameterSourceFactory; +import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.jdbc.core.RowMapper; + +/** + * A {@link MessageHandlerSpec} for a {@link JdbcOutboundGatewaySpec}. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class JdbcOutboundGatewaySpec extends MessageHandlerSpec { + + protected JdbcOutboundGatewaySpec(JdbcOperations jdbcOperations, String updateQuery, String selectQuery) { + this.target = new JdbcOutboundGateway(jdbcOperations, updateQuery, selectQuery); + } + + /** + * @param maxRows the maxRows + * @return the spec + * @see JdbcOutboundGateway#setMaxRows(Integer) + */ + public JdbcOutboundGatewaySpec maxRows(Integer maxRows) { + this.target.setMaxRows(maxRows); + return _this(); + } + + /** + * @param keysGenerated the keysGenerated + * @return the spec + * @see JdbcOutboundGateway#setKeysGenerated(boolean) + */ + public JdbcOutboundGatewaySpec keysGenerated(boolean keysGenerated) { + this.target.setKeysGenerated(keysGenerated); + return _this(); + } + + /** + * @param sqlParameterSourceFactory the sqlParameterSourceFactory + * @return the spec + * @see JdbcOutboundGateway#setRequestSqlParameterSourceFactory(SqlParameterSourceFactory) + */ + public JdbcOutboundGatewaySpec requestSqlParameterSourceFactory(SqlParameterSourceFactory sqlParameterSourceFactory) { + this.target.setRequestSqlParameterSourceFactory(sqlParameterSourceFactory); + return _this(); + } + + /** + * @param preparedStatementSetter the preparedStatementSetter + * @return the spec + * @see JdbcOutboundGateway#setRequestPreparedStatementSetter(MessagePreparedStatementSetter) + */ + public JdbcOutboundGatewaySpec requestPreparedStatementSetter(MessagePreparedStatementSetter preparedStatementSetter) { + this.target.setRequestPreparedStatementSetter(preparedStatementSetter); + return _this(); + } + + /** + * @param sqlParameterSourceFactory the sqlQueryParameterSource + * @return the spec + * @see JdbcOutboundGateway#setReplySqlParameterSourceFactory(SqlParameterSourceFactory) + */ + public JdbcOutboundGatewaySpec replySqlParameterSourceFactory(SqlParameterSourceFactory sqlParameterSourceFactory) { + this.target.setReplySqlParameterSourceFactory(sqlParameterSourceFactory); + return _this(); + } + + /** + * @param rowMapper the rowMapper + * @return the spec + * @see JdbcOutboundGateway#setRowMapper(RowMapper) + */ + public JdbcOutboundGatewaySpec rowMapper(RowMapper rowMapper) { + this.target.setRowMapper(rowMapper); + return _this(); + } + +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcInboundChannelAdapterSpec.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcInboundChannelAdapterSpec.java new file mode 100644 index 00000000000..30a6a308076 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcInboundChannelAdapterSpec.java @@ -0,0 +1,75 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessageSourceSpec; +import org.springframework.integration.jdbc.StoredProcExecutor; +import org.springframework.integration.jdbc.StoredProcPollingChannelAdapter; +import org.springframework.util.Assert; + +/** + * A {@link MessageSourceSpec} for a {@link JdbcStoredProcInboundChannelAdapterSpec}. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class JdbcStoredProcInboundChannelAdapterSpec + extends MessageSourceSpec + implements ComponentsRegistration { + + private final StoredProcExecutor storedProcExecutor; + + private final StoredProcExecutorConfigurer storedProcExecutorConfigurer; + + protected JdbcStoredProcInboundChannelAdapterSpec(StoredProcExecutor storedProcExecutor) { + this.storedProcExecutor = storedProcExecutor; + this.storedProcExecutorConfigurer = new StoredProcExecutorConfigurer(this.storedProcExecutor); + this.target = new StoredProcPollingChannelAdapter(this.storedProcExecutor); + } + + /** + * Configure the storedProcExecutor through storedProcExecutorConfigurer by invoking the {@link Consumer} callback + * @param configurer the configurer. + * @return the spec + */ + public JdbcStoredProcInboundChannelAdapterSpec configurerStoredProcExecutor(Consumer configurer) { + Assert.notNull(configurer, "'configurer' must not be null"); + configurer.accept(this.storedProcExecutorConfigurer); + return _this(); + } + + /** + * @param expectSingleResult the expectSingleResult + * @return the spec + * @see StoredProcPollingChannelAdapter#setExpectSingleResult(boolean) + */ + public JdbcStoredProcInboundChannelAdapterSpec expectSingleResult(boolean expectSingleResult) { + this.target.setExpectSingleResult(expectSingleResult); + return _this(); + } + + @Override + public Map getComponentsToRegister() { + return Collections.singletonMap(this.storedProcExecutor, null); + } +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcOutboundChannelAdapterSpec.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcOutboundChannelAdapterSpec.java new file mode 100644 index 00000000000..d00bce59a02 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcOutboundChannelAdapterSpec.java @@ -0,0 +1,65 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.jdbc.StoredProcExecutor; +import org.springframework.integration.jdbc.StoredProcMessageHandler; +import org.springframework.util.Assert; + +/** + * A {@link MessageHandlerSpec} for a {@link JdbcStoredProcOutboundChannelAdapterSpec}. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class JdbcStoredProcOutboundChannelAdapterSpec + extends MessageHandlerSpec + implements ComponentsRegistration { + + private final StoredProcExecutor storedProcExecutor; + + private final StoredProcExecutorConfigurer storedProcExecutorConfigurer; + + protected JdbcStoredProcOutboundChannelAdapterSpec(StoredProcExecutor storedProcExecutor) { + this.storedProcExecutor = storedProcExecutor; + this.storedProcExecutorConfigurer = new StoredProcExecutorConfigurer(this.storedProcExecutor); + this.target = new StoredProcMessageHandler(this.storedProcExecutor); + } + + /** + * Configure the storedProcExecutor through storedProcExecutorConfigurer by invoking the {@link Consumer} callback + * @param configurer the configurer. + * @return the spec + */ + public JdbcStoredProcOutboundChannelAdapterSpec configurerStoredProcExecutor(Consumer configurer) { + Assert.notNull(configurer, "'configurer' must not be null"); + configurer.accept(this.storedProcExecutorConfigurer); + return _this(); + } + + @Override + public Map getComponentsToRegister() { + return Collections.singletonMap(this.storedProcExecutor, null); + } +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcOutboundGatewaySpec.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcOutboundGatewaySpec.java new file mode 100644 index 00000000000..dd7e97d9002 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/JdbcStoredProcOutboundGatewaySpec.java @@ -0,0 +1,85 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; + +import org.springframework.integration.dsl.ComponentsRegistration; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.jdbc.StoredProcExecutor; +import org.springframework.integration.jdbc.StoredProcOutboundGateway; +import org.springframework.util.Assert; + +/** + * A {@link MessageHandlerSpec} for a {@link JdbcStoredProcOutboundGatewaySpec}. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class JdbcStoredProcOutboundGatewaySpec + extends MessageHandlerSpec + implements ComponentsRegistration { + + private final StoredProcExecutor storedProcExecutor; + + private final StoredProcExecutorConfigurer storedProcExecutorConfigurer; + + protected JdbcStoredProcOutboundGatewaySpec(StoredProcExecutor storedProcExecutor) { + this.storedProcExecutor = storedProcExecutor; + this.storedProcExecutorConfigurer = new StoredProcExecutorConfigurer(this.storedProcExecutor); + this.target = new StoredProcOutboundGateway(this.storedProcExecutor); + } + + /** + * Configure the storedProcExecutor through storedProcExecutorConfigurer by invoking the {@link Consumer} callback + * @param configurer the configurer. + * @return the spec + */ + public JdbcStoredProcOutboundGatewaySpec configurerStoredProcExecutor(Consumer configurer) { + Assert.notNull(configurer, "'configurer' must not be null"); + configurer.accept(this.storedProcExecutorConfigurer); + return _this(); + } + + /** + * @param requiresReply the requiresReply + * @return the spec + * @see StoredProcOutboundGateway#setRequiresReply(boolean) + */ + public JdbcStoredProcOutboundGatewaySpec requiresReply(boolean requiresReply) { + this.target.setRequiresReply(requiresReply); + return _this(); + } + + /** + * @param expectSingleResult the expectSingleResult + * @return the spec + * @see StoredProcOutboundGateway#setExpectSingleResult(boolean) + */ + public JdbcStoredProcOutboundGatewaySpec expectSingleResult(boolean expectSingleResult) { + this.target.setExpectSingleResult(expectSingleResult); + return _this(); + } + + @Override + public Map getComponentsToRegister() { + return Collections.singletonMap(this.storedProcExecutor, null); + } +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/StoredProcExecutorConfigurer.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/StoredProcExecutorConfigurer.java new file mode 100644 index 00000000000..509419899a4 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/dsl/StoredProcExecutorConfigurer.java @@ -0,0 +1,163 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import java.util.List; +import java.util.Map; + +import org.springframework.expression.Expression; +import org.springframework.integration.jdbc.SqlParameterSourceFactory; +import org.springframework.integration.jdbc.StoredProcExecutor; +import org.springframework.integration.jdbc.storedproc.ProcedureParameter; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.jdbc.core.SqlParameter; + +/** + * A {@link StoredProcExecutor} configurer. + * + * @author Jiandong Ma + * + * @since 7.0 + */ +public class StoredProcExecutorConfigurer { + + private final StoredProcExecutor storedProcExecutor; + + protected StoredProcExecutorConfigurer(StoredProcExecutor storedProcExecutor) { + this.storedProcExecutor = storedProcExecutor; + } + + /** + * @param ignoreColumnMetaData the ignoreColumnMetaData + * @return the storedProcExecutor + * @see StoredProcExecutor#setIgnoreColumnMetaData(boolean) + */ + public StoredProcExecutorConfigurer ignoreColumnMetaData(boolean ignoreColumnMetaData) { + this.storedProcExecutor.setIgnoreColumnMetaData(ignoreColumnMetaData); + return this; + } + + /** + * @param procedureParameters the procedureParameters + * @return the storedProcExecutor + * @see StoredProcExecutor#setProcedureParameters(List) + */ + public StoredProcExecutorConfigurer procedureParameters(List procedureParameters) { + this.storedProcExecutor.setProcedureParameters(procedureParameters); + return this; + } + + /** + * @param sqlParameters the sqlParameters + * @return the storedProcExecutor + * @see StoredProcExecutor#setSqlParameters(List) + */ + public StoredProcExecutorConfigurer sqlParameters(List sqlParameters) { + this.storedProcExecutor.setSqlParameters(sqlParameters); + return this; + } + + /** + * @param sqlParameterSourceFactory the sqlParameterSourceFactory + * @return the storedProcExecutor + * @see StoredProcExecutor#setSqlParameterSourceFactory(SqlParameterSourceFactory) + */ + public StoredProcExecutorConfigurer sqlParameterSourceFactory(SqlParameterSourceFactory sqlParameterSourceFactory) { + this.storedProcExecutor.setSqlParameterSourceFactory(sqlParameterSourceFactory); + return this; + } + + /** + * @param storedProcedureName the storedProcedureName + * @return the storedProcExecutor + * @see StoredProcExecutor#setStoredProcedureName(String) + */ + public StoredProcExecutorConfigurer storedProcedureName(String storedProcedureName) { + this.storedProcExecutor.setStoredProcedureName(storedProcedureName); + return this; + } + + /** + * @param storedProcedureNameExpression the storedProcedureNameExpression + * @return the storedProcExecutor + * @see StoredProcExecutor#setStoredProcedureNameExpression(Expression) + */ + public StoredProcExecutorConfigurer storedProcedureNameExpression(Expression storedProcedureNameExpression) { + this.storedProcExecutor.setStoredProcedureNameExpression(storedProcedureNameExpression); + return this; + } + + /** + * @param usePayloadAsParameterSource the usePayloadAsParameterSource + * @return the storedProcExecutor + * @see StoredProcExecutor#setUsePayloadAsParameterSource(boolean) + */ + public StoredProcExecutorConfigurer usePayloadAsParameterSource(boolean usePayloadAsParameterSource) { + this.storedProcExecutor.setUsePayloadAsParameterSource(usePayloadAsParameterSource); + return this; + } + + /** + * @param isFunction the isFunction + * @return the storedProcExecutor + * @see StoredProcExecutor#setIsFunction(boolean) + */ + public StoredProcExecutorConfigurer isFunction(boolean isFunction) { + this.storedProcExecutor.setIsFunction(isFunction); + return this; + } + + /** + * @param returnValueRequired the returnValueRequired + * @return the storedProcExecutor + * @see StoredProcExecutor#setReturnValueRequired(boolean) + */ + public StoredProcExecutorConfigurer returnValueRequired(boolean returnValueRequired) { + this.storedProcExecutor.setReturnValueRequired(returnValueRequired); + return this; + } + + /** + * @param skipUndeclaredResults the skipUndeclaredResults + * @return the storedProcExecutor + * @see StoredProcExecutor#setSkipUndeclaredResults(boolean) + */ + public StoredProcExecutorConfigurer skipUndeclaredResults(boolean skipUndeclaredResults) { + this.storedProcExecutor.setSkipUndeclaredResults(skipUndeclaredResults); + return this; + } + + /** + * @param returningResultSetRowMappers the returningResultSetRowMappers + * @return the storedProcExecutor + * @see StoredProcExecutor#setReturningResultSetRowMappers(Map) + */ + public StoredProcExecutorConfigurer returningResultSetRowMappers(Map> returningResultSetRowMappers) { + this.storedProcExecutor.setReturningResultSetRowMappers(returningResultSetRowMappers); + return this; + } + + /** + * @param jdbcCallOperationsCacheSize the jdbcCallOperationsCacheSize + * @return the storedProcExecutor + * @see StoredProcExecutor#setJdbcCallOperationsCacheSize(int) + */ + public StoredProcExecutorConfigurer jdbcCallOperationsCacheSize(int jdbcCallOperationsCacheSize) { + this.storedProcExecutor.setJdbcCallOperationsCacheSize(jdbcCallOperationsCacheSize); + return this; + } +} diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/dsl/JdbcTest.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/dsl/JdbcTest.java new file mode 100644 index 00000000000..2e2eee6fcde --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/dsl/JdbcTest.java @@ -0,0 +1,377 @@ +/* + * Copyright 2016-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.dsl; + +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; +import java.util.Map; + +import javax.sql.DataSource; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.jdbc.BeanPropertySqlParameterSourceFactory; +import org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory; +import org.springframework.integration.jdbc.config.JdbcTypesEnum; +import org.springframework.integration.jdbc.storedproc.ClobSqlReturnType; +import org.springframework.integration.jdbc.storedproc.PrimeMapper; +import org.springframework.integration.jdbc.storedproc.ProcedureParameter; +import org.springframework.integration.jdbc.storedproc.User; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.json.Jackson2JsonMessageParser; +import org.springframework.integration.support.json.JsonInboundMessageMapper; +import org.springframework.integration.support.json.JsonOutboundMessageMapper; +import org.springframework.integration.test.util.OnlyOnceTrigger; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.jdbc.core.SqlOutParameter; +import org.springframework.jdbc.core.SqlParameter; +import org.springframework.jdbc.core.SqlReturnType; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.Transactional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * @author Jiandong Ma + * + * @since 7.0 + */ +@SpringJUnitConfig +@DirtiesContext +class JdbcTest { + + @Autowired + private JdbcTemplate h2JdbcTemplate; + + @Autowired + private JdbcTemplate derbyJdbcTemplate; + + @Autowired + private QueueChannel inboundFlowPollerChannel; + + @Autowired + @Qualifier("outboundFlow.input") + private MessageChannel outboundFlowInputChannel; + + @Autowired + @Qualifier("outboundGateway.input") + private MessageChannel outboundGatewayInputChannel; + + @Autowired + private QueueChannel outboundGatewayReplyChannel; + + @Autowired + @Qualifier("outboundGatewayNoSelectQuery.input") + private MessageChannel outboundGatewayNoSelectQueryInputChannel; + + @Autowired + private QueueChannel outboundGatewayNoSelectQueryReplyChannel; + + @Autowired + private QueueChannel storedProcInboundPollerChannel; + + @Autowired + @Qualifier("storedProcOutboundAdapter.input") + private MessageChannel storedProcOutboundFlowInputChannel; + + @Autowired + @Qualifier("storedProcOutboundGateway.input") + private MessageChannel storedProcOutboundGatewayInputChannel; + + @Autowired + private QueueChannel storedProcOutboundGatewayReplyChannel; + + @Autowired + private SqlReturnType clobSqlReturnType; + + @Test + void testInboundFlow() { + Message message = this.inboundFlowPollerChannel.receive(10_000); + List rows = (List) message.getPayload(); + assertThat(rows.size()).isEqualTo(2); + assertThat(rows.get(0) instanceof Inbound).isTrue(); + Inbound item = (Inbound) rows.get(0); + assertThat(item.id()).isEqualTo(1); + assertThat(item.status()).isEqualTo(2); + + Integer countOfStatusTwo = h2JdbcTemplate.queryForObject("select count(*) from inbound where status = 2", Integer.class); + assertThat(countOfStatusTwo).isEqualTo(0); + + Integer countOfStatusTen = h2JdbcTemplate.queryForObject("select count(*) from inbound where status = 10", Integer.class); + assertThat(countOfStatusTen).isEqualTo(2); + } + + record Inbound(int id, int status) { + } + + @Test + void testOutboundFlow() { + outboundFlowInputChannel.send(new GenericMessage<>("foo")); + Map map = h2JdbcTemplate.queryForMap("select * from outbound where id=?", 1); + assertThat(map.get("name")).isEqualTo("foo"); + } + + @Test + void testOutboundGateway() { + outboundGatewayInputChannel.send(new GenericMessage<>(10)); + Message message = outboundGatewayReplyChannel.receive(10_000); + assertThat(message).isNotNull(); + List payload = (List) message.getPayload(); + assertThat(payload).hasSize(1); + Object item = payload.get(0); + assertThat(item).isInstanceOf(Map.class); + assertThat(((Map) item).get("status")).isEqualTo(10); + } + + @Test + void testOutboundGatewayNoSelectQuery() { + outboundGatewayNoSelectQueryInputChannel.send(new GenericMessage<>(10)); + Message message = outboundGatewayNoSelectQueryReplyChannel.receive(10_000); + assertThat(message).isNotNull(); + Object payload = message.getPayload(); + assertThat(payload).isInstanceOf(Map.class); + assertThat(((Map) payload).get("UPDATED")).isEqualTo(1); + } + + @Test + void testStoredProcInboundFlow() { + Message message = this.storedProcInboundPollerChannel.receive(10_000); + assertThat(message).isNotNull(); + Object payload = message.getPayload(); + assertThat(payload).isNotNull(); + assertThat(payload).isInstanceOf(List.class); + + List primeNumbers = (List) payload; + + assertThat(primeNumbers.size() == 4).isTrue(); + } + + @Test + void testStoredProcOutboundFlow() { + storedProcOutboundFlowInputChannel.send(MessageBuilder.withPayload(new User("username", "password", "email")).build()); + Map map = this.derbyJdbcTemplate.queryForMap("SELECT * FROM USERS WHERE USERNAME=?", "username"); + assertThat(map.get("USERNAME")).as("Wrong username").isEqualTo("username"); + assertThat(map.get("PASSWORD")).as("Wrong password").isEqualTo("password"); + assertThat(map.get("EMAIL")).as("Wrong email").isEqualTo("email"); + } + + @Test + @Transactional(transactionManager = "derbyTransactionManager") + void testStoredProcOutboundGateway() throws SQLException { + Mockito.reset(this.clobSqlReturnType); + Message testMessage = MessageBuilder.withPayload("TEST").setHeader("FOO", "BAR").build(); + String messageId = testMessage.getHeaders().getId().toString(); + String jsonMessage = new JsonOutboundMessageMapper().fromMessage(testMessage); + this.derbyJdbcTemplate.update("INSERT INTO json_message VALUES (?,?)", messageId, jsonMessage); + + this.storedProcOutboundGatewayInputChannel.send(new GenericMessage<>(messageId)); + Message resultMessage = this.storedProcOutboundGatewayReplyChannel.receive(10_000); + + assertThat(resultMessage).isNotNull(); + Object resultPayload = resultMessage.getPayload(); + assertThat(resultPayload instanceof String).isTrue(); + Message message = new JsonInboundMessageMapper(String.class, new Jackson2JsonMessageParser()) + .toMessage((String) resultPayload); + assertThat(message.getPayload()).isEqualTo(testMessage.getPayload()); + assertThat(message.getHeaders().get("FOO")).isEqualTo(testMessage.getHeaders().get("FOO")); + Mockito.verify(clobSqlReturnType).getTypeValue(Mockito.any(CallableStatement.class), + Mockito.eq(2), Mockito.eq(JdbcTypesEnum.CLOB.getCode()), Mockito.eq(null)); + + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public IntegrationFlow inboundFlow(DataSource h2DataSource) { + var sqlParameterSourceFactory = new ExpressionEvaluatingSqlParameterSourceFactory(); + sqlParameterSourceFactory.setBeanFactory(mock()); + return IntegrationFlow.from(Jdbc.inboundAdapter(h2DataSource, "select * from inbound") + .maxRows(2) + .rowMapper((RowMapper) (rs, rowNum) -> new Inbound(rs.getInt(1), rs.getInt(2))) + .updateSql("update inbound set status = 10 where id in (:id)") + .updatePerRow(false) + .updateSqlParameterSourceFactory(sqlParameterSourceFactory) + .selectSqlParameterSource(null), + e -> e.poller(p -> p.trigger(new OnlyOnceTrigger()))) + .channel(c -> c.queue("inboundFlowPollerChannel")) + .get(); + } + + @Bean + public IntegrationFlow outboundFlow(DataSource h2DataSource) { + return flow -> flow + .handle(Jdbc.outboundAdapter(h2DataSource, "insert into outbound (id, status, name) values (1, 0, ?)") + .preparedStatementSetter((ps, requestMessage) -> { + ps.setObject(1, requestMessage.getPayload()); + }) + .usePayloadAsParameterSource(false) + .sqlParameterSourceFactory(null) + .keysGenerated(false) + ); + } + + @Bean + public IntegrationFlow outboundGateway(DataSource h2DataSource) { + return flow -> flow + .handle(Jdbc.outboundGateway(h2DataSource, + "update outbound_gateway set status = :payload where id = 1", + "select * from outbound_gateway where id = 1") + .keysGenerated(false) + .requestSqlParameterSourceFactory(new BeanPropertySqlParameterSourceFactory()) + .requestPreparedStatementSetter(null) + .replySqlParameterSourceFactory(new ExpressionEvaluatingSqlParameterSourceFactory()) + .rowMapper(null) + .maxRows(0) + ) + .channel(c -> c.queue("outboundGatewayReplyChannel")); + } + + @Bean + public IntegrationFlow outboundGatewayNoSelectQuery(DataSource h2DataSource) { + return flow -> flow + .handle(Jdbc.outboundGateway(h2DataSource, + "update outbound_gateway set status = :payload where id = 2") + ) + .channel(c -> c.queue("outboundGatewayNoSelectQueryReplyChannel")); + } + + @Bean + public IntegrationFlow storedProcInboundFlow(DataSource h2DataSource) { + return IntegrationFlow.from(Jdbc.storedProcInboundAdapter(h2DataSource) + .expectSingleResult(true) + .configurerStoredProcExecutor(configurer -> configurer + .ignoreColumnMetaData(true) + .isFunction(false) + .storedProcedureName("GET_PRIME_NUMBERS") + .procedureParameters(List.of( + new ProcedureParameter("beginRange", 1, null), + new ProcedureParameter("endRange", 10, null) + )) + .sqlParameters(List.of( + new SqlParameter("beginRange", Types.INTEGER), + new SqlParameter("endRange", Types.INTEGER) + )) + .returningResultSetRowMappers(Map.of("out", new PrimeMapper())) + ), + e -> e.poller(p -> p.trigger(new OnlyOnceTrigger()))) + .channel(c -> c.queue("storedProcInboundPollerChannel")) + .get(); + } + + @Bean + public IntegrationFlow storedProcOutboundAdapter(DataSource derbyDataSource) { + return flow -> flow + .handle(Jdbc.storedProcOutboundAdapter(derbyDataSource) + .configurerStoredProcExecutor(configurer -> configurer + .storedProcedureName("CREATE_USER") + .sqlParameterSourceFactory(new BeanPropertySqlParameterSourceFactory()) + .usePayloadAsParameterSource(true) + ) + ); + } + + @Bean + public IntegrationFlow storedProcOutboundGateway(DataSource derbyDataSource) throws Exception { + + return flow -> flow + .handle(Jdbc.storedProcOutboundGateway(derbyDataSource) + .requiresReply(true) + .expectSingleResult(true) + .configurerStoredProcExecutor(configurer -> configurer + .storedProcedureNameExpression(new ValueExpression<>("GET_MESSAGE")) + .ignoreColumnMetaData(false) + .isFunction(false) + .procedureParameters(List.of( + new ProcedureParameter("message_id", null, "payload") + )) + .sqlParameters(List.of( + new SqlParameter("message_id", Types.VARCHAR), + new SqlOutParameter("message_json", Types.CLOB, null, clobSqlReturnType()) + )) + .returnValueRequired(false) + .skipUndeclaredResults(true) + .jdbcCallOperationsCacheSize(10) + )) + .channel(c -> c.queue("storedProcOutboundGatewayReplyChannel")); + } + + @Bean + public ClobSqlReturnType clobSqlReturnType() { + return Mockito.spy(new ClobSqlReturnType()); + } + + @Bean + public DataSource h2DataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.H2) + .addScripts("classpath:dsl-h2.sql", "classpath:h2-stored-procedures.sql") + .build(); + } + + @Bean + public DataSource derbyDataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.DERBY) + .addScripts("classpath:derby-stored-procedures.sql") + .build(); + } + + @Bean + public JdbcTemplate h2JdbcTemplate(DataSource h2DataSource) { + return new JdbcTemplate(h2DataSource); + } + + @Bean + public JdbcTemplate derbyJdbcTemplate(DataSource derbyDataSource) { + return new JdbcTemplate(derbyDataSource); + } + + @Bean + public PlatformTransactionManager h2TransactionManager() { + return new DataSourceTransactionManager(h2DataSource()); + } + + @Bean + public PlatformTransactionManager derbyTransactionManager() { + return new DataSourceTransactionManager(derbyDataSource()); + } + } + +} diff --git a/spring-integration-jdbc/src/test/resources/dsl-h2.sql b/spring-integration-jdbc/src/test/resources/dsl-h2.sql new file mode 100644 index 00000000000..f42fb973cc4 --- /dev/null +++ b/spring-integration-jdbc/src/test/resources/dsl-h2.sql @@ -0,0 +1,10 @@ +create table inbound(id int,status int); +insert into inbound values(1,2); +insert into inbound values(2,2); + +create table outbound(id varchar(100),status int,name varchar(20)); + +create table outbound_gateway(id int,status int); +insert into outbound_gateway values(1,2); +insert into outbound_gateway values(2,2); +