diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index d4274855..3d7f3993 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: jdk: [3-openjdk-16-slim, 3-jdk-14, 3-jdk-8-slim] - influxdb: [1.1, 1.6, 1.8, 2.0] + influxdb: ['1.1', '1.6', '1.8', '2.0'] steps: - name: Checkout @@ -23,6 +23,9 @@ jobs: run: '["${{ secrets.DOCKER_REGISTRY_TOKEN }}" == ""] && echo "::set-output name=is_fork_pr::true" || echo "::set-output name=is_fork_pr::false"' - name: Build project + env: + MAVEN_JAVA_VERSION: "${{ matrix.jdk }}" + INFLUXDB_VERSION: "${{ matrix.influxdb }}" run: ./compile-and-test.sh - name: codecov diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 4430072a..28c45b69 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,6 +47,7 @@ public final class BatchProcessor { private final BatchWriter batchWriter; private boolean dropActionsOnQueueExhaustion; Consumer droppedActionHandler; + Supplier randomSupplier; /** * The Builder to create a BatchProcessor instance. @@ -318,20 +320,21 @@ public static Builder builder(final InfluxDB influxDB) { } else { this.queue = new LinkedBlockingQueue<>(); } + this.randomSupplier = Math::random; Runnable flushRunnable = new Runnable() { @Override public void run() { // write doesn't throw any exceptions write(); - int jitterInterval = (int) (Math.random() * BatchProcessor.this.jitterInterval); + int jitterInterval = (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval); BatchProcessor.this.scheduler.schedule(this, BatchProcessor.this.flushInterval + jitterInterval, BatchProcessor.this.flushIntervalUnit); } }; // Flush at specified Rate this.scheduler.schedule(flushRunnable, - this.flushInterval + (int) (Math.random() * BatchProcessor.this.jitterInterval), + this.flushInterval + (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval), this.flushIntervalUnit); } diff --git a/src/test/java/org/influxdb/BatchOptionsTest.java b/src/test/java/org/influxdb/BatchOptionsTest.java index 351321b6..7378746c 100644 --- a/src/test/java/org/influxdb/BatchOptionsTest.java +++ b/src/test/java/org/influxdb/BatchOptionsTest.java @@ -7,13 +7,13 @@ import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.jetbrains.annotations.NotNull; +import org.influxdb.impl.BatchProcessor; +import org.influxdb.impl.BatchProcessorTest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -21,12 +21,15 @@ import java.io.IOException; import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; @RunWith(JUnitPlatform.class) @@ -234,15 +237,20 @@ public void testFlushDuration() throws InterruptedException { * @throws InterruptedException */ @Test - public void testJitterDuration() throws InterruptedException { + public void testJitterDuration() throws Exception { String dbName = "write_unittest_" + System.currentTimeMillis(); try { - BatchOptions options = BatchOptions.DEFAULTS.flushDuration(100).jitterDuration(500); + // prepare points before start BatchProcessor + List points = prepareSomePoints(0, 19); + BatchOptions options = BatchOptions.DEFAULTS.flushDuration(100).jitterDuration(1000); influxDB.query(new Query("CREATE DATABASE " + dbName)); influxDB.setDatabase(dbName); influxDB.enableBatch(options); - write20Points(influxDB); + BatchProcessor batchProcessor = BatchProcessorTest.getPrivateField(influxDB, "batchProcessor"); + // random always return 1.0 to be sure that first query is null + BatchProcessorTest.setPrivateField(batchProcessor, "randomSupplier", (Supplier) () -> 1.0); + points.forEach(influxDB::write); Thread.sleep(100); @@ -251,7 +259,7 @@ public void testJitterDuration() throws InterruptedException { Assertions.assertNull(result.getResults().get(0).getError()); //wait for at least one flush - Thread.sleep(1000); + Thread.sleep(1500); result = influxDB.query(new Query("select * from weather", dbName)); Assertions.assertEquals(20, result.getResults().get(0).getSeries().get(0).getValues().size()); } @@ -596,6 +604,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { verify(spy, times(2)).write(any(BatchPoints.class)); + Thread.sleep(1_500); QueryResult result = influxDB.query(new Query("select * from m0", dbName)); Assertions.assertNotNull(result.getResults().get(0).getSeries()); Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size()); @@ -668,14 +677,7 @@ void writeSomePoints(InfluxDB influxDB, String measurement, int firstIndex, int } void writeSomePoints(InfluxDB influxDB, int firstIndex, int lastIndex) { - for (int i = firstIndex; i <= lastIndex; i++) { - Point point = Point.measurement("weather") - .time(i,TimeUnit.HOURS) - .addField("temperature", (double) i) - .addField("humidity", (double) (i) * 1.1) - .addField("uv_index", "moderate").build(); - influxDB.write(point); - } + prepareSomePoints(firstIndex, lastIndex).forEach(influxDB::write); } void write20Points(InfluxDB influxDB) { @@ -686,6 +688,19 @@ void writeSomePoints(InfluxDB influxDB, int n) { writeSomePoints(influxDB, 0, n - 1); } + List prepareSomePoints(int firstIndex, int lastIndex) { + List points = new ArrayList<>(); + for (int i = firstIndex; i <= lastIndex; i++) { + Point point = Point.measurement("weather") + .time(i, TimeUnit.HOURS) + .addField("temperature", (double) i) + .addField("humidity", (double) (i) * 1.1) + .addField("uv_index", "moderate").build(); + points.add(point); + } + return points; + } + private BatchPoints createBatchPoints(String dbName, String measurement, int n) { BatchPoints batchPoints = BatchPoints.database(dbName).build(); for (int i = 1; i <= n; i++) { diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 60dd6665..38b5fcfa 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -1330,7 +1330,7 @@ public void testChunkingQueryPost() throws InterruptedException { batchPoints.point(point3); this.influxDB.write(batchPoints); - CountDownLatch countDownLatch = new CountDownLatch(3); + CountDownLatch countDownLatch = new CountDownLatch(2); Thread.sleep(2000); Query query = new Query("SELECT * FROM disk", dbName, true); diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 213f6d31..df28da32 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -226,14 +226,27 @@ public void precision() throws Exception { } } + @Test + @SuppressWarnings("unchecked") + public void randomSupplier() { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).build(); + + Double random = batchProcessor.randomSupplier.get(); + assertTrue(random >= 0); + assertTrue(random < 1); + Assertions.assertNotEquals(random, batchProcessor.randomSupplier.get()); + } + @SuppressWarnings("unchecked") - static T getPrivateField(final Object obj, final String name) throws Exception { + public static T getPrivateField(final Object obj, final String name) throws Exception { Field field = obj.getClass().getDeclaredField(name); field.setAccessible(true); return (T) field.get(obj); } - static void setPrivateField(final Object obj, final String name, final Object value) throws Exception { + public static void setPrivateField(final Object obj, final String name, final Object value) throws Exception { Field field = obj.getClass().getDeclaredField(name); field.setAccessible(true); field.set(obj, value);