Skip to content

Commit

Permalink
airframe-parquet: Support column projection (#1617)
Browse files Browse the repository at this point in the history
* airframe-parquet: Support column projection

* Add doc
  • Loading branch information
xerial committed May 5, 2021
1 parent 8381f44 commit 15878d2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 9 deletions.
Expand Up @@ -26,7 +26,7 @@ import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.PrimitiveCodec.ValueCodec
import wvlet.airframe.surface.Surface
import wvlet.airframe.surface.{CName, Surface}
import wvlet.log.LogSupport

import java.util
Expand All @@ -51,11 +51,19 @@ object AirframeParquetReader {
}

class AirframeParquetReadSupport[A](surface: Surface) extends ReadSupport[A] {
private val schema = Parquet.toParquetSchema(surface)

override def init(context: InitContext): ReadSupport.ReadContext = {
// do nothing
new ReadContext(context.getFileSchema)
val parquetFileSchema = context.getFileSchema
val targetColumns = surface.params.map(p => CName(p.name).canonicalName).toSet
if (targetColumns.isEmpty) {
// e.g., Json, Map where all parameters need to be extracted
new ReadContext(parquetFileSchema)
} else {
// Pruning columns
val projectedColumns =
parquetFileSchema.getFields.asScala.filter(f => targetColumns.contains(CName(f.getName).canonicalName))
val resultSchema = new MessageType(surface.fullName, projectedColumns.asJava)
new ReadContext(resultSchema)
}
}

override def prepareForRead(
Expand All @@ -68,8 +76,9 @@ class AirframeParquetReadSupport[A](surface: Surface) extends ReadSupport[A] {
}
}

class AirframeParquetRecordMaterializer[A](surface: Surface, parquetSchema: MessageType) extends RecordMaterializer[A] {
private val recordConverter = new ParquetRecordConverter[A](surface, parquetSchema)
class AirframeParquetRecordMaterializer[A](surface: Surface, projectedSchema: MessageType)
extends RecordMaterializer[A] {
private val recordConverter = new ParquetRecordConverter[A](surface, projectedSchema)

override def getCurrentRecord: A = recordConverter.currentRecord

Expand Down Expand Up @@ -118,13 +127,13 @@ object ParquetRecordConverter {

}

class ParquetRecordConverter[A](surface: Surface, parquetSchema: MessageType) extends GroupConverter with LogSupport {
class ParquetRecordConverter[A](surface: Surface, projectedSchema: MessageType) extends GroupConverter with LogSupport {
private val codec = MessageCodec.ofSurface(surface)
private val recordHolder = Map.newBuilder[String, Any]

import ParquetRecordConverter._

private val converters: Seq[Converter] = parquetSchema.getFields.asScala.map { f =>
private val converters: Seq[Converter] = projectedSchema.getFields.asScala.map { f =>
val cv: Converter = f match {
case p if p.isPrimitive =>
p.asPrimitiveType().getPrimitiveTypeName match {
Expand Down
Expand Up @@ -134,4 +134,25 @@ object ParquetTest extends AirSpec {
}
}
}

case class SampleRecord(p1: Int, p2: String, p3: Long)
private val r1 = SampleRecord(0, "a", 1L)
private val r2 = SampleRecord(1, "b", 2L)
case class SampleRecordProjection(p1: Int, p3: Long)

test("column pruning") {
IOUtil.withTempFile("target/tmp-column", ".parquet") { file =>
withResource(Parquet.newWriter[SampleRecord](path = file.getPath)) { writer =>
writer.write(r1)
writer.write(r2)
}

withResource(Parquet.newReader[SampleRecordProjection](path = file.getPath)) { reader =>
reader.read() shouldBe SampleRecordProjection(r1.p1, r1.p3)
reader.read() shouldBe SampleRecordProjection(r2.p1, r2.p3)
reader.read() shouldBe null
}
}
}

}
20 changes: 20 additions & 0 deletions docs/airframe-parquet.md
Expand Up @@ -54,6 +54,26 @@ jsonReader.read() // null
jsonReader.close()
```

## Column Projection

If you need to read only a subset of columns, use a model class that has fewer parameters from the original model class. The Parquet reader will access only to the column blocks of the specified column in the model class parameters:

```scala
case class MyRecord(p1:Int, p2: String, p3:Boolean)

val writer = Parquet.newWriter[MyRecord](path = "record.parquet")
writer.write(...)
writer.close()

case class MyRecordProjection(p1:Int, p3:Boolean)
val reader = Parquet.newReader[MyRecordProjection](path = "record.parquet")

// Only p1 and p3 columns will be read from the Parquet file
reader.read() // MyRecordProjection(p1, p3)
reader.close()
```


## Applying Row Group Filter

Parquet can skip reading records by using row group filters.
Expand Down

0 comments on commit 15878d2

Please sign in to comment.