Skip to content

Commit

Permalink
GEOMESA-3347 Type inference for xml and (non-geo) json (#3088)
Browse files Browse the repository at this point in the history
* Make converter inference method use a Try
* Change all float inference to doubles
  * floats cause rounding errors when used as doubles in geometries
* Remove creating points from consecutive number fields
* Improve error messages in CLI tools
  • Loading branch information
elahrvivaz committed Apr 8, 2024
1 parent dad109f commit c89b94c
Show file tree
Hide file tree
Showing 33 changed files with 1,195 additions and 531 deletions.
Expand Up @@ -23,6 +23,7 @@ import org.junit.runner.RunWith
import org.locationtech.geomesa.accumulo.TestWithFeatureType
import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreParams
import org.locationtech.geomesa.arrow.io.SimpleFeatureArrowFileReader
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert.text.DelimitedTextConverter
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.features.ScalaSimpleFeature
Expand All @@ -41,6 +42,7 @@ import java.io.{File, FileInputStream, FileWriter}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.{Collections, Date}
import scala.util.{Failure, Success}

@RunWith(classOf[JUnitRunner])
class AccumuloExportCommandTest extends TestWithFeatureType {
Expand Down Expand Up @@ -194,9 +196,9 @@ class AccumuloExportCommandTest extends TestWithFeatureType {
DelimitedTextConverter.magicParsing(sft.getTypeName, new FileInputStream(file)).toList

def readJson(file: String, sft: SimpleFeatureType): Seq[SimpleFeature] = {
val converter = SimpleFeatureConverter.infer(() => new FileInputStream(file), None, Some(file)) match {
case None => ko(s"could not create converter from $file"); null: SimpleFeatureConverter
case Some((s, c)) => SimpleFeatureConverter(s, c)
val converter = SimpleFeatureConverter.infer(() => new FileInputStream(file), None, EvaluationContext.inputFileParam(file)) match {
case Failure(_) => ko(s"could not create converter from $file"); null: SimpleFeatureConverter
case Success((s, c)) => SimpleFeatureConverter(s, c)
}
val result = Seq.newBuilder[SimpleFeature]
val names = sft.getAttributeDescriptors.asScala.map(_.getLocalName)
Expand Down
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.arrow.tools.export
package org.locationtech.geomesa.arrow.tools.`export`

import com.beust.jcommander.Parameters
import org.locationtech.geomesa.arrow.data.ArrowDataStore
Expand Down
Expand Up @@ -10,27 +10,29 @@ package org.locationtech.geomesa.convert.all

import com.typesafe.config.Config
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert.avro.AvroConverterFactory
import org.locationtech.geomesa.convert.json.JsonConverterFactory
import org.locationtech.geomesa.convert.parquet.ParquetConverterFactory
import org.locationtech.geomesa.convert.shp.ShapefileConverterFactory
import org.locationtech.geomesa.convert.text.DelimitedTextConverterFactory
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.convert.xml.XmlConverterFactory
import org.locationtech.geomesa.convert2.{SimpleFeatureConverter, SimpleFeatureConverterFactory}

import java.io.InputStream
import java.util.Locale
import scala.util.Try

object TypeAwareInference {

import org.locationtech.geomesa.convert2.SimpleFeatureConverter.factories

private val mappings = Map[String, Any => Boolean](
"avro" -> (_.isInstanceOf[AvroConverterFactory]),
"json" -> (_.isInstanceOf[JsonConverterFactory]),
"csv" -> (_.isInstanceOf[DelimitedTextConverterFactory]),
"tsv" -> (_.isInstanceOf[DelimitedTextConverterFactory]),
"parquet" -> (_.isInstanceOf[ParquetConverterFactory]),
"shp" -> (_.isInstanceOf[ShapefileConverterFactory])
private val mappings = Map[String, Class[_ <: SimpleFeatureConverterFactory]](
"avro" -> classOf[AvroConverterFactory],
"json" -> classOf[JsonConverterFactory],
"csv" -> classOf[DelimitedTextConverterFactory],
"tsv" -> classOf[DelimitedTextConverterFactory],
"parquet" -> classOf[ParquetConverterFactory],
"shp" -> classOf[ShapefileConverterFactory],
"xml" -> classOf[XmlConverterFactory]
)

/**
Expand All @@ -42,12 +44,32 @@ object TypeAwareInference {
* @param path file path, if known
* @return
*/
@deprecated("replaced with `infer(String, () => InputStream, Option[SimpleFeatureType], Map[String, Any])`")
def infer(
format: String,
is: () => InputStream,
sft: Option[SimpleFeatureType],
path: Option[String]): Option[(SimpleFeatureType, Config)] =
infer(format, is, sft, path.map(EvaluationContext.inputFileParam).getOrElse(Map.empty)).toOption


/**
* Infer a converter based on a data sample
*
* @param format data format (e.g. csv, avro, json, etc)
* @param is input stream to convert
* @param sft simple feature type, if known
* @param hints implementation specific hints about the input
* @return
*/
def infer(
format: String,
is: () => InputStream,
sft: Option[SimpleFeatureType],
path: Option[String]): Option[(SimpleFeatureType, Config)] = {
val opt = mappings.get(format.toLowerCase(Locale.US)).flatMap(check => factories.find(check.apply))
opt.flatMap(_.infer(is.apply, sft, path)).orElse(SimpleFeatureConverter.infer(is, sft, path))
hints: Map[String, AnyRef]): Try[(SimpleFeatureType, Config)] = {
val priority = mappings.get(format.toLowerCase(Locale.US)).map { clas =>
Ordering.by[SimpleFeatureConverterFactory, Boolean](f => !clas.isAssignableFrom(f.getClass)) // note: false sorts first
}
SimpleFeatureConverter.infer(is, sft, hints, priority)
}
}
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.convert.avro.registry

import com.typesafe.config.Config
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.AvroSchemaRegistryConfig
import org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverterFactory.AvroSchemaRegistryConfigConvert
import org.locationtech.geomesa.convert2.AbstractConverter.{BasicField, BasicOptions}
Expand All @@ -18,9 +19,18 @@ import org.locationtech.geomesa.convert2.transforms.Expression
import pureconfig.ConfigObjectCursor
import pureconfig.error.ConfigReaderFailures

import java.io.InputStream
import scala.util.{Failure, Try}

class AvroSchemaRegistryConverterFactory
extends AbstractConverterFactory[AvroSchemaRegistryConverter, AvroSchemaRegistryConfig, BasicField, BasicOptions](
"avro-schema-registry", AvroSchemaRegistryConfigConvert, BasicFieldConvert, BasicOptionsConvert)
"avro-schema-registry", AvroSchemaRegistryConfigConvert, BasicFieldConvert, BasicOptionsConvert) {

override def infer(
is: InputStream,
sft: Option[SimpleFeatureType],
hints: Map[String, AnyRef]): Try[(SimpleFeatureType, Config)] = Failure(new NotImplementedError())
}

object AvroSchemaRegistryConverterFactory {

Expand Down
Expand Up @@ -13,11 +13,12 @@ import org.apache.avro.Schema
import org.apache.avro.file.DataFileStream
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert.avro.AvroConverter._
import org.locationtech.geomesa.convert.avro.AvroConverterFactory.AvroConfigConvert
import org.locationtech.geomesa.convert2.AbstractConverter.{BasicField, BasicOptions}
import org.locationtech.geomesa.convert2.AbstractConverterFactory.{BasicFieldConvert, BasicOptionsConvert, ConverterConfigConvert, OptionConvert}
import org.locationtech.geomesa.convert2.TypeInference.{FunctionTransform, InferredType}
import org.locationtech.geomesa.convert2.TypeInference.{FunctionTransform, InferredType, Namer}
import org.locationtech.geomesa.convert2.transforms.Expression
import org.locationtech.geomesa.convert2.{AbstractConverterFactory, TypeInference}
import org.locationtech.geomesa.features.avro.io.AvroDataFile
Expand All @@ -30,7 +31,7 @@ import pureconfig.ConfigObjectCursor
import pureconfig.error.{ConfigReaderFailures, FailureReason}

import java.io.InputStream
import scala.util.control.NonFatal
import scala.util.Try

class AvroConverterFactory extends AbstractConverterFactory[AvroConverter, AvroConfig, BasicField, BasicOptions](
"avro", AvroConfigConvert, BasicFieldConvert, BasicOptionsConvert) {
Expand All @@ -50,8 +51,14 @@ class AvroConverterFactory extends AbstractConverterFactory[AvroConverter, AvroC
override def infer(
is: InputStream,
sft: Option[SimpleFeatureType],
path: Option[String]): Option[(SimpleFeatureType, Config)] = {
try {
path: Option[String]): Option[(SimpleFeatureType, Config)] =
infer(is, sft, path.map(EvaluationContext.inputFileParam).getOrElse(Map.empty)).toOption

override def infer(
is: InputStream,
sft: Option[SimpleFeatureType],
hints: Map[String, AnyRef]): Try[(SimpleFeatureType, Config)] = {
Try {
WithClose(new DataFileStream[GenericRecord](is, new GenericDatumReader[GenericRecord]())) { dfs =>
val (schema, id, fields, userData) = if (AvroDataFile.canParse(dfs)) {
// this is a file written in the geomesa avro format
Expand Down Expand Up @@ -142,12 +149,8 @@ class AvroConverterFactory extends AbstractConverterFactory[AvroConverter, AvroC
.withFallback(optsConvert.to(BasicOptions.default))
.toConfig

Some((schema, config))
(schema, config)
}
} catch {
case NonFatal(e) =>
logger.debug(s"Could not infer Avro converter from input:", e)
None
}
}
}
Expand All @@ -163,18 +166,12 @@ object AvroConverterFactory {
* @return
*/
def schemaTypes(schema: Schema): Seq[InferredType] = {
val uniqueNames = scala.collection.mutable.HashSet.empty[String]
val namer = new Namer()
val types = scala.collection.mutable.ArrayBuffer.empty[InferredType]

def mapField(field: Schema.Field, path: String = ""): Unit = {
// get a valid attribute name
val base = s"${field.name().replaceAll("[^A-Za-z0-9]+", "_")}"
var name = base
var i = 0
while (!uniqueNames.add(name)) {
name = s"${base}_$i"
i += 1
}
val name = namer(field.name())

// checks for nested array/map types we can handle
def isSimpleArray: Boolean = isSimple(field.schema().getElementType)
Expand Down Expand Up @@ -202,7 +199,7 @@ object AvroConverterFactory {
schema.getFields.asScala.foreach(mapField(_))

// check if we can derive a geometry field
TypeInference.deriveGeometry(types.toSeq).foreach(g => types += g)
TypeInference.deriveGeometry(types.toSeq, namer).foreach(g => types += g)

types.toSeq
}
Expand Down
Expand Up @@ -195,9 +195,9 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging {

val bytes = out.toByteArray

val inferred = new AvroConverterFactory().infer(new ByteArrayInputStream(bytes))
val inferred = new AvroConverterFactory().infer(new ByteArrayInputStream(bytes), None, Map.empty[String, AnyRef])

inferred must beSome
inferred must beASuccessfulTry
inferred.get._1 mustEqual sft

logger.trace(inferred.get._2.root().render(ConfigRenderOptions.concise().setFormatted(true)))
Expand All @@ -224,9 +224,9 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging {
val bytes = out.toByteArray

val updated = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326,tag:String")
val inferred = new AvroConverterFactory().infer(new ByteArrayInputStream(bytes), Some(updated))
val inferred = new AvroConverterFactory().infer(new ByteArrayInputStream(bytes), Some(updated), Map.empty[String, AnyRef])

inferred must beSome
inferred must beASuccessfulTry
inferred.get._1 mustEqual sft

logger.trace(inferred.get._2.root().render(ConfigRenderOptions.concise().setFormatted(true)))
Expand Down Expand Up @@ -289,9 +289,9 @@ class AvroConverterTest extends Specification with AvroUtils with LazyLogging {

val bytes = out.toByteArray

val inferred = new AvroConverterFactory().infer(new ByteArrayInputStream(bytes))
val inferred = new AvroConverterFactory().infer(new ByteArrayInputStream(bytes), None, Map.empty[String, AnyRef])

inferred must beSome
inferred must beASuccessfulTry

val expectedSft = SimpleFeatureTypes.createType(inferred.get._1.getTypeName,
"lat:Double,lon:Double,label:String,list:List[String],map:Map[String,Int],age:Int,weight:Float,*geom:Point:srid=4326")
Expand Down
Expand Up @@ -19,6 +19,7 @@ import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader
import org.locationtech.geomesa.utils.io.WithClose

import java.io.{Closeable, InputStream}
import scala.util.{Failure, Try}

/**
* Converts input streams into simple features. SimpleFeatureConverters should be thread-safe. However,
Expand Down Expand Up @@ -128,12 +129,62 @@ object SimpleFeatureConverter extends StrictLogging {
* @param sft simple feature type, if known
* @return
*/
@deprecated("replaced with `infer(() => InputStream, Option[SimpleFeatureType], Map[String, Any])`")
def infer(
is: () => InputStream,
sft: Option[SimpleFeatureType],
path: Option[String] = None): Option[(SimpleFeatureType, Config)] = {
factories.foldLeft[Option[(SimpleFeatureType, Config)]](None) { (res, f) =>
res.orElse(WithClose(is())(in => f.infer(in, sft, path)))
val hints = path match {
case None => Map.empty[String, AnyRef]
case Some(p) => Map(EvaluationContext.InputFilePathKey -> p)
}
infer(is, sft, hints).toOption
}

/**
* Infer a converter based on a data sample
*
* @param is input stream to convert
* @param sft simple feature type, if known
* @param hints implementation specific hints
* @return
*/
def infer(
is: () => InputStream,
sft: Option[SimpleFeatureType],
hints: Map[String, AnyRef]): Try[(SimpleFeatureType, Config)] = infer(is, sft, hints, None)

/**
* Infer a converter based on a data sample
*
* @param is input stream to convert
* @param sft simple feature type, if known
* @param hints implementation specific hints
* @param priority priority order for trying factories
* @return
*/
def infer(
is: () => InputStream,
sft: Option[SimpleFeatureType],
hints: Map[String, AnyRef],
priority: Option[Ordering[SimpleFeatureConverterFactory]]): Try[(SimpleFeatureType, Config)] = {
if (factories.isEmpty) {
Failure(new RuntimeException("There are no converters available on the classpath"))
} else {
val sorted = priority.fold(factories)(factories.sorted(_))
val attempts = sorted.iterator.map { factory =>
WithClose(is()) { is =>
val res = try { factory.infer(is, sft, hints) } catch {
// in case a particular converter's dependencies aren't on the classpath
case e: NoClassDefFoundError => Failure(e)
}
if (res.isSuccess) { res } else {
val msg = s"${factory.getClass.getSimpleName}: could not infer a converter:"
Failure(new RuntimeException(msg, res.failed.get))
}
}
}
multiTry(attempts, new RuntimeException("Unable to infer a converter"))
}
}
}
Expand Up @@ -11,8 +11,10 @@ package org.locationtech.geomesa.convert2
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.EvaluationContext

import java.io.InputStream
import scala.util.{Failure, Success, Try}

trait SimpleFeatureConverterFactory extends LazyLogging {

Expand All @@ -33,8 +35,30 @@ trait SimpleFeatureConverterFactory extends LazyLogging {
* @param path file path, if there is a file available
* @return
*/
@deprecated("replaced with `infer(InputStream, Option[SimpleFeatureType], Map[String, Any])`")
def infer(
is: InputStream,
sft: Option[SimpleFeatureType] = None,
path: Option[String] = None): Option[(SimpleFeatureType, Config)] = None

/**
* Infer a configuration and simple feature type from an input stream, if possible.
*
* The default implementation delegates to the deprecated `infer` method to help back-compatibility, but
* should be overridden by implementing classes
*
* @param is input
* @param sft simple feature type, if known ahead of time
* @param hints implementation specific hints about the input
* @return
*/
def infer(
is: InputStream,
sft: Option[SimpleFeatureType],
hints: Map[String, AnyRef]): Try[(SimpleFeatureType, Config)] = {
infer(is, sft, hints.get(EvaluationContext.InputFilePathKey).map(_.toString)) match {
case Some(result) => Success(result)
case None => Failure(new RuntimeException("Could not infer converter from input data"))
}
}
}

0 comments on commit c89b94c

Please sign in to comment.