Skip to content

Commit

Permalink
Fixed failing tests -> allow usage of all types of segment spec
Browse files Browse the repository at this point in the history
  • Loading branch information
justinborromeo committed Mar 25, 2019
1 parent 8b3b6b5 commit ec47028
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;

Expand Down Expand Up @@ -67,6 +65,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
@Inject
public ScanQueryRunnerFactory(
ScanQueryQueryToolChest toolChest,

ScanQueryEngine engine,
ScanQueryConfig scanQueryConfig
)
Expand All @@ -92,12 +91,12 @@ public QueryRunner<ScanResultValue> mergeRunners(
return (queryPlus, responseContext) -> {
ScanQuery query = (ScanQuery) queryPlus.getQuery();

List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
List<Interval> intervalsOrdered =
query.getQuerySegmentSpec().getIntervals(); // Ascending time order
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default

if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
intervalsOrdered = Lists.reverse(intervalsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}

Expand Down Expand Up @@ -127,31 +126,31 @@ public QueryRunner<ScanResultValue> mergeRunners(
input -> input.run(queryPlus, responseContext)
)),
query,
descriptorsOrdered
intervalsOrdered
);
} else {
Preconditions.checkState(
descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of segment descriptors does not equal number of "
intervalsOrdered.size() == queryRunnersOrdered.size(),
"Number of intervals from the query segment spec does not equal number of "
+ "query runners...something went wrong!"
);

// Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
}

// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
descriptorsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs.getInterval(),
LinkedHashMap::new,
Collectors.toList()
));
LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
intervalsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs,
LinkedHashMap::new,
Collectors.toList()
));

// Find the segment with the largest numbers of partitions. This will be used to compare with the
// maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory.
Expand Down Expand Up @@ -224,7 +223,7 @@ public QueryRunner<ScanResultValue> mergeRunners(
Sequence<ScanResultValue> sortAndLimitScanResultValuesPriorityQueue(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery,
List<SegmentDescriptor> descriptorsOrdered
List<Interval> intervalsOrdered
)
{
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
Expand Down Expand Up @@ -267,9 +266,9 @@ public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue i
// Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) {
long timestampOfLimitRow = next.getFirstEventTimestamp(scanQuery.getResultFormat());
for (SegmentDescriptor descriptor : descriptorsOrdered) {
if (descriptor.getInterval().contains(timestampOfLimitRow)) {
finalInterval = descriptor.getInterval();
for (Interval interval : intervalsOrdered) {
if (interval.contains(timestampOfLimitRow)) {
finalInterval = interval;
}
}
if (finalInterval == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -124,7 +123,7 @@ public void testSortAndLimitScanResultValues()
factory.sortAndLimitScanResultValuesPriorityQueue(
inputSequence,
query,
ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0))
ImmutableList.of(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)))
).toList();

// check each scan result value has one event
Expand Down

0 comments on commit ec47028

Please sign in to comment.