diff --git a/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterFactory.scala b/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterFactory.scala index db819e9427b5..e6b789b328a2 100644 --- a/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterFactory.scala +++ b/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterFactory.scala @@ -16,6 +16,7 @@ import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{MessageType, OriginalType, Type} +import org.geotools.api.feature.`type`.AttributeDescriptor import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.convert.EvaluationContext import org.locationtech.geomesa.convert2.AbstractConverter.{BasicConfig, BasicField, BasicOptions} @@ -63,6 +64,7 @@ class ParquetConverterFactory // note: get the path as a URI so that we handle local files appropriately val filePath = new Path(PathUtils.getUrl(p).toURI) val footer = ParquetFileReader.readFooter(new Configuration(), filePath, ParquetMetadataConverter.NO_FILTER) + val parquetSchemaVersion = footer.getFileMetaData.getKeyValueMetaData.getOrDefault("geomesa.parquet.version", "0").toInt val (schema, fields, id) = SimpleFeatureParquetSchema.read(footer.getFileMetaData) match { case Some(parquet) => // this is a geomesa encoded parquet file @@ -71,16 +73,7 @@ class ParquetConverterFactory // note: parquet converter stores the generic record under index 0 val path = s"avroPath($$0, '/$name')" // some types need a function applied to the underlying avro value - val expression = ObjectType.selectType(descriptor) match { - case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"parquetPoint($$0, '/$name')" - case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"parquetMultiPoint($$0, '/$name')" - case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"parquetLineString($$0, '/$name')" - case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"parquetMultiLineString($$0, '/$name')" - case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"parquetPolygon($$0, '/$name')" - case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"parquetMultiPolygon($$0, '/$name')" - case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)" - case _ => path - } + val expression = computeTransformFunction(name, path, descriptor, parquetSchemaVersion) BasicField(descriptor.getLocalName, Some(Expression(expression))) } val id = Expression(s"avroPath($$0, '/${SimpleFeatureParquetSchema.FeatureIdField}')") @@ -115,6 +108,41 @@ class ParquetConverterFactory } } } + + private def computeTransformFunction(name: String, path: String, descriptor: AttributeDescriptor, schemaVersion: Int): String = { + def expressionV2(name: String, path: String, descriptor: AttributeDescriptor): String = { + ObjectType.selectType(descriptor) match { + case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"point(avroPath($$0, '/$name'))" + case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"multipoint(avroPath($$0, '/$name'))" + case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"linestring(avroPath($$0, '/$name'))" + case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"multilinestring(avroPath($$0, '/$name'))" + case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"polygon(avroPath($$0, '/$name'))" + case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"multipolygon(avroPath($$0, '/$name'))" + case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)" + case _ => path + } + } + + def expressionV0V1(name: String, path: String, descriptor: AttributeDescriptor): String = { + ObjectType.selectType(descriptor) match { + case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"parquetPoint($$0, '/$name')" + case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"parquetMultiPoint($$0, '/$name')" + case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"parquetLineString($$0, '/$name')" + case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"parquetMultiLineString($$0, '/$name')" + case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"parquetPolygon($$0, '/$name')" + case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"parquetMultiPolygon($$0, '/$name')" + case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)" + case _ => path + } + } + + schemaVersion match { + case 2 => expressionV2(name, path, descriptor) + case 1 => expressionV0V1(name, path, descriptor) + case 0 => expressionV0V1(name, path, descriptor) + case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v") + } + } } object ParquetConverterFactory { diff --git a/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetFunctionFactory.scala b/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetFunctionFactory.scala index c4b6406179ac..7b7fb35f2771 100644 --- a/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetFunctionFactory.scala +++ b/geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetFunctionFactory.scala @@ -14,9 +14,15 @@ import org.locationtech.geomesa.convert.avro.AvroPath import org.locationtech.geomesa.convert2.transforms.Expression.LiteralString import org.locationtech.geomesa.convert2.transforms.TransformerFunction.NamedTransformerFunction import org.locationtech.geomesa.convert2.transforms.{Expression, TransformerFunction, TransformerFunctionFactory} -import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchema, SimpleFeatureReadSupport} +import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchemaV1, SimpleFeatureReadSupport} import org.locationtech.jts.geom._ +/** + * For parsing geometries from a GeoParquet file, the GeometryFunctionFactory class provides equivalent functionality. + * + * This class is kept for backwards compatibility with older Parquet file formats. + */ +@Deprecated class ParquetFunctionFactory extends TransformerFunctionFactory { override def functions: Seq[TransformerFunction] = geometries @@ -42,7 +48,7 @@ class ParquetFunctionFactory extends TransformerFunctionFactory { abstract class ParquetGeometryFn[T <: Geometry, U](name: String, path: AvroPath) extends NamedTransformerFunction(Seq(name), pure = true) { - import SimpleFeatureParquetSchema.{GeometryColumnX, GeometryColumnY} + import SimpleFeatureParquetSchemaV1.{GeometryColumnX, GeometryColumnY} override def apply(args: Array[AnyRef]): AnyRef = { path.eval(args(0).asInstanceOf[GenericRecord]).collect { diff --git a/geomesa-convert/geomesa-convert-parquet/src/test/resources/example-geo.parquet b/geomesa-convert/geomesa-convert-parquet/src/test/resources/example-geo.parquet new file mode 100644 index 000000000000..e3c5feae01f8 Binary files /dev/null and b/geomesa-convert/geomesa-convert-parquet/src/test/resources/example-geo.parquet differ diff --git a/geomesa-convert/geomesa-convert-parquet/src/test/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterTest.scala b/geomesa-convert/geomesa-convert-parquet/src/test/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterTest.scala index 13770f978867..8c5d4ea0ccfa 100644 --- a/geomesa-convert/geomesa-convert-parquet/src/test/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterTest.scala +++ b/geomesa-convert/geomesa-convert-parquet/src/test/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterTest.scala @@ -8,7 +8,8 @@ package org.locationtech.geomesa.convert.parquet -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} +import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.util.factory.Hints import org.junit.runner.RunWith import org.locationtech.geomesa.convert.EvaluationContext @@ -33,6 +34,39 @@ class ParquetConverterTest extends Specification { sequential "ParquetConverter" should { + "parse a geoparquet file" in { + val conf = ConfigFactory.parseString( + """ + | { + | type = "parquet", + | id-field = "avroPath($0, '/__fid__')", + | fields = [ + | { name = "name", transform = "avroPath($0, '/name')" }, + | { name = "age", transform = "avroPath($0, '/age')" }, + | { name = "dtg", transform = "avroPath($0, '/dtg')" }, + | { name = "position", transform = "point(avroPath($0, '/position'))" }, + | ] + | } + """.stripMargin) + + val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*position:Point:srid=4326") + + val file = getClass.getClassLoader.getResource("example-geo.parquet") + val path = new File(file.toURI).getAbsolutePath + + val res = WithClose(SimpleFeatureConverter(sft, conf)) { converter => + val ec = converter.createEvaluationContext(EvaluationContext.inputFileParam(path)) + WithClose(converter.process(file.openStream(), ec))(_.toList) + } + + res must haveLength(3) + res.map(_.getID) mustEqual Seq("1", "2", "3") + res.map(_.getAttribute("name")) mustEqual Seq("first", null, "third") + res.map(_.getAttribute("age")) mustEqual Seq(100, 200, 300) + res.map(_.getAttribute("dtg")) mustEqual Seq("2017-01-01", "2017-01-02", "2017-01-03").map(FastConverter.convert(_, classOf[Date])) + res.map(_.getAttribute("position")) mustEqual Seq("POINT (25.236263 27.436734)", "POINT (67.2363 55.236)", "POINT (73.0 73.0)").map(FastConverter.convert(_, classOf[Point])) + } + "parse a parquet file" in { val conf = ConfigFactory.parseString( """ @@ -68,6 +102,33 @@ class ParquetConverterTest extends Specification { res.map(_.getAttribute("geom")) mustEqual Seq("POINT (-100.2365 23)", "POINT (40.232 -53.2356)", "POINT (3 -62.23)").map(FastConverter.convert(_, classOf[Point])) } + "infer a converter from a geomesa geoparquet file" >> { + val file = getClass.getClassLoader.getResource("example-geo.parquet") + val path = new File(file.toURI).getAbsolutePath + + val factory = new ParquetConverterFactory() + val inferred: Option[(SimpleFeatureType, Config)] = factory.infer(file.openStream(), path = Some(path)) + inferred must beSome + + val (sft, config) = inferred.get + + sft.getAttributeDescriptors.asScala.map(_.getLocalName) mustEqual Seq("name", "age", "dtg", "position") + sft.getAttributeDescriptors.asScala.map(_.getType.getBinding) mustEqual + Seq(classOf[String], classOf[java.lang.Integer], classOf[Date], classOf[Point]) + + val res = WithClose(SimpleFeatureConverter(sft, config)) { converter => + val ec = converter.createEvaluationContext(EvaluationContext.inputFileParam(path)) + WithClose(converter.process(file.openStream(), ec))(_.toList) + } + + res must haveLength(3) + res.map(_.getID) mustEqual Seq("1", "2", "3") + res.map(_.getAttribute("name")) mustEqual Seq("first", null, "third") + res.map(_.getAttribute("age")) mustEqual Seq(100, 200, 300) + res.map(_.getAttribute("dtg")) mustEqual Seq("2017-01-01", "2017-01-02", "2017-01-03").map(FastConverter.convert(_, classOf[Date])) + res.map(_.getAttribute("position")) mustEqual Seq("POINT (25.236263 27.436734)", "POINT (67.2363 55.236)", "POINT (73.0 73.0)").map(FastConverter.convert(_, classOf[Point])) + } + "infer a converter from a geomesa parquet file" >> { val file = getClass.getClassLoader.getResource("example.parquet") val path = new File(file.toURI).getAbsolutePath diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index d8cc86722ff1..60d5bb557f5b 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -19,7 +19,7 @@ import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.{FileSystemUpda import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api.StorageMetadata._ import org.locationtech.geomesa.fs.storage.api._ -import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver, WriterConfig} +import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, WriterConfig} import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory} import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType @@ -67,11 +67,13 @@ abstract class AbstractFileSystemStorage( /** * Create a writer for the given file * + * @param partition the partition that the file belongs to + * @param action whether to append or modify * @param file file to write to * @param observer observer to report stats on the data written * @return */ - protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter + protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter /** * Create a path reader with the given filter and transform @@ -234,11 +236,11 @@ abstract class AbstractFileSystemStorage( def pathAndObserver: WriterConfig = { val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType) PathCache.register(context.fc, path) - val updateObserver = new UpdateObserver(partition, path, action) - val observer = if (observers.isEmpty) { updateObserver } else { - new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)) + val observer = if (observers.isEmpty) { None } else { + val compositeObserver = new CompositeObserver(observers.map(_.apply(path))) + Some(compositeObserver) } - WriterConfig(path, observer) + WriterConfig(partition, action, path, observer) } targetSize(targetFileSize) match { @@ -247,7 +249,7 @@ abstract class AbstractFileSystemStorage( } } - private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.path, config.observer) + private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.partition, config.action, config.path, config.observer) /** * Writes files up to a given size, then starts a new file @@ -350,10 +352,10 @@ abstract class AbstractFileSystemStorage( * @param file file being written * @param action file type */ - class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver { - override protected def onClose(bounds: Envelope, count: Long): Unit = { + protected class FileBasedMetadataCallback(partition: String, action: StorageFileAction, file: Path) extends ((Envelope, Long) => Unit) { + override def apply(env: Envelope, count: Long): Unit = { val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action)) - metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(bounds), count)) + metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(env), count)) } } } @@ -391,5 +393,5 @@ object AbstractFileSystemStorage { protected def onClose(bounds: Envelope, count: Long): Unit } - private case class WriterConfig(path: Path, observer: FileSystemObserver) + private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: Option[FileSystemObserver]) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala index 1e04f4570dc6..fb6f6bb02e82 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala @@ -13,4 +13,4 @@ import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWrite /** * Marker trait for writer hooks */ -trait FileSystemObserver extends FileSystemWriter +trait FileSystemObserver extends FileSystemWriter \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala index 4e92c5fddb80..55ee96f880a1 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala @@ -13,6 +13,7 @@ import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.api.filter.Filter import org.locationtech.geomesa.convert2.SimpleFeatureConverter import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{StorageFile, StorageFilePath} import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage @@ -30,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co // actually need to be closed, and since they will only open a single connection per converter, the // impact should be low - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = throw new NotImplementedError() override protected def createReader( diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala index a69b92690162..0b5b5f421a5c 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala @@ -16,13 +16,15 @@ import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.api.filter.Filter import org.locationtech.geomesa.filter.factory.FastFilterFactory import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage -import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader +import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver} +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType -import org.locationtech.jts.geom.Geometry +import org.locationtech.jts.geom.{Envelope, Geometry} /** * Orc implementation of FileSystemStorage @@ -32,8 +34,20 @@ import org.locationtech.jts.geom.Geometry class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) { - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = - new OrcFileSystemWriter(metadata.sft, context.conf, file, observer) + private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver { + override protected def onClose(bounds: Envelope, count: Long): Unit = new FileBasedMetadataCallback(partition, action, file)(bounds, count) + } + + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = { + val singleGeometryObserver = new SingleGeometryObserver(partition, action, file) + + observer match { + case Some(_) => + val compositeObserver = new CompositeObserver(Seq(singleGeometryObserver, observer.get)) + new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(compositeObserver)) + case None => new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(singleGeometryObserver)) + } + } override protected def createReader( filter: Option[Filter], diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala index 58f4ae822638..21a7156041a0 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala @@ -24,7 +24,7 @@ class OrcFileSystemWriter( sft: SimpleFeatureType, config: Configuration, file: Path, - observer: FileSystemObserver = NoOpObserver + observer: Option[FileSystemObserver] = None ) extends FileSystemWriter { private val schema = OrcFileSystemStorage.createTypeDescription(sft) @@ -34,6 +34,7 @@ class OrcFileSystemWriter( private val batch = schema.createRowBatch() private val attributeWriter = OrcAttributeWriter(sft, batch) + private val observerVal = observer.getOrElse(NoOpObserver) override def write(sf: SimpleFeature): Unit = { attributeWriter.apply(sf, batch.size) @@ -43,19 +44,19 @@ class OrcFileSystemWriter( writer.addRowBatch(batch) batch.reset() } - observer.write(sf) + observerVal.write(sf) } override def flush(): Unit = { flushBatch() - observer.flush() + observerVal.flush() } override def close(): Unit = { try { flushBatch() } catch { - case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e + case NonFatal(e) => CloseQuietly(Seq(writer, observerVal)).foreach(e.addSuppressed); throw e } - CloseQuietly.raise(Seq(writer, observer)) + CloseQuietly.raise(Seq(writer, observerVal)) } private def flushBatch(): Unit = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala index 18e5608ac4fa..3ff6531abd7e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala @@ -19,7 +19,7 @@ import org.geotools.util.factory.Hints import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter -import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionMetadata, StorageFile} +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile} import org.locationtech.geomesa.fs.storage.api.{FileSystemContext, FileSystemStorage, Metadata, NamedOptions} import org.locationtech.geomesa.fs.storage.common.StorageKeys import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadataFactory @@ -27,12 +27,14 @@ import org.locationtech.geomesa.fs.storage.common.partitions.DateTimeScheme import org.locationtech.geomesa.fs.storage.common.utils.PathCache import org.locationtech.geomesa.utils.collection.SelfClosingIterator import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.locationtech.jts.geom.Envelope import org.specs2.matcher.MatchResult import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner import java.nio.file.Files import java.util.UUID +import scala.collection.mutable @RunWith(classOf[JUnitRunner]) class OrcFileSystemStorageTest extends Specification with LazyLogging { @@ -44,7 +46,54 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging { // 8 bits resolution creates 3 partitions with our test data val scheme = NamedOptions("z2-8bits") - "OrcFileSystemWriter" should { + "OrcFileSystemStorage" should { + "contain partition metadata with correct bounds" in { + val sft = SimpleFeatureTypes.createType("orc-test", "*geom:Point:srid=4326,name:String,age:Int,dtg:Date") + + val features = (0 until 10).map { i => + val sf = new ScalaSimpleFeature(sft, i.toString) + sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE) + sf.setAttribute(1, s"name$i") + sf.setAttribute(2, s"$i") + sf.setAttribute(3, f"2014-01-${i + 1}%02dT00:00:01.000Z") + sf.setAttribute(0, s"POINT(4$i 5$i)") + sf + } + + withTestDir { dir => + val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val metadata = + new FileBasedMetadataFactory() + .create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true)) + val storage = new OrcFileSystemStorageFactory().apply(context, metadata) + + storage must not(beNull) + + val writers = scala.collection.mutable.Map.empty[String, FileSystemWriter] + + val expectedBounds = new mutable.HashMap[String, Envelope]() + features.foreach { f => + val partition = storage.metadata.scheme.getPartitionName(f) + val writer = writers.getOrElseUpdate(partition, storage.getWriter(partition)) + writer.write(f) + + val env = expectedBounds.getOrElse(partition, new Envelope) + env.expandToInclude(f.getBounds.asInstanceOf[Envelope]) + expectedBounds.put(partition, env) + } + + writers.foreach(_._2.close()) + + logger.debug(s"wrote to ${writers.size} partitions for ${features.length} features") + + val partitions = storage.getPartitions.map(_.name) + partitions must haveLength(writers.size) + + storage.getPartitions.foreach(partition => partition.bounds mustEqual PartitionBounds(expectedBounds(partition.name))) + } + ok + } + "read and write features" in { val sft = SimpleFeatureTypes.createType("orc-test", "*geom:Point:srid=4326,name:String,age:Int,dtg:Date") diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml index b0878ece8024..35e43e3f1f4f 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/pom.xml @@ -51,6 +51,11 @@ org.locationtech.geomesa geomesa-index-api_${scala.binary.version} + + com.networknt + json-schema-validator + test + diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/FilterConverter.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/FilterConverter.scala index f5dd2c3248d9..22626c608e9f 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/FilterConverter.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/FilterConverter.scala @@ -23,14 +23,18 @@ import scala.reflect.ClassTag object FilterConverter { - def convert(sft: SimpleFeatureType, filter: Filter): (Option[FilterPredicate], Option[Filter]) = { - if (filter == Filter.INCLUDE) { (None, None) } else { - FilterHelper.propertyNames(filter).foldLeft((Option.empty[FilterPredicate], Option(filter)))(reduce(sft)) + def convert(sft: SimpleFeatureType, filter: Filter): (Int => (Option[FilterPredicate], Option[Filter])) = { + if (filter == Filter.INCLUDE) { _ => (None, None) } else { + val propertyNames = FilterHelper.propertyNames(filter) + lazy val v1 = propertyNames.foldLeft((Option.empty[FilterPredicate], Option(filter)))(reduce(sft, 1)) + lazy val v2 = propertyNames.foldLeft((Option.empty[FilterPredicate], Option(filter)))(reduce(sft, 2)) + i => if (i == 2) { v2 } else { v1 } } } private def reduce( - sft: SimpleFeatureType + sft: SimpleFeatureType, + version: Int )(result: (Option[FilterPredicate], Option[Filter]), name: String): (Option[FilterPredicate], Option[Filter]) = { val (parquet, geotools) = result @@ -44,7 +48,12 @@ object FilterConverter { val (predicate, remaining): (Option[FilterPredicate], Option[Filter]) = bindings.head match { // note: non-points use repeated values, which aren't supported in parquet predicates - case ObjectType.GEOMETRY if bindings.last == ObjectType.POINT => spatial(sft, name, filter, col) + case ObjectType.GEOMETRY if bindings.last == ObjectType.POINT => { + version match { + case 2 => spatial(filter) + case _ => spatialV0V1(sft, name, filter, col) + } + } case ObjectType.DATE => temporal(sft, name, filter, FilterApi.longColumn(col)) case ObjectType.STRING => attribute(sft, name, filter, FilterApi.binaryColumn(col), Binary.fromString) case ObjectType.INT => attribute(sft, name, filter, FilterApi.intColumn(col), identity[java.lang.Integer]) @@ -58,7 +67,10 @@ object FilterConverter { ((predicate.toSeq ++ parquet).reduceLeftOption(FilterApi.and), remaining) } - private def spatial( + private def spatial(filter: Filter): (Option[FilterPredicate], Option[Filter]) = (None, Some(filter)) + + // Backwards-compatible method for old parquet files + private def spatialV0V1( sft: SimpleFeatureType, name: String, filter: Filter, diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala index 64a640167dd8..08329a2434ef 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala @@ -18,6 +18,7 @@ import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.filter.factory.FastFilterFactory import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled} import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader @@ -26,6 +27,7 @@ import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter import org.locationtech.geomesa.utils.io.CloseQuietly +import org.locationtech.jts.geom.Envelope /** * @@ -35,10 +37,10 @@ import org.locationtech.geomesa.utils.io.CloseQuietly class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) { - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = { + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = { val sftConf = new Configuration(context.conf) StorageConfiguration.setSft(sftConf, metadata.sft) - new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer) + new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer, new FileBasedMetadataCallback(partition, action, file)) } override protected def createReader( @@ -74,18 +76,21 @@ object ParquetFileSystemStorage extends LazyLogging { sft: SimpleFeatureType, file: Path, conf: Configuration, - observer: FileSystemObserver = NoOpObserver + observer: Option[FileSystemObserver] = None, + callback: (Envelope, Long) => Unit = ((_, _) => {}) ) extends FileSystemWriter { - private val writer = SimpleFeatureParquetWriter.builder(file, conf).build() + private val writer = SimpleFeatureParquetWriter.builder(file, conf, callback).build() + private val observerVal = observer.getOrElse(NoOpObserver) override def write(f: SimpleFeature): Unit = { writer.write(f) - observer.write(f) + observerVal.write(f) } - override def flush(): Unit = observer.flush() + override def flush(): Unit = observerVal.flush() override def close(): Unit = { - CloseQuietly(Seq(writer, observer)).foreach(e => throw e) + CloseQuietly(Seq(writer, observerVal)).foreach(e => throw e) + if (FileValidationEnabled.get.toBoolean) { validateParquetFile(file) } @@ -102,9 +107,9 @@ object ParquetFileSystemStorage extends LazyLogging { // Process the record record = reader.read() } - logger.debug(s"${file} is a valid Parquet file") + logger.debug(s"'$file' is a valid Parquet file") } catch { - case e: Exception => throw new RuntimeException(s"Unable to validate ${file}: File may be corrupted", e) + case e: Exception => throw new RuntimeException(s"Unable to validate '$file': File may be corrupted", e) } finally { reader.close() } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala index adfa0325d3f0..79588db8c8e9 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala @@ -17,13 +17,14 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter} import org.geotools.api.feature.simple.SimpleFeature import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureWriteSupport +import org.locationtech.jts.geom.Envelope object SimpleFeatureParquetWriter extends LazyLogging { - def builder(file: Path, conf: Configuration): Builder = { + def builder(file: Path, conf: Configuration, callback: (Envelope, Long) => Unit = ((_, _) => {})): Builder = { val codec = CompressionCodecName.fromConf(conf.get("parquet.compression", "SNAPPY")) logger.debug(s"Using Parquet Compression codec ${codec.name()}") - new Builder(file) + new Builder(file, callback) .withConf(conf) .withCompressionCodec(codec) .withDictionaryEncoding(true) @@ -36,10 +37,10 @@ object SimpleFeatureParquetWriter extends LazyLogging { .withRowGroupSize(8*1024*1024) } - class Builder private [SimpleFeatureParquetWriter] (file: Path) + class Builder private [SimpleFeatureParquetWriter] (file: Path, callback: (Envelope, Long) => Unit) extends ParquetWriter.Builder[SimpleFeature, Builder](file) { override def self(): Builder = this override protected def getWriteSupport(conf: Configuration): WriteSupport[SimpleFeature] = - new SimpleFeatureWriteSupport + new SimpleFeatureWriteSupport(callback) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetMetadataBuilder.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetMetadataBuilder.scala new file mode 100644 index 000000000000..88528378e98a --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetMetadataBuilder.scala @@ -0,0 +1,81 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.parquet.io + +import org.geotools.api.feature.`type`.GeometryDescriptor +import org.geotools.api.feature.simple.SimpleFeatureType +import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.{Encoding, GeoParquetSchemaKey, SchemaVersionKey} +import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType +import org.locationtech.geomesa.utils.text.StringSerialization.alphaNumericSafeString +import org.locationtech.jts.geom.Envelope + +import scala.collection.JavaConverters._ + +class SimpleFeatureParquetMetadataBuilder(sft: SimpleFeatureType, schemaVersion: Integer) { + private var geoParquetMetadata: String = null; + + /** + * See https://geoparquet.org/releases/v1.0.0/schema.json + * + * @param sft simple feature type + * @return + */ + def withGeoParquetMetadata(envs: Array[Envelope]): SimpleFeatureParquetMetadataBuilder = { + val geomField = sft.getGeomField + + if (geomField != null) { + val primaryColumn = alphaNumericSafeString(geomField) + val columns = { + val geometryDescriptors = sft.getAttributeDescriptors.toArray.collect {case gd: GeometryDescriptor => gd} + geometryDescriptors.indices.map(i => columnMetadata(geometryDescriptors(i), envs(i))).mkString(",") + } + + geoParquetMetadata = s"""{"version":"1.0.0","primary_column":"$primaryColumn","columns":{$columns}}""" + } + + this + } + + private def columnMetadata(geom: GeometryDescriptor, bbox: Envelope): String = { + // TODO "Z" for 3d, minz/maxz for bbox + val geomTypes = { + val types = ObjectType.selectType(geom).last match { + case ObjectType.POINT => """"Point"""" + case ObjectType.LINESTRING => """"LineString"""" + case ObjectType.POLYGON => """"Polygon"""" + case ObjectType.MULTILINESTRING => """"MultiLineString"""" + case ObjectType.MULTIPOLYGON => """"MultiPolygon"""" + case ObjectType.MULTIPOINT => """"MultiPoint"""" + case ObjectType.GEOMETRY_COLLECTION => """"GeometryCollection"""" + case ObjectType.GEOMETRY => null + } + Seq(types).filter(_ != null) + } + // note: don't provide crs, as default is EPSG:4326 with longitude first, which is our default/only crs + + def stringify(geomName: String, encoding: String, geometryTypes: Seq[String], bbox: Envelope): String = { + val bboxString = s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]" + s""""$geomName":{"encoding":"$encoding","geometry_types":[${geometryTypes.mkString(",")}],"bbox":$bboxString}""" + } + + val geomName = alphaNumericSafeString(geom.getLocalName) + stringify(geomName, Encoding, geomTypes, bbox) + } + + def build(): java.util.Map[String, String] = { + Map( + StorageConfiguration.SftNameKey -> sft.getTypeName, + StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true), + SchemaVersionKey -> schemaVersion.toString, + GeoParquetSchemaKey -> geoParquetMetadata + ).asJava + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala index d016d4581bd1..db5b7c920c5d 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala @@ -18,8 +18,8 @@ import org.apache.parquet.schema.Types.BasePrimitiveBuilder import org.apache.parquet.schema._ import org.geotools.api.feature.`type`.AttributeDescriptor import org.geotools.api.feature.simple.SimpleFeatureType -import org.locationtech.geomesa.features.serialization.TwkbSerialization.GeometryBytes import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.CurrentSchemaVersion import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes} import org.locationtech.geomesa.utils.text.StringSerialization @@ -30,20 +30,12 @@ import org.locationtech.geomesa.utils.text.StringSerialization * @param sft simple feature type * @param schema parquet message schema */ -case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageType) { - - import SimpleFeatureParquetSchema.{CurrentSchemaVersion, SchemaVersionKey} - - import scala.collection.JavaConverters._ +case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageType, version: Integer = CurrentSchemaVersion) { /** * Parquet file metadata */ - lazy val metadata: java.util.Map[String, String] = Map( - StorageConfiguration.SftNameKey -> sft.getTypeName, - StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true), - SchemaVersionKey -> CurrentSchemaVersion.toString // note: this may not be entirely accurate, but we don't write older versions - ).asJava + val metadata = new SimpleFeatureParquetMetadataBuilder(sft, version) /** * Gets the name of the parquet field for the given simple feature type attribute @@ -56,16 +48,18 @@ case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageTyp object SimpleFeatureParquetSchema { + import StringSerialization.alphaNumericSafeString + import scala.collection.JavaConverters._ val FeatureIdField = "__fid__" val SchemaVersionKey = "geomesa.parquet.version" - val CurrentSchemaVersion = 1 + val CurrentSchemaVersion = 2 - val GeometryColumnX = "x" - val GeometryColumnY = "y" + val Encoding = "WKB" + val GeoParquetSchemaKey = "geo" /** * Extract the simple feature type from a parquet read context. The read context @@ -80,7 +74,11 @@ object SimpleFeatureParquetSchema { context.getKeyValueMetadata.asScala.foreach { case (k, v) => if (!v.isEmpty) { metadata.put(k, v.iterator.next) }} val conf = context.getConfiguration // copy in the sft from the conf - overwrite the file level metadata as this has our transform schema - Seq(StorageConfiguration.SftNameKey, StorageConfiguration.SftSpecKey, SchemaVersionKey).foreach { key => + Seq( + StorageConfiguration.SftNameKey, + StorageConfiguration.SftSpecKey, + SchemaVersionKey, + GeoParquetSchemaKey).foreach { key => val value = conf.get(key) if (value != null) { metadata.put(key, value) @@ -128,11 +126,16 @@ object SimpleFeatureParquetSchema { spec <- Option(metadata.get(StorageConfiguration.SftSpecKey)) } yield { val sft = SimpleFeatureTypes.createType(name, spec) - Option(metadata.get(SchemaVersionKey)).map(_.toInt).getOrElse(0) match { - case 1 => new SimpleFeatureParquetSchema(sft, schema(sft)) - case 0 => new SimpleFeatureParquetSchema(sft, SimpleFeatureParquetSchemaV0(sft)) + + val schemaVersion = Option(metadata.get(SchemaVersionKey)).map(_.toInt).getOrElse(0) + val messageType = schemaVersion match { + case 2 => schema(sft) + case 1 => SimpleFeatureParquetSchemaV1(sft) + case 0 => SimpleFeatureParquetSchemaV0(sft) case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v") } + + SimpleFeatureParquetSchema(sft, messageType, schemaVersion) } } @@ -147,7 +150,7 @@ object SimpleFeatureParquetSchema { // note: id field goes at the end of the record val fields = sft.getAttributeDescriptors.asScala.map(schema) :+ id // ensure that we use a valid name - for avro conversion, especially, names are very limited - new MessageType(StringSerialization.alphaNumericSafeString(sft.getTypeName), fields.asJava) + new MessageType(alphaNumericSafeString(sft.getTypeName), fields.asJava) } /** @@ -159,58 +162,11 @@ object SimpleFeatureParquetSchema { private def schema(descriptor: AttributeDescriptor): Type = { val bindings = ObjectType.selectType(descriptor) val builder = bindings.head match { - case ObjectType.GEOMETRY => geometry(bindings(1)) case ObjectType.LIST => Binding(bindings(1)).list() case ObjectType.MAP => Binding(bindings(1)).key(bindings(2)) case p => Binding(p).primitive() } - builder.named(StringSerialization.alphaNumericSafeString(descriptor.getLocalName)) - } - - /** - * Create a builder for a parquet geometry field - * - * @param binding geometry type - * @return - */ - private def geometry(binding: ObjectType): Types.Builder[_, _ <: Type] = { - def group: Types.GroupBuilder[GroupType] = Types.buildGroup(Repetition.OPTIONAL) - binding match { - case ObjectType.POINT => - group.id(GeometryBytes.TwkbPoint) - .required(PrimitiveTypeName.DOUBLE).named(GeometryColumnX) - .required(PrimitiveTypeName.DOUBLE).named(GeometryColumnY) - - case ObjectType.LINESTRING => - group.id(GeometryBytes.TwkbLineString) - .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnX) - .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnY) - - case ObjectType.MULTIPOINT => - group.id(GeometryBytes.TwkbMultiPoint) - .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnX) - .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnY) - - case ObjectType.POLYGON => - group.id(GeometryBytes.TwkbPolygon) - .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnX) - .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnY) - - case ObjectType.MULTILINESTRING => - group.id(GeometryBytes.TwkbMultiLineString) - .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnX) - .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnY) - - case ObjectType.MULTIPOLYGON => - group.id(GeometryBytes.TwkbMultiPolygon) - .requiredList().requiredListElement().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnX) - .requiredList().requiredListElement().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnY) - - case ObjectType.GEOMETRY => - Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) - - case _ => throw new NotImplementedError(s"No mapping defined for geometry type $binding") - } + builder.named(alphaNumericSafeString(descriptor.getLocalName)) } /** @@ -250,6 +206,7 @@ object SimpleFeatureParquetSchema { ObjectType.FLOAT -> new Binding(PrimitiveTypeName.FLOAT), ObjectType.BOOLEAN -> new Binding(PrimitiveTypeName.BOOLEAN), ObjectType.BYTES -> new Binding(PrimitiveTypeName.BINARY), + ObjectType.GEOMETRY -> new Binding(PrimitiveTypeName.BINARY), ObjectType.UUID -> new Binding(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, None, Some(16)) ) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV0.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV0.scala index d4b43b1a6068..4c6b1a7d779c 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV0.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV0.scala @@ -42,8 +42,8 @@ object SimpleFeatureParquetSchemaV0 { val builder = bindings.head match { case ObjectType.GEOMETRY => Types.buildGroup(Repetition.REQUIRED) - .primitive(PrimitiveTypeName.DOUBLE, Repetition.REQUIRED).named(SimpleFeatureParquetSchema.GeometryColumnX) - .primitive(PrimitiveTypeName.DOUBLE, Repetition.REQUIRED).named(SimpleFeatureParquetSchema.GeometryColumnY) + .primitive(PrimitiveTypeName.DOUBLE, Repetition.REQUIRED).named(SimpleFeatureParquetSchemaV1.GeometryColumnX) + .primitive(PrimitiveTypeName.DOUBLE, Repetition.REQUIRED).named(SimpleFeatureParquetSchemaV1.GeometryColumnY) case ObjectType.DATE => Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) case ObjectType.STRING => Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).as(OriginalType.UTF8) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV1.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV1.scala new file mode 100644 index 000000000000..48dd86e57498 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchemaV1.scala @@ -0,0 +1,108 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + + +package org.locationtech.geomesa.fs.storage.parquet.io + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema._ +import org.locationtech.geomesa.features.serialization.TwkbSerialization.GeometryBytes +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.Binding +import org.locationtech.geomesa.utils.geotools.ObjectType +import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType +import org.locationtech.geomesa.utils.text.StringSerialization +import org.geotools.api.feature.`type`.AttributeDescriptor +import org.geotools.api.feature.simple.SimpleFeatureType + +object SimpleFeatureParquetSchemaV1 { + + import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.FeatureIdField + + import scala.collection.JavaConverters._ + + val GeometryColumnX = "x" + val GeometryColumnY = "y" + + /** + * Get the message type for a simple feature type + * + * @param sft simple feature type + * @return + */ + def apply(sft: SimpleFeatureType): MessageType = { + val id = Types.required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(FeatureIdField) + // note: id field goes at the end of the record + val fields = sft.getAttributeDescriptors.asScala.map(schema) :+ id + // ensure that we use a valid name - for avro conversion, especially, names are very limited + new MessageType(StringSerialization.alphaNumericSafeString(sft.getTypeName), fields.asJava) + } + + /** + * Create a parquet field type from an attribute descriptor + * + * @param descriptor descriptor + * @return + */ + private def schema(descriptor: AttributeDescriptor): Type = { + val bindings = ObjectType.selectType(descriptor) + val builder = bindings.head match { + case ObjectType.GEOMETRY => geometry(bindings(1)) + case ObjectType.LIST => Binding(bindings(1)).list() + case ObjectType.MAP => Binding(bindings(1)).key(bindings(2)) + case p => Binding(p).primitive() + } + builder.named(StringSerialization.alphaNumericSafeString(descriptor.getLocalName)) + } + + /** + * Create a builder for a parquet geometry field + * + * @param binding geometry type + * @return + */ + private def geometry(binding: ObjectType): Types.Builder[_, _ <: Type] = { + def group: Types.GroupBuilder[GroupType] = Types.buildGroup(Repetition.OPTIONAL) + binding match { + case ObjectType.POINT => + group.id(GeometryBytes.TwkbPoint) + .required(PrimitiveTypeName.DOUBLE).named(GeometryColumnX) + .required(PrimitiveTypeName.DOUBLE).named(GeometryColumnY) + + case ObjectType.LINESTRING => + group.id(GeometryBytes.TwkbLineString) + .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnX) + .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnY) + + case ObjectType.MULTIPOINT => + group.id(GeometryBytes.TwkbMultiPoint) + .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnX) + .repeated(PrimitiveTypeName.DOUBLE).named(GeometryColumnY) + + case ObjectType.POLYGON => + group.id(GeometryBytes.TwkbPolygon) + .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnX) + .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnY) + + case ObjectType.MULTILINESTRING => + group.id(GeometryBytes.TwkbMultiLineString) + .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnX) + .requiredList().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnY) + + case ObjectType.MULTIPOLYGON => + group.id(GeometryBytes.TwkbMultiPolygon) + .requiredList().requiredListElement().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnX) + .requiredList().requiredListElement().element(PrimitiveTypeName.DOUBLE, Repetition.REPEATED).named(GeometryColumnY) + + case ObjectType.GEOMETRY => + Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) + + case _ => throw new NotImplementedError(s"No mapping defined for geometry type $binding") + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala index f9f2285feda9..c2a67cb1a842 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala @@ -29,23 +29,22 @@ import scala.collection.mutable.ArrayBuffer class SimpleFeatureReadSupport extends ReadSupport[SimpleFeature] { - private var schema: SimpleFeatureParquetSchema = _ + private var schema: SimpleFeatureParquetSchema = null override def init(context: InitContext): ReadContext = { schema = SimpleFeatureParquetSchema.read(context).getOrElse { throw new IllegalArgumentException("Could not extract SimpleFeatureType from read context") } + // ensure that our read schema matches the geomesa parquet version - new ReadContext(schema.schema, schema.metadata) + new ReadContext(schema.schema, schema.metadata.build()) } override def prepareForRead( configuration: Configuration, keyValueMetaData: java.util.Map[String, String], fileSchema: MessageType, - readContext: ReadSupport.ReadContext): RecordMaterializer[SimpleFeature] = { - new SimpleFeatureRecordMaterializer(schema) - } + readContext: ReadSupport.ReadContext): RecordMaterializer[SimpleFeature] = new SimpleFeatureRecordMaterializer(schema) } object SimpleFeatureReadSupport { @@ -86,7 +85,7 @@ object SimpleFeatureReadSupport { class SimpleFeatureRecordMaterializer(schema: SimpleFeatureParquetSchema) extends RecordMaterializer[SimpleFeature] { - private val converter = new SimpleFeatureGroupConverter(schema.sft) + private val converter = new SimpleFeatureGroupConverter(schema.sft, schema.version.toInt) override def getRootConverter: GroupConverter = converter override def getCurrentRecord: SimpleFeature = converter.materialize } @@ -107,7 +106,7 @@ object SimpleFeatureReadSupport { * which will mean they are only converted and then added to simple features if a * record passes the parquet filters and needs to be materialized. */ - class SimpleFeatureGroupConverter(sft: SimpleFeatureType) extends GroupConverter with Settable { + class SimpleFeatureGroupConverter(sft: SimpleFeatureType, schemaVersion: Integer) extends GroupConverter with Settable { // temp placeholders private var id: Binary = _ @@ -128,7 +127,7 @@ object SimpleFeatureReadSupport { } protected def attribute(i: Int): Converter = - SimpleFeatureReadSupport.attribute(ObjectType.selectType(sft.getDescriptor(i)), i, this) + SimpleFeatureReadSupport.attribute(ObjectType.selectType(sft.getDescriptor(i)), schemaVersion, i, this) override def start(): Unit = { id = null @@ -148,9 +147,9 @@ object SimpleFeatureReadSupport { // unless a record is materialized so we can likely speed this up by not creating any of // the true SFT types util a record passes a filter in the SimpleFeatureRecordMaterializer - private def attribute(bindings: Seq[ObjectType], i: Int, callback: Settable): Converter = { + private def attribute(bindings: Seq[ObjectType], schemaVersion: Int, i: Int, callback: Settable): Converter = { bindings.head match { - case ObjectType.GEOMETRY => geometry(bindings.last, i, callback) + case ObjectType.GEOMETRY => geometry(schemaVersion, bindings.last, i, callback) case ObjectType.DATE => new DateConverter(i, callback) case ObjectType.STRING => new StringConverter(i, callback) case ObjectType.INT => new IntConverter(i, callback) @@ -159,14 +158,23 @@ object SimpleFeatureReadSupport { case ObjectType.FLOAT => new FloatConverter(i, callback) case ObjectType.BOOLEAN => new BooleanConverter(i, callback) case ObjectType.BYTES => new BytesConverter(i, callback) - case ObjectType.LIST => new ListConverter(bindings(1), i, callback) - case ObjectType.MAP => new MapConverter(bindings(1), bindings(2), i, callback) + case ObjectType.LIST => new ListConverter(schemaVersion, bindings(1), i, callback) + case ObjectType.MAP => new MapConverter(schemaVersion, bindings(1), bindings(2), i, callback) case ObjectType.UUID => new UuidConverter(i, callback) case _ => throw new IllegalArgumentException(s"Can't deserialize field of type ${bindings.head}") } } - private def geometry(binding: ObjectType, i: Int, callback: Settable): Converter = { + private def geometry(schemaVersion: Int, binding: ObjectType, i: Int, callback: Settable): Converter = { + schemaVersion match { + case 2 => new GeometryWkbConverter(i, callback) + case 1 => geometryV0V1(binding, i, callback) + case 0 => geometryV0V1(binding, i, callback) + case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v") + } + } + + private def geometryV0V1(binding: ObjectType, i: Int, callback: Settable): Converter = { binding match { case ObjectType.POINT => new PointConverter(i, callback) case ObjectType.LINESTRING => new LineStringConverter(i, callback) @@ -217,12 +225,12 @@ object SimpleFeatureReadSupport { override def addBinary(value: Binary): Unit = callback.set(index, value.getBytes) } - class ListConverter(binding: ObjectType, index: Int, callback: Settable) extends GroupConverter { + class ListConverter(schemaVersion: Int, binding: ObjectType, index: Int, callback: Settable) extends GroupConverter { private var list: java.util.List[AnyRef] = _ private val group: GroupConverter = new GroupConverter { - private val converter = attribute(Seq(binding), 0, (value: AnyRef) => list.add(value)) + private val converter = attribute(Seq(binding), schemaVersion, 0, (value: AnyRef) => list.add(value)) override def getConverter(fieldIndex: Int): Converter = converter // better only be one field (0) override def start(): Unit = {} override def end(): Unit = {} @@ -233,7 +241,7 @@ object SimpleFeatureReadSupport { override def end(): Unit = callback.set(index, list) } - class MapConverter(keyBinding: ObjectType, valueBinding: ObjectType, index: Int, callback: Settable) + class MapConverter(schemaVersion: Int, keyBinding: ObjectType, valueBinding: ObjectType, index: Int, callback: Settable) extends GroupConverter { private var map: java.util.Map[AnyRef, AnyRef] = _ @@ -241,8 +249,8 @@ object SimpleFeatureReadSupport { private val group: GroupConverter = new GroupConverter { private var k: AnyRef = _ private var v: AnyRef = _ - private val keyConverter = attribute(Seq(keyBinding), 0, (value: AnyRef) => k = value) - private val valueConverter = attribute(Seq(valueBinding), 1, (value: AnyRef) => v = value) + private val keyConverter = attribute(Seq(keyBinding), schemaVersion, 0, (value: AnyRef) => k = value) + private val valueConverter = attribute(Seq(valueBinding), schemaVersion, 1, (value: AnyRef) => v = value) override def getConverter(fieldIndex: Int): Converter = if (fieldIndex == 0) { keyConverter } else { valueConverter } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala index 445723b57fd9..794b0e657215 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala @@ -10,10 +10,11 @@ package org.locationtech.geomesa.fs.storage.parquet.io import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.hadoop.api.WriteSupport.{FinalizedWriteContext, WriteContext} import org.apache.parquet.io.api.{Binary, RecordConsumer} -import org.geotools.api.feature.`type`.AttributeDescriptor +import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.MetadataObserver import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType import org.locationtech.geomesa.utils.text.WKBUtils @@ -22,28 +23,92 @@ import org.locationtech.jts.geom._ import java.nio.ByteBuffer import java.util.{Date, UUID} -class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { +class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) => {})) extends WriteSupport[SimpleFeature] { + private class MultipleGeometriesObserver extends MetadataObserver { + private var count: Long = 0L + private var numGeoms: Int = 0 // number of geometries in the file + private var bounds: Array[Envelope] = new Array[Envelope](0) + + override def write(feature: SimpleFeature): Unit = { + // Update internal count/bounds/etc + count += 1L + + // Initialize a bounding box for each geometry if we haven't already done so + if (bounds.isEmpty) { + val sft = feature.getFeatureType + val geometryDescriptors = sft.getAttributeDescriptors.toArray.collect {case gd: GeometryDescriptor => gd} + numGeoms = geometryDescriptors.length + bounds = geometryDescriptors.map(_ => new Envelope) + } + + val envelopes = feature.getAttributes.toArray.collect { + case geom: Geometry => geom.getEnvelopeInternal + } + + // Expand the bounding box for each geometry + (0 until numGeoms).foreach(i => bounds(i).expandToInclude(envelopes(i))) + } + + def getBoundingBoxes: Array[Envelope] = bounds + + override def close(): Unit = { + // Merge all the envelopes into one + val mergedBounds = new Envelope() + for (b <- bounds) { + mergedBounds.expandToInclude(b) + } + + onClose(mergedBounds, count) + } + + // Invokes the callback function that adds metadata to the storage partition + override protected def onClose(bounds: Envelope, count: Long): Unit = callback(bounds, count) + } + + private val observer = new MultipleGeometriesObserver private var writer: SimpleFeatureWriteSupport.SimpleFeatureWriter = _ private var consumer: RecordConsumer = _ + private var schema: SimpleFeatureParquetSchema = _ override val getName: String = "SimpleFeatureWriteSupport" + // Need a no-arg constructor because Apache Parquet can't instantiate the callback arg for the MapReduce compaction job + // Also, the compaction job doesn't write or calculate bounds anyway + def this() = this( (_, _) => {} ) + // called once override def init(conf: Configuration): WriteContext = { - val schema = SimpleFeatureParquetSchema.write(conf).getOrElse { + schema = SimpleFeatureParquetSchema.write(conf).getOrElse { throw new IllegalArgumentException("Could not extract SimpleFeatureType from write context") } this.writer = SimpleFeatureWriteSupport.SimpleFeatureWriter(schema.sft) - new WriteContext(schema.schema, schema.metadata) + new WriteContext(schema.schema, schema.metadata.build()) + } + + // called once at the end after all SimpleFeatures are written to the file + override def finalizeWrite(): FinalizedWriteContext = { + // Get the bounding boxes that span each geometry type + val bboxes = observer.getBoundingBoxes + observer.close() + + // Omit GeoParquet metadata if the SFT has no geometries + if (bboxes.isEmpty) { + new FinalizedWriteContext(schema.metadata.build()) + } else { + new FinalizedWriteContext(schema.metadata.withGeoParquetMetadata(bboxes).build()) + } } // called per block override def prepareForWrite(recordConsumer: RecordConsumer): Unit = consumer = recordConsumer // called per row - override def write(record: SimpleFeature): Unit = writer.write(consumer, record) + override def write(record: SimpleFeature): Unit = { + writer.write(consumer, record) + observer.write(record) + } } object SimpleFeatureWriteSupport { @@ -78,7 +143,7 @@ object SimpleFeatureWriteSupport { def attribute(name: String, index: Int, bindings: Seq[ObjectType]): AttributeWriter[_] = { bindings.head match { - case ObjectType.GEOMETRY => geometry(name, index, bindings.last) + case ObjectType.GEOMETRY => new GeometryWkbAttributeWriter(name, index) // TODO support z/m case ObjectType.DATE => new DateWriter(name, index) case ObjectType.STRING => new StringWriter(name, index) case ObjectType.INT => new IntegerWriter(name, index) @@ -94,20 +159,6 @@ object SimpleFeatureWriteSupport { } } - // TODO support z/m - private def geometry(name: String, index: Int, binding: ObjectType): AttributeWriter[_] = { - binding match { - case ObjectType.POINT => new PointAttributeWriter(name, index) - case ObjectType.LINESTRING => new LineStringAttributeWriter(name, index) - case ObjectType.POLYGON => new PolygonAttributeWriter(name, index) - case ObjectType.MULTIPOINT => new MultiPointAttributeWriter(name, index) - case ObjectType.MULTILINESTRING => new MultiLineStringAttributeWriter(name, index) - case ObjectType.MULTIPOLYGON => new MultiPolygonAttributeWriter(name, index) - case ObjectType.GEOMETRY => new GeometryWkbAttributeWriter(name, index) - case _ => throw new IllegalArgumentException(s"Can't serialize field '$name' of type $binding") - } - } - /** * Writes a simple feature attribute to a Parquet file */ @@ -235,189 +286,6 @@ object SimpleFeatureWriteSupport { } } - class PointAttributeWriter(name: String, index: Int) extends AttributeWriter[Point](name, index) { - override def write(consumer: RecordConsumer, value: Point): Unit = { - consumer.startGroup() - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - consumer.addDouble(value.getX) - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - consumer.addDouble(value.getY) - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - consumer.endGroup() - } - } - - class LineStringAttributeWriter(name: String, index: Int) extends AttributeWriter[LineString](name, index) { - override def write(consumer: RecordConsumer, value: LineString): Unit = { - consumer.startGroup() - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - var i = 0 - while (i < value.getNumPoints) { - consumer.addDouble(value.getCoordinateN(i).x) - i += 1 - } - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - i = 0 - while (i < value.getNumPoints) { - consumer.addDouble(value.getCoordinateN(i).y) - i += 1 - } - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - consumer.endGroup() - } - } - - class MultiPointAttributeWriter(name: String, index: Int) extends AttributeWriter[MultiPoint](name, index) { - override def write(consumer: RecordConsumer, value: MultiPoint): Unit = { - consumer.startGroup() - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - var i = 0 - while (i < value.getNumPoints) { - consumer.addDouble(value.getGeometryN(i).asInstanceOf[Point].getX) - i += 1 - } - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - i = 0 - while (i < value.getNumPoints) { - consumer.addDouble(value.getGeometryN(i).asInstanceOf[Point].getY) - i += 1 - } - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - consumer.endGroup() - } - } - - abstract class AbstractLinesWriter[T <: Geometry](name: String, index: Int) - extends AttributeWriter[T](name, index) { - - protected def lines(value: T): Seq[LineString] - - override def write(consumer: RecordConsumer, value: T): Unit = { - val lines = this.lines(value) - consumer.startGroup() - - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - consumer.startGroup() - consumer.startField("list", 0) - lines.foreach { line => - consumer.startGroup() - writeLineStringX(consumer, line) - consumer.endGroup() - } - consumer.endField("list", 0) - consumer.endGroup() - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - consumer.startGroup() - consumer.startField("list", 0) - lines.foreach { line => - consumer.startGroup() - writeLineStringY(consumer, line) - consumer.endGroup() - } - consumer.endField("list", 0) - consumer.endGroup() - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - - consumer.endGroup() - } - } - - class PolygonAttributeWriter(name: String, index: Int) extends AbstractLinesWriter[Polygon](name, index) { - override protected def lines(value: Polygon): Seq[LineString] = - Seq.tabulate(value.getNumInteriorRing + 1) { i => - if (i == 0) { value.getExteriorRing } else { value.getInteriorRingN(i - 1) } - } - } - - class MultiLineStringAttributeWriter(name: String, index: Int) - extends AbstractLinesWriter[MultiLineString](name, index) { - override protected def lines(value: MultiLineString): Seq[LineString] = - Seq.tabulate(value.getNumGeometries)(i => value.getGeometryN(i).asInstanceOf[LineString]) - } - - class MultiPolygonAttributeWriter(name: String, index: Int) extends AttributeWriter[MultiPolygon](name, index) { - override def write(consumer: RecordConsumer, value: MultiPolygon): Unit = { - val polys = Seq.tabulate(value.getNumGeometries) { i => - val poly = value.getGeometryN(i).asInstanceOf[Polygon] - Seq.tabulate(poly.getNumInteriorRing + 1) { i => - if (i == 0) { poly.getExteriorRing } else { poly.getInteriorRingN(i - 1) } - } - } - consumer.startGroup() - - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - consumer.startGroup() - consumer.startField("list", 0) - polys.foreach { lines => - consumer.startGroup() - consumer.startField("element", 0) - consumer.startGroup() - consumer.startField("list", 0) - lines.foreach { line => - consumer.startGroup() - writeLineStringX(consumer, line) - consumer.endGroup() - } - consumer.endField("list", 0) - consumer.endGroup() - consumer.endField("element", 0) - consumer.endGroup() - } - consumer.endField("list", 0) - consumer.endGroup() - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnX, 0) - - consumer.startField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - consumer.startGroup() - consumer.startField("list", 0) - polys.foreach { lines => - consumer.startGroup() - consumer.startField("element", 0) - consumer.startGroup() - consumer.startField("list", 0) - lines.foreach { line => - consumer.startGroup() - writeLineStringY(consumer, line) - consumer.endGroup() - } - consumer.endField("list", 0) - consumer.endGroup() - consumer.endField("element", 0) - consumer.endGroup() - } - consumer.endField("list", 0) - consumer.endGroup() - consumer.endField(SimpleFeatureParquetSchema.GeometryColumnY, 1) - - consumer.endGroup() - } - } - - private def writeLineStringX(consumer: RecordConsumer, ring: LineString): Unit = { - consumer.startField("element", 0) - var i = 0 - while (i < ring.getNumPoints) { - consumer.addDouble(ring.getCoordinateN(i).x) - i += 1 - } - consumer.endField("element", 0) - } - - private def writeLineStringY(consumer: RecordConsumer, ring: LineString): Unit = { - consumer.startField("element", 0) - var i = 0 - while (i < ring.getNumPoints) { - consumer.addDouble(ring.getCoordinateN(i).y) - i += 1 - } - consumer.endField("element", 0) - } - class GeometryWkbAttributeWriter(name: String, index: Int) extends AttributeWriter[Geometry](name, index) { override protected def write(consumer: RecordConsumer, value: Geometry): Unit = consumer.addBinary(Binary.fromConstantByteArray(WKBUtils.write(value))) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/parquet.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/parquet.scala index 236d9b596655..1fa69bdfb7f2 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/parquet.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/parquet.scala @@ -93,7 +93,7 @@ package object parquet { def apply(sft: SimpleFeatureType, filter: Option[Filter]): ReadFilter = { val (parquet, residual) = filter match { case None | Some(Filter.INCLUDE) => (None, None) - case Some(f) => FilterConverter.convert(sft, f) + case Some(f) => FilterConverter.convert(sft, f)(2) } ReadFilter(parquet, residual) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/resources/geoparquet-metadata-schema.json b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/resources/geoparquet-metadata-schema.json new file mode 100644 index 000000000000..b4160908d376 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/resources/geoparquet-metadata-schema.json @@ -0,0 +1,81 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "GeoParquet", + "description": "Parquet metadata included in the geo field.", + "type": "object", + "required": ["version", "primary_column", "columns"], + "properties": { + "version": { + "type": "string", + "const": "1.0.0" + }, + "primary_column": { + "type": "string", + "minLength": 1 + }, + "columns": { + "type": "object", + "minProperties": 1, + "patternProperties": { + ".+": { + "type": "object", + "required": ["encoding", "geometry_types"], + "properties": { + "encoding": { + "type": "string", + "const": "WKB" + }, + "geometry_types": { + "type": "array", + "uniqueItems": true, + "items": { + "type": "string", + "pattern": "^(GeometryCollection|(Multi)?(Point|LineString|Polygon))( Z)?$" + } + }, + "crs": { + "oneOf": [ + { + "$ref": "https://proj.org/schemas/v0.5/projjson.schema.json" + }, + { + "type": "null" + } + ] + }, + "edges": { + "type": "string", + "enum": ["planar", "spherical"] + }, + "orientation": { + "type": "string", + "const": "counterclockwise" + }, + "bbox": { + "type": "array", + "items": { + "type": "number" + }, + "oneOf": [ + { + "description": "2D bbox consisting of (xmin, ymin, xmax, ymax)", + "minItems": 4, + "maxItems": 4 + }, + { + "description": "3D bbox consisting of (xmin, ymin, zmin, xmax, ymax, zmax)", + "minItems": 6, + "maxItems": 6 + } + ] + }, + "epoch": { + "type": "number" + } + } + } + }, + "additionalProperties": false + } + } +} \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/FilterConverterTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/FilterConverterTest.scala index 00aaee1da92c..1ace4b87cfb6 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/FilterConverterTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/FilterConverterTest.scala @@ -27,8 +27,8 @@ class FilterConverterTest extends Specification with AllExpectations { val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326") - def convert(filter: String): (Option[FilterPredicate], Option[Filter]) = - FilterConverter.convert(sft, ECQL.toFilter(filter)) + def convert(filter: String, version: Int = 2): (Option[FilterPredicate], Option[Filter]) = + FilterConverter.convert(sft, ECQL.toFilter(filter))(version) def flatten(and: Operators.And): Seq[FilterPredicate] = { val remaining = scala.collection.mutable.Queue[FilterPredicate](and) @@ -43,8 +43,9 @@ class FilterConverterTest extends Specification with AllExpectations { } "FilterConverter" should { - "convert geo filter to min/max x/y" >> { - val (pFilter, gFilter) = convert("bbox(geom, -24.0, -25.0, -18.0, -19.0)") + "convert geo filter to min/max x/y, for old parquet files" >> { + val (pFilter, gFilter) = convert("bbox(geom, -24.0, -25.0, -18.0, -19.0)", 1) + gFilter must beNone pFilter must beSome(beAnInstanceOf[Operators.And]) val clauses = flatten(pFilter.get.asInstanceOf[Operators.And]) @@ -69,6 +70,13 @@ class FilterConverterTest extends Specification with AllExpectations { ymax.map(_.getValue.doubleValue()) must beSome(-19.0) } + "put bounding box in the post-read filter, for geoparquet files" >> { + val (pFilter, gFilter) = convert("bbox(geom, -24.0, -25.0, -18.0, -19.0)") + + pFilter must beNone + gFilter must beSome(beAnInstanceOf[Filter]) + } + "convert dtg ranges to long ranges" >> { val (pFilter, gFilter) = convert("dtg BETWEEN '2017-01-01T00:00:00.000Z' AND '2017-01-05T00:00:00.000Z'") gFilter must beNone diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetReadWriteTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetReadWriteTest.scala index e12397c28ac1..3e162866b596 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetReadWriteTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetReadWriteTest.scala @@ -10,6 +10,8 @@ package org.locationtech.geomesa.parquet +import com.fasterxml.jackson.databind.ObjectMapper +import com.networknt.schema.{JsonSchemaFactory, SpecVersion, ValidationMessage} import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -21,19 +23,24 @@ import org.geotools.data.DataUtilities import org.geotools.filter.text.ecql.ECQL import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature -import org.locationtech.geomesa.fs.storage.common.FileValidationEnabled +import org.locationtech.geomesa.filter.FilterHelper import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.{ParquetCompressionOpt, validateParquetFile} +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.GeoParquetSchemaKey import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureReadSupport import org.locationtech.geomesa.fs.storage.parquet.{FilterConverter, SimpleFeatureParquetWriter} import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.locationtech.geomesa.utils.index.BucketIndex import org.locationtech.geomesa.utils.io.WithClose +import org.locationtech.geomesa.utils.text.WKTUtils +import org.locationtech.jts.geom.{Coordinate, Envelope, Geometry, GeometryFactory} import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner import org.specs2.specification.AllExpectations -import java.io.RandomAccessFile -import java.nio.file.Files +import java.io.{File, RandomAccessFile} +import java.nio.file.{Files, Paths} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @RunWith(classOf[JUnitRunner]) @@ -51,7 +58,7 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL lazy val f = Files.createTempFile("geomesa", ".parquet") - val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*position:Point:srid=4326") + val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*position:Point:srid=4326,poly:Polygon") val nameAndGeom = SimpleFeatureTypes.createType("test", "name:String,*position:Point:srid=4326") val sftConf = { @@ -62,11 +69,61 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL c } - val features = Seq( - ScalaSimpleFeature.create(sft, "1", "first", 100, "2017-01-01T00:00:00Z", "POINT (25.236263 27.436734)"), - ScalaSimpleFeature.create(sft, "2", null, 200, "2017-01-02T00:00:00Z", "POINT (67.2363 55.236)"), - ScalaSimpleFeature.create(sft, "3", "third", 300, "2017-01-03T00:00:00Z", "POINT (73.0 73.0)") - ) + val points = { + val gf = new GeometryFactory + Seq( + gf.createPoint(new Coordinate(25.236263, 27.436734)), + gf.createPoint(new Coordinate(67.2363, 55.236)), + gf.createPoint(new Coordinate(73.0, 73.0)), + ) + } + + val polygons = { + val gf = new GeometryFactory + Seq( + gf.createPolygon(Array( + new Coordinate(0, 0), + new Coordinate(0, 1), + new Coordinate(1, 1), + new Coordinate(1, 0), + new Coordinate(0, 0), + )), + gf.createPolygon(Array( + new Coordinate(10, 10), + new Coordinate(10, 15), + new Coordinate(15, 15), + new Coordinate(15, 10), + new Coordinate(10, 10), + )), + gf.createPolygon(Array( + new Coordinate(30, 30), + new Coordinate(30, 35), + new Coordinate(35, 35), + new Coordinate(35, 30), + new Coordinate(30, 30), + )), + ) + } + + val pointsBboxString = { + val bbox = new Envelope + points.indices.foreach(i => bbox.expandToInclude(points(i).getEnvelopeInternal)) + s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]" + } + + val polygonsBboxString = { + val bbox = new Envelope + polygons.indices.foreach(i => bbox.expandToInclude(polygons(i).getEnvelopeInternal)) + s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]" + } + + val features = { + Seq( + ScalaSimpleFeature.create(sft, "1", "first", 100, "2017-01-01T00:00:00Z", WKTUtils.write(points.head), WKTUtils.write(polygons.head)), + ScalaSimpleFeature.create(sft, "2", null, 200, "2017-01-02T00:00:00Z", WKTUtils.write(points(1)), WKTUtils.write(polygons(1))), + ScalaSimpleFeature.create(sft, "3", "third", 300, "2017-01-03T00:00:00Z", WKTUtils.write(points(2)), WKTUtils.write(polygons(2))) + ) + } def readFile(filter: FilterCompat.Filter = FilterCompat.NOOP, conf: Configuration = sftConf): Seq[SimpleFeature] = { val builder = ParquetReader.builder[SimpleFeature](new SimpleFeatureReadSupport, new Path(f.toUri)) @@ -82,11 +139,45 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL } def readFile(geoFilter: org.geotools.api.filter.Filter, tsft: SimpleFeatureType): Seq[SimpleFeature] = { - val pFilter = FilterConverter.convert(tsft, geoFilter)._1.map(FilterCompat.get).getOrElse { - ko(s"Couldn't extract a filter from ${ECQL.toCQL(geoFilter)}") - FilterCompat.NOOP + val pFilter = FilterConverter.convert(tsft, geoFilter)(2)._1.map(FilterCompat.get).getOrElse(FilterCompat.NOOP) + val conf = transformConf(tsft) + + val geomAttributeName = tsft.getGeometryDescriptor.getName.toString + val geoms = FilterHelper.extractGeometries(geoFilter, geomAttributeName).values + + val builder = ParquetReader.builder[SimpleFeature](new SimpleFeatureReadSupport, new Path(f.toUri)) + val result = ArrayBuffer.empty[SimpleFeature] + val index = new BucketIndex[SimpleFeature] + + WithClose(builder.withFilter(pFilter).withConf(conf).build()) { reader => + var sf = reader.read() + while (sf != null) { + result += sf + index.insert(sf.getAttribute(geomAttributeName).asInstanceOf[Geometry], sf.getID, sf) + sf = reader.read() + } } - readFile(pFilter, transformConf(tsft)) + + if (geoms.nonEmpty) { + index.query(geoms.head.getEnvelopeInternal).toSeq + } else { + result.toSeq + } + } + + // Helper method that validates the file metadata against the GeoParquet metadata json schema + def validateMetadata(metadataString: String): mutable.Set[ValidationMessage] = { + val schema = { + // https://geoparquet.org/releases/v1.0.0/schema.json + val schemaFile = new File(getClass.getClassLoader.getResource("geoparquet-metadata-schema.json").toURI) + val schemaReader = scala.io.Source.fromFile(schemaFile) + val schemaString = schemaReader.mkString + schemaReader.close() + JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaString) + } + val metadata = new ObjectMapper().readTree(metadataString) + + schema.validate(metadata).asScala } "SimpleFeatureParquetWriter" should { @@ -107,18 +198,58 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL // Validate the file validateParquetFile(filePath) must throwA[RuntimeException].like { - case e => e.getMessage mustEqual s"Unable to validate ${filePath}: File may be corrupted" + case e => e.getMessage mustEqual s"Unable to validate '${filePath}': File may be corrupted" } } - "write parquet files" >> { - WithClose(SimpleFeatureParquetWriter.builder(new Path(f.toUri), sftConf).build()) { writer => + "write geoparquet files" >> { + val writer = SimpleFeatureParquetWriter.builder(new Path(f.toUri), sftConf).build() + WithClose(writer) { writer => features.foreach(writer.write) } + Files.size(f) must beGreaterThan(0L) + + // Check that the GeoParquet metadata is valid json + val metadata = writer.getFooter.getFileMetaData.getKeyValueMetaData.get(GeoParquetSchemaKey) + validateMetadata(metadata) must beEmpty + + // Check that the GeoParquet metadata contains the correct bounding box for each geometry + metadata.contains(pointsBboxString) must beTrue + metadata.contains(polygonsBboxString) must beTrue } - "read parquet files" >> { + "write parquet files with no geometries" >> { + val f = Files.createTempFile("geomesa", ".parquet") + val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date") + val sftConf = { + val c = new Configuration() + StorageConfiguration.setSft(c, sft) + // Use GZIP in tests but snappy in prod due to license issues + c.set(ParquetCompressionOpt, CompressionCodecName.GZIP.toString) + c + } + + val features = { + Seq( + ScalaSimpleFeature.create(sft, "1", "first", 100, "2017-01-01T00:00:00Z"), + ScalaSimpleFeature.create(sft, "2", null, 200, "2017-01-02T00:00:00Z"), + ScalaSimpleFeature.create(sft, "3", "third", 300, "2017-01-03T00:00:00Z") + ) + } + + val writer = SimpleFeatureParquetWriter.builder(new Path(f.toUri), sftConf).build() + WithClose(writer) { writer => + features.foreach(writer.write) + } + + Files.size(f) must beGreaterThan(0L) + + val metadata = writer.getFooter.getFileMetaData.getKeyValueMetaData.get(GeoParquetSchemaKey) + metadata must beNull + } + + "read geoparquet files" >> { val result = readFile(FilterCompat.NOOP, sftConf) result mustEqual features } @@ -183,5 +314,8 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL step { Files.deleteIfExists(f) + + val crcFilePath = Paths.get(s"${f.getParent}/.${f.getFileName}.crc") + Files.deleteIfExists(crcFilePath) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala index 995482f535ba..f73d2354dbc0 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala @@ -20,12 +20,14 @@ import org.geotools.util.factory.Hints import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionBounds import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.StorageKeys import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadataFactory import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorageFactory import org.locationtech.geomesa.utils.collection.SelfClosingIterator import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.locationtech.jts.geom.Envelope import org.specs2.matcher.MatchResult import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner @@ -33,6 +35,7 @@ import org.specs2.specification.AllExpectations import java.nio.file.Files import java.util.UUID +import scala.collection.mutable @RunWith(classOf[JUnitRunner]) class ParquetStorageTest extends Specification with AllExpectations with LazyLogging { @@ -48,6 +51,53 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog val scheme = NamedOptions("z2-8bits") "ParquetFileSystemStorage" should { + "contain partition metadata with correct bounds" in { + val sft = SimpleFeatureTypes.createType("parquet-test", "*geom:Point:srid=4326,name:String,age:Int,dtg:Date") + + val features = (0 until 10).map { i => + val sf = new ScalaSimpleFeature(sft, i.toString) + sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE) + sf.setAttribute(1, s"name$i") + sf.setAttribute(2, s"$i") + sf.setAttribute(3, f"2014-01-${i + 1}%02dT00:00:01.000Z") + sf.setAttribute(0, s"POINT(4$i 5$i)") + sf + } + + withTestDir { dir => + val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) + val metadata = + new FileBasedMetadataFactory() + .create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true)) + val storage = new ParquetFileSystemStorageFactory().apply(context, metadata) + + storage must not(beNull) + + val writers = scala.collection.mutable.Map.empty[String, FileSystemWriter] + + val expectedBounds = new mutable.HashMap[String, Envelope]() + features.foreach { f => + val partition = storage.metadata.scheme.getPartitionName(f) + val writer = writers.getOrElseUpdate(partition, storage.getWriter(partition)) + writer.write(f) + + val env = expectedBounds.getOrElse(partition, new Envelope) + env.expandToInclude(f.getBounds.asInstanceOf[Envelope]) + expectedBounds.put(partition, env) + } + + writers.foreach(_._2.close()) + + logger.debug(s"wrote to ${writers.size} partitions for ${features.length} features") + + val partitions = storage.getPartitions.map(_.name) + partitions must haveLength(writers.size) + + storage.getPartitions.foreach(partition => partition.bounds mustEqual PartitionBounds(expectedBounds(partition.name))) + } + ok + } + "read and write features" in { val sft = SimpleFeatureTypes.createType("parquet-test", "*geom:Point:srid=4326,name:String,age:Int,dtg:Date") @@ -330,7 +380,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog } // note: this is somewhat of a magic number, in that it works the first time through with no remainder - val targetSize = 2100L + val targetSize = 1850L withTestDir { dir => val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir) diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala index 3ea889ef558b..55e76cc5ff7b 100644 --- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala +++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala @@ -8,22 +8,29 @@ package org.locationtech.geomesa.fs.tools.ingest +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.ParquetFileReader import org.geotools.api.data.{DataStoreFinder, Query, Transaction} -import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.geotools.api.feature.simple.SimpleFeatureType +import org.geotools.util.factory.Hints import org.junit.runner.RunWith import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.fs.HadoopSharedCluster import org.locationtech.geomesa.fs.data.FileSystemDataStore +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.GeoParquetSchemaKey import org.locationtech.geomesa.fs.tools.compact.FsCompactCommand import org.locationtech.geomesa.tools.DistributedRunParam.RunModes import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes} import org.locationtech.geomesa.utils.io.WithClose -import org.locationtech.geomesa.utils.text.WKTUtils import org.locationtech.jts.geom._ -import org.specs2.matcher.MatchResult import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner +import java.nio.file.Files +import scala.collection.mutable + @RunWith(classOf[JUnitRunner]) class CompactCommandTest extends Specification { @@ -35,25 +42,34 @@ class CompactCommandTest extends Specification { val encodings = Seq("parquet", "orc") - val pt = WKTUtils.read("POINT(0 0)") - val line = WKTUtils.read("LINESTRING(0 0, 1 1, 4 4)") - val polygon = WKTUtils.read("POLYGON((10 10, 10 20, 20 20, 20 10, 10 10), (11 11, 19 11, 19 19, 11 19, 11 11))") - val mpt = WKTUtils.read("MULTIPOINT((0 0), (1 1))") - val mline = WKTUtils.read("MULTILINESTRING ((0 0, 1 1), (2 2, 3 3))") - val mpolygon = WKTUtils.read("MULTIPOLYGON(((0 0, 1 0, 1 1, 0 0)), ((10 10, 10 20, 20 20, 20 10, 10 10), (11 11, 19 11, 19 19, 11 19, 11 11)))") - - val sfts = encodings.map { name => - val sft = SimpleFeatureTypes.createType(name, - "name:String,age:Int,dtg:Date," + - "*geom:MultiLineString:srid=4326,pt:Point,line:LineString," + + val tempDir: java.nio.file.Path = Files.createTempDirectory("compactCommand") + + val sfts: java.util.Map[String, SimpleFeatureType] = { + def createSft(encoding: String): SimpleFeatureType = { + val sft = SimpleFeatureTypes.createType(encoding, + "name:String,age:Int,dtg:Date," + + "*geom:MultiLineString:srid=4326,pt:Point,line:LineString," + "poly:Polygon,mpt:MultiPoint,mline:MultiLineString,mpoly:MultiPolygon") - sft.setEncoding(name) - sft.setScheme("daily") - sft + sft.setEncoding(encoding) + sft.setScheme("daily") + sft + } + + val map = Map[String, SimpleFeatureType]( + encodings.head -> createSft(encodings.head), + encodings(1) -> createSft(encodings(1)) + ).asJava + + map } val numFeatures = 10000 - val targetFileSize = 8000L // kind of a magic number, in that it divides up the features into files fairly evenly with no remainder + + // kind of a magic number, in that it divides up the features into files fairly evenly with no remainder + val targetFileSize: java.util.Map[String, Long] = Map[String, Long]( + encodings.head -> 15000L, + encodings(1) -> 14000L + ).asJava lazy val path = s"${HadoopSharedCluster.Container.getHdfsUrl}/${getClass.getSimpleName}/" @@ -65,46 +81,97 @@ class CompactCommandTest extends Specification { DataStoreFinder.getDataStore(dsParams.asJava).asInstanceOf[FileSystemDataStore] } + // A map between partition name and a set of bounding boxes of each file in that partition + val partitionBoundingBoxes = new mutable.HashMap[String, mutable.Set[Envelope]] with mutable.MultiMap[String, Envelope] + def features(sft: SimpleFeatureType): Seq[ScalaSimpleFeature] = { - Seq.tabulate(numFeatures) { i => - ScalaSimpleFeature.create(sft, - s"$i", s"test$i", 100 + i, s"2017-06-0${5 + (i % 3)}T04:03:02.0001Z", s"MULTILINESTRING((0 0, 10 10.${i % 10}))", - pt, line, polygon, mpt, mline, mpolygon) + (0 until numFeatures).map { i => + val sf = new ScalaSimpleFeature(sft, i.toString) + sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE) + sf.setAttribute(0, s"name${i % 10}") + sf.setAttribute(1, s"${i % 10}") + sf.setAttribute(2, f"2014-01-${i % 10 + 1}%02dT00:00:01.000Z") + sf.setAttribute(3, s"MULTILINESTRING((0 0, 10 10.${i % 10}))") + sf.setAttribute(4, s"POINT(4${i % 10} 5${i % 10})") + sf.setAttribute(5, s"LINESTRING(0 0, $i $i, 4 4)") + sf.setAttribute(6, s"POLYGON((${i % 10} ${i % 10}, ${i % 10} ${i % 20}, ${i % 20} ${i % 20}, ${i % 20} ${i % 10}, ${i % 10} ${i % 10}), (${i % 11} ${i % 11}, ${i % 19} ${i % 11}, ${i % 19} ${i % 19}, ${i % 11} ${i % 19}, ${i % 11} ${i % 11}))") + sf.setAttribute(7, s"MULTIPOINT((0 0), ($i $i))") + sf.setAttribute(8, s"MULTILINESTRING ((0 0, ${(i+1) % 10} ${(i+1) % 10}), (${(2*i+1) % 10} ${(2*i+1) % 10}, ${(3*i+1) % 10} ${(3*i+1) % 10}))") + sf.setAttribute(9, s"MULTIPOLYGON(((0 0, 1 0, 1 1, 0 0)), ((10 10, 10 20, 20 20, 20 10, 10 10), (11 11, 19 11, 19 19, 11 19, 11 11)))") + sf } } + // Helper for extracting a bounding box from GeoParquet metadata + def getBoundingBoxFromGeoParquetFile(path: Path): Envelope = { + val conf = new Configuration() + val footer = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER) + val metadata = footer.getFileMetaData.getKeyValueMetaData.get(GeoParquetSchemaKey) + + val start = metadata.indexOf("bbox") + 7 + val end = metadata.indexOf("]", start) + val coordinates = metadata.substring(start, end).split(',').map(_.trim.toDouble) + + val x1 = coordinates(0) + val x2 = coordinates(1) + val y1 = coordinates(2) + val y2 = coordinates(3) + new Envelope(x1, x2, y1, y2) + } + + val numFilesPerPartition = 2 + step { - sfts.foreach { sft => + encodings.foreach(encoding => { + val sft = sfts.get(encoding) ds.createSchema(sft) - // create 2 files per partition - features(sft).grouped(numFeatures / 2).foreach { feats => - WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer => + + features(sft).grouped(numFeatures / numFilesPerPartition).foreach { feats => + val writer = ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT) + WithClose(writer) { writer => feats.foreach(FeatureUtils.write(writer, _, useProvidedFid = true)) } } - } + }) } "Compaction command" >> { "Before compacting should be multiple files per partition" in { - foreach(sfts) { sft => + foreach(encodings) { encoding => + val sft = sfts.get(encoding) val fs = ds.getFeatureSource(sft.getTypeName) WithClose(fs.getFeatures.features) { iter => while (iter.hasNext) { val feat = iter.next feat.getDefaultGeometry.asInstanceOf[MultiLineString].isEmpty mustEqual false - featureMustHaveProperGeometries(feat) } } fs.getCount(Query.ALL) mustEqual numFeatures - ds.storage(sft.getTypeName).metadata.getPartitions().map(_.files.size) mustEqual Seq.fill(3)(2) + + val partitions = ds.storage(sft.getTypeName).metadata.getPartitions() + + // For parquet files, get bounding boxes from each file in each partition + if (encoding == "parquet") { + val partitionNames = partitions.map(_.name) + partitionNames.foreach(partitionName => { + val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName) + filePaths.foreach(path => { + val filepath = path.path + val bbox = getBoundingBoxFromGeoParquetFile(filepath) + partitionBoundingBoxes.addBinding(partitionName, bbox) + }) + }) + } + + partitions.map(_.files.size) mustEqual Seq.fill(10)(numFilesPerPartition) } } "Compaction command should run successfully" in { - foreach(sfts) { sft => + foreach(encodings) { encoding => + val sft = sfts.get(encoding) val command = new FsCompactCommand() command.params.featureName = sft.getTypeName command.params.path = path @@ -116,43 +183,63 @@ class CompactCommandTest extends Specification { } "After compacting should be one file per partition" in { - foreach(sfts) { sft => + foreach(encodings) { encoding => + val sft = sfts.get(encoding) val fs = ds.getFeatureSource(sft.getTypeName) WithClose(fs.getFeatures.features) { iter => while (iter.hasNext) { val feat = iter.next feat.getDefaultGeometry.asInstanceOf[MultiLineString].isEmpty mustEqual false - featureMustHaveProperGeometries(feat) } } fs.getCount(Query.ALL) mustEqual numFeatures - ds.storage(sft.getTypeName).metadata.getPartitions().map(_.files.size) mustEqual Seq.fill(3)(1) + + val partitions = ds.storage(sft.getTypeName).metadata.getPartitions() + + // For parquet files, check that the union of bounding boxes of the 2 files before + // compaction is the same as the bounding box of the 1 file after compaction + if (encoding == "parquet") { + val partitionNames = partitions.map(_.name) + partitionNames.foreach(partitionName => { + val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path) + filePaths.foreach(path => { + // In each partition, assert that the + val bboxesUnion = new Envelope + partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox)) + val metadataBbox = getBoundingBoxFromGeoParquetFile(path) + bboxesUnion mustEqual metadataBbox + }) + }) + } + + partitions.map(_.files.size) mustEqual Seq.fill(10)(1) } } "Compaction command should run successfully with target file size" in { - foreach(sfts) { sft => + foreach(encodings) { encoding => + val sft = sfts.get(encoding) val command = new FsCompactCommand() command.params.featureName = sft.getTypeName command.params.path = path command.params.runMode = RunModes.Distributed.toString - command.params.targetFileSize = targetFileSize + command.params.targetFileSize = targetFileSize.get(encoding) // invoke on our existing store so the cached metadata gets updated command.compact(ds) must not(throwAn[Exception]) } } "After compacting with target file size should be multiple files per partition" in { - foreach(sfts) { sft => + foreach(encodings) { encoding => + val sft = sfts.get(encoding) val fs = ds.getFeatureSource(sft.getTypeName) WithClose(fs.getFeatures.features) { iter => while (iter.hasNext) { val feat = iter.next feat.getDefaultGeometry.asInstanceOf[MultiLineString].isEmpty mustEqual false - featureMustHaveProperGeometries(feat) } } @@ -162,21 +249,14 @@ class CompactCommandTest extends Specification { partition.files.size must beGreaterThan(1) val sizes = storage.getFilePaths(partition.name).map(p => storage.context.fc.getFileStatus(p.path).getLen) // hard to get very close with 2 different formats and small files... - foreach(sizes)(_ must beCloseTo(targetFileSize, 4000)) + foreach(sizes)(size => { + size must beCloseTo(targetFileSize.get(encoding), 12000) + }) } } } } - def featureMustHaveProperGeometries(sf: SimpleFeature): MatchResult[Any] = { - sf.getAttribute("pt") mustEqual pt - sf.getAttribute("line") mustEqual line - sf.getAttribute("poly") mustEqual polygon - sf.getAttribute("mpt") mustEqual mpt - sf.getAttribute("mline") mustEqual mline - sf.getAttribute("mpoly") mustEqual mpolygon - } - step { ds.dispose() } diff --git a/pom.xml b/pom.xml index a268cbc5d7c7..36a1a40cacc6 100644 --- a/pom.xml +++ b/pom.xml @@ -1381,6 +1381,12 @@ json-path ${json.path.version} + + com.networknt + json-schema-validator + 1.4.0 + + org.apache.avro avro @@ -3665,6 +3671,10 @@ true + + jitpack.io + https://jitpack.io +