Skip to content

Commit

Permalink
fix: continuous integration (#773)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Sep 15, 2021
1 parent 920aa06 commit e2ed4ea
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 20 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
matrix:
jdk: [3-openjdk-16-slim, 3-jdk-14, 3-jdk-8-slim]
influxdb: [1.1, 1.6, 1.8, 2.0]
influxdb: ['1.1', '1.6', '1.8', '2.0']

steps:
- name: Checkout
Expand All @@ -23,6 +23,9 @@ jobs:
run: '["${{ secrets.DOCKER_REGISTRY_TOKEN }}" == ""] && echo "::set-output name=is_fork_pr::true" || echo "::set-output name=is_fork_pr::false"'

- name: Build project
env:
MAVEN_JAVA_VERSION: "${{ matrix.jdk }}"
INFLUXDB_VERSION: "${{ matrix.influxdb }}"
run: ./compile-and-test.sh

- name: codecov
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -46,6 +47,7 @@ public final class BatchProcessor {
private final BatchWriter batchWriter;
private boolean dropActionsOnQueueExhaustion;
Consumer<Point> droppedActionHandler;
Supplier<Double> randomSupplier;

/**
* The Builder to create a BatchProcessor instance.
Expand Down Expand Up @@ -318,20 +320,21 @@ public static Builder builder(final InfluxDB influxDB) {
} else {
this.queue = new LinkedBlockingQueue<>();
}
this.randomSupplier = Math::random;

Runnable flushRunnable = new Runnable() {
@Override
public void run() {
// write doesn't throw any exceptions
write();
int jitterInterval = (int) (Math.random() * BatchProcessor.this.jitterInterval);
int jitterInterval = (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval);
BatchProcessor.this.scheduler.schedule(this,
BatchProcessor.this.flushInterval + jitterInterval, BatchProcessor.this.flushIntervalUnit);
}
};
// Flush at specified Rate
this.scheduler.schedule(flushRunnable,
this.flushInterval + (int) (Math.random() * BatchProcessor.this.jitterInterval),
this.flushInterval + (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval),
this.flushIntervalUnit);
}

Expand Down
43 changes: 29 additions & 14 deletions src/test/java/org/influxdb/BatchOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,29 @@
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.jetbrains.annotations.NotNull;
import org.influxdb.impl.BatchProcessor;
import org.influxdb.impl.BatchProcessorTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.mockito.Mockito.*;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;


@RunWith(JUnitPlatform.class)
Expand Down Expand Up @@ -234,15 +237,20 @@ public void testFlushDuration() throws InterruptedException {
* @throws InterruptedException
*/
@Test
public void testJitterDuration() throws InterruptedException {
public void testJitterDuration() throws Exception {

String dbName = "write_unittest_" + System.currentTimeMillis();
try {
BatchOptions options = BatchOptions.DEFAULTS.flushDuration(100).jitterDuration(500);
// prepare points before start BatchProcessor
List<Point> points = prepareSomePoints(0, 19);
BatchOptions options = BatchOptions.DEFAULTS.flushDuration(100).jitterDuration(1000);
influxDB.query(new Query("CREATE DATABASE " + dbName));
influxDB.setDatabase(dbName);
influxDB.enableBatch(options);
write20Points(influxDB);
BatchProcessor batchProcessor = BatchProcessorTest.getPrivateField(influxDB, "batchProcessor");
// random always return 1.0 to be sure that first query is null
BatchProcessorTest.setPrivateField(batchProcessor, "randomSupplier", (Supplier<Double>) () -> 1.0);
points.forEach(influxDB::write);

Thread.sleep(100);

Expand All @@ -251,7 +259,7 @@ public void testJitterDuration() throws InterruptedException {
Assertions.assertNull(result.getResults().get(0).getError());

//wait for at least one flush
Thread.sleep(1000);
Thread.sleep(1500);
result = influxDB.query(new Query("select * from weather", dbName));
Assertions.assertEquals(20, result.getResults().get(0).getSeries().get(0).getValues().size());
}
Expand Down Expand Up @@ -596,6 +604,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

verify(spy, times(2)).write(any(BatchPoints.class));

Thread.sleep(1_500);
QueryResult result = influxDB.query(new Query("select * from m0", dbName));
Assertions.assertNotNull(result.getResults().get(0).getSeries());
Assertions.assertEquals(200, result.getResults().get(0).getSeries().get(0).getValues().size());
Expand Down Expand Up @@ -668,14 +677,7 @@ void writeSomePoints(InfluxDB influxDB, String measurement, int firstIndex, int
}

void writeSomePoints(InfluxDB influxDB, int firstIndex, int lastIndex) {
for (int i = firstIndex; i <= lastIndex; i++) {
Point point = Point.measurement("weather")
.time(i,TimeUnit.HOURS)
.addField("temperature", (double) i)
.addField("humidity", (double) (i) * 1.1)
.addField("uv_index", "moderate").build();
influxDB.write(point);
}
prepareSomePoints(firstIndex, lastIndex).forEach(influxDB::write);
}

void write20Points(InfluxDB influxDB) {
Expand All @@ -686,6 +688,19 @@ void writeSomePoints(InfluxDB influxDB, int n) {
writeSomePoints(influxDB, 0, n - 1);
}

List<Point> prepareSomePoints(int firstIndex, int lastIndex) {
List<Point> points = new ArrayList<>();
for (int i = firstIndex; i <= lastIndex; i++) {
Point point = Point.measurement("weather")
.time(i, TimeUnit.HOURS)
.addField("temperature", (double) i)
.addField("humidity", (double) (i) * 1.1)
.addField("uv_index", "moderate").build();
points.add(point);
}
return points;
}

private BatchPoints createBatchPoints(String dbName, String measurement, int n) {
BatchPoints batchPoints = BatchPoints.database(dbName).build();
for (int i = 1; i <= n; i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ public void testChunkingQueryPost() throws InterruptedException {
batchPoints.point(point3);
this.influxDB.write(batchPoints);

CountDownLatch countDownLatch = new CountDownLatch(3);
CountDownLatch countDownLatch = new CountDownLatch(2);

Thread.sleep(2000);
Query query = new Query("SELECT * FROM disk", dbName, true);
Expand Down
17 changes: 15 additions & 2 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,27 @@ public void precision() throws Exception {
}
}

@Test
@SuppressWarnings("unchecked")
public void randomSupplier() {
InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);
BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE)
.interval(1, TimeUnit.NANOSECONDS).build();

Double random = batchProcessor.randomSupplier.get();
assertTrue(random >= 0);
assertTrue(random < 1);
Assertions.assertNotEquals(random, batchProcessor.randomSupplier.get());
}

@SuppressWarnings("unchecked")
static <T> T getPrivateField(final Object obj, final String name) throws Exception {
public static <T> T getPrivateField(final Object obj, final String name) throws Exception {
Field field = obj.getClass().getDeclaredField(name);
field.setAccessible(true);
return (T) field.get(obj);
}

static void setPrivateField(final Object obj, final String name, final Object value) throws Exception {
public static void setPrivateField(final Object obj, final String name, final Object value) throws Exception {
Field field = obj.getClass().getDeclaredField(name);
field.setAccessible(true);
field.set(obj, value);
Expand Down

0 comments on commit e2ed4ea

Please sign in to comment.