Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert to Java Strings only for sketch metric columns of StringType #45

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/main/java/com/rovio/ingest/TaskDataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.unsafe.types.UTF8String;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.Collections;
Expand Down Expand Up @@ -99,6 +96,7 @@ class TaskDataWriter implements DataWriter<InternalRow> {
this.segmentSpec = segmentSpec;
this.maxRows = context.getSegmentMaxRows();
this.dataSchema = segmentSpec.getDataSchema();

this.tuningConfig = getTuningConfig(context);
DataSegmentPusher segmentPusher = SegmentStorageUpdater.createPusher(context);
// Similar code for creating a basePersistDirectory was removed in https://github.com/apache/druid/pull/13040
Expand Down Expand Up @@ -214,9 +212,9 @@ private Map<String, Object> parse(InternalRow record) {
} else {
DataType sqlType = field.getSqlType();
Object value = record.get(field.getOrdinal(), sqlType);
if (sqlType == DataTypes.StringType && value instanceof UTF8String) {
// Convert to String as Spark return UTF8String which is not compatible with Druid sketches.
value = new String(((UTF8String) value).getBytes(), StandardCharsets.UTF_8);
if (value != null && segmentSpec.getComplexMetricColumns().contains(columnName) && sqlType == DataTypes.StringType) {
// Convert to Java String as Spark return UTF8String which is not compatible with Druid sketches.
value = value.toString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change about (this line)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling toString() on UTF8String object is same as the previous code.
I changed to toString() to simplify the code to avoid checking if object is instance of UTF8String.

}
map.put(columnName, value);
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/rovio/ingest/model/SegmentSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
Expand All @@ -39,9 +40,11 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER;

Expand All @@ -64,6 +67,8 @@ public class SegmentSpec implements Serializable {
private final String metricsSpec;
private final String transformSpec;

private final Set<String> complexMetricColumns;

private SegmentSpec(String dataSource, String timeColumn, String segmentGranularity, String queryGranularity,
List<Field> fields, Field partitionTime, Field partitionNum, boolean rollup,
String dimensionsSpec, String metricsSpec, String transformSpec) {
Expand All @@ -78,6 +83,11 @@ private SegmentSpec(String dataSource, String timeColumn, String segmentGranular
this.dimensionsSpec = dimensionsSpec;
this.metricsSpec = metricsSpec;
this.transformSpec = transformSpec;
this.complexMetricColumns = Arrays
.stream(getAggregators())
.filter(aggregatorFactory -> aggregatorFactory.getIntermediateType().is(ValueType.COMPLEX))
.flatMap((AggregatorFactory aggregatorFactory) -> aggregatorFactory.requiredFields().stream())
.collect(Collectors.toSet());
}

public static SegmentSpec from(String datasource, String timeColumn, List<String> excludedDimensions,
Expand Down Expand Up @@ -255,4 +265,8 @@ private AggregatorFactory[] getAggregators() {

return builder.build().toArray(new AggregatorFactory[0]);
}

public Set<String> getComplexMetricColumns() {
return complexMetricColumns;
}
}
7 changes: 4 additions & 3 deletions src/test/java/com/rovio/ingest/SegmentSpecTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.segment.column.ValueType;
Expand Down Expand Up @@ -266,9 +265,8 @@ public void shouldFailWhenMetricsSpecIsInvalidJson() {
.add("city", DataTypes.StringType)
.add("metric1", DataTypes.LongType)
.add("metric2", DataTypes.DoubleType);
SegmentSpec spec = SegmentSpec.from("temp", "__time", Collections.emptyList(), "DAY", "DAY", schema, true, "{}");
assertThrows(IllegalArgumentException.class,
spec::getDataSchema);
() -> SegmentSpec.from("temp", "__time", Collections.emptyList(), "DAY", "DAY", schema, true, "{}"));
}

@Test
Expand Down Expand Up @@ -331,6 +329,9 @@ public void shouldSupportMetricsSpecAsJson() {
assertTrue(Arrays.stream(spec.getDataSchema().getAggregators()).anyMatch(f -> f instanceof HllSketchBuildAggregatorFactory && f.getName().equals("user_id_hll") && Objects.equals(((HllSketchBuildAggregatorFactory) f).getFieldName(), "user_id")));
assertTrue(Arrays.stream(spec.getDataSchema().getAggregators()).anyMatch(f -> f instanceof SketchMergeAggregatorFactory && f.getName().equals("user_id_theta") && Objects.equals(((SketchMergeAggregatorFactory) f).getFieldName(), "user_id")));

assertEquals(1, spec.getComplexMetricColumns().size());
assertTrue(spec.getComplexMetricColumns().contains("user_id"));

List<DimensionSchema> dimensions = spec.getDataSchema().getDimensionsSpec().getDimensions();
assertEquals(2, dimensions.size());
// user_id is used in sketch aggregator, so it should be excluded from dimensions.
Expand Down
Loading