Skip to content

Commit

Permalink
#35: Move configuration to InfluxDBOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Jul 1, 2019
1 parent 7f1cf5e commit 43ac89b
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public WriteReactiveApi getWriteReactiveApi(@Nonnull final WriteOptions writeOpt

Arguments.checkNotNull(writeOptions, "WriteOptions");

return new WriteReactiveApiImpl(writeOptions, retrofit.create(WriteService.class));
return new WriteReactiveApiImpl(writeOptions, retrofit.create(WriteService.class), options);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nonnull;

import org.influxdata.Arguments;
import org.influxdata.client.InfluxDBClientOptions;
import org.influxdata.client.WriteOptions;
import org.influxdata.client.domain.WritePrecision;
import org.influxdata.client.internal.AbstractWriteClient;
Expand All @@ -44,9 +45,10 @@
public class WriteReactiveApiImpl extends AbstractWriteClient implements WriteReactiveApi {

WriteReactiveApiImpl(@Nonnull final WriteOptions writeOptions,
@Nonnull final WriteService service) {
@Nonnull final WriteService service,
@Nonnull final InfluxDBClientOptions options) {

super(writeOptions, writeOptions.getWriteScheduler(), service);
super(writeOptions, options, writeOptions.getWriteScheduler(), service);
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,12 @@ influx2.tags.sensor-version = ${version}
##### Via API

```java
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(10_000)
.flushInterval(500)
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url(url)
.authenticateToken(token)
.addDefaultTag("id", "132-987-655")
.addDefaultTag("customer", "California Miner")
.addDefaultTag("hostname", "${env.hostname}")
.addDefaultTag("hostnamer", "${env.hostname}")
.addDefaultTag("sensor-version", "${version}")
.build();
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.influxdata.Arguments;
import org.influxdata.LogLevel;
import org.influxdata.client.write.PointSettings;
import org.influxdata.exceptions.InfluxException;

import okhttp3.HttpUrl;
Expand All @@ -57,6 +58,7 @@ public final class InfluxDBClientOptions {

private String org;
private String bucket;
private final PointSettings pointSettings;

private InfluxDBClientOptions(@Nonnull final InfluxDBClientOptions.Builder builder) {

Expand All @@ -72,6 +74,7 @@ private InfluxDBClientOptions(@Nonnull final InfluxDBClientOptions.Builder build

this.org = builder.org;
this.bucket = builder.bucket;
this.pointSettings = builder.pointSettings;
}

/**
Expand Down Expand Up @@ -172,6 +175,17 @@ public String getBucket() {
return bucket;
}

/**
* Default tags that will be use for writes by Point and POJO.
*
* @return default tags
* @see InfluxDBClientOptions.Builder#addDefaultTag(String, String)
*/
@Nonnull
public PointSettings getPointSettings() {
return pointSettings;
}

/**
* Creates a builder instance.
*
Expand Down Expand Up @@ -201,6 +215,8 @@ public static class Builder {
private String org;
private String bucket;

private PointSettings pointSettings = new PointSettings();

/**
* Set the url to connect to InfluxDB.
*
Expand Down Expand Up @@ -314,6 +330,31 @@ public InfluxDBClientOptions.Builder bucket(@Nullable final String bucket) {
return this;
}

/**
* Add default tag that will be use for writes by Point and POJO.
* <p>
* The expressions can be:
* <ul>
* <li>"California Miner" - static value</li>
* <li>"${version}" - system property</li>
* <li>"${env.hostname}" - environment property</li>
* </ul>
*
* @param key the tag name
* @param expression the tag value expression
* @return this
*/
@Nonnull
public InfluxDBClientOptions.Builder addDefaultTag(@Nonnull final String key,
@Nullable final String expression) {

Arguments.checkNotNull(key, "tagName");

pointSettings.addDefaultTag(key, expression);

return this;
}

/**
* Configure Builder via connection string.
*
Expand Down
41 changes: 0 additions & 41 deletions client/src/main/java/org/influxdata/client/WriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
package org.influxdata.client;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;

import org.influxdata.Arguments;
import org.influxdata.client.write.PointSettings;

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Scheduler;
Expand Down Expand Up @@ -72,7 +70,6 @@ public final class WriteOptions {
private final int bufferLimit;
private final Scheduler writeScheduler;
private final BackpressureOverflowStrategy backpressureStrategy;
private final PointSettings pointSettings;

/**
* @return the number of data point to collect in batch
Expand Down Expand Up @@ -137,17 +134,6 @@ public BackpressureOverflowStrategy getBackpressureStrategy() {
return backpressureStrategy;
}

/**
* Default tags that will be use for writes by Point and POJO.
*
* @return default tags
* @see WriteOptions.Builder#addDefaultTag(String, String)
*/
@Nonnull
public PointSettings getPointSettings() {
return pointSettings;
}

private WriteOptions(@Nonnull final Builder builder) {

Arguments.checkNotNull(builder, "WriteOptions.Builder");
Expand All @@ -159,7 +145,6 @@ private WriteOptions(@Nonnull final Builder builder) {
bufferLimit = builder.bufferLimit;
writeScheduler = builder.writeScheduler;
backpressureStrategy = builder.backpressureStrategy;
pointSettings = builder.pointSettings;
}

/**
Expand All @@ -185,7 +170,6 @@ public static class Builder {
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
private Scheduler writeScheduler = Schedulers.newThread();
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
private PointSettings pointSettings = new PointSettings();

/**
* Set the number of data point to collect in batch.
Expand Down Expand Up @@ -290,31 +274,6 @@ public Builder backpressureStrategy(@Nonnull final BackpressureOverflowStrategy
return this;
}

/**
* Add default tag that will be use for writes by Point and POJO.
* <p>
* The expressions can be:
* <ul>
* <li>"California Miner" - static value</li>
* <li>"${version}" - system property</li>
* <li>"${env.hostname}" - environment property</li>
* </ul>
*
* @param key the tag name
* @param expression the tag value expression
* @return this
*/
@Nonnull
public Builder addDefaultTag(@Nonnull final String key,
@Nullable final String expression) {

Arguments.checkNotNull(key, "tagName");

pointSettings.addDefaultTag(key, expression);

return this;
}

/**
* Build an instance of WriteOptions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nullable;

import org.influxdata.Arguments;
import org.influxdata.client.InfluxDBClientOptions;
import org.influxdata.client.WriteOptions;
import org.influxdata.client.domain.WritePrecision;
import org.influxdata.client.service.WriteService;
Expand Down Expand Up @@ -65,6 +66,7 @@ public abstract class AbstractWriteClient extends AbstractRestClient {
private static final List<Integer> ABLE_TO_RETRY_ERRORS = Arrays.asList(429, 503);

private final WriteOptions writeOptions;
protected final InfluxDBClientOptions options;

private final PublishProcessor<AbstractWriteClient.BatchWriteItem> processor;
private final PublishProcessor<Flowable<BatchWriteItem>> flushPublisher;
Expand All @@ -74,10 +76,14 @@ public abstract class AbstractWriteClient extends AbstractRestClient {
private final WriteService service;

public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
@Nonnull final InfluxDBClientOptions options,
@Nonnull final Scheduler processorScheduler,
@Nonnull final WriteService service) {

Arguments.checkNotNull(options, "options");

this.writeOptions = writeOptions;
this.options = options;
this.service = service;

this.flushPublisher = PublishProcessor.create();
Expand Down Expand Up @@ -281,7 +287,7 @@ public BatchWriteDataPoint(@Nonnull final Point point) {
@Override
public String toLineProtocol() {

return point.toLineProtocol(writeOptions.getPointSettings());
return point.toLineProtocol(options.getPointSettings());
}
}

Expand All @@ -304,7 +310,7 @@ public String toLineProtocol() {
return null;
}

return measurementMapper.toPoint(measurement, precision).toLineProtocol(writeOptions.getPointSettings());
return measurementMapper.toPoint(measurement, precision).toLineProtocol(options.getPointSettings());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,11 @@
*/
final class WriteApiImpl extends AbstractWriteClient implements WriteApi {

private final InfluxDBClientOptions options;

WriteApiImpl(@Nonnull final WriteOptions writeOptions,
@Nonnull final WriteService service,
@Nonnull final InfluxDBClientOptions options) {

super(writeOptions, writeOptions.getWriteScheduler(), service);

Arguments.checkNotNull(options, "options");

this.options = options;
super(writeOptions, options, writeOptions.getWriteScheduler(), service);
}

@Override
Expand Down
28 changes: 21 additions & 7 deletions client/src/test/java/org/influxdata/client/ITWriteQueryApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ITWriteQueryApi extends AbstractITClientTest {

private Bucket bucket;
private Organization organization;
private String token;

@BeforeEach
void setUp() throws Exception {
Expand Down Expand Up @@ -90,7 +91,7 @@ void setUp() throws Exception {
Authorization authorization = influxDBClient.getAuthorizationsApi()
.createAuthorization(organization, Arrays.asList(readBucket, writeBucket));

String token = authorization.getToken();
token = authorization.getToken();

influxDBClient.close();
influxDBClient = InfluxDBClientFactory.create(influxDB_URL, token.toCharArray());
Expand Down Expand Up @@ -528,19 +529,24 @@ void recovery() {
}

@Test
void defaultTagsPoint() {
void defaultTagsPoint() throws Exception {

influxDBClient.close();

System.setProperty("mine-sensor.version", "1.23a");
String envKey = (String) System.getenv().keySet().toArray()[5];

WriteOptions writeOptions = WriteOptions.builder()
InfluxDBClientOptions options = InfluxDBClientOptions.builder().url(influxDB_URL)
.authenticateToken(token.toCharArray())
.addDefaultTag("id", "132-987-655")
.addDefaultTag("customer", "California Miner")
.addDefaultTag("env-var", "${env." + envKey + "}")
.addDefaultTag("sensor-version", "${mine-sensor.version}")
.build();

writeApi = influxDBClient.getWriteApi(writeOptions);
influxDBClient = InfluxDBClientFactory.create(options);

writeApi = influxDBClient.getWriteApi();
WriteEventListener<WriteSuccessEvent> listener = new WriteEventListener<>();
writeApi.listenEvents(WriteSuccessEvent.class, listener);

Expand All @@ -551,6 +557,7 @@ void defaultTagsPoint() {
writeApi.writePoint(bucket.getName(), organization.getId(), point);
waitToCallback(listener.countDownLatch, 10);

queryApi = influxDBClient.getQueryApi();
List<FluxTable> query = queryApi.query("from(bucket:\"" + bucket.getName() + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")", organization.getId());

Assertions.assertThat(query).hasSize(1);
Expand All @@ -565,19 +572,25 @@ void defaultTagsPoint() {
}

@Test
void defaultTagsMeasurement() {
void defaultTagsMeasurement() throws Exception {

influxDBClient.close();

System.setProperty("mine-sensor.version", "1.23a");
String envKey = (String) System.getenv().keySet().toArray()[5];

WriteOptions writeOptions = WriteOptions.builder()
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url(influxDB_URL)
.authenticateToken(token.toCharArray())
.addDefaultTag("id", "132-987-655")
.addDefaultTag("customer", "California Miner")
.addDefaultTag("env-var", "${env." + envKey + "}")
.addDefaultTag("sensor-version", "${mine-sensor.version}")
.build();

writeApi = influxDBClient.getWriteApi(writeOptions);
influxDBClient = InfluxDBClientFactory.create(options);

writeApi = influxDBClient.getWriteApi();
WriteEventListener<WriteSuccessEvent> listener = new WriteEventListener<>();
writeApi.listenEvents(WriteSuccessEvent.class, listener);

Expand All @@ -588,6 +601,7 @@ void defaultTagsMeasurement() {
writeApi.writeMeasurement(bucket.getName(), organization.getId(), WritePrecision.NS, measurement);
waitToCallback(listener.countDownLatch, 10);

queryApi = influxDBClient.getQueryApi();
List<FluxTable> query = queryApi.query("from(bucket:\"" + bucket.getName() + "\") |> range(start: 1970-01-01T00:00:00.000000001Z) |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")", organization.getId());

Assertions.assertThat(query).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void defaultTagsNull() {

Map<String, String> defaultTags = defaults.getDefaultTags();

Assertions.assertThat(defaultTags).isNull();
Assertions.assertThat(defaultTags).isNotNull();
}

@Test
Expand Down

0 comments on commit 43ac89b

Please sign in to comment.