Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3259 FSDS - Add support for GeoParquet #3064

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{MessageType, OriginalType, Type}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert2.AbstractConverter.{BasicConfig, BasicField, BasicOptions}
Expand Down Expand Up @@ -63,6 +64,7 @@ class ParquetConverterFactory
// note: get the path as a URI so that we handle local files appropriately
val filePath = new Path(PathUtils.getUrl(p).toURI)
val footer = ParquetFileReader.readFooter(new Configuration(), filePath, ParquetMetadataConverter.NO_FILTER)
val parquetSchemaVersion = footer.getFileMetaData.getKeyValueMetaData.getOrDefault("geomesa.parquet.version", "0").toInt
val (schema, fields, id) = SimpleFeatureParquetSchema.read(footer.getFileMetaData) match {
case Some(parquet) =>
// this is a geomesa encoded parquet file
Expand All @@ -71,16 +73,7 @@ class ParquetConverterFactory
// note: parquet converter stores the generic record under index 0
val path = s"avroPath($$0, '/$name')"
// some types need a function applied to the underlying avro value
val expression = ObjectType.selectType(descriptor) match {
case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"parquetPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"parquetMultiPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"parquetLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"parquetMultiLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"parquetPolygon($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"parquetMultiPolygon($$0, '/$name')"
case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)"
case _ => path
}
val expression = computeTransformFunction(name, path, descriptor, parquetSchemaVersion)
BasicField(descriptor.getLocalName, Some(Expression(expression)))
}
val id = Expression(s"avroPath($$0, '/${SimpleFeatureParquetSchema.FeatureIdField}')")
Expand Down Expand Up @@ -115,6 +108,41 @@ class ParquetConverterFactory
}
}
}

private def computeTransformFunction(name: String, path: String, descriptor: AttributeDescriptor, schemaVersion: Int): String = {
def expressionV2(name: String, path: String, descriptor: AttributeDescriptor): String = {
ObjectType.selectType(descriptor) match {
case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"point(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"multipoint(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"linestring(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"multilinestring(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"polygon(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"multipolygon(avroPath($$0, '/$name'))"
case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)"
case _ => path
}
}

def expressionV0V1(name: String, path: String, descriptor: AttributeDescriptor): String = {
ObjectType.selectType(descriptor) match {
case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"parquetPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"parquetMultiPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"parquetLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"parquetMultiLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"parquetPolygon($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"parquetMultiPolygon($$0, '/$name')"
case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)"
case _ => path
}
}

schemaVersion match {
case 2 => expressionV2(name, path, descriptor)
case 1 => expressionV0V1(name, path, descriptor)
case 0 => expressionV0V1(name, path, descriptor)
case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v")
}
}
}

object ParquetConverterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ import org.locationtech.geomesa.convert.avro.AvroPath
import org.locationtech.geomesa.convert2.transforms.Expression.LiteralString
import org.locationtech.geomesa.convert2.transforms.TransformerFunction.NamedTransformerFunction
import org.locationtech.geomesa.convert2.transforms.{Expression, TransformerFunction, TransformerFunctionFactory}
import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchema, SimpleFeatureReadSupport}
import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchemaV1, SimpleFeatureReadSupport}
import org.locationtech.jts.geom._

/**
* For parsing geometries from a GeoParquet file, the GeometryFunctionFactory class provides equivalent functionality.
*
* This class is kept for backwards compatibility with older Parquet file formats.
*/
@Deprecated
class ParquetFunctionFactory extends TransformerFunctionFactory {

override def functions: Seq[TransformerFunction] = geometries
Expand All @@ -42,7 +48,7 @@ class ParquetFunctionFactory extends TransformerFunctionFactory {
abstract class ParquetGeometryFn[T <: Geometry, U](name: String, path: AvroPath)
extends NamedTransformerFunction(Seq(name), pure = true) {

import SimpleFeatureParquetSchema.{GeometryColumnX, GeometryColumnY}
import SimpleFeatureParquetSchemaV1.{GeometryColumnX, GeometryColumnY}

override def apply(args: Array[AnyRef]): AnyRef = {
path.eval(args(0).asInstanceOf[GenericRecord]).collect {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

package org.locationtech.geomesa.convert.parquet

import com.typesafe.config.ConfigFactory
import com.typesafe.config.{Config, ConfigFactory}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
import org.locationtech.geomesa.convert.EvaluationContext
Expand All @@ -33,6 +34,39 @@ class ParquetConverterTest extends Specification {
sequential

"ParquetConverter" should {
"parse a geoparquet file" in {
val conf = ConfigFactory.parseString(
"""
| {
| type = "parquet",
| id-field = "avroPath($0, '/__fid__')",
| fields = [
| { name = "name", transform = "avroPath($0, '/name')" },
| { name = "age", transform = "avroPath($0, '/age')" },
| { name = "dtg", transform = "avroPath($0, '/dtg')" },
| { name = "position", transform = "point(avroPath($0, '/position'))" },
| ]
| }
""".stripMargin)

val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*position:Point:srid=4326")

val file = getClass.getClassLoader.getResource("example-geo.parquet")
val path = new File(file.toURI).getAbsolutePath

val res = WithClose(SimpleFeatureConverter(sft, conf)) { converter =>
val ec = converter.createEvaluationContext(EvaluationContext.inputFileParam(path))
WithClose(converter.process(file.openStream(), ec))(_.toList)
}

res must haveLength(3)
res.map(_.getID) mustEqual Seq("1", "2", "3")
res.map(_.getAttribute("name")) mustEqual Seq("first", null, "third")
res.map(_.getAttribute("age")) mustEqual Seq(100, 200, 300)
res.map(_.getAttribute("dtg")) mustEqual Seq("2017-01-01", "2017-01-02", "2017-01-03").map(FastConverter.convert(_, classOf[Date]))
res.map(_.getAttribute("position")) mustEqual Seq("POINT (25.236263 27.436734)", "POINT (67.2363 55.236)", "POINT (73.0 73.0)").map(FastConverter.convert(_, classOf[Point]))
}

"parse a parquet file" in {
val conf = ConfigFactory.parseString(
"""
Expand Down Expand Up @@ -68,6 +102,33 @@ class ParquetConverterTest extends Specification {
res.map(_.getAttribute("geom")) mustEqual Seq("POINT (-100.2365 23)", "POINT (40.232 -53.2356)", "POINT (3 -62.23)").map(FastConverter.convert(_, classOf[Point]))
}

"infer a converter from a geomesa geoparquet file" >> {
val file = getClass.getClassLoader.getResource("example-geo.parquet")
val path = new File(file.toURI).getAbsolutePath

val factory = new ParquetConverterFactory()
val inferred: Option[(SimpleFeatureType, Config)] = factory.infer(file.openStream(), path = Some(path))
inferred must beSome

val (sft, config) = inferred.get

sft.getAttributeDescriptors.asScala.map(_.getLocalName) mustEqual Seq("name", "age", "dtg", "position")
sft.getAttributeDescriptors.asScala.map(_.getType.getBinding) mustEqual
Seq(classOf[String], classOf[java.lang.Integer], classOf[Date], classOf[Point])

val res = WithClose(SimpleFeatureConverter(sft, config)) { converter =>
val ec = converter.createEvaluationContext(EvaluationContext.inputFileParam(path))
WithClose(converter.process(file.openStream(), ec))(_.toList)
}

res must haveLength(3)
res.map(_.getID) mustEqual Seq("1", "2", "3")
res.map(_.getAttribute("name")) mustEqual Seq("first", null, "third")
res.map(_.getAttribute("age")) mustEqual Seq(100, 200, 300)
res.map(_.getAttribute("dtg")) mustEqual Seq("2017-01-01", "2017-01-02", "2017-01-03").map(FastConverter.convert(_, classOf[Date]))
res.map(_.getAttribute("position")) mustEqual Seq("POINT (25.236263 27.436734)", "POINT (67.2363 55.236)", "POINT (73.0 73.0)").map(FastConverter.convert(_, classOf[Point]))
}

"infer a converter from a geomesa parquet file" >> {
val file = getClass.getClassLoader.getResource("example.parquet")
val path = new File(file.toURI).getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.{FileSystemUpda
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata._
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType
Expand Down Expand Up @@ -67,11 +67,13 @@ abstract class AbstractFileSystemStorage(
/**
* Create a writer for the given file
*
* @param partition the partition that the file belongs to
* @param action whether to append or modify
* @param file file to write to
* @param observer observer to report stats on the data written
* @return
*/
protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter
protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter

/**
* Create a path reader with the given filter and transform
Expand Down Expand Up @@ -234,11 +236,11 @@ abstract class AbstractFileSystemStorage(
def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fc, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))
val observer = if (observers.isEmpty) { None } else {
val compositeObserver = new CompositeObserver(observers.map(_.apply(path)))
Some(compositeObserver)
}
WriterConfig(path, observer)
WriterConfig(partition, action, path, observer)
}

targetSize(targetFileSize) match {
Expand All @@ -247,7 +249,7 @@ abstract class AbstractFileSystemStorage(
}
}

private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.path, config.observer)
private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.partition, config.action, config.path, config.observer)

/**
* Writes files up to a given size, then starts a new file
Expand Down Expand Up @@ -350,10 +352,10 @@ abstract class AbstractFileSystemStorage(
* @param file file being written
* @param action file type
*/
class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver {
override protected def onClose(bounds: Envelope, count: Long): Unit = {
protected class FileBasedMetadataCallback(partition: String, action: StorageFileAction, file: Path) extends ((Envelope, Long) => Unit) {
override def apply(env: Envelope, count: Long): Unit = {
val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action))
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(bounds), count))
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(env), count))
}
}
}
Expand Down Expand Up @@ -391,5 +393,5 @@ object AbstractFileSystemStorage {
protected def onClose(bounds: Envelope, count: Long): Unit
}

private case class WriterConfig(path: Path, observer: FileSystemObserver)
private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: Option[FileSystemObserver])
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWrite
/**
* Marker trait for writer hooks
*/
trait FileSystemObserver extends FileSystemWriter
trait FileSystemObserver extends FileSystemWriter
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{StorageFile, StorageFilePath}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
Expand All @@ -30,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
// actually need to be closed, and since they will only open a single connection per converter, the
// impact should be low

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter =
throw new NotImplementedError()

override protected def createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.geom.{Envelope, Geometry}

/**
* Orc implementation of FileSystemStorage
Expand All @@ -32,8 +34,20 @@ import org.locationtech.jts.geom.Geometry
class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new OrcFileSystemWriter(metadata.sft, context.conf, file, observer)
private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver {
override protected def onClose(bounds: Envelope, count: Long): Unit = new FileBasedMetadataCallback(partition, action, file)(bounds, count)
}

override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = {
val singleGeometryObserver = new SingleGeometryObserver(partition, action, file)

observer match {
case Some(_) =>
val compositeObserver = new CompositeObserver(Seq(singleGeometryObserver, observer.get))
new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(compositeObserver))
case None => new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(singleGeometryObserver))
}
}

override protected def createReader(
filter: Option[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class OrcFileSystemWriter(
sft: SimpleFeatureType,
config: Configuration,
file: Path,
observer: FileSystemObserver = NoOpObserver
observer: Option[FileSystemObserver] = None
) extends FileSystemWriter {

private val schema = OrcFileSystemStorage.createTypeDescription(sft)
Expand All @@ -34,6 +34,7 @@ class OrcFileSystemWriter(
private val batch = schema.createRowBatch()

private val attributeWriter = OrcAttributeWriter(sft, batch)
private val observerVal = observer.getOrElse(NoOpObserver)

override def write(sf: SimpleFeature): Unit = {
attributeWriter.apply(sf, batch.size)
Expand All @@ -43,19 +44,19 @@ class OrcFileSystemWriter(
writer.addRowBatch(batch)
batch.reset()
}
observer.write(sf)
observerVal.write(sf)
}

override def flush(): Unit = {
flushBatch()
observer.flush()
observerVal.flush()
}

override def close(): Unit = {
try { flushBatch() } catch {
case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e
case NonFatal(e) => CloseQuietly(Seq(writer, observerVal)).foreach(e.addSuppressed); throw e
}
CloseQuietly.raise(Seq(writer, observer))
CloseQuietly.raise(Seq(writer, observerVal))
}

private def flushBatch(): Unit = {
Expand Down