Skip to content

Commit

Permalink
Add socket timeout config to FluencyBuilderForFluentd
Browse files Browse the repository at this point in the history
  • Loading branch information
komamitsu committed Feb 16, 2019
1 parent 07bec33 commit dd3d47d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 16 deletions.
29 changes: 21 additions & 8 deletions README.md
Expand Up @@ -66,6 +66,8 @@ dependencies {
// - Max retry of sending events is 8 (by default)
// - Max wait until all buffers are flushed is 10 seconds (by default)
// - Max wait until the flusher is terminated is 10 seconds (by default)
// - Socket connection timeout is 5000 ms (by default)
// - Socket read timeout is 5000 ms (by default)
Fluency fluency = new FluencyBuilderForFluentd().build();
```

Expand Down Expand Up @@ -101,21 +103,32 @@ builder.setFileBackupDir(System.getProperty("java.io.tmpdir"));
Fluency fluency = builder.build();
```

##### Buffer configuration
##### Buffer configuration for high throughput data ingestion with high latency

```java
// Single Fluentd(xxx.xxx.xxx.xxx:24224)
// - Initial chunk buffer size = 4MB
// - Threshold chunk buffer size to flush = 16MB
// - Initial chunk buffer size = 16MB
// - Threshold chunk buffer size to flush = 64MB
// Keep this value (BufferRetentionSize) between `Initial chunk buffer size` and `Max total buffer size`
// - Max total buffer size = 256MB
// - Max total buffer size = 1024MB
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setBufferChunkInitialSize(4 * 1024 * 1024);
builder.setBufferChunkRetentionSize(16 * 1024 * 1024);
builder.setMaxBufferSize(256 * 1024 * 1024L);
builder.setBufferChunkInitialSize(16 * 1024 * 1024);
builder.setBufferChunkRetentionSize(64 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
Fluency fluency = builder.build("xxx.xxx.xxx.xxx", 24224);
```

##### Socket configuration
```java
// Single Fluentd(localhost:24224)
// - Socket connection timeout is 15000 ms
// - Socket read timeout is 10000 ms
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setConnectionTimeoutMilli(15000);
builder.setReadTimeoutMilli(10000);
Fluency fluency = builder.build();
```

##### Waits on close sequence

```java
Expand Down Expand Up @@ -292,7 +305,7 @@ dependencies {
Fluency fluency = new FluencyBuilderForTreasureData().build(yourApiKey);
```

##### For high throughput data ingestion with high latency
##### Buffer configuration for high throughput data ingestion with high latency

```java
// Initial chunk buffer size = 32MB
Expand Down
Expand Up @@ -42,6 +42,8 @@ public class FluencyBuilderForFluentd
private Integer senderMaxRetryCount;
private boolean ackResponseMode;
private boolean sslEnabled;
private Integer connectionTimeoutMilli;
private Integer readTimeoutMilli;

public Integer getSenderMaxRetryCount()
{
Expand Down Expand Up @@ -73,6 +75,26 @@ public void setSslEnabled(boolean sslEnabled)
this.sslEnabled = sslEnabled;
}

public Integer getConnectionTimeoutMilli()
{
return connectionTimeoutMilli;
}

public void setConnectionTimeoutMilli(Integer connectionTimeoutMilli)
{
this.connectionTimeoutMilli = connectionTimeoutMilli;
}

public Integer getReadTimeoutMilli()
{
return readTimeoutMilli;
}

public void setReadTimeoutMilli(Integer readTimeoutMilli)
{
this.readTimeoutMilli = readTimeoutMilli;
}

public Fluency build(String host, int port)
{
return buildFromIngester(
Expand Down Expand Up @@ -132,6 +154,12 @@ private FluentdSender createBaseSender(String host, Integer port, boolean withHe
SSLHeartbeater heartbeater = new SSLHeartbeater(hbConfig);
failureDetector = new FailureDetector(new PhiAccrualFailureDetectStrategy(), heartbeater);
}
if (connectionTimeoutMilli != null) {
senderConfig.setConnectionTimeoutMilli(connectionTimeoutMilli);
}
if (readTimeoutMilli != null) {
senderConfig.setReadTimeoutMilli(readTimeoutMilli);
}
return new SSLSender(senderConfig, failureDetector);
}
else {
Expand All @@ -150,6 +178,12 @@ private FluentdSender createBaseSender(String host, Integer port, boolean withHe
TCPHeartbeater heartbeater = new TCPHeartbeater(hbConfig);
failureDetector = new FailureDetector(new PhiAccrualFailureDetectStrategy(), heartbeater);
}
if (connectionTimeoutMilli != null) {
senderConfig.setConnectionTimeoutMilli(connectionTimeoutMilli);
}
if (readTimeoutMilli != null) {
senderConfig.setReadTimeoutMilli(readTimeoutMilli);
}
return new TCPSender(senderConfig, failureDetector);
}
}
Expand Down
Expand Up @@ -204,6 +204,8 @@ public void buildWithComplexConfig()
builder.setBufferChunkRetentionTimeMillis(19 * 1000);
builder.setJvmHeapBufferMode(true);
builder.setSenderMaxRetryCount(99);
builder.setConnectionTimeoutMilli(12345);
builder.setReadTimeoutMilli(9876);
builder.setAckResponseMode(true);
builder.setWaitUntilBufferFlushed(42);
builder.setWaitUntilFlusherTerminated(24);
Expand Down Expand Up @@ -248,8 +250,8 @@ public void buildWithComplexConfig()
TCPSender sender = (TCPSender) multiSender.getSenders().get(0);
assertThat(sender.getHost(), is("333.333.333.333"));
assertThat(sender.getPort(), is(11111));
assertThat(sender.getConnectionTimeoutMilli(), is(5000));
assertThat(sender.getReadTimeoutMilli(), is(5000));
assertThat(sender.getConnectionTimeoutMilli(), is(12345));
assertThat(sender.getReadTimeoutMilli(), is(9876));

FailureDetector failureDetector = sender.getFailureDetector();
assertThat(failureDetector.getFailureIntervalMillis(), is(3 * 1000));
Expand All @@ -265,8 +267,9 @@ public void buildWithComplexConfig()
TCPSender sender = (TCPSender) multiSender.getSenders().get(1);
assertThat(sender.getHost(), is("444.444.444.444"));
assertThat(sender.getPort(), is(22222));
assertThat(sender.getConnectionTimeoutMilli(), is(5000));
assertThat(sender.getReadTimeoutMilli(), is(5000));
assertThat(sender.getConnectionTimeoutMilli(), is(12345));
assertThat(sender.getReadTimeoutMilli(), is(9876));


FailureDetector failureDetector = sender.getFailureDetector();
assertThat(failureDetector.getFailureIntervalMillis(), is(3 * 1000));
Expand Down Expand Up @@ -294,6 +297,8 @@ public void buildWithSslAndComplexConfig()
builder.setBufferChunkRetentionSize(13 * 1024 * 1024);
builder.setJvmHeapBufferMode(true);
builder.setSenderMaxRetryCount(99);
builder.setConnectionTimeoutMilli(12345);
builder.setReadTimeoutMilli(9876);
builder.setAckResponseMode(true);
builder.setWaitUntilBufferFlushed(42);
builder.setWaitUntilFlusherTerminated(24);
Expand All @@ -320,8 +325,8 @@ public void buildWithSslAndComplexConfig()
SSLSender sender = (SSLSender) multiSender.getSenders().get(0);
assertThat(sender.getHost(), is("333.333.333.333"));
assertThat(sender.getPort(), is(11111));
assertThat(sender.getConnectionTimeoutMilli(), is(5000));
assertThat(sender.getReadTimeoutMilli(), is(5000));
assertThat(sender.getConnectionTimeoutMilli(), is(12345));
assertThat(sender.getReadTimeoutMilli(), is(9876));

FailureDetector failureDetector = sender.getFailureDetector();
assertThat(failureDetector.getFailureIntervalMillis(), is(3 * 1000));
Expand All @@ -337,8 +342,8 @@ public void buildWithSslAndComplexConfig()
SSLSender sender = (SSLSender) multiSender.getSenders().get(1);
assertThat(sender.getHost(), is("444.444.444.444"));
assertThat(sender.getPort(), is(22222));
assertThat(sender.getConnectionTimeoutMilli(), is(5000));
assertThat(sender.getReadTimeoutMilli(), is(5000));
assertThat(sender.getConnectionTimeoutMilli(), is(12345));
assertThat(sender.getReadTimeoutMilli(), is(9876));

FailureDetector failureDetector = sender.getFailureDetector();
assertThat(failureDetector.getFailureIntervalMillis(), is(3 * 1000));
Expand Down

0 comments on commit dd3d47d

Please sign in to comment.