Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright 2012-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.jta.narayana;

import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* Manager able to connect and disconnect when needed for the required operations. If SQL connection is already available, it
* will simply executed requested operation. If SQL connection is not available, it will create it, execute a requested
* operation, and close the connection.
*
* @author Gytis Trikleris
*/
class ConnectionManager {

private static final Log logger = LogFactory.getLog(DataSourceXAResourceRecoveryHelper.class);

private final XADataSource dataSource;

private final String user;

private final String password;

private XAConnection connection;

private XAResource resource;

/**
* Create a new {@link ConnectionManager} instance.
* @param dataSource DataSource to be used when handling connections.
* @param user Username with which connection should be created.
* @param password Password of the user.
*/
ConnectionManager(XADataSource dataSource, String user, String password) {
this.dataSource = dataSource;
this.user = user;
this.password = password;
}

/**
* Invoke {@link XAResourceConsumer} accept method before making sure that SQL connection is available. Current connection
* is used if one is available. If connection is not available, new connection is created before the accept call and closed
* after it.
*
* @param consumer {@link XAResourceConsumer} to be executed.
* @throws XAException if connection cannot be created or exception thrown by the consumer.
*/
void connectAndAccept(XAResourceConsumer consumer) throws XAException {
if (isConnected()) {
consumer.accept(this.resource);
return;
}

connect();
try {
consumer.accept(this.resource);
}
finally {
disconnect();
}
}

/**
* Invoke {@link XAResourceFunction} apply method before making sure that SQL connection is available. Current connection is
* used if one is available. If connection is not available, new connection is created before the apply call and closed
* after it.
*
* @param function {@link XAResourceFunction} to be executed.
* @param <T> Return type of the {@link XAResourceFunction}.
* @return The result of {@link XAResourceFunction}.
* @throws XAException if connection cannot be created or exception thrown by the function.
*/
<T> T connectAndApply(XAResourceFunction<T> function) throws XAException {
if (isConnected()) {
return function.apply(this.resource);
}

connect();
try {
return function.apply(this.resource);
}
finally {
disconnect();
}
}

/**
* Create SQL connection if one is not available.
*
* @throws XAException if connection cannot be created.
*/
public void connect() throws XAException {
if (isConnected()) {
return;
}

try {
this.connection = getXaConnection();
this.resource = this.connection.getXAResource();
}
catch (SQLException e) {
if (this.connection != null) {
try {
this.connection.close();
}
catch (SQLException ignore) {
}
}
logger.warn("Failed to create connection", e);
throw new XAException(XAException.XAER_RMFAIL);
}
}

/**
* Close current SQL connection.
*/
public void disconnect() {
if (!isConnected()) {
return;
}

try {
this.connection.close();
}
catch (SQLException e) {
logger.warn("Failed to close connection", e);
}
finally {
this.connection = null;
this.resource = null;
}
}

/**
* Check if SQL connection is active.
*
* @return {@code true} if SQL connection is active.
*/
public boolean isConnected() {
return this.connection != null && this.resource != null;
}

private XAConnection getXaConnection() throws SQLException {
if (this.user == null && this.password == null) {
return this.dataSource.getXAConnection();
}
return this.dataSource.getXAConnection(this.user, this.password);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,11 @@

package org.springframework.boot.jta.narayana;

import java.sql.SQLException;

import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.util.Assert;

Expand All @@ -42,39 +36,15 @@ public class DataSourceXAResourceRecoveryHelper

private static final XAResource[] NO_XA_RESOURCES = {};

private static final Log logger = LogFactory
.getLog(DataSourceXAResourceRecoveryHelper.class);

private final XADataSource xaDataSource;

private final String user;

private final String password;

private XAConnection xaConnection;

private XAResource delegate;
private final ConnectionManager connectionManager;

/**
* Create a new {@link DataSourceXAResourceRecoveryHelper} instance.
* @param xaDataSource the XA data source
* @param connectionManager SQL connection manager.
*/
public DataSourceXAResourceRecoveryHelper(XADataSource xaDataSource) {
this(xaDataSource, null, null);
}

/**
* Create a new {@link DataSourceXAResourceRecoveryHelper} instance.
* @param xaDataSource the XA data source
* @param user the database user or {@code null}
* @param password the database password or {@code null}
*/
public DataSourceXAResourceRecoveryHelper(XADataSource xaDataSource, String user,
String password) {
Assert.notNull(xaDataSource, "XADataSource must not be null");
this.xaDataSource = xaDataSource;
this.user = user;
this.password = password;
public DataSourceXAResourceRecoveryHelper(ConnectionManager connectionManager) {
Assert.notNull(connectionManager, "ConnectionManager must not be null");
this.connectionManager = connectionManager;
}

@Override
Expand All @@ -84,107 +54,73 @@ public boolean initialise(String properties) {

@Override
public XAResource[] getXAResources() {
if (connect()) {
return new XAResource[] { this };
}
return NO_XA_RESOURCES;
}

private boolean connect() {
if (this.delegate == null) {
if (!this.connectionManager.isConnected()) {
try {
this.xaConnection = getXaConnection();
this.delegate = this.xaConnection.getXAResource();
this.connectionManager.connect();
}
catch (SQLException ex) {
logger.warn("Failed to create connection", ex);
return false;
catch (XAException ignored) {
return NO_XA_RESOURCES;
}
}
return true;
}

private XAConnection getXaConnection() throws SQLException {
if (this.user == null && this.password == null) {
return this.xaDataSource.getXAConnection();
}
return this.xaDataSource.getXAConnection(this.user, this.password);
return new XAResource[] { this };
}

@Override
public Xid[] recover(int flag) throws XAException {
public Xid[] recover(final int flag) throws XAException {
try {
return getDelegate(true).recover(flag);
return this.connectionManager.connectAndApply(delegate -> delegate.recover(flag));
}
finally {
if (flag == XAResource.TMENDRSCAN) {
disconnect();
this.connectionManager.disconnect();
}
}
}

private void disconnect() throws XAException {
try {
this.xaConnection.close();
}
catch (SQLException e) {
logger.warn("Failed to close connection", e);
}
finally {
this.xaConnection = null;
this.delegate = null;
}
}

@Override
public void start(Xid xid, int flags) throws XAException {
getDelegate(true).start(xid, flags);
public void start(final Xid xid, final int flags) throws XAException {
this.connectionManager.connectAndAccept(delegate -> delegate.start(xid, flags));
}

@Override
public void end(Xid xid, int flags) throws XAException {
getDelegate(true).end(xid, flags);
public void end(final Xid xid, final int flags) throws XAException {
this.connectionManager.connectAndAccept(delegate -> delegate.end(xid, flags));
}

@Override
public int prepare(Xid xid) throws XAException {
return getDelegate(true).prepare(xid);
public int prepare(final Xid xid) throws XAException {
return this.connectionManager.connectAndApply(delegate -> delegate.prepare(xid));
}

@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
getDelegate(true).commit(xid, onePhase);
public void commit(final Xid xid, final boolean onePhase) throws XAException {
this.connectionManager.connectAndAccept(delegate -> delegate.commit(xid, onePhase));
}

@Override
public void rollback(Xid xid) throws XAException {
getDelegate(true).rollback(xid);
public void rollback(final Xid xid) throws XAException {
this.connectionManager.connectAndAccept(delegate -> delegate.rollback(xid));
}

@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
return getDelegate(true).isSameRM(xaResource);
public boolean isSameRM(final XAResource xaResource) throws XAException {
return this.connectionManager.connectAndApply(delegate -> delegate.isSameRM(xaResource));
}

@Override
public void forget(Xid xid) throws XAException {
getDelegate(true).forget(xid);
public void forget(final Xid xid) throws XAException {
this.connectionManager.connectAndAccept(delegate -> delegate.forget(xid));
}

@Override
public int getTransactionTimeout() throws XAException {
return getDelegate(true).getTransactionTimeout();
return this.connectionManager.connectAndApply(XAResource::getTransactionTimeout);
}

@Override
public boolean setTransactionTimeout(int seconds) throws XAException {
return getDelegate(true).setTransactionTimeout(seconds);
}

private XAResource getDelegate(boolean required) {
Assert.state(this.delegate != null || !required,
"Connection has not been opened");
return this.delegate;
public boolean setTransactionTimeout(final int seconds) throws XAException {
return this.connectionManager.connectAndApply(delegate -> delegate.setTransactionTimeout(seconds));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,11 @@ public NarayanaXADataSourceWrapper(NarayanaRecoveryManagerBean recoveryManager,

@Override
public DataSource wrapDataSource(XADataSource dataSource) {
XAResourceRecoveryHelper recoveryHelper = getRecoveryHelper(dataSource);
ConnectionManager connectionManager = new ConnectionManager(dataSource, this.properties.getRecoveryDbUser(),
this.properties.getRecoveryDbPass());
XAResourceRecoveryHelper recoveryHelper = new DataSourceXAResourceRecoveryHelper(connectionManager);
this.recoveryManager.registerXAResourceRecoveryHelper(recoveryHelper);
return new NarayanaDataSourceBean(dataSource);
}

private XAResourceRecoveryHelper getRecoveryHelper(XADataSource dataSource) {
if (this.properties.getRecoveryDbUser() == null
&& this.properties.getRecoveryDbPass() == null) {
return new DataSourceXAResourceRecoveryHelper(dataSource);
}
return new DataSourceXAResourceRecoveryHelper(dataSource,
this.properties.getRecoveryDbUser(), this.properties.getRecoveryDbPass());
}

}
Loading