diff --git a/api/src/main/java/com/lantanagroup/link/api/ReportGenerator.java b/api/src/main/java/com/lantanagroup/link/api/ReportGenerator.java index 05915e2f9..7bc375f8b 100644 --- a/api/src/main/java/com/lantanagroup/link/api/ReportGenerator.java +++ b/api/src/main/java/com/lantanagroup/link/api/ReportGenerator.java @@ -61,13 +61,11 @@ public void generate(QueryPhase queryPhase) throws ExecutionException, Interrupt MeasureServiceWrapper measureServiceWrapper = new MeasureServiceWrapper(measureContext.getReportDefBundle(), config.getTerminologyService()); measureServiceWrapper.preCompile(); logger.info("Patient list is : " + measureContext.getPatientsOfInterest(queryPhase).size()); - ForkJoinPool forkJoinPool = config.getMeasureEvaluationThreads() != null - ? new ForkJoinPool(config.getMeasureEvaluationThreads()) - : ForkJoinPool.commonPool(); + ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); AtomicInteger progress = new AtomicInteger(0); List pois = measureContext.getPatientsOfInterest(queryPhase); - try { + forkJoinPool.submit(() -> pois.parallelStream().forEach(patient -> { if (StringUtils.isEmpty(patient.getId())) { logger.error("Patient {} has no ID; cannot generate measure report", patient); @@ -87,11 +85,6 @@ public void generate(QueryPhase queryPhase) throws ExecutionException, Interrupt } })) .get(); - } finally { - if (forkJoinPool != null) { - forkJoinPool.shutdown(); - } - } } private MeasureReport generate(MeasureServiceWrapper measureServiceWrapper, PatientOfInterestModel patient) { diff --git a/core/src/main/java/com/lantanagroup/link/config/api/ApiConfig.java b/core/src/main/java/com/lantanagroup/link/config/api/ApiConfig.java index 27aab7eb2..92b3e535c 100644 --- a/core/src/main/java/com/lantanagroup/link/config/api/ApiConfig.java +++ b/core/src/main/java/com/lantanagroup/link/config/api/ApiConfig.java @@ -148,11 +148,6 @@ public class ApiConfig { @NotNull private String debugPath; - /** - * api.measure-evaluation-threads
The number of threads to use for patient measure report generation. - */ - private Integer measureEvaluationThreads; - /** * api.skip-query
Whether to skip the query phase of report generation; useful if patient data bundles have already been stored. */ diff --git a/core/src/main/java/com/lantanagroup/link/db/model/tenant/FhirQuery.java b/core/src/main/java/com/lantanagroup/link/db/model/tenant/FhirQuery.java index 6c45c8711..bcfebd654 100644 --- a/core/src/main/java/com/lantanagroup/link/db/model/tenant/FhirQuery.java +++ b/core/src/main/java/com/lantanagroup/link/db/model/tenant/FhirQuery.java @@ -19,12 +19,6 @@ public class FhirQuery { */ private String authClass; - /** - * The number of patients to query for in parallel using separate threads. Should not be greater than the number of - * cores available to the installation. - */ - private int parallelPatients = 10; - /** * Configuration used by BasicAuth implementation */ diff --git a/query/src/main/java/com/lantanagroup/link/query/uscore/PatientScoop.java b/query/src/main/java/com/lantanagroup/link/query/uscore/PatientScoop.java index 199cac83d..174bee1b8 100644 --- a/query/src/main/java/com/lantanagroup/link/query/uscore/PatientScoop.java +++ b/query/src/main/java/com/lantanagroup/link/query/uscore/PatientScoop.java @@ -24,6 +24,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static ca.uhn.fhir.rest.api.Constants.HEADER_REQUEST_ID; @@ -104,9 +105,9 @@ private PatientData loadSupplementalPatientData(ReportCriteria criteria, ReportC public void loadInitialPatientData(ReportCriteria criteria, ReportContext context, List patientsOfInterest) { // first get the patients and store them in the patientMap Map patientMap = new ConcurrentHashMap<>(); - int threshold = this.tenantService.getConfig().getFhirQuery().getParallelPatients(); - ForkJoinPool patientDataFork = new ForkJoinPool(threshold); - ForkJoinPool patientFork = new ForkJoinPool(threshold); + ForkJoinPool patientDataFork = ForkJoinPool.commonPool(); + ForkJoinPool patientFork = ForkJoinPool.commonPool(); + AtomicInteger progress = new AtomicInteger(0); try { patientFork.submit(() -> patientsOfInterest.parallelStream().map(poi -> { @@ -117,7 +118,6 @@ public void loadInitialPatientData(ReportCriteria criteria, ReportContext contex UUID queryId = UUID.randomUUID(); if (poi.getReference() != null) { String id = poi.getReference(); - if (id.indexOf("/") > 0) { id = id.substring(id.indexOf("/") + 1); } @@ -162,23 +162,21 @@ public void loadInitialPatientData(ReportCriteria criteria, ReportContext contex } } catch (Exception e) { logger.error("Unable to retrieve patient with identifier " + Helper.sanitizeString(poi.toString()), e); + } finally { + int completed = progress.incrementAndGet(); + double percent = Math.round((completed * 100.0) / patientsOfInterest.size()); + logger.info("Progress ({}%) for Initial Patient Data {} is {} of {}", String.format("%.2f", percent), context.getMasterIdentifierValue(), completed, patientsOfInterest.size()); } - return null; }).collect(Collectors.toList())).get(); } catch (Exception e) { logger.error("Error retrieving Patient resources: {}", e.getMessage(), e); return; - } finally { - if (patientFork != null) { - patientFork.shutdown(); - } } try { // loop through the patient ids to retrieve the patientData using each patient. List patients = new ArrayList<>(patientMap.values()); - logger.info(String.format("Throttling patient query load to " + threshold + " at a time")); patientDataFork.submit(() -> patients.parallelStream().map(patient -> { logger.debug(String.format("Beginning to load data for patient with logical ID %s", patient.getIdElement().getIdPart())); @@ -198,19 +196,14 @@ public void loadInitialPatientData(ReportCriteria criteria, ReportContext contex }).collect(Collectors.toList())).get(); } catch (Exception e) { logger.error("Error scooping data for patients {}", e.getMessage(), e); - } finally { - if (patientDataFork != null) { - patientDataFork.shutdown(); - } } } public void loadSupplementalPatientData(ReportCriteria criteria, ReportContext context, List patientsOfInterest) { - int threshold = this.tenantService.getConfig().getFhirQuery().getParallelPatients(); - ForkJoinPool patientDataFork = new ForkJoinPool(threshold); - try { - logger.info(String.format("Throttling patient query load to " + threshold + " at a time")); + ForkJoinPool patientDataFork = ForkJoinPool.commonPool(); + AtomicInteger progress = new AtomicInteger(0); + try { patientDataFork.submit(() -> patientsOfInterest.parallelStream().map(poi -> { logger.debug(String.format("Continuing to load data for patient with logical ID %s", poi.getId())); @@ -222,6 +215,10 @@ public void loadSupplementalPatientData(ReportCriteria criteria, ReportContext c } catch (Exception ex) { logger.error("Error loading patient data for patient {}: {}", poi.getId(), ex.getMessage(), ex); return null; + } finally { + int completed = progress.incrementAndGet(); + double percent = Math.round((completed * 100.0) / patientsOfInterest.size()); + logger.info("Progress ({}%) for Supplemental Patient Data {} is {} of {}", String.format("%.2f", percent), context.getMasterIdentifierValue(), completed, patientsOfInterest.size()); } this.storePatientData(criteria, context, poi.getId(), patientData.getBundle()); @@ -230,10 +227,6 @@ public void loadSupplementalPatientData(ReportCriteria criteria, ReportContext c }).collect(Collectors.toList())).get(); } catch (Exception e) { logger.error("Error scooping data for patients {}", e.getMessage(), e); - } finally { - if (patientDataFork != null) { - patientDataFork.shutdown(); - } } }