Skip to content

Commit

Permalink
Merge pull request #706 from lantanagroup/LNK-1851-use-common-pool-re…
Browse files Browse the repository at this point in the history
…ports

Lnk 1851 use common pool reports
  • Loading branch information
edward-miller-lcg committed Feb 27, 2024
2 parents c686349 + a6a390b commit 26f6602
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 42 deletions.
11 changes: 2 additions & 9 deletions api/src/main/java/com/lantanagroup/link/api/ReportGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,11 @@ public void generate(QueryPhase queryPhase) throws ExecutionException, Interrupt
MeasureServiceWrapper measureServiceWrapper = new MeasureServiceWrapper(measureContext.getReportDefBundle(), config.getTerminologyService());
measureServiceWrapper.preCompile();
logger.info("Patient list is : " + measureContext.getPatientsOfInterest(queryPhase).size());
ForkJoinPool forkJoinPool = config.getMeasureEvaluationThreads() != null
? new ForkJoinPool(config.getMeasureEvaluationThreads())
: ForkJoinPool.commonPool();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
AtomicInteger progress = new AtomicInteger(0);
List<PatientOfInterestModel> pois = measureContext.getPatientsOfInterest(queryPhase);

try {

forkJoinPool.submit(() -> pois.parallelStream().forEach(patient -> {
if (StringUtils.isEmpty(patient.getId())) {
logger.error("Patient {} has no ID; cannot generate measure report", patient);
Expand All @@ -87,11 +85,6 @@ public void generate(QueryPhase queryPhase) throws ExecutionException, Interrupt
}
}))
.get();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}

private MeasureReport generate(MeasureServiceWrapper measureServiceWrapper, PatientOfInterestModel patient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,6 @@ public class ApiConfig {
@NotNull
private String debugPath;

/**
* <strong>api.measure-evaluation-threads</strong><br>The number of threads to use for patient measure report generation.
*/
private Integer measureEvaluationThreads;

/**
* <strong>api.skip-query</strong><br>Whether to skip the query phase of report generation; useful if patient data bundles have already been stored.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ public class FhirQuery {
*/
private String authClass;

/**
* The number of patients to query for in parallel using separate threads. Should not be greater than the number of
* cores available to the installation.
*/
private int parallelPatients = 10;

/**
* Configuration used by BasicAuth implementation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static ca.uhn.fhir.rest.api.Constants.HEADER_REQUEST_ID;
Expand Down Expand Up @@ -104,9 +105,9 @@ private PatientData loadSupplementalPatientData(ReportCriteria criteria, ReportC
public void loadInitialPatientData(ReportCriteria criteria, ReportContext context, List<PatientOfInterestModel> patientsOfInterest) {
// first get the patients and store them in the patientMap
Map<String, Patient> patientMap = new ConcurrentHashMap<>();
int threshold = this.tenantService.getConfig().getFhirQuery().getParallelPatients();
ForkJoinPool patientDataFork = new ForkJoinPool(threshold);
ForkJoinPool patientFork = new ForkJoinPool(threshold);
ForkJoinPool patientDataFork = ForkJoinPool.commonPool();
ForkJoinPool patientFork = ForkJoinPool.commonPool();
AtomicInteger progress = new AtomicInteger(0);

try {
patientFork.submit(() -> patientsOfInterest.parallelStream().map(poi -> {
Expand All @@ -117,7 +118,6 @@ public void loadInitialPatientData(ReportCriteria criteria, ReportContext contex
UUID queryId = UUID.randomUUID();
if (poi.getReference() != null) {
String id = poi.getReference();

if (id.indexOf("/") > 0) {
id = id.substring(id.indexOf("/") + 1);
}
Expand Down Expand Up @@ -162,23 +162,21 @@ public void loadInitialPatientData(ReportCriteria criteria, ReportContext contex
}
} catch (Exception e) {
logger.error("Unable to retrieve patient with identifier " + Helper.sanitizeString(poi.toString()), e);
} finally {
int completed = progress.incrementAndGet();
double percent = Math.round((completed * 100.0) / patientsOfInterest.size());
logger.info("Progress ({}%) for Initial Patient Data {} is {} of {}", String.format("%.2f", percent), context.getMasterIdentifierValue(), completed, patientsOfInterest.size());
}

return null;
}).collect(Collectors.toList())).get();
} catch (Exception e) {
logger.error("Error retrieving Patient resources: {}", e.getMessage(), e);
return;
} finally {
if (patientFork != null) {
patientFork.shutdown();
}
}

try {
// loop through the patient ids to retrieve the patientData using each patient.
List<Patient> patients = new ArrayList<>(patientMap.values());
logger.info(String.format("Throttling patient query load to " + threshold + " at a time"));

patientDataFork.submit(() -> patients.parallelStream().map(patient -> {
logger.debug(String.format("Beginning to load data for patient with logical ID %s", patient.getIdElement().getIdPart()));
Expand All @@ -198,19 +196,14 @@ public void loadInitialPatientData(ReportCriteria criteria, ReportContext contex
}).collect(Collectors.toList())).get();
} catch (Exception e) {
logger.error("Error scooping data for patients {}", e.getMessage(), e);
} finally {
if (patientDataFork != null) {
patientDataFork.shutdown();
}
}
}

public void loadSupplementalPatientData(ReportCriteria criteria, ReportContext context, List<PatientOfInterestModel> patientsOfInterest) {
int threshold = this.tenantService.getConfig().getFhirQuery().getParallelPatients();
ForkJoinPool patientDataFork = new ForkJoinPool(threshold);
try {
logger.info(String.format("Throttling patient query load to " + threshold + " at a time"));
ForkJoinPool patientDataFork = ForkJoinPool.commonPool();
AtomicInteger progress = new AtomicInteger(0);

try {
patientDataFork.submit(() -> patientsOfInterest.parallelStream().map(poi -> {
logger.debug(String.format("Continuing to load data for patient with logical ID %s", poi.getId()));

Expand All @@ -222,6 +215,10 @@ public void loadSupplementalPatientData(ReportCriteria criteria, ReportContext c
} catch (Exception ex) {
logger.error("Error loading patient data for patient {}: {}", poi.getId(), ex.getMessage(), ex);
return null;
} finally {
int completed = progress.incrementAndGet();
double percent = Math.round((completed * 100.0) / patientsOfInterest.size());
logger.info("Progress ({}%) for Supplemental Patient Data {} is {} of {}", String.format("%.2f", percent), context.getMasterIdentifierValue(), completed, patientsOfInterest.size());
}

this.storePatientData(criteria, context, poi.getId(), patientData.getBundle());
Expand All @@ -230,10 +227,6 @@ public void loadSupplementalPatientData(ReportCriteria criteria, ReportContext c
}).collect(Collectors.toList())).get();
} catch (Exception e) {
logger.error("Error scooping data for patients {}", e.getMessage(), e);
} finally {
if (patientDataFork != null) {
patientDataFork.shutdown();
}
}
}

Expand Down

0 comments on commit 26f6602

Please sign in to comment.