LzoGenericScheme and Source #1268

Merged
merged 13 commits into from May 20, 2015

Projects

None yet

5 participants

@rubanm
Collaborator
rubanm commented May 1, 2015

This adds generic Scheme/Source classes for data stored as lzo-compressed protobuf blocks. Taken from @ianoc's old branch + added changes to support writes.

Depends on twitter/elephant-bird#439 and twitter/elephant-bird#440

@rangadi rangadi commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+ Injection.connect[Externalizer[_], Array[Byte], GZippedBase64String, String]
+ }
+}
+
+private[source] object ConfigBinaryConverterProvider {
+ val ProviderConfKey = "com.twitter.scalding.lzo.converter.provider"
+}
+
+/**
+ * Provides BinaryConverter serialized in JobConf.
+ */
+private[source] class ConfigBinaryConverterProvider[M] extends BinaryConverterProvider[M] {
+ import ConfigBinaryConverterProvider._
+ override def getConverter(conf: Configuration): BinaryConverter[M] = {
+ val data = conf.get(ProviderConfKey)
+ require(data != null, s"No data in field $ProviderConfKey")
@rangadi
rangadi May 6, 2015

"$providerConfKey is not set in configuration"

@rangadi rangadi commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+ new GenericWritable(conv)
+
+ override def sourceConfInit(fp: FlowProcess[JobConf],
+ tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
+ conf: JobConf): Unit = {
+
+ val extern = Externalizer(conv)
+ try {
+ ExternalizerSerializer.inj.invert(ExternalizerSerializer.inj(extern)).get
+ } catch {
+ case e: Exception => throw new RuntimeException("Unable to roundtrip the BinaryConverter in the Externalizer.", e)
+ }
+
+ conf.set(ConfigBinaryConverterProvider.ProviderConfKey, ExternalizerSerializer.inj(extern))
+
+ MultiInputFormat.setClassConf(conv.getClass, conf)
@rangadi
rangadi May 6, 2015

conv.getClass would be "BinaryConverter", not the type parameter for ScroogeBinaryConverter, right? it does not matter except for stats, but just checking.

@rangadi rangadi and 1 other commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+ MultiInputFormat.setGenericConverterClassConf(classOf[ConfigBinaryConverterProvider[_]], conf)
+
+ DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]])
+ }
+
+ override def sinkConfInit(fp: FlowProcess[JobConf],
+ tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
+ conf: JobConf): Unit = {
+ val extern = Externalizer(conv)
+ try {
+ ExternalizerSerializer.inj.invert(ExternalizerSerializer.inj(extern)).get
+ } catch {
+ case e: Exception => throw new RuntimeException("Unable to roundtrip the BinaryConverter in the Externalizer.", e)
+ }
+
+ LzoGenericBlockOutputFormat.setClassConf(implicitly[ClassTag[M]].runtimeClass, conf)
@rangadi
rangadi May 6, 2015

would 'implicitly[ClassTag[M]].runtimeClass' be the thrift class? Can we use that in sourceConfInit too? This will help with stats and logging (where we would see the actual thrift class)

@rubanm
rubanm May 6, 2015 Collaborator

Fixed in the read path. Thanks.

@rangadi
rangadi commented May 6, 2015

+1

@johnynek johnynek and 1 other commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/BinaryConverters.scala
+
+/*
+ * Common BinaryConverters to be used with GenericSource / GenericScheme.
+ */
+
+case object IdentityBinaryConverter extends BinaryConverter[Array[Byte]] {
+ override def fromBytes(messageBuffer: Array[Byte]) = messageBuffer
+ override def toBytes(message: Array[Byte]) = message
+}
+
+object ScroogeBinaryConverter {
+ def apply[T <: ThriftStruct: ClassTag]: BinaryConverter[T] = {
+ val ct = implicitly[ClassTag[T]]
+ new BinaryConverter[T] {
+ val serializer = BinaryThriftStructSerializer[T] {
+ val companionClass = Class.forName(ct.runtimeClass.getName + "$")
@rubanm
rubanm May 6, 2015 Collaborator

could have re-used that code if the codec methods were public. What do you think? Or maybe add support for BinaryThriftStructSerializer there.

Not sure if we're planning a chill release. I can copy over the code for union here for now.

@johnynek johnynek commented on an outdated diff May 6, 2015
.../twitter/scalding/commons/source/LzoCodecSource.scala
@@ -30,6 +30,6 @@ object LzoCodecSource {
val hdfsPaths = paths
val localPath = { assert(paths.size == 1, "Cannot use multiple input files on local mode"); paths(0) }
val boxed = Externalizer(passedInjection)
- override def injection = boxed.get
+ override lazy val injection = boxed.get
@johnynek
johnynek May 6, 2015 Collaborator

Externalizer is already doing the equivalent of lazy, isn't it? Do we need it twice?

@johnynek johnynek commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+import cascading.flow.FlowProcess
+
+/**
+ * Serializes BinaryConverters to JobConf.
+ */
+private[source] object ExternalizerSerializer {
+ val inj: Injection[Externalizer[_], String] = {
+ import com.twitter.bijection.Inversion.attemptWhen
+ import com.twitter.bijection.codec.Base64
+
+ implicit val baseInj = JavaSerializationInjection[Externalizer[_]]
+
+ implicit val unwrap: Injection[GZippedBase64String, String] =
+ new AbstractInjection[GZippedBase64String, String] {
+ override def apply(gzbs: GZippedBase64String) = gzbs.str
+ override def invert(str: String) = attemptWhen(str)(Base64.isBase64)(GZippedBase64String(_))
@johnynek
johnynek May 6, 2015 Collaborator

this is not super precise. It could be Base64, but not compressed, but it's fine, I think we'll catch it in the decompression Injection.

@johnynek johnynek commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+import com.twitter.elephantbird.mapreduce.io.{ BinaryConverter, GenericWritable }
+import com.twitter.elephantbird.mapreduce.input.{ BinaryConverterProvider, MultiInputFormat }
+import com.twitter.elephantbird.mapreduce.output.LzoGenericBlockOutputFormat
+import com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper
+
+import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader }
+import org.apache.hadoop.conf.Configuration
+
+import cascading.tap.Tap
+import cascading.flow.FlowProcess
+
+/**
+ * Serializes BinaryConverters to JobConf.
+ */
+private[source] object ExternalizerSerializer {
+ val inj: Injection[Externalizer[_], String] = {
@johnynek
johnynek May 6, 2015 Collaborator

why not make this:

def inj[T]: Injection[Externalizer[T], String] = { ...

so we don't have to deal with ugly _ issues. Should work fine, right?

@johnynek johnynek commented on an outdated diff May 6, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+}
+
+private[source] object ConfigBinaryConverterProvider {
+ val ProviderConfKey = "com.twitter.scalding.lzo.converter.provider"
+}
+
+/**
+ * Provides BinaryConverter serialized in JobConf.
+ */
+private[source] class ConfigBinaryConverterProvider[M] extends BinaryConverterProvider[M] {
+ import ConfigBinaryConverterProvider._
+ override def getConverter(conf: Configuration): BinaryConverter[M] = {
+ val data = conf.get(ProviderConfKey)
+ require(data != null, s"$ProviderConfKey is not set in configuration")
+
+ val extern: Externalizer[_] = ExternalizerSerializer.inj.invert(data).get
@johnynek
johnynek May 6, 2015 Collaborator

we could avoid the cast here if we put the type on inj above.

@rubanm
Collaborator
rubanm commented May 12, 2015

Merged develop. This is now green.

@johnynek addressed your comments. Could you take another look, thanks.

@johnynek johnynek commented on an outdated diff May 13, 2015
...witter/scalding/commons/source/LzoGenericSource.scala
+
+/**
+ * Generic source with an underlying GenericScheme that uses the supplied BinaryConverter.
+ */
+abstract class LzoGenericSource[T: ClassTag] extends FileSource with SingleMappable[T] with TypedSink[T] with LocalTapSource {
+ def conv: BinaryConverter[T]
+ override def setter[U <: T] = TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T])
+ override def hdfsScheme = HadoopSchemeInstance((new LzoGenericScheme[T](conv)).asInstanceOf[Scheme[_, _, _, _, _]])
+}
+
+object LzoGenericSource {
+ def apply[T: ClassTag](passedConv: BinaryConverter[T], paths: String*) =
+ new LzoGenericSource[T] {
+ override val conv: BinaryConverter[T] = passedConv
+ override val hdfsPaths = paths
+ override val localPaths = { assert(paths.size == 1, "Cannot use multiple input files on local mode"); paths }
@johnynek
johnynek May 13, 2015 Collaborator

I don't think this is needed anymore. This was fixed. These is no need for the assert, is there?

@johnynek johnynek commented on an outdated diff May 13, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+}
+
+private[source] object ConfigBinaryConverterProvider {
+ val ProviderConfKey = "com.twitter.scalding.lzo.converter.provider"
+}
+
+/**
+ * Provides BinaryConverter serialized in JobConf.
+ */
+private[source] class ConfigBinaryConverterProvider[M] extends BinaryConverterProvider[M] {
+ import ConfigBinaryConverterProvider._
+ override def getConverter(conf: Configuration): BinaryConverter[M] = {
+ val data = conf.get(ProviderConfKey)
+ require(data != null, s"$ProviderConfKey is not set in configuration")
+
+ val extern = ExternalizerSerializer.inj.invert(data).get
@johnynek
johnynek May 13, 2015 Collaborator

we often cache things like this if it happens a lot (and on hadoop, such things often happen a lot). Something like:

private[this] var last: Option[(String, BinaryConverter[M])] = None
last match {
  case Some((k, bc)) if k == data => bc
  case _ => // do the decode and reset last.
}
@rubanm
Collaborator
rubanm commented May 16, 2015

Merged in develop. This is now green again.

@ianoc
Collaborator
ianoc commented May 18, 2015

@johnynek this one look ok to you now?

@johnynek johnynek and 1 other commented on an outdated diff May 19, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+ val ProviderConfKey = "com.twitter.scalding.lzo.converter.provider"
+}
+
+/**
+ * Provides BinaryConverter serialized in JobConf.
+ */
+private[source] class ConfigBinaryConverterProvider[M] extends BinaryConverterProvider[M] {
+ import ConfigBinaryConverterProvider._
+
+ private[this] var cached: Option[(String, BinaryConverter[M])] = None
+
+ override def getConverter(conf: Configuration): BinaryConverter[M] = {
+ val data = conf.get(ProviderConfKey)
+ require(data != null, s"$ProviderConfKey is not set in configuration")
+ cached match {
+ case Some((data, conv)) => conv
@johnynek
johnynek May 19, 2015 Collaborator

I think we need case Some((d, conv)) if d == data => conv matching on data, I think, will just shadow the data on line 71.

I think we need this because without it, a different conf passed could give different data.

@rubanm
rubanm May 19, 2015 Collaborator

Oops, yes that's what I meant to do. Will fix.

@johnynek johnynek commented on an outdated diff May 19, 2015
...witter/scalding/commons/source/LzoGenericScheme.scala
+ cached match {
+ case Some((data, conv)) => conv
+ case _ =>
+ val extern = ExternalizerSerializer.inj.invert(data).get
+ val conv = extern.get.asInstanceOf[BinaryConverter[M]]
+ cached = Some((data, conv))
+ conv
+ }
+ }
+}
+
+/**
+ * Generic scheme for data stored as lzo-compressed protobuf messages.
+ * Serialization is performed using the supplied BinaryConverter.
+ */
+class LzoGenericScheme[M: ClassTag](@transient conv: BinaryConverter[M]) extends LzoBinaryScheme[M, GenericWritable[M]] {
@johnynek
johnynek May 19, 2015 Collaborator

is the ClassTag serializable? Have we tested that this actually round-trips a java serialization (which should happen when cascading uses this on Hadoop).

I wonder if instead of keeping the ClassTag here if it is safer to just take a Class, which is serializable, and make an apply method that uses ClassTag if needed.

@johnynek johnynek commented on an outdated diff May 19, 2015
...witter/scalding/commons/source/LzoGenericSource.scala
+limitations under the License.
+*/
+
+package com.twitter.scalding.commons.source
+
+import scala.reflect.ClassTag
+
+import com.twitter.elephantbird.mapreduce.io.BinaryConverter
+import com.twitter.scalding._
+
+import cascading.scheme.Scheme
+
+/**
+ * Generic source with an underlying GenericScheme that uses the supplied BinaryConverter.
+ */
+abstract class LzoGenericSource[T: ClassTag] extends FileSource with SingleMappable[T] with TypedSink[T] with LocalTapSource {
@johnynek
johnynek May 19, 2015 Collaborator

what if we just add def class: Class[T] instead of wiring to ClassTag? What do we gain by using the ClassTag here, since concrete subclasses could chose either way if we go with Class[T].

@johnynek
Collaborator

Can we at least add tests that we can instantiate and roundtrip this with the JavaSerialization Injection?

@ianoc
Collaborator
ianoc commented May 19, 2015

btw has been tested on hadoop, and ClassTag is serializable, though TypeTag is not. Though i'm fine with proposed changes too, just as an FYI.

@johnynek
Collaborator

merge when green

@rubanm rubanm merged commit a86ea41 into twitter:develop May 20, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@rubanm rubanm deleted the rubanm:rubanm/generic_lzo_scheme branch May 20, 2015
@coveralls

Coverage Status

Changes Unknown when pulling 2ec95d1 on rubanm:rubanm/generic_lzo_scheme into ** on twitter:develop**.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment