Skip to content

Commit

Permalink
apache#3323 Keep cursor ordering of buffer access for group by query
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Aug 13, 2020
1 parent 40883ec commit f047664
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public BulkSequence(Sequence<Object[]> sequence, List<ValueDesc> types)

public BulkSequence(Sequence<Object[]> sequence, List<ValueDesc> types, final int max)
{
Preconditions.checkArgument(max < 0xffff); // see TimestampRLE
Preconditions.checkArgument(max < 0xffff);
this.max = max;
this.sequence = sequence;
this.category = new int[types.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder;
import org.apache.commons.lang.mutable.MutableLong;
import org.joda.time.DateTimeZone;

import java.io.IOException;
Expand Down Expand Up @@ -192,6 +193,18 @@ public void serialize(BytesRef utf8, final JsonGenerator jgen, SerializerProvide
}
}
);
addSerializer(
MutableLong.class,
new JsonSerializer<MutableLong>()
{
@Override
public void serialize(MutableLong value, JsonGenerator jsonGenerator, SerializerProvider provider)
throws IOException
{
jsonGenerator.writeNumber(value.longValue());
}
}
);

addSerializer(HostAndPort.class, ToStringSerializer.instance);
addDeserializer(HostAndPort.class, HostAndPortDeserializer.std);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.druid.cache.Cache;
import io.druid.collections.StupidPool;
import io.druid.common.guava.GuavaUtils;
import io.druid.common.guava.IntArray;
import io.druid.common.utils.Sequences;
import io.druid.common.utils.StringUtils;
import io.druid.data.Pair;
Expand Down Expand Up @@ -70,6 +69,7 @@
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import org.apache.commons.lang.mutable.MutableLong;
import org.joda.time.DateTime;

import java.io.Closeable;
Expand Down Expand Up @@ -147,8 +147,41 @@ public Sequence<Rowboat> apply(final Cursor cursor)
}

private static final int DEFAULT_INITIAL_CAPACITY = 1 << 10;
private static final int BUFFER_POS = 2;

public static class RowIterator implements CloseableIterator<Map.Entry<IntArray, int[]>>
private static class KeyValue
{
private final int[] array;

public KeyValue(int[] array)
{
this.array = array;
}

@Override
public int hashCode()
{
int result = 1;
for (int i = BUFFER_POS; i < array.length; i++) {
result = 31 * result + array[i];
}
return result;
}

@Override
public boolean equals(Object obj)
{
final int[] other = ((KeyValue) obj).array;
for (int i = BUFFER_POS; i < array.length; i++) {
if (array[i] != other[i]) {
return false;
}
}
return true;
}
}

public static class RowIterator implements CloseableIterator<KeyValue>
{
private final Cursor cursor;
private final RowUpdater rowUpdater;
Expand All @@ -169,7 +202,7 @@ public static class RowIterator implements CloseableIterator<Map.Entry<IntArray,

private int counter;
private List<int[]> unprocessedKeys;
private Iterator<Map.Entry<IntArray, int[]>> delegate;
private Iterator<KeyValue> delegate;

public RowIterator(
final GroupByQuery query,
Expand Down Expand Up @@ -208,11 +241,11 @@ public RowIterator(
this.rowUpdater = new RowUpdater()
{
@Override
protected List<int[]> updateValues(DimensionSelector[] dimensions)
protected List<int[]> updateValues(final DimensionSelector[] dimensions)
{
final int[] key = new int[dimensions.length];
for (int i = 0; i < key.length; i++) {
key[i] = dimensions[i].getRow().get(0);
final int[] key = new int[BUFFER_POS + dimensions.length];
for (int i = 0; i < dimensions.length; i++) {
key[BUFFER_POS + i] = dimensions[i].getRow().get(0);
}
return update(key);
}
Expand All @@ -236,52 +269,56 @@ protected List<int[]> updateValues(DimensionSelector[] dimensions)

public Sequence<Object[]> asArray()
{
return Sequences.once(GuavaUtils.map(this, new Function<Map.Entry<IntArray, int[]>, Object[]>()
return Sequences.once(GuavaUtils.map(this, new Function<KeyValue, Object[]>()
{
private final int numColumns = dimensions.length + aggregators.length + 1;
private final DateTime timestamp = fixedTimestamp != null ? fixedTimestamp : cursor.getTime();

@Override
public Object[] apply(final Map.Entry<IntArray, int[]> input)
public Object[] apply(final KeyValue input)
{
final Object[] array = new Object[numColumns];
final int[] array = input.array;

int i = 1;
final int[] keyArray = input.getKey().array();
int i = 0;
final Object[] row = new Object[numColumns];

row[i++] = new MutableLong(timestamp.getMillis());
for (int x = 0; x < dimensions.length; x++) {
if (useRawUTF8 && dimensions[x] instanceof WithRawAccess) {
array[i++] = UTF8Bytes.of(((WithRawAccess) dimensions[x]).lookupRaw(keyArray[x]));
row[i++] = UTF8Bytes.of(((WithRawAccess) dimensions[x]).lookupRaw(array[x + BUFFER_POS]));
} else {
array[i++] = StringUtils.emptyToNull(dimensions[x].lookupName(keyArray[x]));
row[i++] = StringUtils.emptyToNull(dimensions[x].lookupName(array[x + BUFFER_POS]));
}
}

final int[] position = input.getValue();
final int position0 = array[0];
final int position1 = array[1];
for (int x = 0; x < aggregators.length; x++) {
array[i++] = aggregators[x].get(metricValues[position[0]], position[1] + increments[x]);
row[i++] = aggregators[x].get(metricValues[position0], position1 + increments[x]);
}

array[0] = timestamp.getMillis();
return array;
return row;
}
}));
}

public Sequence<Rowboat> asRowboat()
{
return Sequences.once(GuavaUtils.map(this, new Function<Map.Entry<IntArray, int[]>, Rowboat>()
return Sequences.once(GuavaUtils.map(this, new Function<KeyValue, Rowboat>()
{
private final DateTime timestamp = fixedTimestamp != null ? fixedTimestamp : cursor.getTime();

@Override
public Rowboat apply(final Map.Entry<IntArray, int[]> input)
public Rowboat apply(final KeyValue input)
{
final int[][] dims = new int[][]{input.getKey().array()};
final int[] array = input.array;
final int[][] dims = new int[][]{Arrays.copyOfRange(array, BUFFER_POS, array.length)};
final Object[] metrics = new Object[aggregators.length];

final int[] position = input.getValue();
final int position0 = array[0];
final int position1 = array[1];
for (int i = 0; i < aggregators.length; i++) {
metrics[i] = aggregators[i].get(metricValues[position[0]], position[1] + increments[i]);
metrics[i] = aggregators[i].get(metricValues[position0], position1 + increments[i]);
}
return new Rowboat(timestamp.getMillis(), dims, metrics, -1);
}
Expand Down Expand Up @@ -340,7 +377,7 @@ protected void nextIteration(long start, List<int[]> unprocessedKeys)
}

@Override
public Map.Entry<IntArray, int[]> next()
public KeyValue next()
{
return delegate.next();
}
Expand All @@ -357,40 +394,41 @@ public void close() throws IOException
rowUpdater.close();
}

private class RowUpdater implements java.util.function.Function<IntArray, int[]>, Closeable
private class RowUpdater implements java.util.function.Function<KeyValue, KeyValue>, Closeable
{
private int nextIndex;
private int endPosition;
private int maxPosition;

private final Map<IntArray, int[]> positions = Maps.newHashMapWithExpectedSize(DEFAULT_INITIAL_CAPACITY);
private final List<KeyValue> ordering = Lists.newArrayListWithCapacity(DEFAULT_INITIAL_CAPACITY);
private final Map<KeyValue, KeyValue> positions = Maps.newHashMapWithExpectedSize(DEFAULT_INITIAL_CAPACITY);

private int getNumRows()
{
return positions.size();
}

protected List<int[]> updateValues(DimensionSelector[] dimensions)
protected List<int[]> updateValues(final DimensionSelector[] dimensions)
{
return updateValues(new int[dimensions.length], 0, dimensions);
return updateValues(new int[BUFFER_POS + dimensions.length], 0, dimensions);
}

private List<int[]> updateValues(final int[] key, final int index, final DimensionSelector[] dims)
{
if (index < key.length) {
if (index < dims.length) {
final IndexedInts row = dims[index].getRow();
final int size = row.size();
if (size == 0) {
// warn: changed semantic.. (we added null and proceeded before)
return null;
} else if (size == 1) {
key[index] = row.get(0);
key[BUFFER_POS + index] = row.get(0);
return updateValues(key, index + 1, dims);
}
List<int[]> retVal = null;
for (int i = 0; i < size; i++) {
final int[] newKey = Arrays.copyOf(key, key.length);
newKey[index] = row.get(i);
newKey[BUFFER_POS + index] = row.get(i);
List<int[]> unaggregatedBuffers = updateValues(newKey, index + 1, dims);
if (unaggregatedBuffers != null) {
if (retVal == null) {
Expand All @@ -406,63 +444,63 @@ private List<int[]> updateValues(final int[] key, final int index, final Dimensi
}
}

protected final List<int[]> update(int[] key)
protected final List<int[]> update(final int[] key)
{
final IntArray wrapper = new IntArray(key);
final int[] position;
if (hasReserve()) {
position = positions.computeIfAbsent(wrapper, this);
} else {
position = positions.get(wrapper);
if (position == null) {
return Lists.newArrayList(key); // buffer full
}
final KeyValue position = positions.computeIfAbsent(new KeyValue(key), this);
if (position == null) {
return Lists.newArrayList(key); // buffer full
}
final int position0 = position.array[0];
final int position1 = position.array[1];
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].aggregate(metricValues[position[0]], position[1] + increments[i]);
aggregators[i].aggregate(metricValues[position0], position1 + increments[i]);
}
return null;
}

@Override
public int[] apply(IntArray wrapper)
public final KeyValue apply(final KeyValue array)
{
final int[] position = allocate();
final int[] assigned = assign(array.array);
if (assigned == null) {
return null;
}
final int position0 = assigned[0];
final int position1 = assigned[1];
for (int i = 0; i < aggregators.length; i++) {
aggregators[i].init(metricValues[position[0]], position[1] + increments[i]);
aggregators[i].init(metricValues[position0], position1 + increments[i]);
}
return position;
}

private boolean hasReserve()
{
return nextIndex < metricValues.length || endPosition <= maxPosition;
ordering.add(array);
return array;
}

private int[] allocate()
private int[] assign(int[] array)
{
if (nextIndex == 0 || endPosition > maxPosition) {
return Preconditions.checkNotNull(nextBuffer(), "buffer overflow");
return nextPage(array);
} else {
array[0] = nextIndex - 1;
array[1] = endPosition;
endPosition += increment;
return array;
}
final int[] allocated = new int[]{nextIndex - 1, endPosition};
endPosition += increment;
return allocated;
}

private int[] nextBuffer()
private int[] nextPage(int[] array)
{
if (nextIndex >= metricValues.length) {
return null;
}
metricValues[nextIndex] = resources.register(bufferPool.take()).get();
endPosition = increment;
maxPosition = metricValues[nextIndex].remaining() - increment;
return new int[]{nextIndex++, 0};
array[0] = nextIndex++;
return array;
}

private Iterator<Map.Entry<IntArray, int[]>> flush()
private Iterator<KeyValue> flush()
{
return GuavaUtils.withResource(positions.entrySet().iterator(), this);
return GuavaUtils.withResource(ordering.iterator(), this);
}

@Override
Expand All @@ -474,6 +512,7 @@ public void close() throws IOException
}
Arrays.fill(metricValues, null);
positions.clear();
ordering.clear();
resources.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TimeComparator<T> implements Comparator<T>
@Override
public int compare(T left, T right)
{
return Longs.compare((Long) accessor.get(left), (Long) accessor.get(right));
return Longs.compare(((Number) accessor.get(left)).longValue(), ((Number) accessor.get(right)).longValue());
}
}

Expand Down

0 comments on commit f047664

Please sign in to comment.