Skip to content

Commit

Permalink
Fixed - codec provided in Redisson configuration should be used for a…
Browse files Browse the repository at this point in the history
…ttribute messages serialization. #1905
  • Loading branch information
Nikita Koksharov committed Mar 6, 2019
1 parent d8ae21e commit ee51283
Show file tree
Hide file tree
Showing 20 changed files with 203 additions and 109 deletions.
Expand Up @@ -15,13 +15,15 @@
*/ */
package org.redisson.tomcat; package org.redisson.tomcat;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;


import org.apache.catalina.util.CustomObjectInputStream; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;


/** /**
* *
Expand Down Expand Up @@ -50,22 +52,29 @@ public String getNodeId() {
return nodeId; return nodeId;
} }


protected byte[] toByteArray(Object value) throws IOException { protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) { if (value == null) {
return null; return null;
} }
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos); ByteBuf buf = encoder.encode(value);
out.writeObject(value); try {
out.flush(); return ByteBufUtil.getBytes(buf);
return bos.toByteArray(); } finally {
buf.release();
}
} }


protected Object toObject(ClassLoader classLoader, byte[] value) throws IOException, ClassNotFoundException { protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) { if (value == null) {
return null; return null;
} }
CustomObjectInputStream in = new CustomObjectInputStream(new ByteArrayInputStream(value), classLoader);
return in.readObject(); ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
} }
} }
Expand Up @@ -17,6 +17,9 @@


import java.io.IOException; import java.io.IOException;


import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
Expand All @@ -30,18 +33,18 @@ public class AttributeUpdateMessage extends AttributeMessage {
public AttributeUpdateMessage() { public AttributeUpdateMessage() {
} }


public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value) throws IOException { public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
this.name = name; this.name = name;
this.value = toByteArray(value); this.value = toByteArray(encoder, value);
} }


public String getName() { public String getName() {
return name; return name;
} }


public Object getValue(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(classLoader, value); return toObject(decoder, value);
} }


} }
Expand Up @@ -20,6 +20,9 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;


import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
Expand All @@ -32,25 +35,25 @@ public class AttributesPutAllMessage extends AttributeMessage {
public AttributesPutAllMessage() { public AttributesPutAllMessage() {
} }


public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs) throws IOException { public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
if (attrs != null) { if (attrs != null) {
this.attrs = new HashMap<String, byte[]>(); this.attrs = new HashMap<String, byte[]>();
for (Entry<String, Object> entry: attrs.entrySet()) { for (Entry<String, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(entry.getValue())); this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
} }
} else { } else {
this.attrs = null; this.attrs = null;
} }
} }


public Map<String, Object> getAttrs(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Map<String, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) { if (attrs == null) {
return null; return null;
} }
Map<String, Object> result = new HashMap<String, Object>(); Map<String, Object> result = new HashMap<String, Object>();
for (Entry<String, byte[]> entry: attrs.entrySet()) { for (Entry<String, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(classLoader, entry.getValue())); result.put(entry.getKey(), toObject(decoder, entry.getValue()));
} }
return result; return result;
} }
Expand Down
Expand Up @@ -187,7 +187,7 @@ protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap
map.put(entry.getKey(), entry.getValue()); map.put(entry.getKey(), entry.getValue());
} }
try { try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map); return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
Expand All @@ -207,7 +207,7 @@ private void fastPut(String name, Object value) {
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value)); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
Expand Down
Expand Up @@ -235,6 +235,16 @@ public void start() throws LifecycleException {
applicationClassLoader = getClass().getClassLoader(); applicationClassLoader = getClass().getClassLoader();
} }


Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
.newInstance(applicationClassLoader, codec);
} catch (Exception e) {
throw new LifecycleException(e);
}

if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this)); getEngine().getPipeline().addValve(new UpdateValve(this));
} }
Expand All @@ -259,14 +269,14 @@ public void onMessage(CharSequence channel, AttributeMessage msg) {


if (msg instanceof AttributesPutAllMessage) { if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg; AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs(applicationClassLoader).entrySet()) { for (Entry<String, Object> entry : m.getAttrs(codecToUse.getMapValueDecoder()).entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true); session.superSetAttribute(entry.getKey(), entry.getValue(), true);
} }
} }


if (msg instanceof AttributeUpdateMessage) { if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg; AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(applicationClassLoader), true); session.superSetAttribute(m.getName(), m.getValue(codecToUse.getMapValueDecoder()), true);
} }
} }
} catch (Exception e) { } catch (Exception e) {
Expand Down Expand Up @@ -294,15 +304,6 @@ protected RedissonClient buildClient() throws LifecycleException {
} }


try { try {
try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}

return Redisson.create(config); return Redisson.create(config);
} catch (Exception e) { } catch (Exception e) {
throw new LifecycleException(e); throw new LifecycleException(e);
Expand Down
Expand Up @@ -15,13 +15,15 @@
*/ */
package org.redisson.tomcat; package org.redisson.tomcat;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;


import org.apache.catalina.util.CustomObjectInputStream; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;


/** /**
* *
Expand Down Expand Up @@ -50,22 +52,29 @@ public String getNodeId() {
return nodeId; return nodeId;
} }


protected byte[] toByteArray(Object value) throws IOException { protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) { if (value == null) {
return null; return null;
} }
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos); ByteBuf buf = encoder.encode(value);
out.writeObject(value); try {
out.flush(); return ByteBufUtil.getBytes(buf);
return bos.toByteArray(); } finally {
buf.release();
}
} }


protected Object toObject(ClassLoader classLoader, byte[] value) throws IOException, ClassNotFoundException { protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) { if (value == null) {
return null; return null;
} }
CustomObjectInputStream in = new CustomObjectInputStream(new ByteArrayInputStream(value), classLoader);
return in.readObject(); ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
} }
} }
Expand Up @@ -17,6 +17,9 @@


import java.io.IOException; import java.io.IOException;


import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
Expand All @@ -30,18 +33,18 @@ public class AttributeUpdateMessage extends AttributeMessage {
public AttributeUpdateMessage() { public AttributeUpdateMessage() {
} }


public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value) throws IOException { public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
this.name = name; this.name = name;
this.value = toByteArray(value); this.value = toByteArray(encoder, value);
} }


public String getName() { public String getName() {
return name; return name;
} }


public Object getValue(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(classLoader, value); return toObject(decoder, value);
} }


} }
Expand Up @@ -20,6 +20,9 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;


import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
Expand All @@ -32,25 +35,25 @@ public class AttributesPutAllMessage extends AttributeMessage {
public AttributesPutAllMessage() { public AttributesPutAllMessage() {
} }


public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs) throws IOException { public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
if (attrs != null) { if (attrs != null) {
this.attrs = new HashMap<String, byte[]>(); this.attrs = new HashMap<String, byte[]>();
for (Entry<String, Object> entry: attrs.entrySet()) { for (Entry<String, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(entry.getValue())); this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
} }
} else { } else {
this.attrs = null; this.attrs = null;
} }
} }


public Map<String, Object> getAttrs(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Map<String, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) { if (attrs == null) {
return null; return null;
} }
Map<String, Object> result = new HashMap<String, Object>(); Map<String, Object> result = new HashMap<String, Object>();
for (Entry<String, byte[]> entry: attrs.entrySet()) { for (Entry<String, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(classLoader, entry.getValue())); result.put(entry.getKey(), toObject(decoder, entry.getValue()));
} }
return result; return result;
} }
Expand Down
Expand Up @@ -187,7 +187,7 @@ protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap
map.put(entry.getKey(), entry.getValue()); map.put(entry.getKey(), entry.getValue());
} }
try { try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map); return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
Expand All @@ -207,7 +207,7 @@ private void fastPut(String name, Object value) {
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value)); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec; import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
Expand Down Expand Up @@ -214,6 +215,16 @@ protected void startInternal() throws LifecycleException {
applicationClassLoader = getClass().getClassLoader(); applicationClassLoader = getClass().getClassLoader();
} }


Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
.newInstance(applicationClassLoader, codec);
} catch (Exception e) {
throw new LifecycleException(e);
}

if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this)); getEngine().getPipeline().addValve(new UpdateValve(this));
} }
Expand All @@ -238,14 +249,14 @@ public void onMessage(CharSequence channel, AttributeMessage msg) {


if (msg instanceof AttributesPutAllMessage) { if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg; AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs(applicationClassLoader).entrySet()) { for (Entry<String, Object> entry : m.getAttrs(codecToUse.getMapValueDecoder()).entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true); session.superSetAttribute(entry.getKey(), entry.getValue(), true);
} }
} }


if (msg instanceof AttributeUpdateMessage) { if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg; AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(applicationClassLoader), true); session.superSetAttribute(m.getName(), m.getValue(codecToUse.getMapValueDecoder()), true);
} }
} }
} catch (Exception e) { } catch (Exception e) {
Expand Down

0 comments on commit ee51283

Please sign in to comment.