Skip to content

Commit

Permalink
Fix PartitionParserTests
Browse files Browse the repository at this point in the history
Some tests in PartitionParserTests were
failing intermittently due to the usage
of non-synchronized shared state between
concurrent threads.

This commit updates the test code to use
`AtomicInteger` instead of `int` for the
state shared between concurrent threads.

(cherry picked from commit 98fba4a)
  • Loading branch information
fmbenhassine committed Feb 18, 2021
1 parent af3eff3 commit b96ac3c
Showing 1 changed file with 10 additions and 9 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2013 the original author or authors.
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.batch.api.BatchProperty;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class PartitionParserTests extends AbstractJsrTestCase {

@Before
public void before() {
MyBatchlet.processed = 0;
MyBatchlet.processed = new AtomicInteger(0);
MyBatchlet.threadNames = Collections.synchronizedSet(new HashSet<>());
MyBatchlet.artifactNames = Collections.synchronizedSet(new HashSet<>());
PartitionCollector.artifactNames = Collections.synchronizedSet(new HashSet<>());
Expand All @@ -64,7 +65,7 @@ public void testBatchletNoProperties() throws Exception {
BatchStatus curBatchStatus = runJob("partitionParserTestsBatchlet", new Properties(), TIMEOUT).getBatchStatus();

assertEquals(BatchStatus.COMPLETED, curBatchStatus);
assertEquals(10, MyBatchlet.processed);
assertEquals(10, MyBatchlet.processed.get());
assertEquals(10, MyBatchlet.threadNames.size());
}

Expand All @@ -88,7 +89,7 @@ public void testFullPartitionConfiguration() throws Exception {
assertTrue(execution.getExitStatus().endsWith("BPSC_APSC"));
assertEquals(3, countMatches(execution.getExitStatus(), caPattern));
assertEquals(3, countMatches(execution.getExitStatus(), asPattern));
assertEquals(3, MyBatchlet.processed);
assertEquals(3, MyBatchlet.processed.get());
assertEquals(3, MyBatchlet.threadNames.size());
}

Expand All @@ -101,7 +102,7 @@ public void testFullPartitionConfigurationWithProperties() throws Exception {
assertTrue(execution.getExitStatus().endsWith("BPSC_APSC"));
assertEquals(3, countMatches(execution.getExitStatus(), caPattern));
assertEquals(3, countMatches(execution.getExitStatus(), asPattern));
assertEquals(3, MyBatchlet.processed);
assertEquals(3, MyBatchlet.processed.get());
assertEquals(3, MyBatchlet.threadNames.size());
assertEquals(MyBatchlet.artifactNames.iterator().next(), "batchlet");
assertEquals(PartitionMapper.name, "mapper");
Expand All @@ -120,7 +121,7 @@ public void testFullPartitionConfigurationWithMapperSuppliedProperties() throws
assertTrue(execution.getExitStatus().endsWith("BPSC_APSC"));
assertEquals(3, countMatches(execution.getExitStatus(), caPattern));
assertEquals(3, countMatches(execution.getExitStatus(), asPattern));
assertEquals(3, MyBatchlet.processed);
assertEquals(3, MyBatchlet.processed.get());
assertEquals(3, MyBatchlet.threadNames.size());

assertEquals(MyBatchlet.artifactNames.size(), 3);
Expand All @@ -145,7 +146,7 @@ public void testFullPartitionConfigurationWithHardcodedProperties() throws Excep
assertTrue(execution.getExitStatus().endsWith("BPSC_APSC"));
assertEquals(3, countMatches(execution.getExitStatus(), caPattern));
assertEquals(3, countMatches(execution.getExitStatus(), asPattern));
assertEquals(3, MyBatchlet.processed);
assertEquals(3, MyBatchlet.processed.get());
assertEquals(3, MyBatchlet.threadNames.size());

assertEquals(MyBatchlet.artifactNames.size(), 3);
Expand Down Expand Up @@ -292,7 +293,7 @@ public PartitionPlan mapPartitions() throws Exception {

public static class MyBatchlet implements Batchlet {

protected static int processed = 0;
protected static AtomicInteger processed = new AtomicInteger(0);;
protected static Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
protected static Set<String> artifactNames = Collections.synchronizedSet(new HashSet<>());

Expand All @@ -310,7 +311,7 @@ public static class MyBatchlet implements Batchlet {
public String process() throws Exception {
artifactNames.add(artifactName);
threadNames.add(Thread.currentThread().getName());
processed++;
processed.incrementAndGet();

stepContext.setExitStatus("bad step exit status");
jobContext.setExitStatus("bad job exit status");
Expand Down

0 comments on commit b96ac3c

Please sign in to comment.