Skip to content

Commit

Permalink
Merge 84db66d into 86a12bc
Browse files Browse the repository at this point in the history
  • Loading branch information
fil512 committed Feb 3, 2019
2 parents 86a12bc + 84db66d commit b7b8d8f
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 21 deletions.
Expand Up @@ -26,6 +26,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
Expand Down Expand Up @@ -357,4 +358,20 @@ public CanonicalEventDefinition() {

}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("myIdElement", myIdElement)
.append("myStatus", myStatus)
.append("myCriteriaString", myCriteriaString)
.append("myEndpointUrl", myEndpointUrl)
.append("myPayloadString", myPayloadString)
// .append("myHeaders", myHeaders)
.append("myChannelType", myChannelType)
// .append("myTrigger", myTrigger)
// .append("myEmailDetails", myEmailDetails)
// .append("myRestHookDetails", myRestHookDetails)
// .append("myChannelExtensions", myChannelExtensions)
.toString();
}
}
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -30,6 +30,7 @@
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -61,15 +62,35 @@ public void handleMessage(Message<?> theMessage) throws MessagingException {
}

public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) {
IBaseResource resource = theResourceModifiedMessage.getNewPayload(myFhirContext);
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource);

if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) {
String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource);
if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
}
switch (theResourceModifiedMessage.getOperationType()) {
case DELETE:
if (isSubscription(theResourceModifiedMessage)) {
mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext));
}
return;
case CREATE:
case UPDATE:
if (isSubscription(theResourceModifiedMessage)) {
registerActiveSubscription(theResourceModifiedMessage.getNewPayload(myFhirContext));
}
break;
default:
break;
}

mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage);
}

private boolean isSubscription(ResourceModifiedMessage theResourceModifiedMessage) {
IIdType id = theResourceModifiedMessage.getId(myFhirContext);
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(id.getResourceType());
return resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode());
}

private void registerActiveSubscription(IBaseResource theSubscription) {
String status = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
}
}
}
Expand Up @@ -85,6 +85,9 @@ public void beforeReset() {
@After
public void cleanup() {
myInterceptorRegistry.clearAnonymousHookForUnitTest();
mySubscriptionMatchingPost.clear();
mySubscriptionActivatedPost.clear();
ourObservationListener.clear();
}

public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException {
Expand Down Expand Up @@ -184,5 +187,7 @@ public void awaitExpected() throws InterruptedException {
public void expectNothing() {
updateLatch.expectNothing();
}

public void clear() { updateLatch.clear();}
}
}
Expand Up @@ -32,6 +32,7 @@ public void activeSubscriptionIsRegistered() {
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
}
Expand All @@ -42,7 +43,19 @@ public void requestedSubscriptionNotRegistered() {
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
}

@Test
public void deleteSubscription() {
Subscription subscription = makeSubscriptionWithStatus("testCriteria", "testPayload", "testEndpoint", Subscription.SubscriptionStatus.REQUESTED);
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.DELETE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber, never()).matchActiveSubscriptionsAndDeliver(any());
}
}
@@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.subscription.module.standalone;

import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
Expand All @@ -9,17 +8,12 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
@Test
public void testSubscriptionLoaderFhirClient() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown());

String payload = "application/fhir+json";

String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
Expand All @@ -33,7 +27,6 @@ public void testSubscriptionLoaderFhirClient() throws InterruptedException {
initSubscriptionLoader(bundle);

sendObservation(myCode, "SNOMED-CT");
latch.await(10, TimeUnit.SECONDS);

waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
Expand All @@ -42,9 +35,6 @@ public void testSubscriptionLoaderFhirClient() throws InterruptedException {

@Test
public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown());

String payload = "application/fhir+json";

String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
Expand All @@ -58,7 +48,6 @@ public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws Inter
initSubscriptionLoader(bundle);

sendObservation(myCode, "SNOMED-CT");
latch.await(10, TimeUnit.SECONDS);

waitForSize(0, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);
Expand Down

0 comments on commit b7b8d8f

Please sign in to comment.