Skip to content

Commit

Permalink
Fix for java gzip compression
Browse files Browse the repository at this point in the history
Gzip compression reads bytes ahead. That was causing serialization
bugs when more data available after a object serialized with
compression enabled.

As fix, we are rewinding the stream to correct position so that
next data in the stream can be read correctly.
Thanks @jerrinot for the fix.

fixes hazelcast/hazelcast-enterprise#1750
fixes hazelcast#12104
  • Loading branch information
sancar committed Jan 9, 2018
1 parent 173734d commit 951ad92
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Date;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.Inflater;

import static com.hazelcast.internal.serialization.impl.SerializationConstants.JAVA_DEFAULT_TYPE_BIG_DECIMAL;
import static com.hazelcast.internal.serialization.impl.SerializationConstants.JAVA_DEFAULT_TYPE_BIG_INTEGER;
Expand All @@ -43,6 +44,7 @@
import static com.hazelcast.internal.serialization.impl.SerializationConstants.JAVA_DEFAULT_TYPE_EXTERNALIZABLE;
import static com.hazelcast.internal.serialization.impl.SerializationConstants.JAVA_DEFAULT_TYPE_SERIALIZABLE;
import static com.hazelcast.nio.IOUtil.newObjectInputStream;
import static java.lang.Math.max;


public final class JavaDefaultSerializers {
Expand All @@ -67,12 +69,17 @@ public Externalizable read(final ObjectDataInput in) throws IOException {
final Externalizable ds = ClassLoaderUtil.newInstance(in.getClassLoader(), className);
final ObjectInputStream objectInputStream;
final InputStream inputStream = (InputStream) in;
ExtendedGZipInputStream gZipInputStream = null;
if (gzipEnabled) {
objectInputStream = newObjectInputStream(in.getClassLoader(), new GZIPInputStream(inputStream));
gZipInputStream = new ExtendedGZipInputStream(inputStream);
objectInputStream = newObjectInputStream(in.getClassLoader(), gZipInputStream);
} else {
objectInputStream = newObjectInputStream(in.getClassLoader(), inputStream);
}
ds.readExternal(objectInputStream);
if (gzipEnabled) {
rewindStream(in, gZipInputStream);
}
return ds;
} catch (final Exception e) {
throw new HazelcastSerializationException("Problem while reading Externalizable class: "
Expand Down Expand Up @@ -190,6 +197,7 @@ public void write(final ObjectDataOutput out, final Class obj) throws IOExceptio

public static final class JavaSerializer extends SingletonSerializer<Object> {


private final boolean shared;
private final boolean gzipEnabled;

Expand All @@ -207,8 +215,10 @@ public int getTypeId() {
public Object read(final ObjectDataInput in) throws IOException {
final ObjectInputStream objectInputStream;
final InputStream inputStream = (InputStream) in;
ExtendedGZipInputStream gZipInputStream = null;
if (gzipEnabled) {
objectInputStream = newObjectInputStream(in.getClassLoader(), new GZIPInputStream(inputStream));
gZipInputStream = new ExtendedGZipInputStream(inputStream);
objectInputStream = newObjectInputStream(in.getClassLoader(), gZipInputStream);
} else {
objectInputStream = newObjectInputStream(in.getClassLoader(), inputStream);
}
Expand All @@ -220,6 +230,9 @@ public Object read(final ObjectDataInput in) throws IOException {
} else {
result = objectInputStream.readUnshared();
}
if (gzipEnabled) {
rewindStream(in, gZipInputStream);
}
} catch (ClassNotFoundException e) {
throw new HazelcastSerializationException(e);
}
Expand Down Expand Up @@ -290,4 +303,36 @@ public void destroy() {
private JavaDefaultSerializers() {
}

/**
* Utility class to access internal inflater of GZIPInputStream
*/
private static class ExtendedGZipInputStream extends GZIPInputStream {

static final int GZIP_TRAILER_SIZE = 8;

ExtendedGZipInputStream(InputStream in) throws IOException {
super(in);
}

Inflater getInflater() {
return inf;
}
}

/**
* Gzip input stream reads more bytes in the stream than it actually needs.
* This method adjust the position so that, next data in the stream can be read correctly
*/
private static void rewindStream(ObjectDataInput in, ExtendedGZipInputStream gZipInputStream) {
Inflater inflater = gZipInputStream.getInflater();
int remaining = inflater.getRemaining();
if (in instanceof ByteArrayObjectDataInput) {
ByteArrayObjectDataInput baodi = (ByteArrayObjectDataInput) in;
int position = baodi.position();
int rewindBack = max(0, remaining - ExtendedGZipInputStream.GZIP_TRAILER_SIZE);
int newPosition = position - rewindBack;
baodi.position(newPosition);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.serialization.impl;

import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;

import static org.junit.Assert.assertEquals;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class})
public class CompressionTest {

public static class SampleSerializable implements Serializable {

private int x;

public SampleSerializable() {
}

public SampleSerializable(int x) {
this.x = x;
}

public int getX() {
return x;
}

public void setX(int x) {
this.x = x;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SampleSerializable that = (SampleSerializable) o;

return x == that.x;
}

@Override
public int hashCode() {
return x;
}
}

public static class SampleExternalizable implements Externalizable {

private int x;

public SampleExternalizable() {
}

public SampleExternalizable(int x) {
this.x = x;
}

public int getX() {
return x;
}

public void setX(int x) {
this.x = x;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SampleExternalizable that = (SampleExternalizable) o;

return x == that.x;
}

@Override
public int hashCode() {
return x;
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(x);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
x = in.readInt();
}
}


@Test
public void testCompression_serializable() {
DefaultSerializationServiceBuilder defaultSerializationServiceBuilder = new DefaultSerializationServiceBuilder();
SerializationService ss = defaultSerializationServiceBuilder.setEnableCompression(true).build();

SampleSerializable expected = new SampleSerializable(5);
Data data = ss.toData(expected);
SampleSerializable result = ss.toObject(data);

assertEquals(expected, result);
}

@Test
public void testCompression_serializable_withArrayList() {
DefaultSerializationServiceBuilder defaultSerializationServiceBuilder = new DefaultSerializationServiceBuilder();
SerializationService ss = defaultSerializationServiceBuilder.setEnableCompression(true).build();

ArrayList<SampleSerializable> expected = new ArrayList<SampleSerializable>();
for (int i = 0; i < 10; i++) {
expected.add(new SampleSerializable(i));
}
Data data = ss.toData(expected);
ArrayList<SampleSerializable> result = ss.toObject(data);

assertEquals(expected, result);
}

@Test
public void testCompression_externalizable() {
DefaultSerializationServiceBuilder defaultSerializationServiceBuilder = new DefaultSerializationServiceBuilder();
SerializationService ss = defaultSerializationServiceBuilder.setEnableCompression(true).build();

SampleExternalizable expected = new SampleExternalizable(5);
Data data = ss.toData(expected);
SampleExternalizable result = ss.toObject(data);

assertEquals(expected, result);
}

@Test
public void testCompression_externalizable_withArrayList() {
DefaultSerializationServiceBuilder defaultSerializationServiceBuilder = new DefaultSerializationServiceBuilder();
SerializationService ss = defaultSerializationServiceBuilder.setEnableCompression(true).build();

ArrayList<SampleExternalizable> expected = new ArrayList<SampleExternalizable>();
for (int i = 0; i < 10; i++) {
expected.add(new SampleExternalizable(i));
}
Data data = ss.toData(expected);
ArrayList<SampleExternalizable> result = ss.toObject(data);

assertEquals(expected, result);
}

}

0 comments on commit 951ad92

Please sign in to comment.