Skip to content

Commit

Permalink
[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serializa…
Browse files Browse the repository at this point in the history
…tion

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's JavaSerializer.

This closes apache#3517.
  • Loading branch information
tzulitai authored and p16i committed Apr 16, 2017
1 parent 07b9152 commit cf9f139
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 5 deletions.
12 changes: 12 additions & 0 deletions docs/dev/custom_serializers.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven dependency:

Please adjust the versions of both libraries as needed.

### Issue with using Kryo's `JavaSerializer`

If you register Kryo's `JavaSerializer` for your custom type, you may
encounter `ClassNotFoundException`s even though your custom type class is
included in the submitted user code jar. This is due to a know issue with
Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.

In this case, you should use `org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
that makes sure the user code classloader is used.

Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
for more details.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.ObjectMap;
import org.apache.flink.util.InstantiationUtil;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* This is a reimplementation of Kryo's {@link com.esotericsoftware.kryo.serializers.JavaSerializer},
* that additionally makes sure the {@link ObjectInputStream} used for deserialization specifically uses Kryo's
* registered classloader.
*
* Flink maintains this reimplementation due to a known issue with Kryo's {@code JavaSerializer}, in which the wrong
* classloader may be used for deserialization, leading to {@link ClassNotFoundException}s.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-6025">FLINK-6025</a>
* @see <a href="https://github.com/EsotericSoftware/kryo/pull/483">Known issue with Kryo's JavaSerializer</a>
*
* @param <T> The type to be serialized.
*/
public class JavaSerializer<T> extends Serializer<T> {

public JavaSerializer() {}

@SuppressWarnings("unchecked")
@Override
public void write(Kryo kryo, Output output, T o) {
try {
ObjectMap graphContext = kryo.getGraphContext();
ObjectOutputStream objectStream = (ObjectOutputStream)graphContext.get(this);
if (objectStream == null) {
objectStream = new ObjectOutputStream(output);
graphContext.put(this, objectStream);
}
objectStream.writeObject(o);
objectStream.flush();
} catch (Exception ex) {
throw new KryoException("Error during Java serialization.", ex);
}
}

@SuppressWarnings("unchecked")
@Override
public T read(Kryo kryo, Input input, Class aClass) {
try {
ObjectMap graphContext = kryo.getGraphContext();
ObjectInputStream objectStream = (ObjectInputStream)graphContext.get(this);
if (objectStream == null) {
// make sure we use Kryo's classloader
objectStream = new InstantiationUtil.ClassLoaderObjectInputStream(input, kryo.getClassLoader());
graphContext.put(this, objectStream);
}
return (T) objectStream.readObject();
} catch (Exception ex) {
throw new KryoException("Error during Java deserialization.", ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;

import org.apache.avro.generic.GenericData;

Expand Down Expand Up @@ -130,7 +129,7 @@ public boolean isImmutableType() {

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
return new KryoSerializer<>(this);
}

@Override
Expand Down Expand Up @@ -331,6 +330,8 @@ private void checkKryoInitialized() {
kryo.setReferences(true);

// Throwable and all subclasses should be serialized via java serialization
// Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
// This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a serializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
public final class InstantiationUtil {

/**
* A custom ObjectInputStream that can also load user-code using a
* user-code ClassLoader.
*
* A custom ObjectInputStream that can load classes using a specific ClassLoader.
*/
public static class ClassLoaderObjectInputStream extends ObjectInputStream {

Expand Down

0 comments on commit cf9f139

Please sign in to comment.