Skip to content

Commit

Permalink
Make sure a transaction is active when activating subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesagnew committed Oct 17, 2017
1 parent 0c464bf commit 8b7723b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecut
}

if (mySubscriptionActivatingSubscriber == null) {
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this);
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager);
}

registerSubscriptionCheckingSubscriber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,34 @@
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.utilities.ucum.Canonical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.concurrent.ConcurrentHashMap;
import org.springframework.transaction.support.TransactionTemplate;

@SuppressWarnings("unchecked")
public class SubscriptionActivatingSubscriber {
private final IFhirResourceDao mySubscriptionDao;
private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
private final PlatformTransactionManager myTransactionManager;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
private FhirContext myCtx;
private Subscription.SubscriptionChannelType myChannelType;

/**
* Constructor
*/
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTransactionManager) {
mySubscriptionDao = theSubscriptionDao;
mySubscriptionInterceptor = theSubscriptionInterceptor;
myChannelType = theChannelType;
myCtx = theSubscriptionDao.getContext();
myTransactionManager = theTransactionManager;
}

public void activateAndRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
Expand Down Expand Up @@ -90,7 +92,7 @@ public void afterCommit() {
}


public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, IBaseResource theSubscription) throws MessagingException {
public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {

switch (theOperationType) {
case DELETE:
Expand All @@ -101,7 +103,13 @@ public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId,
if (!theId.getResourceType().equals("Subscription")) {
return;
}
activateAndRegisterSubscriptionIfRequired(theSubscription);
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
activateAndRegisterSubscriptionIfRequired(theSubscription);
}
});
break;
default:
break;
Expand Down

0 comments on commit 8b7723b

Please sign in to comment.