Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another take on serialization of functions #27

Closed
wants to merge 1 commit into from

Conversation

dajac
Copy link

@dajac dajac commented Nov 5, 2014

First of all, I would like to thank you for Flambo. It is really good library to deal with Spark!

I have been using it for few weeks and I have noticed that our big jobs often failed due to java.lang.OutOfMemoryError: PermGen space in Spark's executors. This is due to the fact that 'serialize-fn' uses eval for deserialising functions which results in a lot of classes created in Spark's executors. Basically, classes are re-created for every tasks.

I looked a bit more deeply at how stuff are serialized in Flambo/Spark and I have tried another approach with this patch. I don't expect to have this PR merged in Flambo but I would like to have your opinion about it.

In short, this PR changes the way functions are serialized in Flambo. Instead of using "serialize-fn", it relies on out-of-the-box java serialization for serializing functions with theirs bindings and it also ensures that namespaces are loaded in executors. This approach works well but requires to AOT namespaces used in executors so it is not super convenient in the REPL. Another thing that I have noticed is that my jobs are 2x faster with this mechanism.

In my opinion, the best solution would be to have both approaches. I mean, having the ability to define functions without AOT for developing and using AOT for running apps in production.

WDYT?

PS: Midje tests can not be AOT'ed so I switched them to clojure.test.

…lar java serialization. With this change, all functions used in Spark jobs must be compiled (aot).
@sorenmacbeth
Copy link
Owner

flambo's serialization uses kryo, and the deserialization is memoized, so eval doesn't get called every time. I'm not convinced that it's the serialization causes your PermGen errors.

As for your speed tests, were these run on a cluster (where serialization/deserialization needs to happen)? Did you disable kryo as the default serializer when you ran the tests? I suspect what is happening is that your java classes are actually being serialized by kryo because you wouldn't be able to serialize and deserialize clojure's native data structures with it.

Also, the current approach required AOT anyway, so there is nothing really different there.

I don't believe that these changes are functional.

@sorenmacbeth
Copy link
Owner

A final point: Your function serializers won't won't capture any closures over them, which does work with the current approach.

@sorenmacbeth
Copy link
Owner

I suspect the PermGen overflows may be cause by this bug:

http://dev.clojure.org/jira/browse/CLJ-1152

@sorenmacbeth
Copy link
Owner

I did a little digging as well, and I do see what you mean about eval creating a new class, even when deserialize is memoized since it's the actual call to eval that creates it. I also no longer think the above bug is likely an issue.

That said, I still think the current method of serialization is preferable.

@dajac
Copy link
Author

dajac commented Nov 9, 2014

Thank you for your comments. Here are few answers to your questions/remarks.

First of all, it is pretty easy to reproduce the PermGen issue. You can just run a job with few tasks and profile one of the executor with YourKit. You'll see that new classes will be generated for every task being executed. As you said, deserialization is memoized but it only works in the context of a task. Every new task will have a new reference to a byte array so deserialzation will be reapplied.

I use default config defined in Flambo. spark.serializer is set to org.apache.spark.serializer.KryoSerializer so clojure data structures are serialized correctly and spark.closure.serializer is set to default value org.apache.spark.serializer.JavaSerializer.

I have switched all our jobs to this implementation and I have noticed any issue so far. All closures are serialized correctly and I don't have any PermGen issue so far. Our typical jobs have thousands tasks and process up to 10TB of data. Yes, gain in term of performance has been noticed on your typical jobs running on a cluster when serialization/deserialization happens.

Do you have any example of closure that will not be serialized correctly by this approach?

That said, I think that it would be good to find a way to avoid creation of classes during execution of a job. I'm happy to discuss further to find a better way.

@sorenmacbeth
Copy link
Owner

(let [y 10] (sfn/fn [x] (+ x y))

Flambo's current serializer will correctly capture and serialize the closed over y var.

As a bare minimum requirement, I want flambo's serializer to use kryo directly and not java serialization. kryo is more efficient and just generally better.

The reason that a new class is created actually has nothing to do with eval per se, but the fact that a new fn is created during the call to eval. this is happening here:

https://github.com/sorenmacbeth/serializable-fn/blob/master/src/clj/serializable/fn.clj#L180

I can't think of another way that avoids creating and evaling. As far as permgen goes, JDK8 is supposed to make that a non-issue. There are also some settings you can pass to the JVM that removes/reduces most of the issues. Perhaps we can add a section to the README.

@dajac
Copy link
Author

dajac commented Nov 9, 2014

Your example also works with this PR. I do agree that kryo is better than java serialization but still I see performance improvement with this PR. I already uses JVM's options for unloading classes but still I face this PermGen issue in some of my jobs.

@dajac
Copy link
Author

dajac commented Nov 9, 2014

To be clear, Kryo is still used for serializing data structures being processed by the job but plain java serialization is used to serialize functions packed in Spark Tasks sent to executors. Spark Tasks are serialized using java serialization any way.

@sorenmacbeth
Copy link
Owner

My example works locally, or on a cluster where the function is serialized in one jvm instance and then deserialized in another?

@sorenmacbeth
Copy link
Owner

I understand the data structures are still serialized using kryo, but I'd like the functions to be serialized with kryo as well and not java serialization.

@dajac
Copy link
Author

dajac commented Nov 10, 2014

Yes, your example works on a cluster where function is serialized in one jvm instance and then deserialize in another. As said, I have switched all my jobs to this implementation and everything works the same. The only difference that I have noticed is that tests must AOT'ed as well.

@chrisbetz
Copy link

Ok, I checked dajacs stuff and
a) it works fine
b) is about twice the speed (which goes back not to the serialization, but to avoiding reflection in flambo.function/call

I added it to my fork (thanks for pointing to the problem and having a solution to that, dajac), so maybe you want to check .

I added some more stuff to my fork (including changes breaking the API), so I won't open a pull request on that. More details on that (and the rationals are in the Readme.

So, thanks again to dajac, and happy hacking!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants