diff --git a/CHANGES.txt b/CHANGES.txt index d61609c9..7c12a5fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -58,6 +58,7 @@ * make the repair of a range repair all replica (CASSANDRA-2610) * expose the ability to repair the first range (as returned by the partitioner) of a node (CASSANDRA-2606) + * Streams Compression (CASSANDRA-3015) 0.8.5 * fix NPE when encryption_options is unspecified (CASSANDRA-3007) diff --git a/lib/compress-lzf-0.8.4.jar b/lib/compress-lzf-0.8.4.jar new file mode 100644 index 00000000..a712c248 Binary files /dev/null and b/lib/compress-lzf-0.8.4.jar differ diff --git a/lib/licenses/compress-lzf-0.8.4.txt b/lib/licenses/compress-lzf-0.8.4.txt new file mode 100755 index 00000000..1e22f87c --- /dev/null +++ b/lib/licenses/compress-lzf-0.8.4.txt @@ -0,0 +1,11 @@ +Copyright 2009-2010 Ning, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you may not +use this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS,WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations under +the License. \ No newline at end of file diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index d4c1244a..86ffbe14 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -22,6 +22,7 @@ import java.io.*; +import java.net.InetSocketAddress; import java.net.Socket; import org.apache.cassandra.gms.Gossiper; @@ -78,7 +79,7 @@ public void run() else { // streaming connections are per-session and have a fixed version. we can't do anything with a new-version stream connection, so drop it. - logger.error("Received untranslated stream from newer protcol version. Terminating connection!"); + logger.error("Received untranslated stream from newer protocol version. Terminating connection!"); } // We are done with this connection.... return; diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index a6611815..6e6e63d5 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -18,9 +18,9 @@ package org.apache.cassandra.streaming; -import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -40,6 +40,8 @@ import org.apache.cassandra.utils.Throttle; import org.apache.cassandra.utils.WrappedRunnable; +import com.ning.compress.lzf.LZFOutputStream; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +60,7 @@ public class FileStreamTask extends WrappedRunnable // communication socket private Socket socket; // socket's output stream - private DataOutputStream output; + private OutputStream output; // system encryption options if any private final EncryptionOptions encryptionOptions; // allocate buffer to use for transfers only once @@ -119,7 +121,7 @@ public void runMayThrow() throws IOException private void stream() throws IOException { ByteBuffer HeaderBuffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to)); - // write header + // write header (this should not be compressed for compatibility with other messages) output.write(ByteBufferUtil.getArray(HeaderBuffer)); if (header.file == null) @@ -129,6 +131,9 @@ private void stream() throws IOException ? CompressedRandomAccessReader.open(header.file.getFilename(), true) : RandomAccessReader.open(new File(header.file.getFilename()), CHUNK_SIZE, true); + // setting up data compression stream + output = new LZFOutputStream(output); + try { // stream each of the required sections of the file @@ -234,12 +239,12 @@ protected void bind() throws IOException protected void connect() throws IOException { socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); - output = new DataOutputStream(socket.getOutputStream()); + output = socket.getOutputStream(); } protected void close() throws IOException { - socket.close(); + output.close(); } public String toString() diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index 2688792b..a598587e 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -23,9 +23,6 @@ import java.net.Socket; import java.util.Collections; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; @@ -41,6 +38,11 @@ import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; +import com.ning.compress.lzf.LZFInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class IncomingStreamReader { private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class); @@ -79,7 +81,7 @@ public void read() throws IOException assert remoteFile.estimatedKeys > 0; SSTableReader reader = null; logger.debug("Estimated keys {}", remoteFile.estimatedKeys); - DataInputStream dis = new DataInputStream(socket.getInputStream()); + DataInputStream dis = new DataInputStream(new LZFInputStream(socket.getInputStream())); try { reader = streamIn(dis, localFile, remoteFile);