Skip to content

Commit

Permalink
improve terasort performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Lv, Qi committed Apr 23, 2015
1 parent 7d08778 commit 0e11415
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
8 changes: 7 additions & 1 deletion bin/build-all.sh
Expand Up @@ -31,6 +31,12 @@ cp target/*-jar-with-dependencies.jar jars && \
mvn clean package -D MR1 -D spark1.2 && \
cp jars/*.jar target/ && \
rm -rf jars

result=$?
cd $CURDIR

echo "Build all done!"
if [ $result -ne 0 ]; then
echo "Build failed, please check!"
else
echo "Build all done!"
fi
Expand Up @@ -17,16 +17,20 @@

package com.intel.sparkbench.terasort;

import com.google.common.collect.Ordering;
import com.intel.sparkbench.IOCommon;
import org.apache.hadoop.examples.terasort.TeraInputFormat;
import org.apache.hadoop.examples.terasort.TeraOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.spark.SerializableWritable;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import java.io.Serializable;
import java.util.Comparator;
import java.util.regex.Pattern;

public final class JavaTeraSort {
Expand All @@ -44,19 +48,37 @@ public static void main(String[] args) throws Exception {
JavaPairRDD<Text, Text> lines = ctx.newAPIHadoopFile(args[0], TeraInputFormat.class, Text.class, Text.class, ctx.hadoopConfiguration());
Integer parallel = sparkConf.getInt("spark.default.parallelism", ctx.defaultParallelism());
Integer reducer = Integer.parseInt(IOCommon.getProperty("hibench.default.shuffle.parallelism").get());
JavaPairRDD<String, String> words = lines.mapToPair(new PairFunction<Tuple2<Text, Text>, String, String>() {
JavaPairRDD<byte[], byte[]> words = lines.mapToPair(new PairFunction<Tuple2<Text, Text>, byte[], byte[]>() {
@Override
public Tuple2<String, String> call(Tuple2<Text, Text> e) throws Exception {
return new Tuple2<String, String>(e._1().toString(), e._2().toString());
public Tuple2<byte[], byte[]> call(Tuple2<Text, Text> e) throws Exception {
return new Tuple2<byte[], byte[]>(e._1().getBytes(), e._2().getBytes());
}
});

abstract class InlineComparator implements Comparator<byte[]>, Serializable{

JavaPairRDD<String, String> sorted = words.sortByKey(true, reducer);
JavaPairRDD<Text, Text> result = sorted.mapToPair(new PairFunction<Tuple2<String, String>, Text, Text>() {
}

JavaPairRDD<byte[], byte[]> sorted = words.sortByKey(new InlineComparator() {
@Override
public int compare(byte[] o1, byte[] o2) {
int i;
if (o1==o2) return 0;
if (o1 == null) return -1;
if (o2 == null) return 1;
for (i=0; i< (o1.length<o2.length?o1.length:o2.length); i++){
if (o1[i] < o2[i]) return -1;
else if (o1[i] > o2[i]) return 1;
}
return o1.length - o2.length;
}
}, true, reducer);

JavaPairRDD<Text, Text> result = sorted.mapToPair(new PairFunction<Tuple2<byte[], byte[]>, Text, Text>() {
@Override
public Tuple2<Text, Text> call(Tuple2<String, String> e) throws Exception {
return new Tuple2<Text, Text>(new Text(e._1()), new Text(e._2()));
public Tuple2<Text, Text> call(Tuple2<byte[], byte[]> e) throws Exception {
return new Tuple2<Text, Text>(new Text(e._1()),
new Text(e._2()));
}
});
result.saveAsNewAPIHadoopFile(args[1], Text.class, Text.class, TeraOutputFormat.class);
Expand Down
Expand Up @@ -30,6 +30,7 @@ object ScalaTeraSort {
implicit def rddToSampledOrderedRDDFunctions[K: Ordering : ClassTag, V: ClassTag]
(rdd: RDD[(K, V)]) = new ConfigurableOrderedRDDFunctions[K, V, (K, V)](rdd)

implicit def ArrayByteOrdering: Ordering[Array[Byte]] = Ordering.fromLessThan{case (a, b)=> a.compareTo(b)<0}
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println(
Expand All @@ -42,25 +43,19 @@ object ScalaTeraSort {
val io = new IOCommon(sc)

//val file = io.load[String](args(0), Some("Text"))
val file = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0))
val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map{case (k,v)=>(k.getBytes, v.getBytes)}
val parallel = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)
val reducer = IOCommon.getProperty("hibench.default.shuffle.parallelism")
.getOrElse((parallel / 2).toString).toInt
val data = file.map{item =>
(item._1.toString, item._2.toString)
// (line.substring(0, 10), line.substring(10))
}
.getOrElse((parallel / 2).toString).toInt

val partitioner = new BaseRangePartitioner(partitions = reducer, rdd = data)
val sorted_data = data.sortByKeyWithPartitioner(partitioner = partitioner)
//.map{case (k, v) => k + v}

val seq=sorted_data
.map{case (k,v) => (new Text(k), new Text(v))}
val ordered_data = new ConfigurableOrderedRDDFunctions[Array[Byte], Array[Byte], (Array[Byte], Array[Byte])](data)
val sorted_data = ordered_data.sortByKeyWithPartitioner(partitioner = partitioner).map{case (k, v)=>(new Text(k), new Text(v))}

seq.saveAsNewAPIHadoopFile[TeraOutputFormat](args(1))
sorted_data.saveAsNewAPIHadoopFile[TeraOutputFormat](args(1))
//io.save(args(1), sorted_data)


sc.stop()
}
}

0 comments on commit 0e11415

Please sign in to comment.