Skip to content

Commit

Permalink
Represent histogram value count as long
Browse files Browse the repository at this point in the history
Histograms currently use integers to store the count of each value,
which can overflow. Switch to using long integers to avoid this.

TDigestState was updated to use long for centroid value count in elastic#99491

Fixes elastic#99820
  • Loading branch information
kkrik-es committed Sep 26, 2023
1 parent 137bb45 commit 0c253ce
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 18 deletions.
4 changes: 2 additions & 2 deletions docs/reference/mapping/types/histogram.asciidoc
Expand Up @@ -10,7 +10,7 @@ This data is defined using two paired arrays:

* A `values` array of <<number, `double`>> numbers, representing the buckets for
the histogram. These values must be provided in ascending order.
* A corresponding `counts` array of <<number, `integer`>> numbers, representing how
* A corresponding `counts` array of <<number, `long`>> numbers, representing how
many values fall into each bucket. These numbers must be positive or zero.

Because the elements in the `values` array correspond to the elements in the
Expand Down Expand Up @@ -138,5 +138,5 @@ PUT my-index-000001/_doc/2
<1> Values for each bucket. Values in the array are treated as doubles and must be given in
increasing order. For <<search-aggregations-metrics-percentile-aggregation-approximation, T-Digest>>
histograms this value represents the mean value. In case of HDR histograms this represents the value iterated to.
<2> Count for each bucket. Values in the arrays are treated as integers and must be positive or zero.
<2> Count for each bucket. Values in the arrays are treated as long integers and must be positive or zero.
Negative values will be rejected. The relation between a bucket and a count is given by the position in the array.
Expand Up @@ -145,6 +145,7 @@ static TransportVersion def(int id) {
public static final TransportVersion WAIT_FOR_CLUSTER_STATE_IN_RECOVERY_ADDED = def(8_502_00_0);
public static final TransportVersion RECOVERY_COMMIT_TOO_NEW_EXCEPTION_ADDED = def(8_503_00_0);
public static final TransportVersion NODE_INFO_COMPONENT_VERSIONS_ADDED = def(8_504_00_0);
public static final TransportVersion LONG_COUNT_IN_HISTOGRAM_ADDED = def(8_505_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Expand Up @@ -32,6 +32,6 @@ public abstract class HistogramValue {
* The current count of the histogram
* @return the current count of the histogram
*/
public abstract int count();
public abstract long count();

}
Expand Up @@ -88,7 +88,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
double previousKey = Double.NEGATIVE_INFINITY;
while (sketch.next()) {
final double value = sketch.value();
final int count = sketch.count();
final long count = sketch.count();

double key = Math.floor((value - offset) / interval);
assert key >= previousKey;
Expand Down
Expand Up @@ -134,15 +134,15 @@ public void collect(int doc, long bucket) throws IOException {
previousValue = value;
// Collecting the bucket automatically increments the count by the docCountProvider,
// account for that here
final int count = sketch.count() - docCountProvider.getDocCount(doc);
final long count = sketch.count() - docCountProvider.getDocCount(doc);
lo = HistoBackedRangeAggregator.this.collect(sub, doc, value, bucket, lo, count);
}
}
}
};
}

abstract int collect(LeafBucketCollector sub, int doc, double value, long owningBucketOrdinal, int lowBound, int count)
abstract int collect(LeafBucketCollector sub, int doc, double value, long owningBucketOrdinal, int lowBound, long count)
throws IOException;

private static class NoOverlap extends HistoBackedRangeAggregator {
Expand Down Expand Up @@ -178,7 +178,7 @@ private NoOverlap(
}

@Override
public int collect(LeafBucketCollector sub, int doc, double value, long owningBucketOrdinal, int lowBound, int count)
public int collect(LeafBucketCollector sub, int doc, double value, long owningBucketOrdinal, int lowBound, long count)
throws IOException {
int lo = lowBound, hi = ranges.length - 1;
while (lo <= hi) {
Expand Down Expand Up @@ -240,7 +240,7 @@ private static class Overlap extends HistoBackedRangeAggregator {
}

@Override
public int collect(LeafBucketCollector sub, int doc, double value, long owningBucketOrdinal, int lowBound, int count)
public int collect(LeafBucketCollector sub, int doc, double value, long owningBucketOrdinal, int lowBound, long count)
throws IOException {
int lo = lowBound, hi = ranges.length - 1; // all candidates are between these indexes
int mid = (lo + hi) >>> 1;
Expand Down
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -295,7 +296,7 @@ public void parse(DocumentParserContext context) throws IOException {
return;
}
ArrayList<Double> values = null;
ArrayList<Integer> counts = null;
ArrayList<Long> counts = null;
// should be an object
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser());
subParser = new XContentSubParser(context.parser());
Expand Down Expand Up @@ -343,7 +344,7 @@ public void parse(DocumentParserContext context) throws IOException {
while (token != XContentParser.Token.END_ARRAY) {
// should be a number
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser);
counts.add(subParser.intValue());
counts.add(subParser.longValue());
token = subParser.nextToken();
}
} else {
Expand Down Expand Up @@ -385,15 +386,19 @@ public void parse(DocumentParserContext context) throws IOException {
}
BytesStreamOutput streamOutput = new BytesStreamOutput();
for (int i = 0; i < values.size(); i++) {
int count = counts.get(i);
long count = counts.get(i);
if (count < 0) {
throw new DocumentParsingException(
subParser.getTokenLocation(),
"error parsing field [" + name() + "], [" + COUNTS_FIELD + "] elements must be >= 0 but got " + counts.get(i)
);
} else if (count > 0) {
// we do not add elements with count == 0
streamOutput.writeVInt(count);
if (streamOutput.getTransportVersion().onOrAfter(TransportVersions.LONG_COUNT_IN_HISTOGRAM_ADDED)) {
streamOutput.writeVLong(count);
} else {
streamOutput.writeVInt(Math.toIntExact(count));
}
streamOutput.writeLong(Double.doubleToRawLongBits(values.get(i)));
}
}
Expand Down Expand Up @@ -431,7 +436,7 @@ public void parse(DocumentParserContext context) throws IOException {
/** re-usable {@link HistogramValue} implementation */
private static class InternalHistogramValue extends HistogramValue {
double value;
int count;
long count;
boolean isExhausted;
final ByteArrayStreamInput streamInput;

Expand All @@ -450,7 +455,11 @@ void reset(BytesRef bytesRef) {
@Override
public boolean next() throws IOException {
if (streamInput.available() > 0) {
count = streamInput.readVInt();
if (streamInput.getTransportVersion().onOrAfter(TransportVersions.LONG_COUNT_IN_HISTOGRAM_ADDED)) {
count = streamInput.readVLong();
} else {
count = streamInput.readVInt();
}
value = Double.longBitsToDouble(streamInput.readLong());
return true;
}
Expand All @@ -467,7 +476,7 @@ public double value() {
}

@Override
public int count() {
public long count() {
if (isExhausted) {
throw new IllegalArgumentException("histogram already exhausted");
}
Expand Down
Expand Up @@ -281,8 +281,8 @@ public void testCountIsLong() throws Exception {
.field("values", new double[] { 2, 2, 3 })
.endObject()
);
Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source));
assertThat(e.getCause().getMessage(), containsString(" out of range of int"));
ParsedDocument doc = mapper.parse(source);
assertThat(doc.rootDoc().getField("field"), notNullValue());
}

public void testValuesNotInOrder() throws Exception {
Expand Down
Expand Up @@ -158,7 +158,7 @@ public void write(XContentBuilder builder) throws IOException {
if (isEmpty() == false) {
final HistogramValue histogramValue = (HistogramValue) label.get();
final List<Double> values = new ArrayList<>();
final List<Integer> counts = new ArrayList<>();
final List<Long> counts = new ArrayList<>();
while (histogramValue.next()) {
values.add(histogramValue.value());
counts.add(histogramValue.count());
Expand Down
Expand Up @@ -251,3 +251,58 @@ histogram with synthetic source and zero counts:
latency:
values: [0.2, 0.4]
counts: [7, 6]


---
histogram with large count values:
- skip:
version: " - 8.10.99"
reason: Support for `long` values was introduced in 8.11.0

- do:
indices.create:
index: histo_large_count
body:
mappings:
properties:
latency:
type: histogram
- do:
bulk:
index: histo_large_count
refresh: true
body:
- '{"index": {}}'
- '{"latency": {"values" : [0.1, 0.2, 0.3, 0.4, 0.5], "counts" : [0, 1000000000000, 10, 1000, 1000000]}}'

- do:
search:
index: histo_large_count
body:
size: 0
aggs:
histo:
histogram:
field: latency
interval: 0.3

- length: { aggregations.histo.buckets: 2 }
- match: { aggregations.histo.buckets.0.key: 0.0 }
- match: { aggregations.histo.buckets.0.doc_count: 1000000000000 }
- match: { aggregations.histo.buckets.1.key: 0.3 }
- match: { aggregations.histo.buckets.1.doc_count: 1001010 }

- do:
search:
index: histo_large_count
body:
size: 0
aggs:
percent:
percentiles:
field: latency

- length: { aggregations.percent.values: 7 }
- match: { aggregations.percent.values.1\.0: 0.2 }
- match: { aggregations.percent.values.5\.0: 0.2 }
- match: { aggregations.percent.values.25\.0: 0.2 }

0 comments on commit 0c253ce

Please sign in to comment.