Skip to content

Commit

Permalink
[Transform] Transform optmize date histogram (elastic#54068)
Browse files Browse the repository at this point in the history
optimize transform for group_by on date_histogram by injecting an additional range query. This limits the number of search and index requests and avoids unnecessary updates. Only recent buckets get re-written.

fixes elastic#54254
  • Loading branch information
Hendrik Muhs authored and franyang committed Apr 17, 2020
1 parent 1df5c35 commit d892705
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 140 deletions.
Expand Up @@ -74,19 +74,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())),
randomBoolean() ? randomZone() : null
);
} else {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))),
randomBoolean() ? randomZone() : null
);
}

if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource;
}

Expand Down
Expand Up @@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable {
*/
boolean isValid();

String getField();

QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint);

QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint);
Expand Down
Expand Up @@ -25,7 +25,7 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class TimeSyncConfig implements SyncConfig {
public class TimeSyncConfig implements SyncConfig {

public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60);
private static final String NAME = "data_frame_transform_pivot_sync_time";
Expand All @@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig {
private static final ConstructingObjectParser<TimeSyncConfig, Void> LENIENT_PARSER = createParser(true);

private static ConstructingObjectParser<TimeSyncConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient,
args -> {
String field = (String) args[0];
TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay);
});
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
String field = (String) args[0];
TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay);
});
parser.declareString(constructorArg(), TransformField.FIELD);
parser.declareField(optionalConstructorArg(),
parser.declareField(
optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()),
TransformField.DELAY,
ObjectParser.ValueType.STRING);
ObjectParser.ValueType.STRING
);
return parser;
}

Expand All @@ -65,6 +66,7 @@ public TimeSyncConfig(StreamInput in) throws IOException {
this.delay = in.readTimeValue();
}

@Override
public String getField() {
return field;
}
Expand Down Expand Up @@ -105,12 +107,11 @@ public boolean equals(Object other) {

final TimeSyncConfig that = (TimeSyncConfig) other;

return Objects.equals(this.field, that.field)
&& Objects.equals(this.delay, that.delay);
return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay);
}

@Override
public int hashCode(){
public int hashCode() {
return Objects.hash(field, delay);
}

Expand Down Expand Up @@ -139,7 +140,8 @@ public QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint) {

@Override
public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound())
.lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
}
}
Expand Up @@ -7,15 +7,18 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

Expand Down Expand Up @@ -105,6 +108,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(interval);
}

@Override
public String toString() {
return interval.toString();
}
}

public static class CalendarInterval implements Interval {
Expand Down Expand Up @@ -169,6 +177,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(interval);
}

@Override
public String toString() {
return interval.toString();
}
}

private Interval readInterval(StreamInput in) throws IOException {
Expand All @@ -195,11 +208,26 @@ private void writeInterval(Interval interval, StreamOutput out) throws IOExcepti
private static final ConstructingObjectParser<DateHistogramGroupSource, Void> LENIENT_PARSER = createParser(true);

private final Interval interval;
private ZoneId timeZone;
private final ZoneId timeZone;
private Rounding rounding;

public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) {
public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) {
super(field, scriptConfig);
this.interval = interval;
this.timeZone = timeZone;

Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
final Rounding.Builder roundingBuilder;
if (timeUnit != null) {
roundingBuilder = new Rounding.Builder(timeUnit);
} else {
roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName()));
}

if (timeZone != null) {
roundingBuilder.timeZone(timeZone);
}
this.rounding = roundingBuilder.build();
}

public DateHistogramGroupSource(StreamInput in) throws IOException {
Expand All @@ -218,6 +246,7 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
ScriptConfig scriptConfig = (ScriptConfig) args[1];
String fixedInterval = (String) args[2];
String calendarInterval = (String) args[3];
ZoneId zoneId = (ZoneId) args[4];

Interval interval = null;

Expand All @@ -231,15 +260,15 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
}

return new DateHistogramGroupSource(field, scriptConfig, interval);
return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId);
});

declareValuesSourceFields(parser, lenient);

parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME));
parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME));

parser.declareField(DateHistogramGroupSource::setTimeZone, p -> {
parser.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text());
} else {
Expand Down Expand Up @@ -267,8 +296,8 @@ public ZoneId getTimeZone() {
return timeZone;
}

public void setTimeZone(ZoneId timeZone) {
this.timeZone = timeZone;
public Rounding getRounding() {
return rounding;
}

@Override
Expand Down Expand Up @@ -315,9 +344,16 @@ public int hashCode() {
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
// no need for an extra range filter as this is already done by checkpoints
return null;
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) {
return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis");
} else {
return null;
}
}

@Override
Expand Down
Expand Up @@ -101,7 +101,11 @@ public int hashCode() {
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
// histograms are simple and cheap, so we skip this optimization
return null;
}
Expand Down
Expand Up @@ -116,7 +116,11 @@ public void writeTo(StreamOutput out) throws IOException {

public abstract boolean supportsIncrementalBucketUpdate();

public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets);
public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
);

public String getField() {
return field;
Expand Down
Expand Up @@ -54,8 +54,15 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
return new TermsQueryBuilder(field, changedBuckets);
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
return new TermsQueryBuilder(field, changedBuckets);
}
return null;
}

@Override
Expand Down
Expand Up @@ -21,9 +21,9 @@
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.hamcrest.Matchers.equalTo;

public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> {
Expand Down Expand Up @@ -184,6 +184,11 @@ public String getWriteableName() {
return "foo";
}

@Override
public String getField() {
return "foo";
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

Expand Down
Expand Up @@ -10,11 +10,17 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractSerializingTestCase;

import java.io.IOException;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAccessor;

import static org.hamcrest.Matchers.equalTo;

public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> {

Expand All @@ -26,19 +32,20 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
randomBoolean() ? randomZone() : null
);
} else {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
new DateHistogramGroupSource.CalendarInterval(
new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y"))
),
randomBoolean() ? randomZone() : null
);
}

if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource;
}

Expand Down Expand Up @@ -70,4 +77,56 @@ protected Reader<DateHistogramGroupSource> instanceReader() {
return DateHistogramGroupSource::new;
}

public void testRoundingDateHistogramFixedInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")),
null
);

// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());

assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
}

public void testRoundingDateHistogramCalendarInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")),
null
);

// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());

assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
}

private static long time(String time) {
TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time);
return DateFormatters.from(accessor).toInstant().toEpochMilli();
}
}

0 comments on commit d892705

Please sign in to comment.