Skip to content

Commit

Permalink
[BEAM-9779] Patch HL7v2IOWriteIT Flakiness (apache#11450)
Browse files Browse the repository at this point in the history
* Patches for HL7v2IO

* Use TestPipeline in ITs
* Drop schematized data before calling message ingest (should be output only) to help pipelines that read/write from/to two HL7v2 stores
* Make HL7v2MessageCoder constructor public

* block on run

* add sleep to avoid flakiness due to asyncronous HL7v2 indexing

* E2E integration test

* fix merge issue

* reconcile double sleeping

* improve error hanlding

* improve error handling

* fix docs typo

* add latency distribution metrics

* remove unused imports

* ingest only data and labels

* fix comment

* call spliterator directly, use page size 1000

* output elements more eagerly in ListHL72MessageFn

* eagerly emit data from early pages

* Optimization of Listing and Stablization of ITs

* allow HL7v2 Message listing to emit early panes rather than waiting on pagination of all list results
* add EBO on HL7v2 Message listing reaching a certain expected length in ITs to account for async indexing BEAM-9779

* revert unrelated changes

* add back test

* Add constant for HL7v2 indexing timeout minutes

* Add constant for HL7v2 indexing timeout minutes

* fix checkstyle
  • Loading branch information
Jacob Ferriero authored and yirutang committed Jul 23, 2020
1 parent 1a0b551 commit 96a6614
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 84 deletions.
Expand Up @@ -27,10 +27,8 @@

import java.io.IOException;
import java.security.SecureRandom;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.ListHL7v2MessageIDs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
Expand All @@ -39,6 +37,7 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -54,6 +53,7 @@ public class HL7v2IOReadIT {
+ "_"
+ (new SecureRandom().nextInt(32))
+ "_read_it";
@Rule public transient TestPipeline pipeline = TestPipeline.create();

@BeforeClass
public static void createHL7v2tore() throws IOException {
Expand Down Expand Up @@ -86,36 +86,6 @@ public void tearDown() throws Exception {
deleteAllHL7v2Messages(this.client, healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME);
}

@Test
public void testHL7v2IORead() throws Exception {
// Should read all messages.
Pipeline pipeline = Pipeline.create();
HL7v2IO.Read.Result result =
pipeline
.apply(
new ListHL7v2MessageIDs(
Collections.singletonList(
healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME)))
.apply(HL7v2IO.getAll());
PCollection<Long> numReadMessages =
result.getMessages().setCoder(new HL7v2MessageCoder()).apply(Count.globally());
PAssert.thatSingleton(numReadMessages).isEqualTo((long) MESSAGES.size());
PAssert.that(result.getFailedReads()).empty();

PAssert.that(result.getMessages())
.satisfies(
input -> {
for (HL7v2Message elem : input) {
assertFalse(elem.getName().isEmpty());
assertFalse(elem.getData().isEmpty());
assertFalse(elem.getMessageType().isEmpty());
}
return null;
});

pipeline.run();
}

@Test
public void testHL7v2IO_ListHL7v2Messages() throws Exception {
// Should read all messages.
Expand Down Expand Up @@ -164,33 +134,4 @@ public void testHL7v2IO_ListHL7v2Messages_filtered() throws Exception {

pipeline.run();
}

@Test
public void testHL7v2IORead_filtered() throws Exception {
final String adtFilter = "messageType = \"ADT\"";
// Should read only messages matching the filter.
Pipeline pipeline = Pipeline.create();
HL7v2IO.Read.Result result =
pipeline
.apply(
new ListHL7v2MessageIDs(
Collections.singletonList(
healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME),
adtFilter))
.apply(HL7v2IO.getAll());
PCollection<Long> numReadMessages =
result.getMessages().setCoder(new HL7v2MessageCoder()).apply(Count.globally());
PAssert.thatSingleton(numReadMessages).isEqualTo(NUM_ADT);
PAssert.that(result.getFailedReads()).empty();

PAssert.that(result.getMessages())
.satisfies(
input -> {
for (HL7v2Message elem : input) {
assertEquals("ADT", elem.getMessageType());
}
return null;
});
pipeline.run();
}
}
Expand Up @@ -18,19 +18,25 @@
package org.apache.beam.sdk.io.gcp.healthcare;

import com.google.api.client.util.Base64;
import com.google.api.client.util.Sleeper;
import com.google.api.services.healthcare.v1beta1.model.Message;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HL7v2MessagePages;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

class HL7v2IOTestUtil {
public static final long HL7V2_INDEXING_TIMEOUT_MINUTES = 10L;
/** Google Cloud Healthcare Dataset in Apache Beam integration test project. */
public static final String HEALTHCARE_DATASET_TEMPLATE =
"projects/%s/locations/us-central1/datasets/apache-beam-integration-testing";
Expand Down Expand Up @@ -81,20 +87,58 @@ class HL7v2IOTestUtil {
/** Clear all messages from the HL7v2 store. */
static void deleteAllHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
throws IOException {
for (String msgId :
client
.getHL7v2MessageStream(hl7v2Store)
.map(HL7v2Message::getName)
.collect(Collectors.toList())) {
client.deleteHL7v2Message(msgId);
for (List<HL7v2Message> page : new HL7v2MessagePages(client, hl7v2Store)) {
for (String msgId : page.stream().map(HL7v2Message::getName).collect(Collectors.toList())) {
client.deleteHL7v2Message(msgId);
}
}
}

/** Utiliy for waiting on HL7v2 Store indexing to be complete see BEAM-9779. */
public static void waitForHL7v2Indexing(
HealthcareApiClient client, String hl7v2Store, long expectedNumMessages, Duration timeout)
throws InterruptedException, TimeoutException {

Instant start = Instant.now();
long sleepMs = 50;
long numListedMessages = 0;
while (new Duration(start, Instant.now()).isShorterThan(timeout)) {
numListedMessages = 0;
// count messages in HL7v2 Store.
for (List<HL7v2Message> page :
new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store)) {
numListedMessages += page.size();
}
if (numListedMessages == expectedNumMessages) {
return;
}
// exponential backoff.
sleepMs *= 2;
// exit if next sleep will violate timeout
if (new Duration(start, Instant.now()).plus(sleepMs).isShorterThan(timeout)) {
Sleeper.DEFAULT.sleep(sleepMs);
} else {
throw new TimeoutException(
String.format(
"Timed out waiting for %s to reach %s messages. last list request returned %s messages.",
hl7v2Store, expectedNumMessages, numListedMessages));
}
}
}

/** Populate the test messages into the HL7v2 store. */
static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store) throws IOException {
static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
throws IOException, InterruptedException, TimeoutException {
for (HL7v2Message msg : MESSAGES) {
client.createHL7v2Message(hl7v2Store, msg.toModel());
}
// [BEAM-9779] HL7v2 indexing is asyncronous. Block until indexing completes to stabilize this
// IT.
HL7v2IOTestUtil.waitForHL7v2Indexing(
client,
hl7v2Store,
MESSAGES.size(),
Duration.standardMinutes(HL7V2_INDEXING_TIMEOUT_MINUTES));
}

/**
Expand Down Expand Up @@ -170,10 +214,11 @@ public void initClient() throws IOException {
public void listMessages(ProcessContext context) throws IOException {
String hl7v2Store = context.element();
// Output all elements of all pages.
this.client
.getHL7v2MessageStream(hl7v2Store, this.filter)
.map(HL7v2Message::getName)
.forEach(context::output);
HttpHealthcareApiClient.HL7v2MessagePages pages =
new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
for (List<HL7v2Message> page : pages) {
page.stream().map(HL7v2Message::getName).forEach(context::output);
}
}
}

Expand Down
Expand Up @@ -18,22 +18,25 @@
package org.apache.beam.sdk.io.gcp.healthcare;

import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HL7V2_INDEXING_TIMEOUT_MINUTES;
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.MESSAGES;
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.deleteAllHL7v2Messages;
import static org.junit.Assert.assertEquals;

import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
import java.io.IOException;
import java.security.SecureRandom;
import org.apache.beam.sdk.Pipeline;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -46,12 +49,15 @@ public class HL7v2IOWriteIT {
private static final String HL7V2_STORE_NAME =
"hl7v2_store_write_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));

@Rule public transient TestPipeline pipeline = TestPipeline.create();

@BeforeClass
public static void createHL7v2tore() throws IOException {
String project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
HealthcareApiClient client = new HttpHealthcareApiClient();
client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
Hl7V2Store store = client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
store.getParserConfig();
}

@AfterClass
Expand All @@ -65,7 +71,6 @@ public void setup() throws Exception {
if (client == null) {
client = new HttpHealthcareApiClient();
}
PipelineOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
}

@After
Expand All @@ -74,8 +79,7 @@ public void tearDown() throws Exception {
}

@Test
public void testHL7v2IOWrite() throws IOException {
Pipeline pipeline = Pipeline.create();
public void testHL7v2IOWrite() throws Exception {
HL7v2IO.Write.Result result =
pipeline
.apply(Create.of(MESSAGES).withCoder(new HL7v2MessageCoder()))
Expand All @@ -84,10 +88,15 @@ public void testHL7v2IOWrite() throws IOException {
PAssert.that(result.getFailedInsertsWithErr()).empty();

pipeline.run().waitUntilFinish();
long numWrittenMessages =
client
.getHL7v2MessageStream(healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME)
.count();
assertEquals(MESSAGES.size(), numWrittenMessages);

try {
HL7v2IOTestUtil.waitForHL7v2Indexing(
client,
healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME,
MESSAGES.size(),
Duration.standardMinutes(HL7V2_INDEXING_TIMEOUT_MINUTES));
} catch (TimeoutException e) {
Assert.fail(e.getMessage());
}
}
}

0 comments on commit 96a6614

Please sign in to comment.