Skip to content

Commit

Permalink
Interceptor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesagnew committed Jan 20, 2019
1 parent b878925 commit c3c7d15
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 37 deletions.
Expand Up @@ -2,6 +2,8 @@

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
Expand Down Expand Up @@ -51,6 +53,7 @@
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,6 +88,8 @@ public abstract class BaseJpaTest {
protected IRequestOperationCallback myRequestOperationCallback = mock(IRequestOperationCallback.class);
@Autowired
protected DatabaseBackedPagingProvider myDatabaseBackedPagingProvider;
@Autowired
protected IInterceptorRegistry myInterceptorRegistry;

@After
public void afterPerformCleanup() {
Expand Down Expand Up @@ -129,6 +134,12 @@ public void beforeInitMocks() {
when(mySrd.getHeaders(eq(JpaConstants.HEADER_META_SNAPSHOT_MODE))).thenReturn(new ArrayList<>());
}

protected CountDownLatch registerLatchHookInterceptor(int theCount, Pointcut theLatchPointcut) {
CountDownLatch deliveryLatch = new CountDownLatch(theCount);
myInterceptorRegistry.registerAnonymousHookForUnitTest(theLatchPointcut, Integer.MAX_VALUE, t -> deliveryLatch.countDown());
return deliveryLatch;
}

protected abstract FhirContext getContext();

protected abstract PlatformTransactionManager getTxManager();
Expand Down
Expand Up @@ -3,6 +3,7 @@
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.model.interceptor.api.Hook;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Interceptor;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
Expand All @@ -22,9 +23,11 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.*;

/**
* Test the rest-hook subscriptions
Expand Down Expand Up @@ -66,35 +69,43 @@ public void before() throws Exception {
public void testBeforeRestHookDelivery_ModifyResourceId() throws Exception {
ourNextModifyResourceId = true;

// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);

// Creating a matching resource
CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);

waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals("Observation/A", ourUpdatedObservations.get(0).getId());
// TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first
waitForTrue(() -> ourHitBeforeRestHookDelivery);
waitForTrue(() -> ourHitAfterRestHookDelivery);
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
}

@Test
public void testBeforeRestHookDelivery_AddHeader() throws Exception {
ourNextAddHeader = true;

// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);

// Creating a matching resource
CountDownLatch deliveryLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY);
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);

waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
// TODO: JA a latch would be even better but we'd need to allow customizable orders since the ad-hoc ones run first
waitForTrue(() -> ourHitBeforeRestHookDelivery);
waitForTrue(() -> ourHitAfterRestHookDelivery);
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
assertThat(ourHeaders, hasItem("X-Foo: Bar"));
}

Expand All @@ -103,8 +114,10 @@ public void testBeforeRestHookDelivery_AddHeader() throws Exception {
public void testBeforeRestHookDelivery_AbortDelivery() throws Exception {
ourNextBeforeRestHookDeliveryReturn = false;

// Create a subscription
CountDownLatch registerLatch = registerLatchHookInterceptor(1, Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
createSubscription("Observation?status=final", "application/fhir+json");
waitForActivatedSubscriptionCount(1);
registerLatch.await(10, TimeUnit.SECONDS);

sendObservation();

Expand Down
Expand Up @@ -24,9 +24,14 @@

public interface IInterceptorRegistry {

int DEFAULT_ORDER = 0;

@VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook);

@VisibleForTesting
void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook);

@VisibleForTesting
void clearAnonymousHookForUnitTest();

Expand Down
Expand Up @@ -67,9 +67,23 @@ public enum Pointcut {
* <li>ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage</li>
* </ul>
*/
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage")
SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED("ResourceModifiedMessage"),

;

/**
* Invoked immediately after an active subscription is "registered". In HAPI FHIR, when
* a subscription
* <p>
* Hooks may make changes to the canonicalized subscription and this will have an effect
* on processing across this server. Note however that timing issues may occur, since the
* subscription is already technically live by the time this hook is called.
* </p>
* Hooks may accept the following parameters:
* <ul>
* <li>ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription</li>
* </ul>
*/
SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED("CanonicalSubscription");

private final List<String> myParameterTypes;

Expand Down
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -66,10 +66,15 @@ public List<Object> getGlobalInterceptorsForUnitTest() {
@Override
@VisibleForTesting
public void registerAnonymousHookForUnitTest(Pointcut thePointcut, IAnonymousLambdaHook theHook) {
registerAnonymousHookForUnitTest(thePointcut, DEFAULT_ORDER, theHook);
}

@Override
public void registerAnonymousHookForUnitTest(Pointcut thePointcut, int theOrder, IAnonymousLambdaHook theHook) {
Validate.notNull(thePointcut);
Validate.notNull(theHook);

myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook));
myAnonymousInvokers.put(thePointcut, new AnonymousLambdaInvoker(theHook, theOrder));
}

@Override
Expand All @@ -88,22 +93,40 @@ public void start() {
myGlobalInterceptors.add(nextGlobalInterceptor);
}

// Sort them
sortByOrderAnnotation(myGlobalInterceptors);

// Pull out the hook methods
for (Object nextInterceptor : myGlobalInterceptors) {

int typeOrder = DEFAULT_ORDER;
Order typeOrderAnnotation = AnnotationUtils.findAnnotation(nextInterceptor.getClass(), Order.class);
if (typeOrderAnnotation != null) {
typeOrder = typeOrderAnnotation.value();
}

for (Method nextMethod : nextInterceptor.getClass().getDeclaredMethods()) {
Hook hook = AnnotationUtils.findAnnotation(nextMethod, Hook.class);

if (hook != null) {
HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod);

int methodOrder = typeOrder;
Order methodOrderAnnotation = AnnotationUtils.findAnnotation(nextMethod, Order.class);
if (methodOrderAnnotation != null) {
methodOrder = methodOrderAnnotation.value();
}

HookInvoker invoker = new HookInvoker(hook, nextInterceptor, nextMethod, methodOrder);
for (Pointcut nextPointcut : hook.value()) {
myInvokers.put(nextPointcut, invoker);
}
}
}
}

// Sort everything by declared order
sortByOrderAnnotation(myGlobalInterceptors);
for (Pointcut nextPointcut : myInvokers.keys()) {
List<BaseInvoker> nextInvokerList = myInvokers.get(nextPointcut);
nextInvokerList.sort(Comparator.naturalOrder());
}
}

private void sortByOrderAnnotation(List<Object> theObjects) {
Expand All @@ -130,10 +153,16 @@ public void setApplicationContext(@Nonnull ApplicationContext theApplicationCont
public boolean callHooks(Pointcut thePointcut, HookParams theParams) {
assert haveAppropriateParams(thePointcut, theParams);

// Anonymous hooks first
List<BaseInvoker> invokers = ListUtils.union(
myAnonymousInvokers.get(thePointcut),
myInvokers.get(thePointcut));
List<BaseInvoker> globalInvokers = myInvokers.get(thePointcut);
List<BaseInvoker> anonymousInvokers = myAnonymousInvokers.get(thePointcut);

List<BaseInvoker> invokers = globalInvokers;
if (anonymousInvokers.isEmpty() == false) {
invokers = ListUtils.union(
anonymousInvokers,
globalInvokers);
invokers.sort(Comparator.naturalOrder());
}

/*
* Call each hook in order
Expand Down Expand Up @@ -167,14 +196,27 @@ public boolean callHooks(Pointcut thePointcut, Object... theParams) {
return callHooks(thePointcut, new HookParams(theParams));
}

private abstract class BaseInvoker {
private abstract class BaseInvoker implements Comparable<BaseInvoker> {

private final int myOrder;

protected BaseInvoker(int theOrder) {
myOrder = theOrder;
}

abstract boolean invoke(HookParams theParams);

@Override
public int compareTo(BaseInvoker o) {
return myOrder - o.myOrder;
}
}

private class AnonymousLambdaInvoker extends BaseInvoker {
private final IAnonymousLambdaHook myHook;

public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook) {
public AnonymousLambdaInvoker(IAnonymousLambdaHook theHook, int theOrder) {
super(theOrder);
myHook = theHook;
}

Expand All @@ -196,7 +238,8 @@ private class HookInvoker extends BaseInvoker {
/**
* Constructor
*/
private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod) {
private HookInvoker(Hook theHook, @Nonnull Object theInterceptor, @Nonnull Method theHookMethod, int theOrder) {
super(theOrder);
myInterceptor = theInterceptor;
myParameterTypes = theHookMethod.getParameterTypes();
myMethod = theHookMethod;
Expand Down
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,6 +21,8 @@
*/

import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorRegistry;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
Expand All @@ -37,7 +39,6 @@
import java.util.Optional;

/**
*
* Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created
* and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and
* handlers are all caches in this registry so they can be removed it the subscription is deleted.
Expand All @@ -47,7 +48,7 @@
@Component
public class SubscriptionRegistry {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionRegistry.class);

private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@Autowired
SubscriptionCanonicalizer<IBaseResource> mySubscriptionCanonicalizer;
@Autowired
Expand All @@ -56,8 +57,8 @@ public class SubscriptionRegistry {
SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
@Autowired
ModelConfig myModelConfig;

private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
@Autowired
private IInterceptorRegistry myInterceptorRegistry;

public ActiveSubscription get(String theIdPart) {
return myActiveSubscriptionCache.get(theIdPart);
Expand Down Expand Up @@ -98,6 +99,9 @@ private CanonicalSubscription registerSubscription(IIdType theId, IBaseResource

myActiveSubscriptionCache.put(subscriptionId, activeSubscription);

// Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
myInterceptorRegistry.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, canonicalized);

return canonicalized;
}

Expand Down

0 comments on commit c3c7d15

Please sign in to comment.