Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use reflection over Jobs to find serialized classes #1654

Merged
merged 8 commits into from
Apr 7, 2017

Conversation

jcdavis
Copy link
Contributor

@jcdavis jcdavis commented Mar 15, 2017

Using scala reflection, we can look at the container types of TypedPipe/Grouped
etc to identify classes being serialized and automatically assign them
compact cascading tokens instead of writing full names.

A few followups probably needed on this first pass:

  1. Is the correct copyright stanza at the top to just copy + paste, changing the year?
  2. Does my use of a boolean flag to disable make sense? This should definitely be disable-able in the event of issues, but the method of of checking for the flags existence rather than some boolean value seems a little hacky
  3. (Most importantly) What is the proper way of determining a field's class in a Job is a typed scalding container? The current solution of listing a bunch of base types mostly works but is obviously a little hacky and also possibly not complete. Is it worth maybe having a market trait (eg TypedContainer for TypedPipe etc? That doesn't truly change the coverage issue, though
  4. Is it worth recursively examining more than just Tuple2 ? There isn't too much cost in adding additional tokens to the cascading tokens config, even if they aren't used.

Using scala reflection, we can look at the types of TypedPipe/Grouped
etc to identify classes being serialized and automatically assign them
compact cascading tokens instead of writing full names.
* Note: this not guaranteed to find every used type. Eg, it can't find types used in a step that isn't
* referred to in a field
*/
def findUsedClasses(jobClazz: Class[_ <: Job]): Set[Class[_]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either this method or the object should be private[scalding]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need jobClass: Class[_ <: Job] rather than outerClass: Class[_]? Why does Job matter?

For instance, I'd like to use this with Execution as well, and we might want to pass an ExecutionApp to get the same result (if you can find items).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could we possibly also walk the methods and look at the input and return types? This could cover all the cases perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not really job-related specifically, thats just what I started with to self-limit usage. I'll change it

I passed on methods for this pr to simplify things, but could probably add them. For methods, we might care about ones that return A in addition to TypedPipe[A]/etc, so you might need to be a little smarter about what you consider/ignore

}
}

private def getClassOpt(name: String): Option[Class[_]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could avoid loading the classes if I refactored CascadingTokenUpdate.updater to take just the names instead of the classes, since it just calls .getName

Jackson Davis added 2 commits March 15, 2017 18:09
PSA stringToTermName is deprecated in 2.11,
to might need to revert some of this for 2.12 support
def setSerialization(
kryo: Either[(Class[_ <: KryoInstantiator], KryoInstantiator), Class[_ <: KryoInstantiator]],
userHadoop: Seq[Class[_ <: HSerialization[_]]] = Nil,
additionalSerializedClasses: Set[Class[_]] = Set.empty): Config = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the additionalSerializedClasses here? I think callers should just do:

.setSerialization( ... )
.addCascadingClassSerializationTokens(additionalSerializedClasses)

But another concern is that sadly both cascading and kryo need to give tokens to classes. Can we make a kryo registrar:
https://github.com/twitter/chill/blob/develop/chill-java/src/main/java/com/twitter/chill/IKryoRegistrar.java

that registers any classes not already added? Then it can be combined with the KryoInstantiator:
https://github.com/twitter/chill/blob/develop/chill-java/src/main/java/com/twitter/chill/KryoInstantiator.java#L98

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing Kryo as well but I'm not super familiar on things so I've punted for this pr.

My main confusion/concern is: how does the KryoRegistrar know the job class to inspect (Hadoop config? I don't see the full scalding job name anywhere), or does it parse the cascading token config to find classes? And does this get added to the chill KryoHadoop, or do we add a new instantiator for this? (And then what happens if someone extends KryoHadoop)

@@ -185,9 +185,13 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {

val init = base ++ modeConf

val usedClasses: Set[Class[_]] = if (args.boolean("scalding.nojobclassreflection")) Set.empty else {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a private def?

* Note: this not guaranteed to find every used type. Eg, it can't find types used in a step that isn't
* referred to in a field
*/
def findUsedClasses(jobClazz: Class[_ <: Job]): Set[Class[_]] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need jobClass: Class[_ <: Job] rather than outerClass: Class[_]? Why does Job matter?

For instance, I'd like to use this with Execution as well, and we might want to pass an ExecutionApp to get the same result (if you can find items).

private val baseContainers = List(
classOf[Execution[_]],
classOf[TypedPipe[_]],
classOf[TypedSink[_]],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not want TypedSource[_]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add

case universe.TypeRef(_, _, args) =>
args.flatMap { generic =>
//If the wrapped type is a Tuple2, recurse into its types
if (generic.typeSymbol.fullName == "scala.Tuple2") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not .fullName.startsWith("scala.Tuple") to support all tuples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason beyond reducing the scope, I'll add

* Note: this not guaranteed to find every used type. Eg, it can't find types used in a step that isn't
* referred to in a field
*/
def findUsedClasses(jobClazz: Class[_ <: Job]): Set[Class[_]] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could we possibly also walk the methods and look at the input and return types? This could cover all the cases perhaps?


import scala.reflect.runtime.universe

object JobClassFinder {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Job has anything really to do with this. Can we just say ReferencedClassFinder or something? Also, we might add a method like:

object ReferencedClassFinder {
  /**
   * Add the given type, as well as all referenced types to the cascading tokens list.
   * note, for maximal efficiency, you should also register those types with the kryo
   * instantiator being used.
   */
  def addCascadingTokensFrom(c: Class[_], config: Config): Config
}

@jcdavis
Copy link
Contributor Author

jcdavis commented Mar 17, 2017

A small wrinkle here we have to think about: If we have a TypedPipe of a type that has a cascading protected token (so <128, eg BytesWritable) we will assign an a token in the config, that will cause an IllegalStateException when initializing the map in TupleSerialization

There are a few options here, of varying degrees of complication/hackiness:

  1. Explicitly blacklist classes known to have private cascading tokens. AFAICT, that is only BigDecimal (125), Array[Byte] (126), and BytesWritable (127).
  2. Construct a TupleSerialization.SerializationElementReader in CascadingTokenUpdater.update, and then only tokenize classes in the config which don't already have a token by checking getTokenFor
  3. Replicate the logic in TupleSerialization.initTokenMaps which looks over all the HadoopSerializations for the SerializationToken annotation to determine which classes are tokenized. This has the benefit it could be done in Config.getCascadingSerializationTokens, which is a little cleaner

Both 2 and 3 require the Hadoop Serializations config to have been set, which seems like a reasonable requirement based on how the Scalding config is init'ed. 2 and 3 could also protect against either class or token conflicts with other user-defined serializations with the SerializationToken annotation set, although the token conflict issue is already a concern regardless of this PR (and given the brittleness of explicitly-assigned tokens, I'm not sure its a thing that actually gets used?)
Thoughts?

@johnynek
Copy link
Collaborator

2 sounds pretty good to me.

@jcdavis
Copy link
Contributor Author

jcdavis commented Mar 23, 2017

TupleSerialization.getTokenFor isn't public, so that strategy didn't work.

Instead, I've replicated the logic of looking over the serializations for the SerializationToken annotation, which avoids the token re-assignment

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@johnynek
Copy link
Collaborator

What do you think @piyushnarang ?

CascadingTokenUpdater.update(config, findReferencedClasses(c) + c)
}
/**
* Reflect over a scalding to try and identify types it uses so they can be tokenized by cascading.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reflect over a scalding container type?

@@ -197,6 +198,10 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
.toMap.toMap[AnyRef, AnyRef] // linter:ignore the second one is to lift from String -> AnyRef
}

def reflectedClasses: Set[Class[_]] = if (args.boolean("scalding.nojobclassreflection")) Set.empty else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add this to Config.scala: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Config.scala#L431
with details on what users get when they turn this on / off?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually now that I look at it, I'm not sure that makes sense? the Config params are for stuff that go into the hadoop config. This is just being read from the Args. Other things being read from args in the Job (eg scalding.nocounters, or scalding.flowstats) aren't referenced in the config.

Agreed I should add documentation somewhere, just not sure if Config seems like the right place

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we add it to the companion Args object? We don't need to move refactor the other Args in this review but it seems nicer than having them scattered in a bunch of places right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Args companion object seems pretty reasonable, I'll add it there

case class C3(c: Int)
case class C4(d: Int)

trait TraitType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add some tests for:

  1. Java types?
  2. Thrift / protobuf types?
  3. Testing that if we have some primitive types (like Int) referenced, we don't end up adding their tokens? (Or is that the same scenario as the BytesWriteable?)

Copy link
Contributor Author

@jcdavis jcdavis Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 and 2 should be fine, at least based on my testing.

Primitives seem to work, but in setSerialization it filters out primitives and array types:

    val kryoClasses = withKryo.getKryoRegisteredClasses
      .filterNot(_.isPrimitive) // Cascading handles primitives and arrays
      .filterNot(_.isArray)

I'll add those filters to findReferencedClasses

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add tests for these scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I'll add some tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, let's have tests for InnerClasses (Case classes in Object)

// We don't want to assign tokens to classes already in the map
val newClasses: Iterable[String] = clazzes.map { _.getName } -- toks.values
val newClasses: Iterable[String] = clazzes.map { _.getName } -- fromSerializations -- toks.values
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be checking if the number of these tokens exceeds what cascading supports? (max int?). Wondering if there are some really complex types that end up referencing a large graph of types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TupleSerialization writes out the token as a varint, so there shouldn't be any concerns about assigning too many tokens

@johnynek
Copy link
Collaborator

@jcdavis one last thing.

here:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/serialization/KryoHadoop.scala#L32

you could get the registered class list back out of the config. With that list we could make sure at the end of all the registrations that everything named is registered on the resulting kryo. This would make sure that all classes use tokens at the hadoop, cascading and kryo levels (the tokens don't need to match with Kryo, but it would be nice to not write the class names on nested structures).

using code similar to here:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Config.scala#L163

you check if each item in the list has a registration, if not, add it.

@isnotinvain
Copy link
Contributor

Sort of related question: Can we re-use some of this to make JobTests try to serialize everything that's going to need serializing, to catch those sorts of errors in unit tests? That'd be super cool.

@jcdavis
Copy link
Contributor Author

jcdavis commented Mar 25, 2017

@johnynek I have already been testing that for us locally, but for some reason I though KryoHadoop was in chill, thus I was gonna have to PR it there, do some versioning hell, etc etc. This makes that much easier :)

My version locally uses a different config that has classes that are definitely not registered yet, which makes things simpler. I'll do some more looking at the Kryo api, but it seems like I can just call .register(clazz) ? Thats a no-op if the class is already registered, and if not will use the appropriate default serializer.

My one concern is that if someone overrides KryoHadoop with their own instantiator (as we do), they might register default serializers after classes have already been registered. Eg for Thrift,

class MyKryoHadoop(config: Config) extends KryoHadoop(config) {
  override def newKryo(): Kryo = {
    val kryo = super.newKryo()
    kryo.addDefaultSerializer(classOf[TBase[_, _]], new TBaseSerializer)
    ...

So if a thrift type was in the tokenized list, it won't be registered with the TBaseSerializer, which may? be worse than writing out the full string name but using an optimized serializer

@jcdavis
Copy link
Contributor Author

jcdavis commented Mar 27, 2017

Some more though on this: what if KryoHadoop added a new method eg def registerKryoSerializers(kryo: Kryo): Unit = {} ,with the intention that that should be overridden, not newKryo. Then the registering of classes could happen at the very end, after the call to ensure any custom default serializers have been properly configured?

@johnynek
Copy link
Collaborator

@jcdavis what you are describing is related to what chill calls an IKryoRegistrar:
https://github.com/twitter/chill/blob/develop/chill-java/src/main/java/com/twitter/chill/IKryoRegistrar.java#L23

The problem is that newKyro is implementing a class method, so it has to be there. The better approach is probably to invert it:

add

def registrar: IKryoRegistrar = 
  // This could even be on the companion so people could access the default registrar
  new IKryoRegistrar {
    def apply(k: Kryo) = {
      // add all the registrations here
    }
}

def newKryo = {
  val k = // make the kryo with no registrations
  registrar(k) // register, here is where people can override
  // now make sure all the classes are registered.
}

what do you think of that?

@johnynek
Copy link
Collaborator

@isnotinvain I don't follow exactly what you mean. This tells us the types in play, but it does not tell us a way to get instances of them, so I still don't see a way to test serialization better with this. If the user does .runHadoop it should serialize the data the user sets up. Did I misunderstand?

@dieu
Copy link
Contributor

dieu commented Mar 28, 2017

@jcdavis / @johnynek what about Execution do we want to turn on it for ExecutionApp?

@johnynek
Copy link
Collaborator

I would love to turn it on for Execution. Should be doable, but I have not looked at it yet. Any thoughts?

@dieu
Copy link
Contributor

dieu commented Mar 28, 2017

@jcdavis / @johnynek problem is that current implementation based on finding fields, but in Execution we don't have field and whole logic inside ExecutionApp.job.

only options I think is using a macro to inspect method body.

@piyushnarang
Copy link
Collaborator

I'm not sure if adding support for Executions is something we need to tackle as part of this PR. It seems like we could keep this iterative and first push out the Job based functionality and then followup with a different review for Execution support.
For the current review, what are the major items left to tackle? I was hoping to get some more tests in (which I commented on earlier) and @dieu also had some test scenarios in mind(I'll let him chime in with them). Not sure what other items there are. Trying to get a sense of roughly how long this might take given that we're waiting on this PR for the 0.17.0 release (#1641) and there's a bunch of other items waiting on that as well (like Storehaus 2.12 and Summingbird 2.12).

@jcdavis
Copy link
Contributor Author

jcdavis commented Mar 28, 2017

I think the last thing I'm working on is adding support for registering these classes in KryoHadoop. I'm not too familiar with Execution so punting on that for this PR makes sense.

@johnynek
Copy link
Collaborator

+1 to separate into a second PR.

relatedly, maybe #1658 could be useful for execution. I think some function like addCascadingTokens(classOf[MyObjectThatHasTheMethods], conf) could just work currently, but is a little manual.

@piyushnarang
Copy link
Collaborator

@jcdavis sounds good. Lets also add some tests around the scenarios I mentioned if possible. We have been hit a few times lately with bugs which would've been caught with more unit tests but ended up breaking prod jobs and the resultant pain.

Also added a new customRegistrar method so users can add additional default
serializers before the tokenized classes registered
@jcdavis
Copy link
Contributor Author

jcdavis commented Apr 4, 2017

Sorry for the delay on this one, I spent most of last week firefighting so I haven't been able to make progress until today.

As discussed, I've added support in KryoHadoop for registering cascading tokenized classes which haven't been registered yet.

Also as per @dieu, it turns out that inner classes don't work currently. The root of the issue is that java's class names use $ for inner classes, whereas Type.fullName returns . all the way through. (eg it sees com.twitter.scalding.ReferencedClassFinder.C5 instead of ...$C5) This doesn't cause any issues because we catch the ClassNotFoundException and silently ignore, but it does mean such classes aren't tokenized. I've added a case to the test to check that things still work.

This is almost certainly fixable, but given this PR is already pretty lengthy I would suggest passing on that case for now.

@jcdavis
Copy link
Contributor Author

jcdavis commented Apr 4, 2017

FWIW, just the cascading tokenization has improved our jobs' CPU usage by as much as 20%. That plus these kryo changes and better kryo registration on our end (not using the default serializer unecessarily) seems to be worth another ~20% on top (so 30-35% CPU reduction overall), plus an 8-10% drop in bytes written to the network.

As always with hadoop benchmarks, YMMV. We were doing about as bad as one could do (no manual registration of any sort + missing serializers), so I suspect other folks might not see such big wins.

@isnotinvain
Copy link
Contributor

@johnynek oh ok, I guess I misunderstood. I think it'd be nice if JobTest could trigger all the serialization that's going to happen in production so that we can make sure it works. But you're right, that's at the instance level and this is just scanning for classes.

@piyushnarang
Copy link
Collaborator

@jcdavis checking in, were you planning on adding more tests here? (Think you added tests for the inner classes, not sure if you were planning to add others for the other scenarios).
@johnynek / @isnotinvain / @dieu - are you guys good with these changes?

@johnynek
Copy link
Collaborator

johnynek commented Apr 7, 2017

I say we merge. Users can disable if we they have a problem and we can add tests. Let's get scalding 0.17 out.

@piyushnarang
Copy link
Collaborator

Sounds good to me. We can follow up with more tests if needed.

@johnynek johnynek merged commit 68519c1 into twitter:develop Apr 7, 2017
@jcdavis jcdavis deleted the job-type-reflection branch May 2, 2017 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants