Skip to content

Commit

Permalink
Merge branch 'master' of github.com:jamesagnew/hapi-fhir
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesagnew committed Oct 17, 2018
2 parents d129d43 + 8130700 commit aa1f624
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
Expand Up @@ -195,6 +195,9 @@ public void runDeliveryPass() {
return;
}

String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", "));
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);

SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);

runJob(activeJob);
Expand All @@ -219,6 +222,7 @@ public void runDeliveryPass() {

private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
StopWatch sw = new StopWatch();
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());

// Submit individual resources
int totalSubmitted = 0;
Expand All @@ -244,14 +248,13 @@ private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount());
theJobDetails.setCurrentSearchCount(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
}

// If we have an active search going, submit resources from it
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = 0;
if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) {
fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
}
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;

IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());

Expand All @@ -260,26 +263,28 @@ private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid());
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);

ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
for (Long next : resourceIds) {
IBaseResource nextResource = resourceDao.readByPid(next);
submitResource(theJobDetails.getSubscriptionId(), nextResource);
totalSubmitted++;
theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1);
theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1);
}

int expectedCount = toIndex - fromIndex;
if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}

ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
}

private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
Expand Down Expand Up @@ -325,7 +330,7 @@ private static class SubscriptionTriggeringJobDetails {
private String myCurrentSearchUuid;
private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType;
private Integer myCurrentSearchLastUploadedIndex;
private int myCurrentSearchLastUploadedIndex;

public Integer getCurrentSearchCount() {
return myCurrentSearchCount;
Expand Down Expand Up @@ -383,11 +388,11 @@ public void setCurrentSearchUuid(String theCurrentSearchUuid) {
myCurrentSearchUuid = theCurrentSearchUuid;
}

public Integer getCurrentSearchLastUploadedIndex() {
public int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex;
}

public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) {
public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
}
}
Expand Down
Expand Up @@ -71,6 +71,8 @@ public void afterUnregisterRestHookListener() {

ourSubscriptionTriggeringProvider.cancelAll();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null);

myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds());
}

@Before
Expand Down Expand Up @@ -169,6 +171,8 @@ public void testTriggerResourceToSpecificSubscription() throws Exception {

@Test
public void testTriggerUsingMultipleSearches() throws Exception {
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(13, 22, 100));

String payload = "application/fhir+json";
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
Expand Down Expand Up @@ -216,6 +220,8 @@ public void testTriggerUsingMultipleSearches() throws Exception {
responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));

// Thread.sleep(1000000000);

waitForSize(51, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);
Expand Down

0 comments on commit aa1f624

Please sign in to comment.