Skip to content

Commit

Permalink
GEOMESA-3357 Converters - allow for inspection of conversion errors
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Apr 29, 2024
1 parent c9d7e5a commit f351c5c
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 34 deletions.
2 changes: 1 addition & 1 deletion docs/user/cli/ingest.rst
Expand Up @@ -68,7 +68,7 @@ satisfaction, then used for the entire data set with a distributed ingest.
See :ref:`cli_converter_conf` for more details on specifying the converter.

The ``converter-error-mode`` argument may be used to override the error mode defined in the converter. It must be
one of ``skip-bad-records`` or ``raise-errors``.
one of ``log-errors`` or ``raise-errors``.

If the ``--feature-name`` is specified and the schema already exists, then ``--spec`` is not required. Likewise,
if a converter is not defined, the schema will be inferred alongside the converter. Otherwise, ``--spec`` may be
Expand Down
2 changes: 1 addition & 1 deletion docs/user/convert/parquet.rst
Expand Up @@ -161,7 +161,7 @@ The following SimpleFeatureType and converter would be sufficient to parse the r
],
"options" : {
"encoding" : "UTF-8",
"error-mode" : "skip-bad-records",
"error-mode" : "log-errors",
"parse-mode" : "incremental",
"validators" : [ "index" ]
}
Expand Down
10 changes: 6 additions & 4 deletions docs/user/convert/parsing_and_validation.rst
Expand Up @@ -111,14 +111,16 @@ For more details on implementing a service provider, see the
Error Mode
~~~~~~~~~~

There are two types of modes for handling errors:
There are three modes for handling errors:

* ``skip-bad-records``
* ``log-errors``
* ``raise-errors``
* ``return-errors``

``raise-errors`` mode will throw an IOException if bad data is detected based on parsing or validation. This can
be especially useful when first developing and testing a converter definition. ``skip-bad-records`` mode will
still provide debug level logging but will not throw an exception. To configure the
be especially useful when first developing and testing a converter definition. ``log-errors`` mode will
still provide debug level logging but will not throw an exception. ``return-errors`` will expose error details through
the evaluation context, and is generally only useful when using converters programmatically. To configure the
error mode add the following option to your converter's typesafe config:

::
Expand Down
2 changes: 1 addition & 1 deletion docs/user/nifi/converters.rst
Expand Up @@ -19,7 +19,7 @@ specifying the input source:
+-------------------------------+-----------------------------------------------------------------------------------------+
| ``ConverterSpec`` | Converter specification string. Overridden by ConverterName if both are set. |
+-------------------------------+-----------------------------------------------------------------------------------------+
| ``ConverterErrorMode`` | Override the converter error mode (``skip-bad-records`` or ``raise-errors``) |
| ``ConverterErrorMode`` | Override the converter error mode (``log-errors`` or ``raise-errors``) |
+-------------------------------+-----------------------------------------------------------------------------------------+
| ``ConverterMetricReporters`` | Override the converter metrics reporters (see below) |
+-------------------------------+-----------------------------------------------------------------------------------------+
Expand Down
Expand Up @@ -58,6 +58,14 @@ trait EvaluationContext {
*/
def failure: com.codahale.metrics.Counter

/**
* Access to any errors that have occurred - note that errors will generally only be kept if the converter
* error mode is set to `ReturnErrors`
*
* @return
*/
def errors: java.util.Queue[EvaluationError]

/**
* Gets a references to a field's value
*
Expand Down Expand Up @@ -189,7 +197,8 @@ object EvaluationContext extends LazyLogging {
val cache: Map[String, EnrichmentCache],
val metrics: ConverterMetrics,
val success: Counter,
val failure: Counter
val failure: Counter,
val errors: java.util.Queue[EvaluationError] = new java.util.ArrayDeque[EvaluationError]()
) extends EvaluationContext {

// holder for results from evaluating each row
Expand Down
Expand Up @@ -33,12 +33,27 @@ object Modes {
type LineMode = LineMode.Value

object ErrorMode extends Enumeration with Modes {

type Mode = Modes.ErrorMode
val SkipBadRecords: ErrorMode = Value("skip-bad-records")
val RaiseErrors : ErrorMode = Value("raise-errors")
val LogErrors : ErrorMode = Value("log-errors")
val ReturnErrors : ErrorMode = Value("return-errors")
def Default : ErrorMode = apply()

override protected val defaultValue: ErrorMode = SkipBadRecords
def apply(mode: String): ErrorMode = {
values.find(_.toString.equalsIgnoreCase(mode)) match {
case Some(m) => m
case None =>
if ("skip-bad-records".equalsIgnoreCase(mode)) {
LogErrors
} else {
throw new IllegalArgumentException(
s"Invalid error mode '$mode'. Valid values are ${values.mkString("'", "', '", "'")}")
}
}
}

override protected val defaultValue: ErrorMode = LogErrors
override val systemProperty: SystemProperty =
SystemProperty("geomesa.converter.error.mode.default", defaultValue.toString)
}
Expand Down
Expand Up @@ -82,7 +82,7 @@ abstract class AbstractCompositeConverter[T <: AnyRef](
}

val hist = predicates.head.context.metrics.histogram("parse.nanos")
new ErrorHandlingIterator(parse(is, ec), errorMode, ec.failure, hist).flatMap(eval)
new ErrorHandlingIterator(parse(is, ec), errorMode, ec, hist).flatMap(eval)
}

override def close(): Unit = CloseWithLogging(delegates.map(_._2))
Expand All @@ -97,6 +97,7 @@ object AbstractCompositeConverter {
override def accessor(name: String): FieldAccessor = throw new NotImplementedError()
override def evaluate(args: Array[AnyRef]): Either[EvaluationError, Array[AnyRef]] =
throw new NotImplementedError()
override def errors: java.util.Queue[EvaluationError] = throw new NotImplementedError()
}

object CompositeEvaluationContext {
Expand Down
Expand Up @@ -148,7 +148,7 @@ abstract class AbstractConverter[T, C <: ConverterConfig, F <: Field, O <: Conve

override def process(is: InputStream, ec: EvaluationContext): CloseableIterator[SimpleFeature] = {
val hist = ec.metrics.histogram("parse.nanos")
val converted = convert(new ErrorHandlingIterator(parse(is, ec), options.errorMode, ec.failure, hist), ec)
val converted = convert(new ErrorHandlingIterator(parse(is, ec), options.errorMode, ec, hist), ec)
options.parseMode match {
case ParseMode.Incremental => converted
case ParseMode.Batch => CloseableIterator((new ListBuffer() ++= converted).iterator, converted.close())
Expand Down Expand Up @@ -254,14 +254,13 @@ abstract class AbstractConverter[T, C <: ConverterConfig, F <: Field, O <: Conve

case Left(error) =>
ec.failure.inc()
if (options.errorMode == ErrorMode.RaiseErrors) {
throw new IOException(errorMessage(error, rawValues, verbose = true), error.e)
}
// SkipBadRecords
if (logger.underlying.isDebugEnabled) {
logger.debug(errorMessage(error, rawValues, verbose = true), error.e)
} else if (logger.underlying.isInfoEnabled) {
logger.info(errorMessage(error, rawValues, verbose = false))
def msg(verbose: Boolean): String = errorMessage(error, rawValues, verbose)
options.errorMode match {
case ErrorMode.LogErrors if logger.underlying.isDebugEnabled => logger.debug(msg(verbose = true), error.e)
case ErrorMode.LogErrors if logger.underlying.isInfoEnabled => logger.info(msg(verbose = false))
case ErrorMode.ReturnErrors => ec.errors.add(error.copy(e = new IOException(msg(verbose = true), error.e)))
case ErrorMode.RaiseErrors => throw new IOException(msg(verbose = true), error.e)
case _ => // no-op
}
CloseableIterator.empty
}
Expand Down
Expand Up @@ -358,10 +358,11 @@ object AbstractConverterFactory extends LazyLogging {
}
}

def parse[T](key: String, values: Iterable[T]): Either[ConfigReaderFailures, T] = {
def parse[T](key: String, values: Iterable[T], fallback: Map[String, T] = Map.empty[String, T]): Either[ConfigReaderFailures, T] = {
cur.atKey(key).right.flatMap { value =>
value.asString.right.flatMap { string =>
values.find(_.toString.equalsIgnoreCase(string)) match {
val fb = fallback.collectFirst { case (k, v) if k.equalsIgnoreCase(string) => v }
values.find(_.toString.equalsIgnoreCase(string)).orElse(fb) match {
case Some(v) => Right(v)
case None => value.failed(CannotConvert(value.valueOpt.map(_.toString).orNull, values.head.getClass.getSimpleName, s"Must be one of: ${values.mkString(", ")}"))
}
Expand All @@ -377,7 +378,7 @@ object AbstractConverterFactory extends LazyLogging {
validators <- cur.atKey("validators").right.flatMap(_.asListCursor).right.flatMap(mergeValidators).right
reporters <- parseReporters(cur.atKeyOrUndefined("reporters")).right
parseMode <- parse("parse-mode", ParseMode.values).right
errorMode <- parse("error-mode", ErrorMode.values).right
errorMode <- parse("error-mode", ErrorMode.values, Map("skip-bad-records" -> ErrorMode.LogErrors)).right
encoding <- cur.atKey("encoding").right.flatMap(_.asString).right.map(Charset.forName).right
options <- decodeOptions(cur, validators, reporters, parseMode, errorMode, encoding).right
} yield {
Expand Down
Expand Up @@ -8,13 +8,16 @@

package org.locationtech.geomesa

import com.codahale.metrics.{Counter, Histogram}
import com.codahale.metrics.Histogram
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert.EvaluationContext.EvaluationError
import org.locationtech.geomesa.convert.Modes.{ErrorMode, ParseMode}
import org.locationtech.geomesa.convert2.transforms.Expression
import org.locationtech.geomesa.utils.collection.CloseableIterator

import java.io.IOException
import java.nio.charset.Charset
import scala.util.control.NonFatal
import scala.util.{Failure, Try}
Expand Down Expand Up @@ -82,13 +85,13 @@ package object convert2 {
*
* @param underlying wrapped iterator
* @param mode error mode
* @param counter counter for failures
* @param ec EvaluationContext for tracking errors
* @param times histogram for tracking convert times
*/
class ErrorHandlingIterator[T](
underlying: CloseableIterator[T],
mode: ErrorMode,
counter: Counter,
ec: EvaluationContext,
times: Histogram
) extends CloseableIterator[T] with LazyLogging {

Expand All @@ -109,10 +112,11 @@ package object convert2 {
}
} catch {
case NonFatal(e) =>
counter.inc()
ec.failure.inc()
mode match {
case ErrorMode.SkipBadRecords => logger.warn("Failed parsing input: ", e)
case ErrorMode.RaiseErrors => throw e
case ErrorMode.LogErrors => logger.warn("Failed parsing input: ", e)
case ErrorMode.ReturnErrors => ec.errors.add(EvaluationError(null, ec.line, new IOException("Failed parsing input: ", e)))
case ErrorMode.RaiseErrors => throw new IOException("Failed parsing input: ", e)
}
error = true
false // usually parsing can't continue if there is an exception in the underlying read
Expand Down
Expand Up @@ -1378,6 +1378,59 @@ class JsonConverterTest extends Specification {
}
}

"return errors in the evaluation context" >> {
val jsonStr =
"""{ id: 1, number: 123, color: "red", lat: 1, lon: 1 }
|{ id: 2, number: 456, color: "blue", lat: 200, lon: 2 }
|{ id: 3, number: 789, color: "green", lat: 3 }
|{ id: 4, number: 321, color: "yellow", lat: 4, lon: 4 }
""".stripMargin

val parserConf = ConfigFactory.parseString(
"""
| {
| type = "json"
| id-field = "$id"
| options = {
| error-mode: "return-errors"
| }
| fields = [
| { name = "id", json-type = "integer", path = "$.id", transform = "toString($0)" }
| { name = "number", json-type = "integer", path = "$.number", }
| { name = "color", json-type = "string", path = "$.color", transform = "trim($0)" }
| { name = "lat", json-type = "double", path = "$.lat", }
| { name = "lon", json-type = "double", path = "$.lon", }
| { name = "geom", transform = "point($lon, $lat)" }
| ]
| }
""".stripMargin)

WithClose(SimpleFeatureConverter(sft, parserConf)) { converter =>
val ec = converter.createEvaluationContext()
val features = WithClose(converter.process(new ByteArrayInputStream(jsonStr.getBytes(StandardCharsets.UTF_8)), ec))(_.toList)
features must haveLength(2)
features(0).getID mustEqual "1"
features(0).getAttribute("number").asInstanceOf[Integer] mustEqual 123
features(0).getAttribute("color").asInstanceOf[String] mustEqual "red"
features(0).getDefaultGeometry mustEqual WKTUtils.read("POINT (1 1)")
features(1).getID mustEqual "4"
features(1).getAttribute("number").asInstanceOf[Integer] mustEqual 321
features(1).getAttribute("color").asInstanceOf[String] mustEqual "yellow"
features(1).getDefaultGeometry mustEqual WKTUtils.read("POINT (4 4)")

ec.errors.size() mustEqual 2
val e1 = ec.errors.poll()
val e2 = ec.errors.poll()
e1.field must beNull
e1.e.getCause must not(beNull)
e1.e.getCause.getMessage must contain("Validation error")
e2.field mustEqual "geom"
e2.e.getCause must not(beNull)
e2.e.getCause.getMessage must contain("Invalid point")
ec.errors.size() mustEqual 0
}
}

"infer schema from geojson files" >> {
val json =
"""{
Expand Down
Expand Up @@ -148,7 +148,7 @@ trait ConverterConfigParam {
@Parameter(names = Array("-C", "--converter"), description = "GeoMesa converter specification as a config string, file name, or name of an available converter")
var config: String = _

@Parameter(names = Array("--converter-error-mode"), description = "Override the converter error mode - 'skip-bad-records' or 'raise-errors'", converter = classOf[ErrorModeConverter])
@Parameter(names = Array("--converter-error-mode"), description = "Override the converter error mode - 'log-errors' or 'raise-errors'", converter = classOf[ErrorModeConverter])
var errorMode: ErrorMode = _
}

Expand Down
Expand Up @@ -21,7 +21,7 @@ import org.locationtech.geomesa.utils.text.Suffixes.Memory
import java.util.Date
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

object ParameterConverters {

Expand Down Expand Up @@ -106,9 +106,9 @@ object ParameterConverters {

class ErrorModeConverter(name: String) extends BaseConverter[ErrorMode](name) {
override def convert(value: String): ErrorMode = {
ErrorMode.values.find(_.toString.equalsIgnoreCase(value)).getOrElse {
throw new ParameterException(s"Invalid error mode '$value'. Valid values are " +
ErrorMode.values.map(_.toString).mkString("'", "', '", "'"))
Try(ErrorMode(value)) match {
case Success(m) => m
case Failure(e) => throw new ParameterException(e.getMessage)
}
}
}
Expand Down

0 comments on commit f351c5c

Please sign in to comment.