Skip to content

Commit

Permalink
Initial version of Spring transaction management for datastore abstra…
Browse files Browse the repository at this point in the history
…ction. Still not working correctly for Redis read queries however. Need to investigate using second Redis connection for reads whilst a transaction is active.
  • Loading branch information
graemerocher committed Aug 20, 2010
1 parent a13b9d0 commit ee55717
Show file tree
Hide file tree
Showing 19 changed files with 853 additions and 46 deletions.
Expand Up @@ -9,7 +9,7 @@
import org.springframework.datastore.keyvalue.mapping.KeyValue;
import org.springframework.datastore.mapping.MappingContext;
import org.springframework.datastore.mapping.PersistentEntity;
import org.springframework.datastore.tx.Transaction;
import org.springframework.datastore.transactions.Transaction;

import java.util.*;

Expand Down Expand Up @@ -88,7 +88,7 @@ public boolean isConnected() {
*
* @return a started transaction
*/
public Transaction beginTransaction() {
protected Transaction beginTransactionInternal() {
AppEngineTransaction engineTransaction = new AppEngineTransaction(DatastoreServiceFactory.getDatastoreService().beginTransaction());
this.transaction = engineTransaction;
return engineTransaction;
Expand Down
@@ -1,9 +1,11 @@
package org.springframework.datastore.appengine;

import com.google.appengine.api.datastore.Transaction;

/**
* @author Guillaume Laforge
*/
public class AppEngineTransaction implements org.springframework.datastore.tx.Transaction<com.google.appengine.api.datastore.Transaction> {
public class AppEngineTransaction implements org.springframework.datastore.transactions.Transaction<Transaction> {
private com.google.appengine.api.datastore.Transaction transaction;

public AppEngineTransaction(com.google.appengine.api.datastore.Transaction transaction) {
Expand All @@ -21,4 +23,12 @@ public void rollback() {
public com.google.appengine.api.datastore.Transaction getNativeTransaction() {
return transaction;
}

public boolean isActive() {
return transaction.isActive();
}

public void setTimeout(int timeout) {
throw new UnsupportedOperationException("Transaction timeouts not supported on AppEngine");
}
}
Expand Up @@ -3,7 +3,7 @@
import com.google.appengine.api.datastore.*;
import org.springframework.datastore.appengine.testsupport.AppEngineDatastoreTestCase;
import org.springframework.datastore.keyvalue.KeyValueSession;
import org.springframework.datastore.tx.Transaction;
import org.springframework.datastore.transactions.Transaction;
import org.junit.Before;

import java.util.HashMap;
Expand Down Expand Up @@ -105,7 +105,7 @@ public void testTransactionRollback() {
AppEngineDatastore engineDatastore = new AppEngineDatastore();
KeyValueSession connection = (KeyValueSession) engineDatastore.connect(new HashMap<String, String>());

org.springframework.datastore.tx.Transaction transaction = connection.beginTransaction();
Transaction transaction = connection.beginTransaction();

// add a new person in the store
Key keyGuillaume = (Key) connection.store("persons", personOne);
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.springframework.datastore.engine.Persister;
import org.springframework.datastore.mapping.MappingContext;
import org.springframework.datastore.mapping.PersistentEntity;
import org.springframework.datastore.tx.Transaction;
import org.springframework.datastore.transactions.Transaction;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.transaction.TransactionSystemException;

Expand Down Expand Up @@ -65,7 +65,7 @@ public void disconnect() {
}
}

public Transaction beginTransaction() {
protected Transaction beginTransactionInternal() {
throw new TransactionSystemException("Transactions are not supported by Cassandra");
}

Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Expand Up @@ -2,6 +2,7 @@ dependencies {

compile group: 'org.codehaus.groovy', name: 'groovy', version: '1.7.2'
compile 'javax.persistence:persistence-api:1.0'
compile 'javax.transaction:jta:1.1'
compile 'commons-beanutils:commons-beanutils:1.8.0'
compile 'commons-collections:commons-collections:3.2'

Expand Down
1 change: 1 addition & 0 deletions core/core.iml
Expand Up @@ -24,6 +24,7 @@
<orderEntry type="library" name="Spring Beans" level="project" />
<orderEntry type="library" name="Spring AOP" level="project" />
<orderEntry type="library" name="CGLib" level="project" />
<orderEntry type="library" name="JTA" level="project" />
</component>
</module>

Expand Up @@ -21,6 +21,8 @@
import org.springframework.datastore.mapping.MappingContext;
import org.springframework.datastore.mapping.PersistentEntity;
import org.springframework.datastore.query.Query;
import org.springframework.datastore.transactions.Transaction;
import org.springframework.transaction.NoTransactionException;

import java.io.Serializable;
import java.util.*;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractSession<N> implements Session {
private Map<String, String> connectionDetails;
protected List<EntityInterceptor> interceptors = new ArrayList<EntityInterceptor>();
protected ConcurrentLinkedQueue lockedObjects = new ConcurrentLinkedQueue();
private Transaction transaction;

public AbstractSession(Map<String, String> connectionDetails, MappingContext mappingContext) {
super();
Expand Down Expand Up @@ -198,4 +201,16 @@ public Query createQuery(Class type) {
}
throw new NonPersistentTypeException("Cannot create query. The class ["+type+"] is not a known persistent type.");
}

public final Transaction beginTransaction() {
this.transaction = beginTransactionInternal();
return this.transaction;
}

protected abstract Transaction beginTransactionInternal();

public Transaction getTransaction() {
if(transaction == null) throw new NoTransactionException("Transaction not started. Call beginTransaction() first");
return this.transaction;
}
}
@@ -0,0 +1,263 @@
/* Copyright (C) 2010 SpringSource
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.datastore.core;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.NamedThreadLocal;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.datastore.transactions.SessionHolder;
import org.springframework.datastore.transactions.support.SpringSessionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

/**
* Helper class for obtaining Datastore sessions. Based on similar work
* for Hibernate such as SessionFactoryUtils
*
* @author Juergen Hoeller
* @author Graeme Rocher
*/
public abstract class DatastoreUtils {

public static final Log logger = LogFactory.getLog(DatastoreUtils.class);
private static final ThreadLocal<Map<Datastore, Set<Session>>> deferredCloseHolder =
new NamedThreadLocal<Map<Datastore, Set<Session>>>("Datastore Sessions registered for deferred close");

/**
* Get a Datastore Session for the given Datastore. Is aware of and will
* return any existing corresponding Session bound to the current thread, for
* example when using {@link org.springframework.datastore.transactions.DatastoreTransactionManager}. Will create a new
* Session otherwise, if "allowCreate" is <code>true</code>.
* <p>This is the <code>getSession</code> method used by typical data access code,
* in combination with <code>releaseSession</code> called when done with
* the Session.
*
* @param datastore Datastore to create the session with
* @param allowCreate whether a non-transactional Session should be created
* when no transactional Session can be found for the current thread
* @return the Datastore Session
* @throws org.springframework.dao.DataAccessResourceFailureException if the Session couldn't be created
* @throws IllegalStateException if no thread-bound Session found and
* "allowCreate" is <code>false</code>
*/
public static Session getSession(Datastore datastore, boolean allowCreate)
throws DataAccessResourceFailureException, IllegalStateException {

try {
return doGetSession(datastore, allowCreate);
}
catch (Exception ex) {
throw new DataAccessResourceFailureException("Could not open Datastore Session", ex);
}
}

/**
* Get a Datastore Session for the given Datastore. Is aware of and will
* return any existing corresponding Session bound to the current thread, for
* example when using {@link org.springframework.datastore.transactions.DatastoreTransactionManager}. Will create a new
* Session otherwise, if "allowCreate" is <code>true</code>.
*
* @param datastore Datastore to create the session with
* Session on transaction synchronization (may be <code>null</code>)
* @param allowCreate whether a non-transactional Session should be created
* when no transactional Session can be found for the current thread
* @return the Datastore Session
* @throws IllegalStateException if no thread-bound Session found and
* "allowCreate" is <code>false</code>
*/
private static Session doGetSession(
Datastore datastore, boolean allowCreate){

Assert.notNull(datastore, "No Datastore specified");

SessionHolder sessionHolder = (SessionHolder) TransactionSynchronizationManager.getResource(datastore);

if (sessionHolder != null && !sessionHolder.isEmpty()) {
// pre-bound Datastore Session
Session session;
if (TransactionSynchronizationManager.isSynchronizationActive() &&
sessionHolder.doesNotHoldNonDefaultSession()) {
// Spring transaction management is active ->
// register pre-bound Session with it for transactional flushing.
session = sessionHolder.getSession();
if (session != null && !sessionHolder.isSynchronizedWithTransaction()) {
logger.debug("Registering Spring transaction synchronization for existing Datastore Session");
TransactionSynchronizationManager.registerSynchronization(
new SpringSessionSynchronization(sessionHolder, datastore, false));
sessionHolder.setSynchronizedWithTransaction(true);

}
if (session != null) {
return session;
}
}
}




logger.debug("Opening Datastore Session");
Session session = datastore.connect();

// Use same Session for further Datastore actions within the transaction.
// Thread object will get removed by synchronization at transaction completion.
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// We're within a Spring-managed transaction, possibly from JtaTransactionManager.
logger.debug("Registering Spring transaction synchronization for new Datastore Session");
SessionHolder holderToUse = sessionHolder;
if (holderToUse == null) {
holderToUse = new SessionHolder(session);
}
else {
holderToUse.addSession(session);
}
TransactionSynchronizationManager.registerSynchronization(
new SpringSessionSynchronization(holderToUse, datastore, true));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != sessionHolder) {
TransactionSynchronizationManager.bindResource(datastore, holderToUse);
}
}

// Check whether we are allowed to return the Session.
if (!allowCreate && !isSessionTransactional(session, datastore)) {
closeSession(session);
throw new IllegalStateException("No Datastore Session bound to thread, " +
"and configuration does not allow creation of non-transactional one here");
}

return session;
}

/**
* Return whether the given Datastore Session is transactional, that is,
* bound to the current thread by Spring's transaction facilities.
* @param session the Datastore Session to check
* @param datastore Datastore that the Session was created with
* (may be <code>null</code>)
* @return whether the Session is transactional
*/
public static boolean isSessionTransactional(Session session, Datastore datastore) {
if (datastore == null) {
return false;
}
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.getResource(datastore);
return (sessionHolder != null && sessionHolder.containsSession(session));
}

/**
* Perform actual closing of the Session,
* catching and logging any cleanup exceptions thrown.
* @param session The Session instance
*/
public static void closeSession(Session session) {
if (session != null) {
logger.debug("Closing Datastore Session");
try {
session.disconnect();
}
catch (Throwable ex) {
logger.debug("Unexpected exception on closing Datastore Session", ex);
}
}
}


/**
* Close the given Session, created via the given factory,
* if it is not managed externally (i.e. not bound to the thread).
* @param session the Datastore Session to close (may be <code>null</code>)
* @param datastore Datastore that the Session was created with
* (may be <code>null</code>)
*/
public static void releaseSession(Session session, Datastore datastore) {
if (session == null) {
return;
}
// Only close non-transactional Sessions.
if (!isSessionTransactional(session, datastore)) {
closeSessionOrRegisterDeferredClose(session, datastore);
}
}

/**
* Process all Datastore Sessions that have been registered for deferred close
* for the given SessionFactory.
* @param datastore the Datastore to process deferred close for
* @see #initDeferredClose
* @see #releaseSession
*/
public static void processDeferredClose(Datastore datastore) {
Assert.notNull(datastore, "No Datastore specified");
Map<Datastore, Set<Session>> holderMap = deferredCloseHolder.get();
if (holderMap == null || !holderMap.containsKey(datastore)) {
throw new IllegalStateException("Deferred close not active for Datastore [" + datastore + "]");
}
logger.debug("Processing deferred close of Datastore Sessions");
Set<Session> sessions = holderMap.remove(datastore);
for (Session session : sessions) {
closeSession(session);
}
if (holderMap.isEmpty()) {
deferredCloseHolder.set(null);
}
}

/**
* Initialize deferred close for the current thread and the given Datastore.
* Sessions will not be actually closed on close calls then, but rather at a
* {@link #processDeferredClose} call at a finishing point (like request completion).
*
* @param datastore the Datastore to initialize deferred close for
* @see #processDeferredClose
* @see #releaseSession
*/
public static void initDeferredClose(Datastore datastore) {
Assert.notNull(datastore, "No Datastore specified");
logger.debug("Initializing deferred close of Datastore Sessions");
Map<Datastore, Set<Session>> holderMap = deferredCloseHolder.get();
if (holderMap == null) {
holderMap = new HashMap<Datastore, Set<Session>>();
deferredCloseHolder.set(holderMap);
}
holderMap.put(datastore, new LinkedHashSet<Session>(4));
}
/**
* Close the given Session or register it for deferred close.
* @param session the Datastore Session to close
* @param datastore Datastore that the Session was created with
* (may be <code>null</code>)
* @see #initDeferredClose
* @see #processDeferredClose
*/
public static void closeSessionOrRegisterDeferredClose(Session session, Datastore datastore) {
Map<Datastore, Set<Session>> holderMap = deferredCloseHolder.get();
if (holderMap != null && datastore != null && holderMap.containsKey(datastore)) {
logger.debug("Registering Datastore Session for deferred close");
Set<Session> sessions = holderMap.get(datastore);
sessions.add(session);
}
else {
closeSession(session);
}
}
}

0 comments on commit ee55717

Please sign in to comment.