Skip to content

Commit

Permalink
apache#1936 Exception thrown in processing row in index task should n…
Browse files Browse the repository at this point in the history
…ot be regarded as parsing fail
  • Loading branch information
navis committed Apr 23, 2019
1 parent 26d96b6 commit e146f91
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 70 deletions.
29 changes: 21 additions & 8 deletions common/src/main/java/io/druid/common/utils/JodaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,32 @@ public static Interval umbrellaInterval(Iterable<Interval> intervals)
return new Interval(startDate, endDate);
}

public static boolean overlaps(final Interval i, Iterable<Interval> intervals)
public static boolean contains(final long timestamp, final Iterable<Interval> intervals)
{
return Iterables.any(
intervals, new Predicate<Interval>()
{
@Override
public boolean apply(Interval input)
{
return input.overlaps(i);
}
}
{
@Override
public boolean apply(Interval input)
{
return input.contains(timestamp);
}
}
);
}

public static boolean overlaps(final Interval i, Iterable<Interval> intervals)
{
return Iterables.any(
intervals, new Predicate<Interval>()
{
@Override
public boolean apply(Interval input)
{
return input.overlaps(i);
}
}
);
}

public static List<Interval> overlapping(final Interval interval, Iterable<Interval> intervals)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down Expand Up @@ -348,7 +347,6 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ public void configure(Binder binder)
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
}

public enum IndexJobCounters
{
INVALID_ROW_COUNTER
}

public static HadoopDruidIndexerConfig fromSpec(HadoopIngestionSpec spec)
{
return new HadoopDruidIndexerConfig(spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package io.druid.indexer;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.ParserInitializationFail;
import io.druid.data.ParsingFail;
import io.druid.data.input.InputRow;
Expand All @@ -30,9 +32,11 @@
import io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.io.IOException;
import java.util.List;
import java.util.SortedSet;

public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
{
Expand All @@ -43,11 +47,12 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
private List<Interval> intervals;

private Counter indexedRows;
private Counter invalidRows;
private Counter oobRows;
private Counter errRows;
private Counter nullRows;

private boolean oobLogged;

Expand All @@ -59,10 +64,15 @@ protected void setup(Context context)
parser = config.getParser();
granularitySpec = config.getGranularitySpec();

Optional<SortedSet<Interval>> buckets = granularitySpec.bucketIntervals();
if (buckets.isPresent()) {
intervals = JodaUtils.condenseIntervals(buckets.get());
}

indexedRows = context.getCounter("druid.internal", "indexed-row-num");
invalidRows = context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
oobRows = context.getCounter("druid.internal", "oob-row-num");
errRows = context.getCounter("druid.internal", "err-row-num");
nullRows = context.getCounter("druid.internal", "null-row-num");

setupHadoopAwareParser(parser, new MapperContext(context));
}
Expand Down Expand Up @@ -90,58 +100,58 @@ public InputRowParser getParser()
@Override
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
{
final InputRow inputRow;
try {
final InputRow inputRow = parseInputRow(value, parser);
if (inputRow == null) {
return;
}
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
.isPresent()) {
indexedRows.increment(1);
innerMap(inputRow, value, context);
} else {
oobRows.increment(1);
if (!oobLogged) {
log.info("Out of bound row [%s]. will be ignored in next", inputRow);
oobLogged = true;
}
}
inputRow = parseInputRow(value, parser);
}
catch (Throwable e) {
errRows.increment(1);
Throwables.propagateIfInstanceOf(e, Error.class);
Throwables.propagateIfInstanceOf(e, ParserInitializationFail.class); // invalid configuration, etc.. fail early

handleInvalidRow(value, e);

if (config.isIgnoreInvalidRows()) {
handelInvalidRow(value, e);
return; // we're ignoring this invalid row
}
if (e instanceof ParsingFail) {
Object target = ((ParsingFail) e).getInput();
e = e.getCause() == null ? e : e.getCause();
log.info(e, "Ignoring invalid row due to parsing fail of %s", target == null ? value : target);
}
if (e instanceof IOException) {
throw (IOException) e;
}
if (e instanceof InterruptedException) {
throw (InterruptedException) e;
}
Throwables.propagateIfInstanceOf(e, IOException.class);
Throwables.propagateIfInstanceOf(e, InterruptedException.class);
throw Throwables.propagate(e);
}
process(inputRow, context);
}

private void handelInvalidRow(Object value, Throwable e)
private void process(InputRow inputRow, Context context) throws IOException, InterruptedException
{
invalidRows.increment(1);
if (invalidRows.getValue() <= INVALID_LOG_THRESHOLD) {
if (inputRow == null) {
nullRows.increment(1);
return;
}
if (intervals == null || JodaUtils.contains(inputRow.getTimestampFromEpoch(), intervals)) {
indexedRows.increment(1);
innerMap(inputRow, context);
} else {
oobRows.increment(1);
if (!oobLogged) {
log.info("Out of bound row [%s]. will be ignored in next", inputRow);
oobLogged = true;
}
}
}

private void handleInvalidRow(Object value, Throwable e)
{
errRows.increment(1);
if (e instanceof ParsingFail) {
value = ((ParsingFail) e).getInput() == null ? value : ((ParsingFail) e).getInput();
e = e.getCause() == null ? e : e.getCause();
}
if (errRows.getValue() <= INVALID_LOG_THRESHOLD) {
log.warn(
e,
"Ignoring invalid [%d]th row [%s] due to parsing error.. %s", invalidRows.getValue(), value,
invalidRows.getValue() == INVALID_LOG_THRESHOLD ? "will not be logged further" : ""
"Invalid row [%s] ([%d]th) due to parsing error.. %s", errRows.getValue(), value,
errRows.getValue() == INVALID_LOG_THRESHOLD ? "will not be logged further" : ""
);
}
if (e instanceof ParserInitializationFail) {
throw (ParserInitializationFail) e; // invalid configuration, etc.. fail early
}
}

@SuppressWarnings("unchecked")
Expand All @@ -150,6 +160,6 @@ private InputRow parseInputRow(Object value, InputRowParser parser)
return value instanceof InputRow ? (InputRow) value : parser.parse(value);
}

protected abstract void innerMap(InputRow inputRow, Object value, Context context)
protected abstract void innerMap(InputRow inputRow, Context context)
throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,25 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.mapreduce.Counters;

import static io.druid.indexer.HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER;

/**
*/
public class IndexGeneratorStats
{
private long invalidRowCount = 0;
private long indexedRows = 0;
private long oobRows = 0;
private long errRows = 0;
private long nullRows = 0;
private long flushedIndex = 0;

public void setStats(Counters counters)
{
this.invalidRowCount = counters.findCounter(INVALID_ROW_COUNTER).getValue();
this.indexedRows = counters.findCounter("druid.internal", "indexed-row-num").getValue();
this.oobRows = counters.findCounter("druid.internal", "oob-row-num").getValue();
this.errRows = counters.findCounter("druid.internal", "err-row-num").getValue();
this.nullRows = counters.findCounter("druid.internal", "null-row-num").getValue();
this.flushedIndex = counters.findCounter("druid.internal", "index-flush-count").getValue();
}

@JsonProperty
public long getInvalidRowCount()
{
return invalidRowCount;
}

@JsonProperty
public long getIndexedRows()
{
Expand All @@ -67,6 +59,12 @@ public long getErrRows()
return errRows;
}

@JsonProperty
public long getNullRows()
{
return nullRows;
}

@JsonProperty
public long getFlushedIndex()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void progress()

@Override
protected void innerMap(
InputRow row, Object value, Context context
InputRow row, Context context
) throws IOException, InterruptedException
{
if (interval == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void progress()
}

@Override
protected void innerMap(InputRow row, Object value, Context context)
protected void innerMap(InputRow row, Context context)
throws IOException, InterruptedException
{
// not null only with HadoopCombineInputFormat
Expand Down

0 comments on commit e146f91

Please sign in to comment.