Skip to content

Commit

Permalink
airframe-parquet: Scala 3 support (#2211)
Browse files Browse the repository at this point in the history
- AnyCodec now accepts knownSurfaces parameter to resolve Surface from Class[_] at runtime
- Add Resource.use for using loan pattern
- Parquet.newRecordWriter accepts knownSurface parameter to support nested objects
  • Loading branch information
xerial committed Jun 1, 2022
1 parent 90931d9 commit 8ef9535
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 35 deletions.
Expand Up @@ -41,6 +41,7 @@ trait MessageCodec[A] extends LogSupport {
} catch {
case e: Throwable => throw unpackError(e)
}
// TODO: Check v.isNull
v.getError match {
case Some(err) =>
throw unpackError(err)
Expand Down
Expand Up @@ -22,6 +22,7 @@ import wvlet.airframe.msgpack.spi._
import wvlet.airframe.surface.{Primitive, Surface}
import wvlet.airframe.ulid.ULID

import java.util.concurrent.ConcurrentHashMap
import scala.util.Try

/**
Expand All @@ -46,7 +47,7 @@ object PrimitiveCodec {
// JSON types
Surface.of[JSONValue] -> JSONValueCodec,
Surface.of[Json] -> RawJsonCodec,
Surface.of[Any] -> AnyCodec
Surface.of[Any] -> AnyCodec.default
)

val primitiveArrayCodec = Map(
Expand Down Expand Up @@ -872,7 +873,7 @@ object PrimitiveCodec {
override def pack(p: Packer, v: Array[Any]): Unit = {
p.packArrayHeader(v.length)
v.foreach { x =>
AnyCodec.pack(p, x)
AnyCodec.default.pack(p, x)
}
}
override def unpack(
Expand All @@ -884,7 +885,7 @@ object PrimitiveCodec {
val b = Array.newBuilder[Any]
b.sizeHint(len)
(0 until len).foreach { i =>
AnyCodec.unpack(u, v)
AnyCodec.default.unpack(u, v)
if (v.isNull) {
b += null // or report error?
} else {
Expand All @@ -909,14 +910,21 @@ object PrimitiveCodec {
}
}

object AnyCodec {
val default: AnyCodec = new AnyCodec()
}

/**
* Codec for Any values. This only supports very basic types to enable packing/unpacking collections like Seq[Any],
* Map[Any, Any] at ease.
*
* Another option to implement AnyCodec is packing pairs of (type, value), but we will not take this approach as this
* will require many bytes to fully encode type names.
*/
object AnyCodec extends MessageCodec[Any] {
class AnyCodec(knownSurfaces: Seq[Surface] = Seq.empty) extends MessageCodec[Any] {

private val knownSurfaceTable = knownSurfaces.map(s => s.rawType -> s).toMap[Class[_], Surface]

override def pack(p: Packer, v: Any): Unit = {
v match {
case null => p.packNil
Expand Down Expand Up @@ -982,12 +990,18 @@ object PrimitiveCodec {
ThrowableCodec.pack(p, v)
case _ =>
val cl = v.getClass
wvlet.airframe.codec.Compat.codecOfClass(cl) match {
case Some(codec) =>
codec.asInstanceOf[MessageCodec[Any]].pack(p, v)
knownSurfaceTable.get(cl) match {
case Some(surface) =>
val codec = MessageCodec.ofSurface(surface).asInstanceOf[MessageCodec[Any]]
codec.pack(p, v)
case None =>
// Pack as a string for unknown types
StringCodec.pack(p, v.toString)
wvlet.airframe.codec.Compat.codecOfClass(cl) match {
case Some(codec) =>
codec.asInstanceOf[MessageCodec[Any]].pack(p, v)
case None =>
// Pack as a string for unknown types
StringCodec.pack(p, v.toString)
}
}
}
}
Expand Down
Expand Up @@ -23,7 +23,8 @@ object ScalaStandardCodec {
case class OptionCodec[A](elementCodec: MessageCodec[A]) extends MessageCodec[Option[A]] {
override def pack(p: Packer, v: Option[A]): Unit = {
v match {
case None => p.packNil
case None =>
p.packNil
case Some(x) =>
elementCodec.pack(p, x)
}
Expand Down
Expand Up @@ -24,6 +24,20 @@ trait Resource[A] extends AutoCloseable {
def get: A
def close(): Unit

/**
* Use the resource within the limited scope. After existing the scope, the resource will be closed
* @param body
* @tparam U
* @return
*/
def use[U](body: A => U): U = {
try {
body(get)
} finally {
close()
}
}

/**
* Wrap a Future with this resource. After the future completes, the resource will be closed
*/
Expand Down
Expand Up @@ -16,18 +16,21 @@ object Parquet extends ParquetCompat with LogSupport {
* Create a Parquet writer that accepts records represented in Map, Array, JSON, MsgPack, etc.
* @param path
* @param schema
* @param knownSurfaces
* surfaces of objects that will be used for wrigin records
* @param hadoopConf
* @param config
* @return
*/
def newRecordWriter(
path: String,
schema: MessageType,
knownSurfaces: Seq[Surface] = Seq.empty,
hadoopConf: Configuration = new Configuration(),
config: ParquetWriterAdapter.RecordWriterBuilder => ParquetWriterAdapter.RecordWriterBuilder =
identity[ParquetWriterAdapter.RecordWriterBuilder](_)
): ParquetWriter[Any] = {
val b = ParquetWriterAdapter.recordWriterBuilder(path, schema, hadoopConf)
val b = ParquetWriterAdapter.recordWriterBuilder(path, schema, knownSurfaces, hadoopConf)
val builder = config(b)
builder.build()
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ import org.apache.parquet.schema.{MessageType, Type}
import wvlet.airframe.codec.{JSONCodec, MessageCodec, MessageCodecException}
import wvlet.airframe.codec.PrimitiveCodec.{AnyCodec, ValueCodec}
import wvlet.airframe.json.JSONParseException
import wvlet.airframe.msgpack.spi.Value.{ArrayValue, BinaryValue, MapValue, StringValue}
import wvlet.airframe.msgpack.spi.Value.{ArrayValue, BinaryValue, MapValue, NilValue, StringValue}
import wvlet.airframe.msgpack.spi.{Code, MessagePack, MsgPack, Value}
import wvlet.airframe.surface.{CName, Parameter, Surface}
import wvlet.log.LogSupport
Expand Down Expand Up @@ -111,7 +111,7 @@ class ParquetSeqWriter(elementCodec: ParquetWriteCodec) extends ParquetWriteCode
}
case _ =>
// Write unknown value as binary
val msgpack = AnyCodec.toMsgPack(v)
val msgpack = AnyCodec.default.toMsgPack(v)
recordConsumer.addBinary(Binary.fromConstantByteArray(msgpack))
}
}
Expand Down Expand Up @@ -185,6 +185,8 @@ case class ParquetObjectWriter(paramWriters: Seq[ParquetFieldWriter], params: Se
parquetCodecTable.get(columnName) match {
case Some(parameterCodec) =>
v match {
case NilValue =>
// skip
case m: MapValue if m.isEmpty =>
// skip
case a: ArrayValue if a.isEmpty =>
Expand Down
Expand Up @@ -15,27 +15,28 @@ package wvlet.airframe.parquet

import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.MessageType
import wvlet.airframe.codec.PrimitiveCodec.ValueCodec
import wvlet.airframe.codec.PrimitiveCodec.{AnyCodec, ValueCodec}
import wvlet.airframe.codec.{MessageCodec, MessageCodecException}
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

/**
* Adjust any input objects into the shape of the Parquet schema
*
* @param schema
*/
class ParquetRecordWriter(schema: MessageType) extends LogSupport {
class ParquetRecordWriter(schema: MessageType, knownSurfaces: Seq[Surface] = Seq.empty) extends LogSupport {
private val parquetCodec: ParquetWriteCodec = {
val surface = ParquetSchema.buildSurfaceFromParquetSchema(schema)
ParquetWriteCodec.parquetCodecOf(schema, surface, ValueCodec).asRoot
}

private val anyCodec = MessageCodec.of[Any]
private val codec = new AnyCodec(knownSurfaces)

def pack(obj: Any, recordConsumer: RecordConsumer): Unit = {
val msgpack =
try {
anyCodec.toMsgPack(obj)
codec.toMsgPack(obj)
} catch {
case e: MessageCodecException =>
throw new IllegalArgumentException(s"Cannot convert the input into MsgPack: ${obj}", e)
Expand Down
Expand Up @@ -19,7 +19,15 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.{MessageType, Type}
import org.apache.parquet.schema.Type.Repetition
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.PrimitiveCodec.{BooleanCodec, DoubleCodec, FloatCodec, IntCodec, LongCodec, StringCodec}
import wvlet.airframe.codec.PrimitiveCodec.{
BooleanCodec,
DoubleCodec,
FloatCodec,
IntCodec,
LongCodec,
StringCodec,
ValueCodec
}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport
Expand Down
Expand Up @@ -47,18 +47,23 @@ object ParquetWriterAdapter extends LogSupport {
}
}

class RecordWriterBuilder(schema: MessageType, file: OutputFile)
class RecordWriterBuilder(schema: MessageType, file: OutputFile, knownSurfaces: Seq[Surface])
extends ParquetWriter.Builder[Any, RecordWriterBuilder](file: OutputFile) {
override def self(): RecordWriterBuilder = this
override def getWriteSupport(conf: Configuration): WriteSupport[Any] = {
new ParquetRecordWriterSupportAdapter(schema)
new ParquetRecordWriterSupportAdapter(schema, knownSurfaces)
}
}

def recordWriterBuilder(path: String, schema: MessageType, conf: Configuration): RecordWriterBuilder = {
def recordWriterBuilder(
path: String,
schema: MessageType,
knownSurfaces: Seq[Surface],
conf: Configuration
): RecordWriterBuilder = {
val fsPath = new Path(path)
val file = HadoopOutputFile.fromPath(fsPath, conf)
val b = new RecordWriterBuilder(schema, file).withConf(conf)
val b = new RecordWriterBuilder(schema, file, knownSurfaces).withConf(conf)
// Use snappy by default
b.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
Expand Down Expand Up @@ -90,7 +95,9 @@ class ParquetWriteSupportAdapter[A](surface: Surface) extends WriteSupport[A] wi
}
}

class ParquetRecordWriterSupportAdapter(schema: MessageType) extends WriteSupport[Any] with LogSupport {
class ParquetRecordWriterSupportAdapter(schema: MessageType, knownSurfaces: Seq[Surface])
extends WriteSupport[Any]
with LogSupport {
private var recordConsumer: RecordConsumer = null

override def init(configuration: Configuration): WriteContext = {
Expand All @@ -102,7 +109,7 @@ class ParquetRecordWriterSupportAdapter(schema: MessageType) extends WriteSuppor
this.recordConsumer = recordConsumer
}

private val codec = new ParquetRecordWriter(schema)
private val codec = new ParquetRecordWriter(schema, knownSurfaces)

override def write(record: Any): Unit = {
require(recordConsumer != null)
Expand Down
Expand Up @@ -42,6 +42,7 @@ class RecordBuilderImpl extends RecordBuilder with LogSupport {
}
}
}

def toMap: Map[String, Any] = {
holder
}
Expand Down
Expand Up @@ -14,6 +14,7 @@
package wvlet.airframe.parquet

import wvlet.airframe.control.Control.withResource
import wvlet.airframe.control.Resource
import wvlet.airframe.msgpack.spi.{Value, ValueFactory}
import wvlet.airframe.surface.Surface
import wvlet.airframe.ulid.ULID
Expand Down Expand Up @@ -42,7 +43,7 @@ object NestedRecordWriteTest extends AirSpec {
debug(s"write target schema: ${schema}")

IOUtil.withTempFile("target/tmp-nested-record", ".parquet") { file =>
withResource(Parquet.newRecordWriter(file.getPath, schema)) { writer =>
withResource(Parquet.newRecordWriter(file.getPath, schema, knownSurfaces = Seq(Surface.of[ColStats]))) { writer =>
writer.write(m1)
writer.write(m2)
}
Expand Down Expand Up @@ -109,7 +110,13 @@ object NestedRecordWriteTest extends AirSpec {
test("write records with Option/Seq using record writer") {
val p0 = Partition(id = ULID.newULID, metadata = Map("xxx" -> "yyy"))
IOUtil.withTempFile("target/tmp-nested-opt-record", ".parquet") { file =>
withResource(Parquet.newRecordWriter(file.getPath, Parquet.toParquetSchema(Surface.of[Partition]))) { writer =>
withResource(
Parquet.newRecordWriter(
file.getPath,
Parquet.toParquetSchema(Surface.of[Partition]),
knownSurfaces = Seq(Surface.of[Partition])
)
) { writer =>
writer.write(p0)
}
withResource(Parquet.newReader[Partition](file.getPath)) { reader =>
Expand Down
Expand Up @@ -35,7 +35,7 @@ object ParquetRecordWriterTest extends AirSpec {
writer.write(Array(2, "yui"))
writer.write("""{"id":3, "name":"aina"}""")
writer.write("""[4, "ruri"]""")
writer.write(AnyCodec.toMsgPack(Map("id" -> 5, "name" -> "xxx")))
writer.write(AnyCodec.default.toMsgPack(Map("id" -> 5, "name" -> "xxx")))
}

withResource(Parquet.newReader[Map[String, Any]](file.getPath)) { reader =>
Expand Down Expand Up @@ -86,10 +86,11 @@ object ParquetRecordWriterTest extends AirSpec {

test("write records with Option") {
IOUtil.withTempFile("target/tmp-record-opt", ".parquet") { file =>
withResource(Parquet.newRecordWriter(file.getPath, schema2)) { writer =>
writer.write(RecordOpt(1, Some(1)))
writer.write(RecordOpt(2, None))
writer.write("""{"id":"3"}""")
withResource(Parquet.newRecordWriter(file.getPath, schema2, knownSurfaces = Seq(Surface.of[RecordOpt]))) {
writer =>
writer.write(RecordOpt(1, Some(1)))
writer.write(RecordOpt(2, None))
writer.write("""{"id":"3"}""")
}

withResource(Parquet.newReader[Map[String, Any]](file.getPath)) { reader =>
Expand Down
21 changes: 17 additions & 4 deletions docs/airframe-parquet.md
Expand Up @@ -66,7 +66,7 @@ val schema = new MessageType(
Types.optional(PrimitiveTypeName.BINARY).as(stringType).named("name")
)
// Create a record writer for the given schema
val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema)
val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema = schema)
// Write a record using Map (column name -> value)
recordWriter.write(Map("id" -> 1, "name" -> "leo"))
// Write a record using JSON object
Expand All @@ -75,10 +75,23 @@ recordWriter.write("""{"id":2, "name":"yui"}""")
recordWriter.write(Seq(3, "aina"))
// Write a record using JSON array
recordWriter.write("""[4, "xxx"]""")
// You can use case classes as input as well
recordWriter.write(MyEntry(5, "yyy"))

recordWriter.close()


// In case you need to write dynamic recoreds containing case classes,
// register the Surfaces of these classes
case class Nested(id:Int, entry:MyEntry)
val nestedRecordWriter = Parquet.newRecordWriter(
path = "nested.parquet",
// You can build a Parquet schema matching to Surface
schema = Parquet.toParquetSchema(Surface.of[Nested]),
knownSurfaces = Seq(Surface.of[MyEntry]) // required to serialize MyEntry
)

// Write dynamic records
nestedRecordWriter.write(Map("id" -> 1, "entry" -> MyEntry(1, "yyy"))
nestedRecordWriter.write(Map("id" -> 2, "entry" -> MyEntry(2, "zzz"))
nestedRecordWriter.close()
```

### Using with AWS S3
Expand Down

0 comments on commit 8ef9535

Please sign in to comment.