From ff4df2f4423059bf796456237d496ebf3becc3fa Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Mon, 22 Sep 2014 19:09:27 -0700 Subject: [PATCH 1/5] Add parquet-scrooge sources --- project/Build.scala | 12 +++ scalding-parquet-scrooge/README.md | 3 + .../parquet/scrooge/ParquetScrooge.scala | 30 +++++++ .../parquet/scrooge/ParquetScroogeTests.scala | 68 +++++++++++++++ scalding-parquet/README.md | 6 +- .../parquet/thrift/ParquetThrift.scala | 19 ++-- .../parquet/ParquetSourcesTests.scala | 87 ++++++++++--------- 7 files changed, 175 insertions(+), 50 deletions(-) create mode 100644 scalding-parquet-scrooge/README.md create mode 100644 scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala create mode 100644 scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/ParquetScroogeTests.scala diff --git a/project/Build.scala b/project/Build.scala index 8311075156..9c836141fe 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -266,6 +266,18 @@ object ScaldingBuild extends Build { "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" ) ).dependsOn(scaldingCore) + + lazy val scaldingParquetScrooge = module("parquet-scrooge").settings( + libraryDependencies ++= Seq( + "com.twitter" % "parquet-cascading" % "1.6.0rc2", + "com.twitter" %% "parquet-scrooge" % "1.6.0rc2", + "org.slf4j" % "slf4j-api" % slf4jVersion, + "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided", + "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", + "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" + ) + ).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test") lazy val scaldingHRaven = module("hraven").settings( libraryDependencies ++= Seq( diff --git a/scalding-parquet-scrooge/README.md b/scalding-parquet-scrooge/README.md new file mode 100644 index 0000000000..1dc0007f6c --- /dev/null +++ b/scalding-parquet-scrooge/README.md @@ -0,0 +1,3 @@ +# Parquet-Scrooge support for Scalding + +This module has sources for reading scrooge-generated thrift structs. See the scalding-parquet module for reading apache-thrift (TBase) generated thrift structs. diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala new file mode 100644 index 0000000000..532a97342b --- /dev/null +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala @@ -0,0 +1,30 @@ +package com.twitter.scalding.parquet.scrooge + +import _root_.parquet.scrooge.ParquetScroogeScheme +import cascading.scheme.Scheme +import com.twitter.scalding._ +import com.twitter.scalding.parquet.thrift.ParquetThriftBase +import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource } +import com.twitter.scrooge.ThriftStruct + +trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBase[T] { + + override def hdfsScheme = { + val scheme = new ParquetScroogeScheme[T](this.config) + HadoopSchemeInstance(scheme.asInstanceOf[Scheme[_, _, _, _, _]]) + } + +} + +class DailySuffixParquetScrooge[T <: ThriftStruct]( + path: String, + dateRange: DateRange)(implicit override val mf: Manifest[T]) + extends DailySuffixSource(path, dateRange) with ParquetScrooge[T] + +class HourlySuffixParquetScrooge[T <: ThriftStruct]( + path: String, + dateRange: DateRange)(implicit override val mf: Manifest[T]) + extends HourlySuffixSource(path, dateRange) with ParquetScrooge[T] + +class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val mf: Manifest[T]) + extends FixedPathSource(paths: _*) with ParquetScrooge[T] diff --git a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/ParquetScroogeTests.scala b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/ParquetScroogeTests.scala new file mode 100644 index 0000000000..381f107301 --- /dev/null +++ b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/ParquetScroogeTests.scala @@ -0,0 +1,68 @@ +package com.twitter.scalding.parquet.scrooge + +import com.twitter.scalding.parquet.ParquetSourcesTestsBase +import com.twitter.scrooge.ThriftStruct +import org.apache.thrift.protocol.TProtocol +import parquet.filter2.predicate.FilterPredicate + +class ParquetScroogeTests extends ParquetSourcesTestsBase { + + "DailySuffixParquetScrooge" should { + val default = new DailySuffixParquetScrooge[MockThriftStruct](path, dateRange) + + testDefaultFilter(default) + + testReturnProvidedFilter( + new DailySuffixParquetScrooge[MockThriftStruct](path, dateRange) { + override val withFilter: Option[FilterPredicate] = Some(filter1) + }) + + testDefaultColumns(default) + + testReturnProvidedColumns( + new DailySuffixParquetScrooge[MockThriftStruct](path, dateRange) { + override def withColumns: Set[String] = columnStrings + }) + } + + "HourlySuffixParquetScrooge" should { + val default = new HourlySuffixParquetScrooge[MockThriftStruct](path, dateRange) + + testDefaultFilter(default) + + testReturnProvidedFilter( + new HourlySuffixParquetScrooge[MockThriftStruct](path, dateRange) { + override val withFilter: Option[FilterPredicate] = Some(filter1) + }) + + testDefaultColumns(default) + + testReturnProvidedColumns( + new HourlySuffixParquetScrooge[MockThriftStruct](path, dateRange) { + override def withColumns: Set[String] = columnStrings + }) + } + + "FixedPathParquetScrooge" should { + val default = new FixedPathParquetScrooge[MockThriftStruct](path, path, path) + + testDefaultFilter(default) + + testReturnProvidedFilter( + new FixedPathParquetScrooge[MockThriftStruct](path, path, path) { + override val withFilter: Option[FilterPredicate] = Some(filter1) + }) + + testDefaultColumns(default) + + testReturnProvidedColumns( + new FixedPathParquetScrooge[MockThriftStruct](path, path, path) { + override def withColumns: Set[String] = columnStrings + }) + } + +} + +class MockThriftStruct extends ThriftStruct { + override def write(oprot: TProtocol): Unit = () +} \ No newline at end of file diff --git a/scalding-parquet/README.md b/scalding-parquet/README.md index 5435375136..95200de7f2 100644 --- a/scalding-parquet/README.md +++ b/scalding-parquet/README.md @@ -1,5 +1,7 @@ # Parquet support for Scalding The implementation is ported from code used by Twitter internally written by Sam Ritchie, Ian O'Connell, Oscar Boykin, Tianshuo Deng -## Use com.twitter.scalding.parquet.thrift for reading Thrift records -## Use com.twitter.tuple for reading Tuple records \ No newline at end of file +## Use com.twitter.scalding.parquet.thrift for reading apache Thrift (TBase) records +## Use com.twitter.scalding.parquet.scrooge for reading scrooge Thrift (ThriftStruct) records + Located in the scalding-parquet-scrooge module +## Use com.twitter.scalding.parquet.tuple for reading Tuple records \ No newline at end of file diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala index c37c81aca7..bdd9578e1a 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala @@ -28,12 +28,11 @@ object ParquetThrift extends Serializable { type ThriftBase = TBase[_ <: TBase[_, _], _ <: TFieldIdEnum] } -trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends FileSource - with SingleMappable[T] with TypedSink[T] with LocalTapSource with HasFilterPredicate with HasColumnProjection { +trait ParquetThriftBase[T] extends FileSource with SingleMappable[T] with TypedSink[T] with LocalTapSource with HasFilterPredicate with HasColumnProjection { def mf: Manifest[T] - override def hdfsScheme = { + def config: ParquetValueScheme.Config[T] = { val config = new ParquetValueScheme.Config[T].withRecordClass(mf.erasure.asInstanceOf[Class[T]]) @@ -47,15 +46,23 @@ trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends FileSource case None => configWithFp } - val scheme = new ParquetTBaseScheme[T](configWithProjection) - - HadoopSchemeInstance(scheme.asInstanceOf[Scheme[_, _, _, _, _]]) + // TODO: remove asInstanceOf after the fix ships in parqet-mr + configWithProjection.asInstanceOf[ParquetValueScheme.Config[T]] } override def setter[U <: T] = TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T]) } +trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBase[T] { + + override def hdfsScheme = { + val scheme = new ParquetTBaseScheme[T](this.config) + HadoopSchemeInstance(scheme.asInstanceOf[Scheme[_, _, _, _, _]]) + } + +} + /** * When Using these sources or creating subclasses of them, you can * provide a filter predicate and / or a set of fields (columns) to keep (project). diff --git a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/ParquetSourcesTests.scala b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/ParquetSourcesTests.scala index 4510af2503..4ecb729faa 100644 --- a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/ParquetSourcesTests.scala +++ b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/ParquetSourcesTests.scala @@ -1,17 +1,17 @@ package com.twitter.scalding.parquet import cascading.tuple.Fields -import com.twitter.scalding.parquet.thrift.{DailySuffixParquetThrift, FixedPathParquetThrift, HourlySuffixParquetThrift} -import com.twitter.scalding.parquet.tuple.{DailySuffixParquetTuple, FixedPathParquetTuple, HourlySuffixParquetTuple} -import com.twitter.scalding.{DateRange, RichDate, Source} -import java.lang.{Integer => JInt} +import com.twitter.scalding.parquet.thrift.{ DailySuffixParquetThrift, FixedPathParquetThrift, HourlySuffixParquetThrift } +import com.twitter.scalding.parquet.tuple.{ DailySuffixParquetTuple, FixedPathParquetTuple, HourlySuffixParquetTuple } +import com.twitter.scalding.{ DateRange, RichDate, Source } +import java.lang.{ Integer => JInt } import org.apache.thrift.protocol.TProtocol -import org.apache.thrift.{TBase, TFieldIdEnum} +import org.apache.thrift.{ TBase, TFieldIdEnum } import org.specs.Specification import parquet.filter2.predicate.FilterApi._ -import parquet.filter2.predicate.{FilterApi, FilterPredicate} +import parquet.filter2.predicate.{ FilterApi, FilterPredicate } -class ParquetSourcesTests extends Specification { +abstract class ParquetSourcesTestsBase extends Specification { val dateRange = DateRange(RichDate(0L), RichDate(0L)) val path = "/a/path" @@ -24,6 +24,44 @@ class ParquetSourcesTests extends Specification { val columnStrings = Set("a", "b", "c") + def testDefaultFilter[S <: Source with HasFilterPredicate](src: S) = { + "default to no filter predicate" in { + src.withFilter must be equalTo None + } + } + + def testReturnProvidedFilter[S <: Source with HasFilterPredicate](src: S) = { + "return the provided filter" in { + src.withFilter must be equalTo Some(filter1) + } + } + + def testDefaultColumns[S <: Source with HasColumnProjection](src: S) = { + + "default to no column projection" in { + src.columnGlobs must beEmpty + src.globsInParquetStringFormat must be equalTo None + } + } + + def testReturnProvidedColumns[S <: Source with HasColumnProjection](src: S) = { + "return the provided columns" in { + src.columnGlobs must be equalTo columns + } + + "correctly format globs into parquet's expected format" in { + verifyParquetStringFormat(src.globsInParquetStringFormat.get, Set("a", "b", "c")) + } + } + + private def verifyParquetStringFormat(s: String, expected: Set[String]) = { + s.split(";").toSet must be equalTo expected + } + +} + +class ParquetSourcesTests extends ParquetSourcesTestsBase { + "DailySuffixParquetThrift" should { val default = new DailySuffixParquetThrift[MockTBase](path, dateRange) @@ -110,41 +148,6 @@ class ParquetSourcesTests extends Specification { override val withFilter: Option[FilterPredicate] = Some(filter1) }) } - - def testDefaultFilter[S <: Source with HasFilterPredicate](src: S) = { - "default to no filter predicate" in { - src.withFilter must be equalTo None - } - } - - def testReturnProvidedFilter[S <: Source with HasFilterPredicate](src: S) = { - "return the provided filter" in { - src.withFilter must be equalTo Some(filter1) - } - } - - def testDefaultColumns[S <: Source with HasColumnProjection](src: S) = { - - "default to no column projection" in { - src.columnGlobs must beEmpty - src.globsInParquetStringFormat must be equalTo None - } - } - - def testReturnProvidedColumns[S <: Source with HasColumnProjection](src: S) = { - "return the provided columns" in { - src.columnGlobs must be equalTo columns - } - - "correctly format globs into parquet's expected format" in { - verifyParquetStringFormat(src.globsInParquetStringFormat.get, Set("a", "b", "c")) - } - } - - private def verifyParquetStringFormat(s: String, expected: Set[String]) = { - s.split(";").toSet must be equalTo expected - } - } class MockTBase extends TBase[MockTBase, TFieldIdEnum] { From 364c9f0561371234e9bc9a1b6ab6b75c9189ed02 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Tue, 23 Sep 2014 14:31:57 -0700 Subject: [PATCH 2/5] Add parquet scrooge module to aggregate project so that its tests are run on the CI --- project/Build.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/Build.scala b/project/Build.scala index 9c836141fe..9bd7696198 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -156,6 +156,7 @@ object ScaldingBuild extends Build { scaldingCommons, scaldingAvro, scaldingParquet, + scaldingParquetScrooge, scaldingHRaven, scaldingRepl, scaldingJson, From 17f7af3aa0cc63547d9a0ca29b867fb1f4cce4fe Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Tue, 23 Sep 2014 19:37:06 -0700 Subject: [PATCH 3/5] Make parquet-scrooge 2.10 only --- project/Build.scala | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 9bd7696198..0dd6cf95a9 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -12,6 +12,12 @@ import com.typesafe.sbt.SbtScalariform._ import scala.collection.JavaConverters._ object ScaldingBuild extends Build { + + def scalaBinaryVersion(scalaVersion: String) = scalaVersion match { + case version if version startsWith "2.9" => "2.9" + case version if version startsWith "2.10" => "2.10" + } + val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies") val sharedSettings = Project.defaultSettings ++ assemblySettings ++ scalariformSettings ++ Seq( @@ -268,16 +274,26 @@ object ScaldingBuild extends Build { ) ).dependsOn(scaldingCore) + def scaldingParquetScroogeDeps(version: String) = { + if (scalaBinaryVersion(version) == "2.9") + Seq() + else + Seq( + "com.twitter" % "parquet-cascading" % "1.6.0rc2", + "com.twitter" %% "parquet-scrooge" % "1.6.0rc2", + "org.slf4j" % "slf4j-api" % slf4jVersion, + "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided", + "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", + "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" + ) + } + lazy val scaldingParquetScrooge = module("parquet-scrooge").settings( - libraryDependencies ++= Seq( - "com.twitter" % "parquet-cascading" % "1.6.0rc2", - "com.twitter" %% "parquet-scrooge" % "1.6.0rc2", - "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided", - "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test", - "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", - "org.scala-tools.testing" %% "specs" % "1.6.9" % "test" - ) + skip in compile := scalaBinaryVersion(scalaVersion.value) == "2.9", + skip in test := scalaBinaryVersion(scalaVersion.value) == "2.9", + publishArtifact := !(scalaBinaryVersion(scalaVersion.value) == "2.9"), + libraryDependencies ++= scaldingParquetScroogeDeps(scalaVersion.value) ).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test") lazy val scaldingHRaven = module("hraven").settings( From f2d0fbe4f211fa3c1f77843b0c0aad0934056367 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Tue, 23 Sep 2014 19:58:04 -0700 Subject: [PATCH 4/5] Add parquet-scrooge to travis.yml --- .travis.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 40eb70c11a..eacad17bd5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,7 @@ matrix: - scala: 2.10.4 env: BUILD="base" TEST_TARGET="scalding-hadoop-test" script: "scripts/run_test.sh" - + - scala: 2.10.4 env: BUILD="base" TEST_TARGET="scalding-hraven" script: "scripts/run_test.sh" @@ -56,6 +56,10 @@ matrix: env: BUILD="base" TEST_TARGET="scalding-parquet" script: "scripts/run_test.sh" + - scala: 2.10.4 + env: BUILD="base" TEST_TARGET="scalding-parquet-scrooge" + script: "scripts/run_test.sh" + - scala: 2.10.4 env: BUILD="base" TEST_TARGET="scalding-repl" script: "scripts/run_test.sh" @@ -100,6 +104,10 @@ matrix: env: BUILD="base" TEST_TARGET="scalding-parquet" script: "scripts/run_test.sh" + - scala: 2.9.3 + env: BUILD="base" TEST_TARGET="scalding-parquet-scrooge" + script: "scripts/run_test.sh" + - scala: 2.9.3 env: BUILD="base" TEST_TARGET="scalding-repl" script: "scripts/run_test.sh" From 5d9905120b7ca6d4ce800963155a99685fb8cb5e Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Tue, 23 Sep 2014 19:59:56 -0700 Subject: [PATCH 5/5] rm scala 2.9 parquet-scrooge tests from travis.yml --- .travis.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index eacad17bd5..20ac631d63 100644 --- a/.travis.yml +++ b/.travis.yml @@ -104,10 +104,6 @@ matrix: env: BUILD="base" TEST_TARGET="scalding-parquet" script: "scripts/run_test.sh" - - scala: 2.9.3 - env: BUILD="base" TEST_TARGET="scalding-parquet-scrooge" - script: "scripts/run_test.sh" - - scala: 2.9.3 env: BUILD="base" TEST_TARGET="scalding-repl" script: "scripts/run_test.sh"