Typed Parquet Tuple #1198

Merged
merged 8 commits into from May 14, 2015

Projects

None yet

6 participants

@JiJiTang
Contributor
  • Used as sink to write tuple with primitive fields and also collection types(List, Set, Map)
  • Create Parquet schema generation macro
@colinmarc

Hey! I'm just a rando, but I work on very similar stuff, and I have some rando thoughts:

This is a really cool approach, because it allows you to load parquet directly into tuples/case classes. However (if I'm reading this correctly), the problem with this PR is that it doesn't actually materialize the tuples/case classes for you, because it's using ParquetTupleScheme and you already have a cascading tuple of the fields from that. It also doesn't support nested schemas (I think) for the same reason.

You could bypass this and make the code simpler if you put this macro more or less directly into a ParquetTypedScheme; rather than reading the records into cascading tuples and then materializing them into case classes/scala tuples, you use the macro to materialize the case classes/scala tuples directly off of the parquet reader. Then you'd end up with an interface like:

case class Foo(a: String, b: Option[Double])
ParquetTypedSource[Foo].map { f => f.a, f.b } // type checking against your parquet schema!

Anyway, just a suggestion!

@JiJiTang
Contributor

Hello @colinmarc
Thank you for this comment and you are totally right. At the beginning, I wanted also to generate cascading fields directly from case classes using macros(defined in the scalding-macros module). But this will create a direct dependence on the module from scalding-parquet. So I've defined the fields as part of the method parameters and let users to decide whether or not to use scalding-macros. And perhaps I will make all these parameters implicit(I will try to make a commit tonight). And you are also right about the reason why it doesn't support nested schemas and a pull request need to be done in parquet-mr to make ParquetTupleScheme support nested schema. As you can see TupleWriteSupport only supports primitive type today.

@julienledem julienledem and 1 other commented on an outdated diff Feb 17, 2015
...er/scalding/parquet/tuple/macros/MacroUnitTests.scala
+ | required binary b.a1.y;
+ | required int32 b.a2.x;
+ | required binary b.a2.y;
+ | required binary b.y;
+ | required int32 c.x;
+ | required binary c.y;
+ | required int32 d.a1.x;
+ | required binary d.a1.y;
+ | required int32 d.a2.x;
+ | required binary d.a2.y;
+ | required binary d.y;
+ | required int32 e.a1.x;
+ | required binary e.a1.y;
+ | required int32 e.a2.x;
+ | required binary e.a2.y;
+ | required binary e.y;
@julienledem
julienledem Feb 17, 2015 Contributor

I'm curious, why do you use "." to capture nesting instead of actually nesting groups?

@JiJiTang
JiJiTang Feb 17, 2015 Contributor

@julienledem Hi Julien, totally agree with you, it will be better to use nesting groups. The reason I make the generated schema flat is because today TupleWriteSupport only supports primitive type for right now.

      if (field.isPrimitive()) {
        writePrimitive(record, field.asPrimitiveType());
      } else {
        throw new UnsupportedOperationException("Complex type not implemented");
      }
@julienledem
Contributor

Nice approach

@ianoc ianoc commented on an outdated diff Feb 17, 2015
...er/scalding/parquet/tuple/macros/MacroImplicits.scala
@@ -0,0 +1,10 @@
+package com.twitter.scalding.parquet.tuple.macros
+
+import scala.language.experimental.macros
+
+import com.twitter.scalding.parquet.tuple.macros.impl.SchemaProviderImpl
+import _root_.parquet.schema.MessageType
+
+object MacroImplicits {
+ implicit def materializeCaseClassTypeDescriptor[T]: MessageType = macro SchemaProviderImpl.toParquetSchemaImp[T]
@ianoc
ianoc Feb 17, 2015 Collaborator

comment

@ianoc ianoc commented on an outdated diff Feb 17, 2015
...witter/scalding/parquet/tuple/TypedParquetTuple.scala
+ def parquetSchema: Option[String]
+
+ override def hdfsScheme = {
+ val scheme = parquetSchema match {
+ case Some(messageType) => new ParquetTupleScheme(sourceFields, sinkFields, messageType)
+ case _ =>
+ withFilter match {
+ case Some(filterPredicate) => new ParquetTupleScheme(filterPredicate, sourceFields)
+ case _ => new ParquetTupleScheme(sourceFields)
+ }
+ }
+ HadoopSchemeInstance(scheme.asInstanceOf[Scheme[_, _, _, _, _]])
+ }
+}
+
+class TypedFixedPathParquetTuple[T](paths: Seq[String],
@ianoc
ianoc Feb 17, 2015 Collaborator

doing tuple converters/setters like this if you have macros in the fray is dangerous, you could get one macro generated and not the other making them incompatible

@ianoc ianoc and 1 other commented on an outdated diff Feb 17, 2015
...om/twitter/scalding/parquet/tuple/macros/Macros.scala
@@ -0,0 +1,9 @@
+package com.twitter.scalding.parquet.tuple.macros
+
+import scala.language.experimental.macros
+
+import com.twitter.scalding.parquet.tuple.macros.impl.SchemaProviderImpl
+
+object Macros {
+ def caseClassParquetSchema[T]: _root_.parquet.schema.MessageType = macro SchemaProviderImpl.toParquetSchemaImp[T]
@ianoc
ianoc Feb 17, 2015 Collaborator

Do you need macros and macro implicits?

@JiJiTang
JiJiTang Feb 27, 2015 Contributor

Hi @ianoc , you are right. There's no need for macro implicits. I keep macros and delete these macro implicits in my latest commits.

@ianoc ianoc commented on an outdated diff Feb 17, 2015
...ng/parquet/tuple/macros/impl/SchemaProviderImpl.scala
+ val fieldName = accessorMethod.name.toTermName.toString
+ val fieldType = accessorMethod.returnType
+ matchField(fieldType, outerName, fieldName, false)
+ }.toList
+ }
+
+ def expandCaseClass(outerTpe: Type, outerName: String, isOption: Boolean): Tree = {
+ val expanded = expandMethod(outerTpe, outerName, isOption)
+ if (expanded.isEmpty) c.abort(c.enclosingPosition, s"Case class $outerTpe has no primitive types we were able to extract")
+ val messageTypeName = s"${outerTpe}".split("\\.").last
+ q"""new _root_.parquet.schema.MessageType($messageTypeName,
+ _root_.scala.Array.apply[parquet.schema.Type](..$expanded):_*)
+ """
+ }
+
+ c.Expr[parquet.schema.MessageType](expandCaseClass(T.tpe, "", isOption = false))
@ianoc
ianoc Feb 17, 2015 Collaborator

You should just import this, you've used it with and without root. So importing it up the top is safer

@ianoc ianoc commented on an outdated diff Feb 17, 2015
...ng/parquet/tuple/macros/impl/SchemaProviderImpl.scala
+ List(q"""new _root_.parquet.schema.PrimitiveType(${getRepetition(isOption)}, _root_.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64, $parquetFieldName)""")
+ case tpe if tpe =:= typeOf[Float] =>
+ List(q"""new _root_.parquet.schema.PrimitiveType(${getRepetition(isOption)}, _root_.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT, $parquetFieldName)""")
+ case tpe if tpe =:= typeOf[Double] =>
+ List(q"""new _root_.parquet.schema.PrimitiveType(${getRepetition(isOption)}, _root_.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE, $parquetFieldName)""")
+ case tpe if tpe.erasure =:= typeOf[Option[Any]] && isOption =>
+ c.abort(c.enclosingPosition, s"Nested options do not make sense being mapped onto a tuple fields in cascading.")
+ case tpe if tpe.erasure =:= typeOf[Option[Any]] =>
+ val innerType = tpe.asInstanceOf[TypeRefApi].args.head
+ matchField(innerType, outerName, fieldName, true)
+ case tpe if IsCaseClassImpl.isCaseClassType(c)(tpe) => expandMethod(tpe, s"$parquetFieldName.", isOption = false)
+ case _ => c.abort(c.enclosingPosition, s"Case class $T is not pure primitives or nested case classes")
+ }
+ }
+
+ def expandMethod(outerTpe: Type, outerName: String, isOption: Boolean): List[Tree] = {
@ianoc
ianoc Feb 17, 2015 Collaborator

drop the braces for a single statement

@ianoc ianoc and 1 other commented on an outdated diff Feb 17, 2015
...ng/parquet/tuple/macros/impl/SchemaProviderImpl.scala
+ implicit val extractorLiftable = new Liftable[Extractor] {
+ def apply(b: Extractor): Tree = b.toTree
+ }
+
+ lazy val REPETITION_REQUIRED = q"_root_.parquet.schema.Type.Repetition.REQUIRED"
+ lazy val REPETITION_OPTIONAL = q"_root_.parquet.schema.Type.Repetition.OPTIONAL"
+
+ def getRepetition(isOption: Boolean): Tree = {
+ if (isOption) REPETITION_OPTIONAL else REPETITION_REQUIRED
+ }
+
+ def matchField(fieldType: Type, outerName: String, fieldName: String, isOption: Boolean): List[Tree] = {
+ val parquetFieldName = s"$outerName$fieldName"
+ fieldType match {
+ case tpe if tpe =:= typeOf[String] =>
+ List(q"""new _root_.parquet.schema.PrimitiveType(${getRepetition(isOption)}, _root_.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY, $parquetFieldName)""")
@ianoc
ianoc Feb 17, 2015 Collaborator

these are mostly repetition, can we not make a helper function here?

@JiJiTang
JiJiTang Feb 27, 2015 Contributor

Hi @ianoc , refacto done, please check my latest commits.

@ianoc ianoc and 1 other commented on an outdated diff Feb 17, 2015
...ng/parquet/tuple/macros/impl/SchemaProviderImpl.scala
+ if (!IsCaseClassImpl.isCaseClassType(c)(T.tpe))
+ c.abort(c.enclosingPosition, s"""We cannot enforce ${T.tpe} is a case class, either it is not a case class or this macro call is possibly enclosed in a class.
+ This will mean the macro is operating on a non-resolved type.""")
+
+ case class Extractor(tpe: Type, toTree: Tree)
+ case class Builder(toTree: Tree = q"")
+
+ implicit val builderLiftable = new Liftable[Builder] {
+ def apply(b: Builder): Tree = b.toTree
+ }
+
+ implicit val extractorLiftable = new Liftable[Extractor] {
+ def apply(b: Extractor): Tree = b.toTree
+ }
+
+ lazy val REPETITION_REQUIRED = q"_root_.parquet.schema.Type.Repetition.REQUIRED"
@ianoc
ianoc Feb 17, 2015 Collaborator

why is it lazy?

@JiJiTang
JiJiTang Feb 27, 2015 Contributor

No reason to make this lazy, bad habit of mine, shamed.

@ianoc ianoc and 1 other commented on an outdated diff Feb 17, 2015
...er/scalding/parquet/tuple/TypedParquetTupleTest.scala
+ val tempCsv = createTempDir("parquet_tuple_test_csv_")
+ val readJobArgs = buildJobArgs(Array("--input", tempParquet, "--output", tempCsv, "--hdfs"))
+ val readSuccess = new ReadFromTypedParquetTupleJob(readJobArgs).run
+ readSuccess shouldEqual true
+
+ //check data correctness
+ val csv = ScalaSource.fromFile(new java.io.File(tempCsv, "part-00000"))
+ csv.getLines().toList shouldEqual Seq("A,1,4.0", "B,2,3.0", "A,2,5.0")
+
+ //clean temporary files generated by the jobs
+ deletePath(tempCsv)
+ deletePath(tempParquet)
+ }
+ }
+
+ def buildJobArgs(argArray: Array[String]): Args = {
@ianoc
ianoc Feb 17, 2015 Collaborator

you should use the platform tests framework for these. None of this code seems too needed/ideal to replicate here?

@JiJiTang
JiJiTang Feb 27, 2015 Contributor

Refacto done, and have used hadoop platfrom tests framework in the lastest commits.

@ianoc ianoc and 1 other commented on an outdated diff Feb 17, 2015
...er/scalding/parquet/tuple/macros/MacroUnitTests.scala
+ | required binary d.y;
+ | required int32 e.a1.x;
+ | required binary e.a1.y;
+ | required int32 e.a2.x;
+ | required binary e.a2.y;
+ | required binary e.y;
+ |}
+ """.stripMargin)
+ schema shouldEqual expectedSchema
+ }
+
+ "Generate parquet schema for SampleClassD" in {
+ val schema = Macros.caseClassParquetSchema[SampleClassD]
+ val expectedSchema = MessageTypeParser.parseMessageType("""
+ |message SampleClassD {
+ | required binary a;
@ianoc
ianoc Feb 17, 2015 Collaborator

Does this cover all the data types? -- are strings always binary? how does the parquet schema types compare against this set?

@JiJiTang
JiJiTang Feb 17, 2015 Contributor

Hi @ianoc, the current version of TupleWriteSupport only supports primitive type. And in its implementation Strings are mapped to binary.

@ianoc
ianoc Feb 17, 2015 Collaborator

Great thanks, looks like here you might want to include Long and Short for completeness?

@JiJiTang
JiJiTang Feb 17, 2015 Contributor

@ianoc, yes exactly. You are right, this is not clear, I will add a test with a case class that contains all the primitive types to make this clear.

@JiJiTang
JiJiTang Feb 17, 2015 Contributor

@ianoc Thank you so much for this detailed review. I will come with commits to fix all the problems you've mentioned.

@ianoc
ianoc Feb 17, 2015 Collaborator

Thank you for this PR, some great work. Nice new macro

@colinmarc

@JiJiTang just to clarify - I actually meant you could replace ParquetTupleScheme with a macro, a lot like this one. Rather than just generating a schema from a case class, you could also generate a parquet RecordMaterializer implementation that reads values directly into case class instances.

Then you wouldn't be limited to primitive types, and you'd be able to support nested structs. And you wouldn't have to define setters/getters for those case classes.

@JiJiTang
Contributor

@colinmarc Good point. And thank you very much for this suggestion. And It's trivial to write a macro to generate RecordMaterializer(as you said this would support nested case classes). But I am not sure we do this in Scalding and this means we skip directly the current ParquetTupleScheme implementation in parquet-mr. Plus this means we close the door for users to provide their own customized parquet schema? Perhaps I get you wrong. And let me think about it.

@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 23, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro support nested group type
   *Add r/w support nested case classes
   *Tests + refacto
b8ad5b5
@JiJiTang
Contributor

Hi @julienledem @ianoc , this weekend I've added code that supports nested case classes read/write with nested parquet message type and also some macro facilities to generate r/w support relative classes. Tuple setter and converter don't need to be defined any more. And tuple Materializer is generated now(directly by manipulating the case class). And generated schema will look like this:

message SampleClassC {
  required group a {
    required int32 x;
    required binary y;
  }
  required group b {
    required group a {
      required int32 x;
      required binary y;
    }
    required binary y;
  }
}

Please check the unit tests for details.
Sorry for this commit with too much changes. And thank you guys very much for your time to review the code. And please let me know your thoughts about this.

And hi @colinmarc , I've tried to define macros which allow to generate all the read/write support classes. But I cannot generate WriteSupport or ReadSupport classes. Because macros doesn't allow to expand class at top level, which means all the generated classes will be inner classes and cannot be instantiated using java reflection in ParquetInputFormat without providing parent class instances. I've tried to simplify as possible the definition of TypedParquet source. Please check the committed unit tests for details and thanks a lot.

@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 24, 2015
@JiJiTang @JiJiTang JiJiTang + JiJiTang Typed Parquet Tuple #1198
   *Macro support nested group type
   *Add r/w support nested case classes
   *Tests + refacto
6f0d55d
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 24, 2015
@JiJiTang @JiJiTang JiJiTang + JiJiTang Typed Parquet Tuple #1198
   *Macro support nested group type
   *Add r/w support nested case classes
   *Tests + refacto
9abf725
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 24, 2015
@JiJiTang @JiJiTang JiJiTang + JiJiTang Typed Parquet Tuple #1198
   *Macro support nested group type
   *Add r/w support nested case classes
   *Tests + refacto
916d943
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 25, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
bf940e0
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 25, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
4b67ac4
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 25, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
cd7bd87
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 26, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
2559402
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 27, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
1d992c3
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 27, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement

(cherry picked from commit 1d992c3)
70eb188
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Feb 27, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
5eabae1
@colinmarc colinmarc commented on an outdated diff Mar 20, 2015
...com/twitter/scalding/parquet/tuple/TypedParquet.scala
+/**
+ * Typed parquet tuple
+ * @author Jian Tang
+ */
+object TypedParquet {
+ /**
+ * Create readable typed parquet source.
+ * Here is an example:
+ *
+ * case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
+ *
+ * class ReadSupport extends ParquetReadSupport[SampleClassB] {
+ * import com.twitter.scalding.parquet.tuple.macros.Macros._
+ * override val tupleConverter: ParquetTupleConverter = caseClassParquetTupleConverter[SampleClassB]
+ * override val rootSchema: String = caseClassParquetSchema[SampleClassB]
+ * }
@colinmarc
colinmarc Mar 20, 2015

You can call a macro inside a macro, so you could actually have the ReadSupport types generated as well.

@colinmarc
colinmarc Mar 20, 2015

Oh, hm, I just saw your comment about this, sorry.

@JiJiTang
Contributor
JiJiTang commented Apr 7, 2015

Could someone please have a look at this PR?

@julienledem julienledem and 1 other commented on an outdated diff Apr 7, 2015
...om/twitter/scalding/parquet/tuple/macros/Macros.scala
@@ -0,0 +1,50 @@
+package com.twitter.scalding.parquet.tuple.macros
+
+import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetSchemaProvider, ParquetTupleConverterProvider, WriteSupportProvider }
+import com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter
+import parquet.io.api.RecordConsumer
+import parquet.schema.MessageType
+
+import scala.language.experimental.macros
+
+object Macros {
@julienledem
julienledem Apr 7, 2015 Contributor

none of those macros depend on scalding and could be in the parquet project.
You could use them with Spark for example.

@JiJiTang
JiJiTang Apr 12, 2015 Contributor

hi @julienledem , I will commit a PR in parquet project once it comes to release then I will commit a PR in scalding to delete these macros. Right now could we let these macros stay in Scalding?

@julienledem julienledem commented on an outdated diff Apr 7, 2015
...tuple/macros/impl/ParquetTupleConverterProvider.scala
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.ShortConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Int] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.IntConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Long] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.LongConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Float] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.FloatConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Double] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.DoubleConverter($idx, this, $isOption)"))
+ case tpe if tpe.erasure =:= typeOf[Option[Any]] =>
+ val innerType = tpe.asInstanceOf[TypeRefApi].args.head
+ matchField(idx, innerType, isOption = true)
+ case tpe if IsCaseClassImpl.isCaseClassType(ctx)(tpe) =>
+ val innerConverterTrees = buildConverterBody(tpe, expandMethod(tpe))
+ List(createConverter(buildGroupConverter(tpe, q"Option(this)", isOption, idx, innerConverterTrees)))
+ case _ => ctx.abort(ctx.enclosingPosition, s"Case class $T is not pure primitives or nested case classes")
@julienledem
julienledem Apr 7, 2015 Contributor

You should mention in the doc that the support for lists is not implemented yet.

@ianoc ianoc commented on an outdated diff Apr 7, 2015
project/Build.scala
@@ -296,9 +296,12 @@ object ScaldingBuild extends Build {
exclude("com.twitter.elephantbird", "elephant-bird-core"),
"org.apache.thrift" % "libthrift" % "0.7.0",
"org.slf4j" % "slf4j-api" % slf4jVersion,
- "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided"
- )
- ).dependsOn(scaldingCore)
+ "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
+ "org.scala-lang" % "scala-reflect" % scalaVersion,
+ "com.twitter" %% "bijection-macros" % bijectionVersion
+ ) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % "2.0.1") else Seq())
@ianoc
ianoc Apr 7, 2015 Collaborator

can you lift the quasiquotes version up the top of the file here and in the other usages

@julienledem julienledem commented on an outdated diff Apr 7, 2015
...tuple/macros/impl/ParquetTupleConverterProvider.scala
+ }
+
+ def matchField(idx: Int, fieldType: Type, isOption: Boolean): List[Tree] = {
+
+ def createConverter(converter: Tree): Tree = q"if($idx == i) return $converter"
+
+ fieldType match {
+ case tpe if tpe =:= typeOf[String] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.StringConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Boolean] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.BooleanConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Short] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.ShortConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Int] =>
+ List(createConverter(q"new _root_.com.twitter.scalding.parquet.tuple.scheme.IntConverter($idx, this, $isOption)"))
+ case tpe if tpe =:= typeOf[Long] =>
@julienledem
julienledem Apr 7, 2015 Contributor

and Byte?

@julienledem julienledem and 1 other commented on an outdated diff Apr 7, 2015
.../parquet/tuple/macros/impl/WriteSupportProvider.scala
+ either it is not a case class or this macro call is possibly enclosed in a class.
+ This will mean the macro is operating on a non-resolved type.""")
+
+ def matchField(idx: Int, fieldType: Type, fValue: Tree, groupName: TermName): (Int, Tree) = {
+ def writePrimitiveField(wTree: Tree) =
+ (idx + 1, q"""rc.startField($groupName.getFieldName($idx), $idx)
+ $wTree
+ rc.endField($groupName.getFieldName($idx), $idx)""")
+
+ def writeGroupField(subTree: Tree) =
+ q"""rc.startGroup()
+ rc.startField($groupName.getFieldName($idx), $idx)
+ $subTree
+ rc.endField($groupName.getFieldName($idx), $idx)
+ rc.endGroup()
+ """
@julienledem
julienledem Apr 7, 2015 Contributor

Am I missing something or should the startField/endField calls be around the startGroup/endGroup and not the other way around?

@JiJiTang
JiJiTang Apr 12, 2015 Contributor

fixed in the latest commits

@ianoc
Collaborator
ianoc commented Apr 7, 2015

Sorry about the delay looking at this. Looks good to me to me. @johnynek any thoughts before good to go?

@julienledem julienledem and 1 other commented on an outdated diff Apr 7, 2015
...tuple/macros/impl/ParquetTupleConverterProvider.scala
+ if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
+ ctx.abort(ctx.enclosingPosition,
+ s"""We cannot enforce ${T.tpe} is a case class,
+ either it is not a case class or this macro call is possibly enclosed in a class.
+ This will mean the macro is operating on a non-resolved type.""")
+
+ def buildGroupConverter(tpe: Type, parentTree: Tree, isOption: Boolean, idx: Int, converterBodyTree: Tree): Tree = {
+ q"""new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter($parentTree, $isOption, $idx){
+ override def newConverter(i: Int): _root_.parquet.io.api.Converter = {
+ $converterBodyTree
+ throw new RuntimeException("invalid index: " + i)
+ }
+
+ override def createValue(): Any = {
+ if(fieldValues.isEmpty) null
+ else classOf[$tpe].getConstructors()(0).newInstance(fieldValues.toSeq.map(_.asInstanceOf[AnyRef]): _*)
@julienledem
julienledem Apr 7, 2015 Contributor

we should be able to do better than creating through reflection. After all it's a macro.

@JiJiTang
JiJiTang Apr 12, 2015 Contributor

Fixed in the latest commit

@julienledem julienledem and 1 other commented on an outdated diff Apr 7, 2015
...ding/parquet/tuple/scheme/ParquetTupleConverter.scala
+
+import parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter }
+
+import scala.collection.mutable
+import scala.util.Try
+
+/**
+ * Parquet tuple converter used to create user defined tuple value from parquet column values
+ * @param parent parent parquet tuple converter
+ * @param isOption is the field optional
+ * @param outerIndex field index in parent tuple schema
+ */
+abstract class ParquetTupleConverter(val parent: Option[ParquetTupleConverter] = None, val isOption: Boolean = false,
+ val outerIndex: Int = -1) extends GroupConverter {
+ var converters: Map[Int, Converter] = Map()
+ val fieldValues: mutable.ArrayBuffer[Any] = mutable.ArrayBuffer()
@julienledem
julienledem Apr 7, 2015 Contributor

The downside of this is boxing primitives.
temporarily storing the value before creating the case class instance could be delegated to the PrimitiveConverter children. Each can have a field of the right type. There's no need to store it as an Option as we will know to wrap it at the time of creation of the case class instance. Option(value)

@JiJiTang
JiJiTang Apr 12, 2015 Contributor

Hi @julienledem , thanks a lot for your good advice, this is improved in the latest commit, please check.

@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Apr 12, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Add Byte type support
893a086
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Apr 12, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Improve tuple converter macro(delete unnecessary boxing)
b13934e
@JiJiTang
Contributor

hi @ianoc @julienledem , thank you guys so much for the review. Check my latest two commits please and tell me if there's something not ready to go(Code is rebased so that it could be merged).

JiJiTang added some commits Feb 14, 2015
@JiJiTang @JiJiTang JiJiTang Typed Parquet Tuple
  * Used as sink to write tuple with primitive fields
  * Create Parquet schema generation macro
aa64868
@JiJiTang JiJiTang Typed Parquet Tuple
   *Refacto unit test by using platform test
   *Refacto macro code

(cherry picked from commit 40cd1eb)
97c1980
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro support nested group type
   *Add r/w support nested case classes
   *Tests + refacto
8dbef39
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro write support improvement
904858a
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Add Byte type support
2782288
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Apr 12, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Improve tuple converter macro(delete unnecessary boxing)
97dd384
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Apr 12, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Improve tuple converter macro(delete unnecessary boxing)
f9af7de
@johnynek johnynek commented on the diff Apr 13, 2015
...ding/parquet/tuple/scheme/ParquetTupleConverter.scala
@@ -0,0 +1,142 @@
+package com.twitter.scalding.parquet.tuple.scheme
+
+import parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter }
+import scala.util.Try
+
+trait TupleFieldConverter[+T] extends Converter {
@johnynek
johnynek Apr 13, 2015 Collaborator

what does this type do? It is unclear to mo. Why use currentValue and hasValue, not Option[T].

@johnynek
johnynek Apr 13, 2015 Collaborator

I guess I'm asking two things: 1) can you add a comment to this type. 2) why not use Option rather than two methods.

@JiJiTang
JiJiTang Apr 14, 2015 Contributor

Hi @johnynek thanks a lot for the review. This trait is here to model parquet tuple field reading converter. As a field in parquet tuple can be "REQUIRED" and also "OPTIONAL". Not using Option[T] is to avoid Option boxing overhead for those fields that are required(Advice from @julienledem ). "hasValue" is there to tell if a optional field has a value or not when reading from parquet. For those optional fields, they will be boxed into Option at the moment parent tuple is created, as in macro implementation we know that if field is a option type or not.

@JiJiTang
JiJiTang Apr 16, 2015 Contributor

Hi @johnynek , I've added some comments for the trait. Is that ok for you? Please let me know your thoughts.

@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Improve tuple converter macro(delete unnecessary boxing)
f7ad7a7
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Apr 15, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
   *Macro support list collection type
428c07b
@JiJiTang JiJiTang Improve tuple converter macro
   *use two different classes for modeling required and optional field converter
   *delete unnecessary class cast, type all the field converter class
5f06144
@JiJiTang JiJiTang added a commit to JiJiTang/scalding that referenced this pull request Apr 21, 2015
@JiJiTang JiJiTang Typed Parquet Tuple #1198
    * Add macro support for collection types(LIST, SET, MAP)
80996b5
@JiJiTang @JiJiTang JiJiTang Typed Parquet Tuple #1198
    * Add macro support for collection types(LIST, SET, MAP)
536bd0c
@JiJiTang
Contributor

Hi guys, I have added a commit to make the macros support collection fields types (LIST, SET, MAP), please review my latest commit.

@JiJiTang
Contributor
JiJiTang commented May 8, 2015

Could someone please check the latest commits for managing collection type?

@ianoc ianoc merged commit 019ec80 into twitter:develop May 14, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@ianoc
Collaborator
ianoc commented May 14, 2015

@JiJiTang sorry about the delay getting this merged. Those latest commits look ok to me. I think if there are more things to be done should be done as a follow up PR. This was huge. Its great work and an awesome addition. Many thanks

@JiJiTang
Contributor

Hi @ianoc, thank you so much for the code review and merging this PR. And also many thanks to @julienledem @colinmarc @johnynek for your reviews and good advices. I will follow this PR for future updates.

@johnynek johnynek commented on the diff Jun 10, 2015
.../parquet/tuple/macros/impl/WriteSupportProvider.scala
+ q"""rc.startField($groupName.getFieldName($idx), $idx)
+ rc.startGroup()
+ $subTree
+ rc.endGroup()
+ rc.endField($groupName.getFieldName($idx), $idx)
+ """
+ def writeCollectionField(elementGroupName: TermName, subTree: Tree) =
+ writeGroupField(q"""if(!$fValue.isEmpty) {
+ val $elementGroupName = $groupName.getType($idx).asGroupType.getType(0).asGroupType
+ $subTree
+ }
+ """)
+
+ fieldType match {
+ case tpe if tpe =:= typeOf[String] =>
+ writePrimitiveField(q"rc.addBinary(Binary.fromString($fValue))")
@johnynek
johnynek Jun 10, 2015 Collaborator

this macro is not hygenic. We need the fully qualified path here.

@JiJiTang
JiJiTang Jun 11, 2015 Contributor

Hi @johnynek , this is fixed in this commit: JiJiTang@60e7390 merged with PR #1303

@coveralls

Coverage Status

Changes Unknown when pulling 536bd0c on JiJiTang:develop into ** on twitter:develop**.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment