Skip to content

Commit

Permalink
Streams Compression
Browse files Browse the repository at this point in the history
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3015

git-svn-id: https://svn.apache.org/repos/asf/cassandra/trunk@1164689 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
xedin committed Sep 2, 2011
1 parent 19b8a60 commit 8eb0a93
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -58,6 +58,7 @@
* make the repair of a range repair all replica (CASSANDRA-2610)
* expose the ability to repair the first range (as returned by the
partitioner) of a node (CASSANDRA-2606)
* Streams Compression (CASSANDRA-3015)

0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Expand Down
Binary file added lib/compress-lzf-0.8.4.jar
Binary file not shown.
11 changes: 11 additions & 0 deletions lib/licenses/compress-lzf-0.8.4.txt
@@ -0,0 +1,11 @@
Copyright 2009-2010 Ning, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Expand Up @@ -22,6 +22,7 @@


import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;

import org.apache.cassandra.gms.Gossiper;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void run()
else
{
// streaming connections are per-session and have a fixed version. we can't do anything with a new-version stream connection, so drop it.
logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
logger.error("Received untranslated stream from newer protocol version. Terminating connection!");
}
// We are done with this connection....
return;
Expand Down
15 changes: 10 additions & 5 deletions src/java/org/apache/cassandra/streaming/FileStreamTask.java
Expand Up @@ -18,9 +18,9 @@

package org.apache.cassandra.streaming;

import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand All @@ -40,6 +40,8 @@
import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;

import com.ning.compress.lzf.LZFOutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +60,7 @@ public class FileStreamTask extends WrappedRunnable
// communication socket
private Socket socket;
// socket's output stream
private DataOutputStream output;
private OutputStream output;
// system encryption options if any
private final EncryptionOptions encryptionOptions;
// allocate buffer to use for transfers only once
Expand Down Expand Up @@ -119,7 +121,7 @@ public void runMayThrow() throws IOException
private void stream() throws IOException
{
ByteBuffer HeaderBuffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to));
// write header
// write header (this should not be compressed for compatibility with other messages)
output.write(ByteBufferUtil.getArray(HeaderBuffer));

if (header.file == null)
Expand All @@ -129,6 +131,9 @@ private void stream() throws IOException
? CompressedRandomAccessReader.open(header.file.getFilename(), true)
: RandomAccessReader.open(new File(header.file.getFilename()), CHUNK_SIZE, true);

// setting up data compression stream
output = new LZFOutputStream(output);

try
{
// stream each of the required sections of the file
Expand Down Expand Up @@ -234,12 +239,12 @@ protected void bind() throws IOException
protected void connect() throws IOException
{
socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
output = new DataOutputStream(socket.getOutputStream());
output = socket.getOutputStream();
}

protected void close() throws IOException
{
socket.close();
output.close();
}

public String toString()
Expand Down
Expand Up @@ -23,9 +23,6 @@
import java.net.Socket;
import java.util.Collections;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand All @@ -41,6 +38,11 @@
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;

import com.ning.compress.lzf.LZFInputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncomingStreamReader
{
private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
Expand Down Expand Up @@ -79,7 +81,7 @@ public void read() throws IOException
assert remoteFile.estimatedKeys > 0;
SSTableReader reader = null;
logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
DataInputStream dis = new DataInputStream(socket.getInputStream());
DataInputStream dis = new DataInputStream(new LZFInputStream(socket.getInputStream()));
try
{
reader = streamIn(dis, localFile, remoteFile);
Expand Down

0 comments on commit 8eb0a93

Please sign in to comment.