-
Notifications
You must be signed in to change notification settings - Fork 7
/
ResourcePipeline.java
92 lines (78 loc) · 3.31 KB
/
ResourcePipeline.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package org.miracum.etl.fhirgateway.processors;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Observation;
import org.miracum.etl.fhirgateway.stores.FhirResourceRepository;
import org.miracum.etl.fhirgateway.stores.FhirServerResourceRepository;
import org.miracum.etl.fhirgateway.stores.PostgresFhirResourceRepository;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ResourcePipeline {
private static final Timer PIPELINE_DURATION_TIMER =
Timer.builder("fhirgateway.pipeline.duration")
.description("Total resource pipeline processing duration.")
.minimumExpectedValue(Duration.ofMillis(50))
.maximumExpectedValue(Duration.ofSeconds(5))
.publishPercentileHistogram()
.register(Metrics.globalRegistry);
private final FhirResourceRepository psqlStore;
private final FhirResourceRepository fhirStore;
private final LoincHarmonizer loincHarmonizer;
private final IPseudonymizer pseudonymizer;
private final boolean isLoincHarmonizationEnabled;
private final boolean isFhirServerEnabled;
private final boolean isPsqlEnabled;
public ResourcePipeline(
PostgresFhirResourceRepository psqlStore,
FhirServerResourceRepository fhirStore,
LoincHarmonizer loincHarmonizer,
IPseudonymizer pseudonymizer,
@Value("${services.loinc.conversions.enabled}") boolean isLoincHarmonizationEnabled,
@Value("${services.fhirServer.enabled}") boolean isFhirServerEnabled,
@Value("${services.psql.enabled}") boolean isPsqlEnabled) {
this.psqlStore = psqlStore;
this.fhirStore = fhirStore;
this.loincHarmonizer = loincHarmonizer;
this.pseudonymizer = pseudonymizer;
this.isLoincHarmonizationEnabled = isLoincHarmonizationEnabled;
this.isFhirServerEnabled = isFhirServerEnabled;
this.isPsqlEnabled = isPsqlEnabled;
}
private void saveToStores(Bundle bundle) {
if (isFhirServerEnabled) {
this.fhirStore.save(bundle);
}
if (isPsqlEnabled) {
this.psqlStore.save(bundle);
}
}
public Bundle process(Bundle bundle) {
MDC.put("bundleId", bundle.getId());
return PIPELINE_DURATION_TIMER.record(
() -> {
// pseudonymization should be the first task to ensure all other processors only
// ever work with de-identified data.
var pseudonymized = pseudonymizer.process(bundle);
// this logic may be refactored and cleaned up by creating a genuine pipeline class with
// optionally
// added stages. A base for this would be an abstract ResourceProcessor
if (this.isLoincHarmonizationEnabled) {
for (var entry : pseudonymized.getEntry()) {
var resource = entry.getResource();
if (resource instanceof Observation) {
try (var ignored = MDC.putCloseable("resourceId", resource.getId())) {
var obs = loincHarmonizer.process((Observation) resource);
entry.setResource(obs);
}
}
}
}
saveToStores(pseudonymized);
return pseudonymized;
});
}
}