Skip to content

Commit

Permalink
Merge pull request #1169 from jamesagnew/subscription-startup-sideeff…
Browse files Browse the repository at this point in the history
…ects

Subscription startup side-effects
  • Loading branch information
jamesagnew committed Jan 16, 2019
2 parents a2d0168 + 66dc7f8 commit 584179b
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 89 deletions.
Expand Up @@ -143,7 +143,6 @@ public class DaoConfig {
private boolean myDisableHashBasedSearches;
private boolean myEnableInMemorySubscriptionMatching = true;
private ClientIdStrategyEnum myResourceClientIdStrategy = ClientIdStrategyEnum.ALPHANUMERIC;
private boolean mySubscriptionMatchingEnabled = true;

/**
* Constructor
Expand Down Expand Up @@ -530,7 +529,7 @@ public void setInterceptors(List<IServerInterceptor> theInterceptors) {
* This may be used to optionally register server interceptors directly against the DAOs.
*/
public void setInterceptors(IServerInterceptor... theInterceptor) {
setInterceptors(new ArrayList<IServerInterceptor>());
setInterceptors(new ArrayList<>());
if (theInterceptor != null && theInterceptor.length != 0) {
getInterceptors().addAll(Arrays.asList(theInterceptor));
}
Expand Down Expand Up @@ -1308,8 +1307,7 @@ public List<Integer> getSearchPreFetchThresholds() {
public void setSearchPreFetchThresholds(List<Integer> thePreFetchThresholds) {
Validate.isTrue(thePreFetchThresholds.size() > 0, "thePreFetchThresholds must not be empty");
int last = 0;
for (Integer nextInteger : thePreFetchThresholds) {
int nextInt = nextInteger.intValue();
for (Integer nextInt : thePreFetchThresholds) {
Validate.isTrue(nextInt > 0 || nextInt == -1, nextInt + " is not a valid prefetch threshold");
Validate.isTrue(nextInt != last, "Prefetch thresholds must be sequential");
Validate.isTrue(nextInt > last || nextInt == -1, "Prefetch thresholds must be sequential");
Expand Down Expand Up @@ -1398,7 +1396,7 @@ public void setEnableInMemorySubscriptionMatching(boolean theEnableInMemorySubsc
*/

public boolean isSubscriptionMatchingEnabled() {
return mySubscriptionMatchingEnabled;
return myModelConfig.isSubscriptionMatchingEnabled();
}

/**
Expand All @@ -1407,9 +1405,8 @@ public boolean isSubscriptionMatchingEnabled() {
* @since 3.7.0
*/


public void setSubscriptionMatchingEnabled(boolean theSubscriptionMatchingEnabled) {
mySubscriptionMatchingEnabled = theSubscriptionMatchingEnabled;
myModelConfig.setSubscriptionMatchingEnabled(theSubscriptionMatchingEnabled);
}

public ModelConfig getModelConfig() {
Expand Down
@@ -0,0 +1,7 @@
package ca.uhn.fhir.jpa.subscription;

import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;

public interface IResourceModifiedConsumer {
void submitResourceModified(ResourceModifiedMessage theMsg);
}
Expand Up @@ -37,50 +37,41 @@
public class SubscriptionInterceptorLoader {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionInterceptorLoader.class);

// TODO KHS these beans are late loaded because we don't want to run their @PostConstruct and @Scheduled method if they're
// not required. Recommend removing @PostConstruct from these classes and instead call those methods in register interceptors below.
// @Schedule will be tricker to resolve

@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
private SubscriptionActivatingInterceptor mySubscriptionActivatingInterceptor;

@Autowired
DaoConfig myDaoConfig;
@Autowired
private ApplicationContext myAppicationContext;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private ApplicationContext myAppicationContext;

public void registerInterceptors() {
Set<Subscription.SubscriptionChannelType> supportedSubscriptionTypes = myDaoConfig.getSupportedSubscriptionTypes();

if (!supportedSubscriptionTypes.isEmpty()) {
loadSubscriptions();
if (mySubscriptionActivatingInterceptor == null) {
mySubscriptionActivatingInterceptor = myAppicationContext.getBean(SubscriptionActivatingInterceptor.class);
}
ourLog.info("Registering subscription activating interceptor");
myDaoConfig.registerInterceptor(mySubscriptionActivatingInterceptor);
}
if (myDaoConfig.isSubscriptionMatchingEnabled()) {
if (mySubscriptionMatcherInterceptor == null) {
mySubscriptionMatcherInterceptor = myAppicationContext.getBean(SubscriptionMatcherInterceptor.class);
}
mySubscriptionMatcherInterceptor.start();
ourLog.info("Registering subscription matcher interceptor");
myDaoConfig.registerInterceptor(mySubscriptionMatcherInterceptor);

}
}

private void loadSubscriptions() {
ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
// Load subscriptions into the SubscriptionRegistry
// Activate scheduled subscription loads into the SubscriptionRegistry
myAppicationContext.getBean(SubscriptionLoader.class);
ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());
}

@VisibleForTesting
public void unregisterInterceptorsForUnitTest() {
void unregisterInterceptorsForUnitTest() {
myDaoConfig.unregisterInterceptor(mySubscriptionActivatingInterceptor);
myDaoConfig.unregisterInterceptor(mySubscriptionMatcherInterceptor);
}
Expand Down
Expand Up @@ -9,6 +9,7 @@
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,7 +18,6 @@
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/*-
Expand All @@ -42,12 +42,12 @@

@Component
@Lazy
public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAdapter {
public class SubscriptionMatcherInterceptor extends ServerOperationInterceptorAdapter implements IResourceModifiedConsumer {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);

public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
public static final String SUBSCRIPTION_STATUS = "Subscription.status";
public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
private static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
static final String SUBSCRIPTION_STATUS = "Subscription.status";
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
private SubscribableChannel myProcessingChannel;

@Autowired
Expand All @@ -64,7 +64,6 @@ public SubscriptionMatcherInterceptor() {
super();
}

@PostConstruct
public void start() {
if (myProcessingChannel == null) {
myProcessingChannel = mySubscriptionChannelFactory.newMatchingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME);
Expand All @@ -77,7 +76,10 @@ public void start() {
@SuppressWarnings("unused")
@PreDestroy
public void preDestroy() {
myProcessingChannel.unsubscribe(mySubscriptionMatchingSubscriber);

if (myProcessingChannel != null) {
myProcessingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
}
}

@Override
Expand All @@ -100,8 +102,9 @@ private void submitResourceModified(IBaseResource theNewResource, ResourceModifi
submitResourceModified(msg);
}

protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
private void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Sending resource modified message to processing channel");
Validate.notNull(myProcessingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
myProcessingChannel.send(new ResourceModifiedJsonMessage(theMessage));
}

Expand All @@ -117,7 +120,7 @@ public void submitResourceModified(final ResourceModifiedMessage theMsg) {
}

@VisibleForTesting
public LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
return (LinkedBlockingQueueSubscribableChannel) myProcessingChannel;
}
}
Expand Up @@ -55,15 +55,15 @@
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;

Expand All @@ -72,10 +72,10 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;

@Service
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, ApplicationContextAware {
public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);

public static final int DEFAULT_MAX_SUBMIT = 10000;
private static final int DEFAULT_MAX_SUBMIT = 10000;

@Autowired
private FhirContext myFhirContext;
Expand All @@ -88,11 +88,10 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
private IResourceModifiedConsumer myResourceModifiedConsumer;

private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
private ApplicationContext myAppCtx;
private ExecutorService myExecutorService;

@Override
Expand All @@ -105,7 +104,7 @@ public IBaseParameters triggerSubscription(List<UriParam> theResourceIds, List<S
if (theSubscriptionId != null) {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
IIdType subscriptionId = theSubscriptionId;
if (subscriptionId.hasResourceType() == false) {
if (!subscriptionId.hasResourceType()) {
subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
}
subscriptionDao.read(subscriptionId);
Expand All @@ -128,7 +127,7 @@ public IBaseParameters triggerSubscription(List<UriParam> theResourceIds, List<S

// Search URLs must be valid
for (StringParam next : searchUrls) {
if (next.getValue().contains("?") == false) {
if (!next.getValue().contains("?")) {
throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
}
}
Expand Down Expand Up @@ -163,7 +162,7 @@ public void runDeliveryPass() {
return;
}

String activeJobIds = myActiveJobs.stream().map(t -> t.getJobId()).collect(Collectors.joining(", "));
String activeJobIds = myActiveJobs.stream().map(SubscriptionTriggeringJobDetails::getJobId).collect(Collectors.joining(", "));
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);

SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
Expand Down Expand Up @@ -290,7 +289,7 @@ private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Fu

private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId);

return submitResource(theSubscriptionId, resourceToTrigger);
Expand All @@ -306,7 +305,7 @@ private Future<Void> submitResource(String theSubscriptionId, IBaseResource theR
return myExecutorService.submit(() -> {
for (int i = 0; ; i++) {
try {
mySubscriptionMatcherInterceptor.submitResourceModified(msg);
myResourceModifiedConsumer.submitResourceModified(msg);
break;
} catch (Exception e) {
if (i >= 3) {
Expand All @@ -329,11 +328,6 @@ public void cancelAll() {
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
myAppCtx = applicationContext;
}

/**
* Sets the maximum number of resources that will be submitted in a single pass
*/
Expand All @@ -346,7 +340,6 @@ public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
myMaxSubmitPerPass = maxSubmitPerPass;
}

@SuppressWarnings("unchecked")
@PostConstruct
public void start() {
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
Expand Down Expand Up @@ -393,67 +386,67 @@ private static class SubscriptionTriggeringJobDetails {
private String myCurrentSearchResourceType;
private int myCurrentSearchLastUploadedIndex;

public Integer getCurrentSearchCount() {
Integer getCurrentSearchCount() {
return myCurrentSearchCount;
}

public void setCurrentSearchCount(Integer theCurrentSearchCount) {
void setCurrentSearchCount(Integer theCurrentSearchCount) {
myCurrentSearchCount = theCurrentSearchCount;
}

public String getCurrentSearchResourceType() {
String getCurrentSearchResourceType() {
return myCurrentSearchResourceType;
}

public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
myCurrentSearchResourceType = theCurrentSearchResourceType;
}

public String getJobId() {
String getJobId() {
return myJobId;
}

public void setJobId(String theJobId) {
void setJobId(String theJobId) {
myJobId = theJobId;
}

public String getSubscriptionId() {
String getSubscriptionId() {
return mySubscriptionId;
}

public void setSubscriptionId(String theSubscriptionId) {
void setSubscriptionId(String theSubscriptionId) {
mySubscriptionId = theSubscriptionId;
}

public List<String> getRemainingResourceIds() {
List<String> getRemainingResourceIds() {
return myRemainingResourceIds;
}

public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
void setRemainingResourceIds(List<String> theRemainingResourceIds) {
myRemainingResourceIds = theRemainingResourceIds;
}

public List<String> getRemainingSearchUrls() {
List<String> getRemainingSearchUrls() {
return myRemainingSearchUrls;
}

public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
myRemainingSearchUrls = theRemainingSearchUrls;
}

public String getCurrentSearchUuid() {
String getCurrentSearchUuid() {
return myCurrentSearchUuid;
}

public void setCurrentSearchUuid(String theCurrentSearchUuid) {
void setCurrentSearchUuid(String theCurrentSearchUuid) {
myCurrentSearchUuid = theCurrentSearchUuid;
}

public int getCurrentSearchLastUploadedIndex() {
int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex;
}

public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
}
}
Expand Down

0 comments on commit 584179b

Please sign in to comment.