Skip to content

Commit

Permalink
[euphoria-flink] register types to kryo
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Sep 13, 2018
1 parent 43f50d8 commit 0071e4a
Showing 1 changed file with 37 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,22 @@
import com.esotericsoftware.kryo.Kryo;
import cz.seznam.euphoria.core.executor.io.SerializerFactory;
import java.io.OutputStream;
import java.util.HashSet;
import org.apache.flink.api.common.ExecutionConfig;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;

public class FlinkSerializerFactory implements SerializerFactory {

static class FlinkSerializerAdapter implements Serializer {

private final Map<Class<?>, ExecutionConfig.SerializableSerializer<?>> flinkSerializers;
private final Kryo kryo;

// the class that we will serialize
Class clz;
// serializer for the class
com.esotericsoftware.kryo.Serializer serializer;

FlinkSerializerAdapter(
Map<Class<?>, ExecutionConfig.SerializableSerializer<?>> flinkSerializers,
Kryo kryo) {
this.flinkSerializers = flinkSerializers;
FlinkSerializerAdapter(Kryo kryo) {
this.kryo = kryo;
}

Expand All @@ -51,25 +45,7 @@ public SerializerFactory.Serializer.Output newOutput(OutputStream os) {
@SuppressWarnings("unchecked")
@Override
public void writeObject(Object element) {
if (clz == null) {
clz = (Class) element.getClass();
ExecutionConfig.SerializableSerializer<?> flinkSerializer = flinkSerializers.get(clz);
if (flinkSerializer == null) {
serializer = null;
} else {
serializer = flinkSerializer.getSerializer();
}
} else if (element.getClass() != clz) {
throw new IllegalArgumentException(
"Use only single class as a storage type, got " + clz
+ " and " + element.getClass());
}

if (serializer == null) {
kryo.writeObject(output, element);
} else {
serializer.write(kryo, output, element);
}
kryo.writeClassAndObject(output, element);
}

@Override
Expand All @@ -91,9 +67,7 @@ public SerializerFactory.Serializer.Input newInput(java.io.InputStream is) {
@SuppressWarnings("unchecked")
@Override
public Object readObject() {
return (serializer == null)
? kryo.readObject(input, clz)
: serializer.read(kryo, input, clz);
return kryo.readClassAndObject(input);
}

@Override
Expand All @@ -109,25 +83,51 @@ public void close() {
}
}

private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> serializers;
private final Map<Class<?>, ExecutionConfig.SerializableSerializer<?>> serializers;
private final Set<Class<?>> registeredClasses;
private transient Kryo kryo;

public FlinkSerializerFactory(ExecutionConfig conf) {
this.serializers = conf.getDefaultKryoSerializers();
serializers = instantiateSerializers(
conf.getRegisteredTypesWithKryoSerializerClasses());
serializers.putAll(instantiateSerializers(
conf.getDefaultKryoSerializerClasses()));
serializers.putAll(conf.getDefaultKryoSerializers());
serializers.putAll(conf.getRegisteredTypesWithKryoSerializers());
registeredClasses = new HashSet<>(conf.getRegisteredKryoTypes());
registeredClasses.addAll(conf.getRegisteredPojoTypes());
}

@Override
public Serializer newSerializer() {
return new FlinkSerializerAdapter(serializers, initKryo());
return new FlinkSerializerAdapter(initKryo());
}

private Kryo initKryo() {
if (this.kryo == null) {
// FIXME: how to get to the kryo instance in flink?
this.kryo = new Kryo();
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
serializers.forEach((k, v) -> kryo.addDefaultSerializer(k, v.getSerializer()));
registeredClasses.forEach(kryo::register);
}
return this.kryo;
}

@SuppressWarnings("unchecked")
private Map<Class<?>, SerializableSerializer<?>> instantiateSerializers(
Map<Class<?>, Class<? extends com.esotericsoftware.kryo.Serializer<?>>> serializers) {

return serializers.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey,
e -> {
try {
return new SerializableSerializer(e.getValue().newInstance());
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}));
}

}

0 comments on commit 0071e4a

Please sign in to comment.