This repository has been archived by the owner on Dec 31, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
Update to 0.6.7 #15
Merged
Merged
Update to 0.6.7 #15
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
732b067
add KryoSerializer, remove ClassifierResult2
088da9b
expose nupic Inference object
b98b06e
introduce NetworkInference class
cc2e554
checkpoint support
60aeed8
update version to 0.6.7
fd059b9
Keyed Stream serialization test
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
212 changes: 212 additions & 0 deletions
212
...tm-streaming-java/src/main/java/org/numenta/nupic/flink/serialization/KryoSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
package org.numenta.nupic.flink.serialization; | ||
|
||
import com.esotericsoftware.kryo.Kryo; | ||
import com.esotericsoftware.kryo.KryoException; | ||
import com.esotericsoftware.kryo.Serializer; | ||
import com.esotericsoftware.kryo.io.Input; | ||
import com.esotericsoftware.kryo.io.Output; | ||
import org.apache.flink.api.common.ExecutionConfig; | ||
import org.apache.flink.api.java.tuple.Tuple2; | ||
import org.apache.flink.streaming.util.FieldAccessor; | ||
import org.numenta.nupic.Persistable; | ||
import org.numenta.nupic.network.Network; | ||
import org.numenta.nupic.serialize.HTMObjectInput; | ||
import org.numenta.nupic.serialize.HTMObjectOutput; | ||
import org.numenta.nupic.serialize.SerialConfig; | ||
import org.numenta.nupic.serialize.SerializerCore; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.*; | ||
import java.lang.reflect.Field; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* Kryo serializer for HTM network and related objects. | ||
* | ||
*/ | ||
public class KryoSerializer<T extends Persistable> extends Serializer<T> implements Serializable { | ||
|
||
protected static final Logger LOGGER = LoggerFactory.getLogger(KryoSerializer.class); | ||
|
||
private final SerializerCore serializer = new SerializerCore(SerialConfig.DEFAULT_REGISTERED_TYPES); | ||
|
||
/** | ||
* Write the given instance to the given output. | ||
* | ||
* @param kryo instance of {@link Kryo} object | ||
* @param output a Kryo {@link Output} object | ||
* @param t instance to serialize | ||
*/ | ||
@Override | ||
public void write(Kryo kryo, Output output, T t) { | ||
try { | ||
preSerialize(t); | ||
|
||
try(ByteArrayOutputStream stream = new ByteArrayOutputStream(4096)) { | ||
|
||
// write the object using the HTM serializer | ||
HTMObjectOutput writer = serializer.getObjectOutput(stream); | ||
writer.writeObject(t, t.getClass()); | ||
writer.close(); | ||
|
||
// write the serialized data | ||
output.writeInt(stream.size()); | ||
stream.writeTo(output); | ||
|
||
LOGGER.debug("wrote {} bytes", stream.size()); | ||
} | ||
} | ||
catch(IOException e) { | ||
throw new KryoException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Read an instance of the given class from the given input. | ||
* | ||
* @param kryo instance of {@link Kryo} object | ||
* @param input a Kryo {@link Input} | ||
* @param aClass The class of the object to be read in. | ||
* @return an instance of type <T> | ||
*/ | ||
@Override | ||
public T read(Kryo kryo, Input input, Class<T> aClass) { | ||
|
||
// read the serialized data | ||
byte[] data = new byte[input.readInt()]; | ||
input.readBytes(data); | ||
|
||
try { | ||
try(ByteArrayInputStream stream = new ByteArrayInputStream(data)) { | ||
HTMObjectInput reader = serializer.getObjectInput(stream); | ||
T t = (T) reader.readObject(aClass); | ||
|
||
postDeSerialize(t); | ||
|
||
return t; | ||
} | ||
} | ||
catch(Exception e) { | ||
throw new KryoException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Copy the given instance. | ||
* @param kryo instance of {@link Kryo} object | ||
* @param original an object to copy. | ||
* @return | ||
*/ | ||
@Override | ||
public T copy(Kryo kryo, T original) { | ||
try { | ||
preSerialize(original); | ||
|
||
try(CopyStream output = new CopyStream(4096)) { | ||
HTMObjectOutput writer = serializer.getObjectOutput(output); | ||
writer.writeObject(original, original.getClass()); | ||
writer.close(); | ||
|
||
try(InputStream input = output.toInputStream()) { | ||
HTMObjectInput reader = serializer.getObjectInput(input); | ||
T t = (T) reader.readObject(original.getClass()); | ||
|
||
postDeSerialize(t); | ||
|
||
return t; | ||
} | ||
} | ||
} | ||
catch(Exception e) { | ||
throw new KryoException(e); | ||
} | ||
} | ||
|
||
static class CopyStream extends ByteArrayOutputStream { | ||
public CopyStream(int size) { super(size); } | ||
|
||
/** | ||
* Get an input stream based on the contents of this output stream. | ||
* Do not use the output stream after calling this method. | ||
* @return an {@link InputStream} | ||
*/ | ||
public InputStream toInputStream() { | ||
return new ByteArrayInputStream(this.buf, 0, this.count); | ||
} | ||
} | ||
|
||
/** | ||
* The HTM serializer handles the Persistable callbacks automatically, but | ||
* this method is for any additional actions to be taken. | ||
* @param t the instance to be serialized. | ||
*/ | ||
protected void preSerialize(T t) { | ||
} | ||
|
||
/** | ||
* The HTM serializer handles the Persistable callbacks automatically, but | ||
* this method is for any additional actions to be taken. | ||
* @param t the instance newly deserialized. | ||
*/ | ||
protected void postDeSerialize(T t) { | ||
} | ||
|
||
/** | ||
* Register the HTM types with the Kryo serializer. | ||
* @param config | ||
*/ | ||
public static void registerTypes(ExecutionConfig config) { | ||
for(Class<?> c : SerialConfig.DEFAULT_REGISTERED_TYPES) { | ||
Class<?> serializerClass = DEFAULT_SERIALIZERS.getOrDefault(c, (Class<?>) KryoSerializer.class); | ||
config.registerTypeWithKryoSerializer(c, (Class<? extends Serializer<?>>) serializerClass); | ||
} | ||
for(Class<?> c : KryoSerializer.ADDITIONAL_REGISTERED_TYPES) { | ||
Class<?> serializerClass = DEFAULT_SERIALIZERS.getOrDefault(c, (Class<?>) KryoSerializer.class); | ||
config.registerTypeWithKryoSerializer(c, (Class<? extends Serializer<?>>) serializerClass); | ||
} | ||
} | ||
|
||
static final Class<?>[] ADDITIONAL_REGISTERED_TYPES = { Network.class }; | ||
|
||
/** | ||
* A map of serializers for various classes. | ||
*/ | ||
static final Map<Class<?>,Class<?>> DEFAULT_SERIALIZERS = Stream.of( | ||
new Tuple2<>(Network.class, NetworkSerializer.class) | ||
).collect(Collectors.toMap(kv -> kv.f0, kv -> kv.f1)); | ||
|
||
|
||
public static class NetworkSerializer extends KryoSerializer<Network> { | ||
|
||
private final static Field shouldDoHaltField; | ||
|
||
static { | ||
try { | ||
shouldDoHaltField = Network.class.getDeclaredField("shouldDoHalt"); | ||
shouldDoHaltField.setAccessible(true); | ||
} catch (NoSuchFieldException e) { | ||
throw new UnsupportedOperationException("unable to locate Network::shouldDoHalt", e); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
protected void preSerialize(Network network) { | ||
super.preSerialize(network); | ||
try { | ||
// issue: HTM.java #417 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the effect of not managing the |
||
shouldDoHaltField.set(network, false); | ||
} catch (IllegalAccessException e) { | ||
throw new UnsupportedOperationException("unable to set Network::shouldDoHalt", e); | ||
} | ||
} | ||
|
||
@Override | ||
protected void postDeSerialize(Network network) { | ||
super.postDeSerialize(network); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell there is no reason to register the Network object. Internally, all registering does is share the class name (string) among all instances of that particular class (which is slower). So pre-registering helps avoid this behavior, but since there is only ever one instance of the Network, I don't see how registering it will be any more efficient because there is only ever one of its type being serialized?