Skip to content

Commit

Permalink
Merge 23c1caf into 13c8c18
Browse files Browse the repository at this point in the history
  • Loading branch information
komamitsu committed Mar 21, 2019
2 parents 13c8c18 + 23c1caf commit fb90290
Show file tree
Hide file tree
Showing 32 changed files with 932 additions and 99 deletions.
1 change: 0 additions & 1 deletion findbugs-exclude.xml
Expand Up @@ -8,7 +8,6 @@
<Bug pattern="EI_EXPOSE_REP, EI_EXPOSE_REP2"/>
</Match>
<Match>
<Class name="org.komamitsu.fluency.treasuredata.FluencyBuilderForTreasureDataTest"/>
<Bug pattern="CNT_ROUGH_CONSTANT_VALUE"/>
</Match>
</FindBugsFilter>
Expand Up @@ -20,6 +20,9 @@
import org.komamitsu.fluency.EventTime;
import org.komamitsu.fluency.ingester.Ingester;
import org.komamitsu.fluency.recordformat.RecordFormatter;
import org.komamitsu.fluency.validation.Validatable;
import org.komamitsu.fluency.validation.annotation.DecimalMin;
import org.komamitsu.fluency.validation.annotation.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,6 +61,7 @@ public Buffer(RecordFormatter recordFormatter)

public Buffer(final Config config, RecordFormatter recordFormatter)
{
config.validateValues();
this.config = config;

if (config.getFileBackupDir() != null) {
Expand All @@ -69,10 +73,6 @@ public Buffer(final Config config, RecordFormatter recordFormatter)

this.recordFormatter = recordFormatter;

if (config.getChunkInitialSize() > config.getChunkRetentionSize()) {
LOG.warn("Initial Buffer Chunk Size ({}) shouldn't be more than Buffer Chunk Retention Size ({}) for better performance.",
config.getChunkInitialSize(), config.getChunkRetentionSize());
}
bufferPool = new BufferPool(
config.getChunkInitialSize(), config.getMaxBufferSize(), config.jvmHeapBufferMode);

Expand Down Expand Up @@ -514,14 +514,17 @@ public String toString()
}

public static class Config
implements Validatable
{
private long maxBufferSize = 512 * 1024 * 1024;
private String fileBackupDir;
private String fileBackupPrefix; // Mainly for testing

private int chunkInitialSize = 1024 * 1024;
@DecimalMin("1.2")
private float chunkExpandRatio = 2.0f;
private int chunkRetentionSize = 4 * 1024 * 1024;
@Min(50)
private int chunkRetentionTimeMillis = 1000;
private boolean jvmHeapBufferMode = false;

Expand Down Expand Up @@ -605,6 +608,25 @@ public void setJvmHeapBufferMode(boolean jvmHeapBufferMode)
this.jvmHeapBufferMode = jvmHeapBufferMode;
}

void validateValues()
{
validate();

if (chunkInitialSize >= chunkRetentionSize) {
throw new IllegalArgumentException(
String.format(
"Buffer Chunk Retention Size (%d) should be more than Initial Buffer Chunk Size (%d)",
chunkRetentionSize, chunkInitialSize));
}

if (chunkRetentionSize >= maxBufferSize) {
throw new IllegalArgumentException(
String.format(
"Max Total Buffer Size (%d) should be more than Buffer Chunk Retention Size (%d)",
maxBufferSize, chunkRetentionSize));
}
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -19,6 +19,9 @@
import org.komamitsu.fluency.buffer.Buffer;
import org.komamitsu.fluency.ingester.Ingester;
import org.komamitsu.fluency.util.ExecutorServiceUtils;
import org.komamitsu.fluency.validation.Validatable;
import org.komamitsu.fluency.validation.annotation.Max;
import org.komamitsu.fluency.validation.annotation.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,7 +30,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,6 +47,7 @@ public class Flusher

public Flusher(Config config, Buffer buffer, Ingester ingester)
{
config.validateValues();
this.config = config;
this.buffer = buffer;
this.ingester = ingester;
Expand Down Expand Up @@ -196,9 +199,14 @@ public String toString()
}

public static class Config
implements Validatable
{
@Min(20)
@Max(2000)
private int flushIntervalMillis = 600;
@Min(1)
private int waitUntilBufferFlushed = 60;
@Min(1)
private int waitUntilTerminated = 60;

public int getFlushIntervalMillis()
Expand Down Expand Up @@ -231,6 +239,11 @@ public void setWaitUntilTerminated(int waitUntilTerminated)
this.waitUntilTerminated = waitUntilTerminated;
}

void validateValues()
{
validate();
}

@Override
public String toString()
{
Expand Down
@@ -0,0 +1,152 @@
/*
* Copyright 2019 Mitsunori Komatsu (komamitsu)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.komamitsu.fluency.validation;

import org.komamitsu.fluency.validation.annotation.DecimalMax;
import org.komamitsu.fluency.validation.annotation.DecimalMin;
import org.komamitsu.fluency.validation.annotation.Max;
import org.komamitsu.fluency.validation.annotation.Min;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;

public interface Validatable
{
class ValidationList {
private static class Validation
{
Class<? extends Annotation> annotationClass;
BiFunction<Annotation, Number, Boolean> isValid;
String messageTemplate;

Validation(
Class<? extends Annotation> annotationClass,
BiFunction<Annotation, Number, Boolean> isValid,
String messageTemplate)
{
this.annotationClass = annotationClass;
this.isValid = isValid;
this.messageTemplate = messageTemplate;
}
}

private static final Validation VALIDATION_MAX = new Validation(
Max.class,
(annotation, actual) -> {
Max maxAnnotation = (Max) annotation;
if (maxAnnotation.inclusive()) {
return maxAnnotation.value() >= actual.longValue();
}
else {
return maxAnnotation.value() > actual.longValue();
}
},
"This field (%s) is more than (%s)");

private static final Validation VALIDATION_MIN = new Validation(
Min.class,
(annotation, actual) -> {
Min minAnnotation = (Min) annotation;
if (minAnnotation.inclusive()) {
return minAnnotation.value() <= actual.longValue();
}
else {
return minAnnotation.value() < actual.longValue();
}
},
"This field (%s) is less than (%s)");

private static final Validation VALIDATION_DECIMAL_MAX = new Validation(
DecimalMax.class,
(annotation, actual) -> {
DecimalMax maxAnnotation = (DecimalMax) annotation;
BigDecimal limitValue = new BigDecimal(maxAnnotation.value());
BigDecimal actualValue = new BigDecimal(actual.toString());
if (maxAnnotation.inclusive()) {
return limitValue.compareTo(actualValue) >= 0;
}
else {
return limitValue.compareTo(actualValue) > 0;
}
},
"This field (%s) is more than (%s)");

private static final Validation VALIDATION_DECIMAL_MIN = new Validation(
DecimalMin.class,
(annotation, actual) -> {
DecimalMin maxAnnotation = (DecimalMin) annotation;
BigDecimal limitValue = new BigDecimal(maxAnnotation.value());
BigDecimal actualValue = new BigDecimal(actual.toString());
if (maxAnnotation.inclusive()) {
return limitValue.compareTo(actualValue) <= 0;
}
else {
return limitValue.compareTo(actualValue) < 0;
}
},
"This field (%s) is less than (%s)");

private static final List<Validation> VALIDATIONS = Arrays.asList(
VALIDATION_MAX,
VALIDATION_MIN,
VALIDATION_DECIMAL_MAX,
VALIDATION_DECIMAL_MIN);
}

default void validate()
{
Class<? extends Object> klass = getClass();
while (klass != Object.class) {
for (Field field : klass.getDeclaredFields()) {
for (ValidationList.Validation validation : ValidationList.VALIDATIONS) {
Class<? extends Annotation> annotationClass = validation.annotationClass;
if (field.isAnnotationPresent(annotationClass)) {
Annotation annotation = field.getAnnotation(annotationClass);
Object value;
try {
field.setAccessible(true);
value = field.get(this);
}
catch (IllegalAccessException e) {
throw new RuntimeException(
String.format("Failed to get a value from field (%s)", field), e);
}

if (value == null) {
break;
}

if (!(value instanceof Number)) {
throw new IllegalArgumentException(
String.format("This field has (%s), but actual field is (%s)", annotation, value.getClass()));
}

if (!validation.isValid.apply(annotation, (Number) value)) {
throw new IllegalArgumentException(
String.format(validation.messageTemplate, field, value));
}
}
}
}
klass = klass.getSuperclass();
}
}
}
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Mitsunori Komatsu (komamitsu)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.komamitsu.fluency.validation.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Inherited
@Retention(RetentionPolicy.RUNTIME)
public @interface DecimalMax
{
String value();
boolean inclusive() default true;
}
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Mitsunori Komatsu (komamitsu)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.komamitsu.fluency.validation.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Inherited
@Retention(RetentionPolicy.RUNTIME)
public @interface DecimalMin
{
String value();
boolean inclusive() default true;
}
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Mitsunori Komatsu (komamitsu)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.komamitsu.fluency.validation.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Inherited
@Retention(RetentionPolicy.RUNTIME)
public @interface Max
{
long value();
boolean inclusive() default true;
}

0 comments on commit fb90290

Please sign in to comment.