Skip to content

siddv29/cfs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

32 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

cfs - Cassandra Fast full table Scan

You can use this utility to scan your table in cassandra database at a very very fast speed.

All you need to do is provide

  • Table identifier
  • Cluster node IP
  • Queue (LinkedBlockingQueue)
  • Configurable options ( values in brackets are default values )
    • Username ( nill )
    • Password ( nill )
    • Consistency Level ( LOCAL_ONE )
    • Number of threads ( 16 )
    • DC Aware Option ( all nodes considered )
    • ColumnNames to be dumped ( * )
    • Fetch Size per page ( 5000 )

How fast are we talking about?

~ 229 million rows128 threads134((18) + 116)1DC, 6 nodes in DC, RF:3
~ 765 million rows128 threads487((16) + 471)3DCs, 3 nodes in concerened DC, RF:1
the time indicated in column is TotalTime((SetupTime) + EffectiveTime).
TotalTime = total time taken by script to dump data.
SetupTime = time taken to instantiate cluster, create sessions, etc.
Thus, EffectiveTime to dump the data id TotalTime-SetupTime
All the indicated numbers denote seconds.

How to use CFS?

Download

You can download the jar from here.
Or, you may download the source file and run mvn clean install to use cfs-1.0-SNAPSHOT-full.jar

Sample Program

package com.cassandra.utility.trial;

import com.cassandra.utility.method1.CassandraFastFullTableScan;
import com.cassandra.utility.method1.Options;
import com.cassandra.utility.method1.RowTerminal;
import com.datastax.driver.core.Row;
import java.io.File;
import java.io.PrintStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {
    public static void main(String... args) throws Exception{
        LinkedBlockingQueue<Row> queue =new LinkedBlockingQueue<Row>();
        
        CassandraFastFullTableScan cfs = 
                new CassandraFastFullTableScan("mykeyspace.table_name",
                        "10.41.55.111",queue,
                        new Options().setUsername("cassandra").setPassword("cassandra"),
                        new PrintStream(new File("/tmp/cfs_round_1.log")));
                        
        CountDownLatch countDownLatch = cfs.start();
        
        new NotifyWhenCFSFinished(countDownLatch).start();
        
        Row row;
        int counter=0;
        while(! ((row = queue.take()) instanceof RowTerminal)){
            System.out.println(++counter+":"+row);
            /*
              you can use row.getString("column1") and so on
            */
        }
    }

    static class NotifyWhenCFSFinished extends Thread{
        CountDownLatch latch;

        public NotifyWhenCFSFinished(CountDownLatch latch) {
            this.latch = latch;
        }

        public void run(){
            System.out.println("Waiting for CFS to complete");
            try{
                latch.await();
            }catch (Exception e1){
                //ignore
            }
            System.out.println("CFS completed");
        }

    }

}

Explanation

Traditional Cassandra Scan

CFS

Upcoming features

  • Finegrain token range for more parallelism, if need be.
  • Custom load balancing policy for better choice of coordinator.(whitelist alone does no good)
  • And a few more.
To work on features, or request some, kindly see Contributing section.

Contributing

Hi, if it interests you,
Kindly go through the code.
If you would like to build it more, drop a mail at sidd.verma29.list@gmail.com
This was prepared ASAP. Would love to collaborate on it, to increase functionality, and a lot lot more.
P.S. I strongly feel, with the right size cluster and a few tweaks, we can reduce the effctive dump time of hundred millions of rows to within 10 seconds.