Skip to content

Commit

Permalink
Add virtual column types, holder serde, and safety features. (apache#…
Browse files Browse the repository at this point in the history
…3823)

* Add virtual column types, holder serde, and safety features.

Virtual columns:
- add long, float, dimension selectors
- put cache IDs in VirtualColumnCacheHelper
- adjust serde so VirtualColumns can be the holder object for Jackson
- add fail-fast validation for cycle detection and duplicates
- add expression virtual column in core

Storage adapters:
- move virtual column hooks before checking base columns, to prevent surprises
  when a new base column is added that happens to have the same name as a
  virtual column.

* Fix ExtractionDimensionSpecs with virtual dimensions.

* Fix unused imports.

* CR comments

* Merge one more time, with feeling.
  • Loading branch information
gianm authored and jon-wei committed Jan 27, 2017
1 parent ac84a3e commit d3a3b7b
Show file tree
Hide file tree
Showing 32 changed files with 1,622 additions and 201 deletions.
22 changes: 0 additions & 22 deletions common/src/main/java/io/druid/math/expr/Evals.java
Expand Up @@ -20,7 +20,6 @@
package io.druid.math.expr;

import com.google.common.base.Strings;
import io.druid.common.guava.GuavaUtils;
import io.druid.java.util.common.logger.Logger;

import java.util.Arrays;
Expand All @@ -32,27 +31,6 @@ public class Evals
{
private static final Logger log = new Logger(Evals.class);

public static Number toNumber(Object value)
{
if (value == null) {
return 0L;
}
if (value instanceof Number) {
return (Number) value;
}
String stringValue = String.valueOf(value);
Long longValue = GuavaUtils.tryParseLong(stringValue);
if (longValue == null) {
return Double.valueOf(stringValue);
}
return longValue;
}

public static boolean isConstant(Expr expr)
{
return expr instanceof ConstantExpr;
}

public static boolean isAllConstants(Expr... exprs)
{
return isAllConstants(Arrays.asList(exprs));
Expand Down
Expand Up @@ -22,22 +22,27 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.metamx.common.StringUtils;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilterUtils;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.virtual.VirtualColumnCacheHelper;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
*/
public class MapVirtualColumn implements VirtualColumn
{
private static final byte VC_TYPE_ID = 0x00;

private final String outputName;
private final String keyDimension;
private final String valueDimension;
Expand All @@ -59,13 +64,14 @@ public MapVirtualColumn(
}

@Override
public ObjectColumnSelector init(String dimension, ColumnSelectorFactory factory)
public ObjectColumnSelector makeObjectColumnSelector(String dimension, ColumnSelectorFactory factory)
{
final DimensionSelector keySelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(keyDimension));
final DimensionSelector valueSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(valueDimension));

int index = dimension.indexOf('.');
if (index < 0) {
final String subColumnName = VirtualColumns.splitColumnName(dimension).rhs;

if (subColumnName == null) {
return new ObjectColumnSelector<Map>()
{
@Override
Expand Down Expand Up @@ -97,7 +103,7 @@ public Map get()

IdLookup keyIdLookup = keySelector.idLookup();
if (keyIdLookup != null) {
final int keyId = keyIdLookup.lookupId(dimension.substring(index + 1));
final int keyId = keyIdLookup.lookupId(subColumnName);
if (keyId < 0) {
return NullStringObjectColumnSelector.instance();
}
Expand Down Expand Up @@ -127,7 +133,6 @@ public String get()
}
};
} else {
final String key = dimension.substring(index + 1);
return new ObjectColumnSelector<String>()
{
@Override
Expand All @@ -146,7 +151,7 @@ public String get()
}
final int limit = Math.min(keyIndices.size(), valueIndices.size());
for (int i = 0; i < limit; i++) {
if (Objects.equals(keySelector.lookupName(keyIndices.get(i)), key)) {
if (Objects.equals(keySelector.lookupName(keyIndices.get(i)), subColumnName)) {
return valueSelector.lookupName(valueIndices.get(i));
}
}
Expand All @@ -156,6 +161,38 @@ public String get()
}
}

@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec, ColumnSelectorFactory factory)
{
// Could probably do something useful here if the column name is dot-style. But for now just return nothing.
return null;
}

@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName, ColumnSelectorFactory factory)
{
return null;
}

@Override
public LongColumnSelector makeLongColumnSelector(String columnName, ColumnSelectorFactory factory)
{
return null;
}

@Override
public ColumnCapabilities capabilities(String columnName)
{
final ValueType valueType = columnName.indexOf('.') < 0 ? ValueType.COMPLEX : ValueType.STRING;
return new ColumnCapabilitiesImpl().setType(valueType);
}

@Override
public List<String> requiredColumns()
{
return ImmutableList.of(keyDimension, valueDimension);
}

@Override
public boolean usesDotNotation()
{
Expand All @@ -170,7 +207,7 @@ public byte[] getCacheKey()
byte[] output = StringUtils.toUtf8(outputName);

return ByteBuffer.allocate(3 + key.length + value.length + output.length)
.put(VC_TYPE_ID)
.put(VirtualColumnCacheHelper.CACHE_TYPE_ID_MAP)
.put(key).put(DimFilterUtils.STRING_SEPARATOR)
.put(value).put(DimFilterUtils.STRING_SEPARATOR)
.put(output)
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.segment.VirtualColumns;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
Expand Down Expand Up @@ -87,6 +88,7 @@ public InputRow get()

Aggregator agg = aggFactory.factorize(
IncrementalIndex.makeColumnSelectorFactory(
VirtualColumns.EMPTY,
aggFactory,
supplier,
true
Expand Down
15 changes: 13 additions & 2 deletions processing/src/main/java/io/druid/query/Druids.java
Expand Up @@ -55,6 +55,7 @@
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -1104,7 +1105,7 @@ public static class SelectQueryBuilder
private QueryGranularity granularity;
private List<DimensionSpec> dimensions;
private List<String> metrics;
private List<VirtualColumn> virtualColumns;
private VirtualColumns virtualColumns;
private PagingSpec pagingSpec;

public SelectQueryBuilder()
Expand Down Expand Up @@ -1233,12 +1234,22 @@ public SelectQueryBuilder metrics(List<String> m)
return this;
}

public SelectQueryBuilder virtualColumns(List<VirtualColumn> vcs)
public SelectQueryBuilder virtualColumns(VirtualColumns vcs)
{
virtualColumns = vcs;
return this;
}

public SelectQueryBuilder virtualColumns(List<VirtualColumn> vcs)
{
return virtualColumns(VirtualColumns.create(vcs));
}

public SelectQueryBuilder virtualColumns(VirtualColumn... vcs)
{
return virtualColumns(VirtualColumns.create(Arrays.asList(vcs)));
}

public SelectQueryBuilder pagingSpec(PagingSpec p)
{
pagingSpec = p;
Expand Down
Expand Up @@ -58,7 +58,7 @@ public double getOffset()
public String apply(Object value)
{
if (value instanceof Number) {
return bucket((Double) value);
return bucket(((Number) value).doubleValue());
} else if (value instanceof String) {
return apply(value);
}
Expand Down
10 changes: 5 additions & 5 deletions processing/src/main/java/io/druid/query/select/SelectQuery.java
Expand Up @@ -31,7 +31,7 @@
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;

import java.util.List;
import java.util.Map;
Expand All @@ -46,7 +46,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
private final QueryGranularity granularity;
private final List<DimensionSpec> dimensions;
private final List<String> metrics;
private final List<VirtualColumn> virtualColumns;
private final VirtualColumns virtualColumns;
private final PagingSpec pagingSpec;

@JsonCreator
Expand All @@ -58,7 +58,7 @@ public SelectQuery(
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("virtualColumns") List<VirtualColumn> virtualColumns,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("pagingSpec") PagingSpec pagingSpec,
@JsonProperty("context") Map<String, Object> context
)
Expand All @@ -67,7 +67,7 @@ public SelectQuery(
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions;
this.virtualColumns = virtualColumns;
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
this.metrics = metrics;
this.pagingSpec = pagingSpec;

Expand Down Expand Up @@ -134,7 +134,7 @@ public List<String> getMetrics()
}

@JsonProperty
public List<VirtualColumn> getVirtualColumns()
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}
Expand Down
Expand Up @@ -43,7 +43,6 @@
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
Expand Down Expand Up @@ -161,7 +160,7 @@ public Sequence<Result<SelectResultValue>> process(final SelectQuery query, fina
adapter,
query.getQuerySegmentSpec().getIntervals(),
filter,
VirtualColumns.valueOf(query.getVirtualColumns()),
query.getVirtualColumns(),
query.isDescending(),
query.getGranularity(),
new Function<Cursor, Result<SelectResultValue>>()
Expand Down
Expand Up @@ -49,7 +49,6 @@
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.timeline.DataSegmentUtils;
import io.druid.segment.VirtualColumn;
import io.druid.timeline.LogicalSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand Down Expand Up @@ -192,20 +191,7 @@ public byte[] computeCacheKey(SelectQuery query)
++index;
}

List<VirtualColumn> virtualColumns = query.getVirtualColumns();
if (virtualColumns == null) {
virtualColumns = Collections.emptyList();
}

final byte[][] virtualColumnsBytes = new byte[virtualColumns.size()][];
int virtualColumnsBytesSize = 0;
index = 0;
for (VirtualColumn vc : virtualColumns) {
virtualColumnsBytes[index] = vc.getCacheKey();
virtualColumnsBytesSize += virtualColumnsBytes[index].length;
++index;
}

final byte[] virtualColumnsCacheKey = query.getVirtualColumns().getCacheKey();
final ByteBuffer queryCacheKey = ByteBuffer
.allocate(
1
Expand All @@ -214,7 +200,7 @@ public byte[] computeCacheKey(SelectQuery query)
+ query.getPagingSpec().getCacheKey().length
+ dimensionsBytesSize
+ metricBytesSize
+ virtualColumnsBytesSize
+ virtualColumnsCacheKey.length
)
.put(SELECT_QUERY)
.put(granularityBytes)
Expand All @@ -229,9 +215,7 @@ public byte[] computeCacheKey(SelectQuery query)
queryCacheKey.put(metricByte);
}

for (byte[] vcByte : virtualColumnsBytes) {
queryCacheKey.put(vcByte);
}
queryCacheKey.put(virtualColumnsCacheKey);

return queryCacheKey.array();
}
Expand Down
Expand Up @@ -22,14 +22,29 @@
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.ColumnCapabilities;

import javax.annotation.Nullable;

/**
* Factory class for MetricSelectors
*/
public interface ColumnSelectorFactory
{
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec);
public FloatColumnSelector makeFloatColumnSelector(String columnName);
public LongColumnSelector makeLongColumnSelector(String columnName);
public ObjectColumnSelector makeObjectColumnSelector(String columnName);
public ColumnCapabilities getColumnCapabilities(String columnName);
DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec);
FloatColumnSelector makeFloatColumnSelector(String columnName);
LongColumnSelector makeLongColumnSelector(String columnName);

@Nullable
ObjectColumnSelector makeObjectColumnSelector(String columnName);

/**
* Returns capabilities of a particular column, if known. May be null if the column doesn't exist, or if
* the column does exist but the capabilities are unknown. The latter is possible with dynamically discovered
* columns.
*
* @param column column name
*
* @return capabilities, or null
*/
@Nullable
ColumnCapabilities getColumnCapabilities(String column);
}
Expand Up @@ -30,6 +30,18 @@

public class NullDimensionSelector implements DimensionSelector, IdLookup
{
private static final NullDimensionSelector INSTANCE = new NullDimensionSelector();

private NullDimensionSelector()
{
// Singleton.
}

public static NullDimensionSelector instance()
{
return INSTANCE;
}

@Override
public IndexedInts getRow()
{
Expand Down

0 comments on commit d3a3b7b

Please sign in to comment.