# Benchmark

The benchmark setup used a 1,000-node Accumulo 2.0.0 Cluster (16,000 cores) running and a 256-node Spark 2.4.3 cluster (4,096 cores). All nodes used [Azure D16s_v3](https://docs.microsoft.com/en-us/azure/virtual-machines/windows/sizes-general) (16 cores) virtual machines.

In all experiments we use the same base dataset which is a collection of Twitter user tweets with labeled sentiment value. This dataset is known as the Sentiment140 dataset ([Go, Bhayani, & Huang, 2009](http://www-nlp.stanford.edu/courses/cs224n/2009/fp/3.pdf)). The training data consist of 1.6M samples of tweets, where each tweet has columns indicating the sentiment label, user, timestamp, query term, and text. The text is limited to 140 characters and the overall uncompressed size of the training dataset is 227MB.

| sentiment | id | date | query_string | user | text |
| --- | --- | --- | --- | --- | --- |
|0|1467810369|Mon Apr 06 22:19:...|    NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|0|1467810672|Mon Apr 06 22:19:...|    NO_QUERY|  scotthamilton|is upset that he ...|
|0|1467810917|Mon Apr 06 22:19:...|    NO_QUERY|       mattycus|@Kenichan I dived...|

To evaluate different table sizes and the impact of splitting the following procedure was used to generate the Accumulo tables:

- Prefix id with split keys (e.g. 0000, 0001, ..., 1024)
- Create Accumulo table and configure splits
- Upload prefixed data to Accumulo using Spark and the MASC writer 
- Duplicate data using custom Accumulo server-side iterator
- Validate data partitioning

A common machine learning scenario was evaluated using a sentiment model trained using [SparkML](https://spark.apache.org/docs/latest/ml-guide.html). 
To train the classification model, we generated feature vectors from the text of tweets (text column). We used a feature engineering pipeline (a.k.a. featurizer) that breaks the text into tokens, splitting on whitespaces and discarding any capitalization and non-alphabetical characters. The pipeline consisted of 

- Regex Tokenizer
- Hashing Transformer
- Logistic Regression

To run the notebook, you need to first download `protobuf-java-3.5.1.jar` from [here](https://search.maven.org/artifact/com.google.protobuf/protobuf-java/3.5.1/bundle) and include the jar file in `/home/rba1/.m2/repository/com/google/protobuf/protobuf-java/3.5.1/` folder. Then, start a toree kernel

```bash
JAR="file:///home/rba1/webscale-ai-test/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar"
jupyter toree install \
    --replace \
    --user \
    --kernel_name=accumulo \
    --spark_home=${SPARK_HOME} \
    --spark_opts="--master yarn --jars $JAR \
        --packages org.apache.spark:spark-avro_2.11:2.4.3,ml.combust.mleap:mleap-spark_2.11:0.14.0 \
        --driver-memory 16g \
        --executor-memory 12g \
        --driver-cores 4 \
        --executor-cores 4 \
        --num-executors 256"
```

In [1]:
val dataUrl = "https://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip"
val dataDir = "data"
val dataFilename = "training.1600000.processed.noemoticon.csv"

val splitSizes = Array(2,     4,    12,    102,
                       16,    32,   128,   1024,
                       160,   320,   1280,  10240, 
                       1600,  3200,  12800, 
                       16384)

val dataSizes = Array("100GB", "100GB", "100GB", "100GB",
                      "1T", "1T", "1T", "1T",
                      "10T", "10T", "10T", "10T",
                      "100T", "100T", "100T",
                      "1PB")

val dataGB = Array(100, 100, 100, 100,              // 100GB
                   1024, 1024, 1024, 1024,          // 1TB
                   10240, 10240, 10240, 10240,      // 10TB
                   102400, 102400, 102400,          // 100TB
                   1024*1024)                       // 1PB

import sys.process._
import java.net.URL
import java.io._
import java.util.zip.{GZIPOutputStream, ZipFile}
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import scala.collection.JavaConverters._
import scala.io.Source
import org.apache.accumulo.core.client.Accumulo
import org.apache.hadoop.io.Text
import java.util.Calendar
import java.util.Base64
import java.net.URL
import com.google.common.io.Resources

dataUrl = https://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip
dataDir = data
dataFilename = training.1600000.processed.noemoticon.csv
splitSizes = Array(2, 4, 12, 102, 16, 32, 128, 1024, 160, 320, 1280, 10240, 1600, 3200, 12800, 16384)
dataSizes = Array(100GB, 100GB, 100GB, 100GB, 1T, 1T, 1T, 1T, 10T, 10T, 10T, 10T, 100T, 100T, 100T, 1PB)
dataGB = Array(100, 100, 100, 100, 1024, 1024, 1024, 1024, 10240, 10240, 10240, 10240, 102400, 102400, 102400, 1048576)


Array(100, 100, 100, 100, 1024, 1024, 1024, 1024, 10240, 10240, 10240, 10240, 102400, 102400, 102400, 1048576)

## Download Twitter data and unzip

In [3]:
// create output directory
new File(dataDir).mkdirs

lazy val tmpZipFile = Paths.get(dataDir, "tmp.zip").toFile

new URL(dataUrl) #> tmpZipFile !!

lazy val zipFile = new ZipFile(tmpZipFile)
for (entry <- zipFile.entries.asScala)
  Files.copy(zipFile.getInputStream(entry), 
             Paths.get(dataDir).resolve(entry.getName),
             StandardCopyOption.REPLACE_EXISTING)

tmpZipFile = <lazy>
zipFile = <lazy>




<lazy>

In [4]:
def getTrainData() = Source.fromFile(new File(dataDir, dataFilename), "ISO-8859-1").getLines

val trainDataSize = getTrainData.size

// Show input data header
for (line <- getTrainData.take(5)) {
    println(line)
}

"0","1467810369","Mon Apr 06 22:19:45 PDT 2009","NO_QUERY","_TheSpecialOne_","@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D"
"0","1467810672","Mon Apr 06 22:19:49 PDT 2009","NO_QUERY","scotthamilton","is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!"
"0","1467810917","Mon Apr 06 22:19:53 PDT 2009","NO_QUERY","mattycus","@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds"
"0","1467811184","Mon Apr 06 22:19:57 PDT 2009","NO_QUERY","ElleCTF","my whole body feels itchy and like its on fire "
"0","1467811193","Mon Apr 06 22:19:57 PDT 2009","NO_QUERY","Karoli","@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. "


trainDataSize = 1600000


getTrainData: ()Iterator[String]


1600000

## Prepare Twitter Data with Row ID Prefixes

With the prefixes, we can easily split Accumulo tables used for holding replicated twitter data and speed up the process of writing data into these tables.

In [14]:
def prepareData(splits: Int) {
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.conf.Configuration;

    val conf = new Configuration()
    val fs = FileSystem.get(conf)
    
    val divider = Math.ceil(trainDataSize / splits.toDouble).toInt

    val digits = (Math.log10(splits) + 1).toInt

    val outputFilename = s"sentiment140_prefix_${splits}.csv.gz"
    
    println(s"Creating ${outputFilename}...")
    
    val output = new PrintWriter(
        new GZIPOutputStream(
             fs.create(new org.apache.hadoop.fs.Path(outputFilename))))

    var idx = 0
    for (line <- getTrainData) {
        // it's a bit crude, but fast
        val sep = line.indexOf("\",\"") + 3
        val f0 = line.substring(0, sep)
        val f1 = line.substring(sep)

        output.print(f0)
        output.print(s"%0${digits}d-".format(idx / divider))
        output.println(f1)

        idx += 1
    }

    output.close
}

for (s <- splitSizes)
    prepareData(s)

Creating sentiment140_prefix_2.csv.gz...
Creating sentiment140_prefix_4.csv.gz...
Creating sentiment140_prefix_12.csv.gz...
Creating sentiment140_prefix_102.csv.gz...
Creating sentiment140_prefix_16.csv.gz...
Creating sentiment140_prefix_32.csv.gz...
Creating sentiment140_prefix_128.csv.gz...
Creating sentiment140_prefix_1024.csv.gz...
Creating sentiment140_prefix_160.csv.gz...
Creating sentiment140_prefix_320.csv.gz...
Creating sentiment140_prefix_1280.csv.gz...
Creating sentiment140_prefix_10240.csv.gz...
Creating sentiment140_prefix_1600.csv.gz...
Creating sentiment140_prefix_3200.csv.gz...
Creating sentiment140_prefix_12800.csv.gz...
Creating sentiment140_prefix_16384.csv.gz...


prepareData: (splits: Int)Unit


## Load data to Accumulo

In [None]:
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
conf.setAppName("AccumuloBenchmark")
conf.set("spark.driver.maxResultSize", "3g")

new SparkContext(conf)

println("Spark version %s".format(sc.version))
println("Scala %s".format(util.Properties.versionString))
println
sc.getConf.getAll.foreach(println)

In [7]:
import org.apache.spark.sql.types.{LongType, DoubleType, StringType, StructField, StructType}
import scala.collection.JavaConverters._

// client property file path
val PROPS_PATH = "/home/centos/webscale-ai-test/conf/accumulo-client.properties"

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schema = StructType(Array(
    StructField("sentiment", StringType),
    StructField("prefix", StringType),
    StructField("date", StringType),
    StructField("query_string", StringType),
    StructField("user", StringType),
    StructField("text", StringType)
))

PROPS_PATH = /home/centos/webscale-ai-test/conf/accumulo-client.properties
sqlContext = org.apache.spark.sql.SQLContext@5b860512
schema = StructType(StructField(sentiment,StringType,true), StructField(prefix,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))




StructType(StructField(sentiment,StringType,true), StructField(prefix,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))

In [17]:
val split = 4
val df = spark.read
    .schema(schema)
    .csv(s"sentiment140_prefix_${split}.csv.gz")
df.show(10)

+---------+------------+--------------------+------------+---------------+--------------------+
|sentiment|      prefix|                date|query_string|           user|                text|
+---------+------------+--------------------+------------+---------------+--------------------+
|        0|0-1467810369|Mon Apr 06 22:19:...|    NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|        0|0-1467810672|Mon Apr 06 22:19:...|    NO_QUERY|  scotthamilton|is upset that he ...|
|        0|0-1467810917|Mon Apr 06 22:19:...|    NO_QUERY|       mattycus|@Kenichan I dived...|
|        0|0-1467811184|Mon Apr 06 22:19:...|    NO_QUERY|        ElleCTF|my whole body fee...|
|        0|0-1467811193|Mon Apr 06 22:19:...|    NO_QUERY|         Karoli|@nationwideclass ...|
|        0|0-1467811372|Mon Apr 06 22:20:...|    NO_QUERY|       joy_wolf|@Kwesidei not the...|
|        0|0-1467811592|Mon Apr 06 22:20:...|    NO_QUERY|        mybirch|         Need a hug |
|        0|0-1467811594|Mon Apr 06 22:20

split = 4
df = [sentiment: string, prefix: string ... 4 more fields]


[sentiment: string, prefix: string ... 4 more fields]

## Create pre-split Accumulo tables and upload prefixed data

In [22]:
val client = Accumulo.newClient().from(PROPS_PATH).build()

client = org.apache.accumulo.core.clientImpl.ClientContext@559a972f


org.apache.accumulo.core.clientImpl.ClientContext@559a972f

In [None]:
for ( (split, label) <- splitSizes.zip(dataSizes)) {
    val tableName = s"twitter_${split}_${label}"
    val df = spark.read
                  .schema(schema)
                  .csv(s"sentiment140_prefix_${split}.csv.gz")
                  .repartition(128)
                  .cache()

    val splitValues = df.selectExpr("split(prefix, '_')[0]").distinct().collect()
    val splits = new java.util.TreeSet(
                splitValues
                  .map { _(0).toString }
                  .sorted
                  .drop(1) // exclude the first partion as it's an upper bound
                  .map(new Text(_))
                  .toSeq
                  .asJava)

    val now = Calendar.getInstance().getTime()
    
    println(s"${now} | number of splits: ${splits.size()} for table ${tableName} ${splits.first()} to ${splits.last()}")
    // delete if exists
    // if (client.tableOperations.exists(tableName))
    //    client.tableOperations.delete(tableName)
    
    // re-create
    client.tableOperations.create(tableName)
    
    // add the splits
    client.tableOperations.addSplits(tableName, splits)

    val props = Accumulo.newClientProperties().from(PROPS_PATH).build().asScala

    props.put("rowkey", "prefix")
    props.put("table", tableName)

    df.write.format("com.microsoft.accumulo").options(props).save()
}

## Duplicate data in Accumulo
As it's easier to parallelize and monitor we're running the duplication from the command line.

### Script for a single table

Create the following script file

duplicate-twitter-data.sh

```bash
#!/bin/sh
export TABLE=$1
export DUPS=$2
export ASHELL='/opt/muchos/install/accumulo-2.0.0/bin/accumulo shell -u root -p secret'

{
printf "_\n$DUPS\n" | $ASHELL -e "setiter -n dup -class org.apache.accumulo.iterator.DuplicationIterator -p 10 -majc -t $TABLE"

$ASHELL -e "listiter -t $TABLE -majc"
time $ASHELL -e "compact -w -t $TABLE"

$ASHELL -e "deleteiter -n dup -majc -t $TABLE"
} > twitter_${TABLE}_${DUPS}.stdout.log 2>twitter_${TABLE}_${DUPS}.stderr.log
```

Execute the following commands

In [21]:
// generate calls to duplicate all table/settings
for ( ((split, label), size) <- splitSizes.zip(dataSizes).zip(dataGB)) {
    val dataSizeInMB = new File(dataDir, dataFilename).length / (1024 * 1024)
    
    val sourceSize = dataSizeInMB / 1024.0
    val dup = Math.ceil(size / sourceSize).toInt
    println(s"./duplicate-twitter-data.sh twitter_${split}_${label} ${dup} &")
}

./duplicate-twitter-data.sh twitter_2_100GB 452 &
./duplicate-twitter-data.sh twitter_4_100GB 452 &
./duplicate-twitter-data.sh twitter_12_100GB 452 &
./duplicate-twitter-data.sh twitter_102_100GB 452 &
./duplicate-twitter-data.sh twitter_16_1T 4620 &
./duplicate-twitter-data.sh twitter_32_1T 4620 &
./duplicate-twitter-data.sh twitter_128_1T 4620 &
./duplicate-twitter-data.sh twitter_1024_1T 4620 &
./duplicate-twitter-data.sh twitter_160_10T 46193 &
./duplicate-twitter-data.sh twitter_320_10T 46193 &
./duplicate-twitter-data.sh twitter_1280_10T 46193 &
./duplicate-twitter-data.sh twitter_10240_10T 46193 &
./duplicate-twitter-data.sh twitter_1600_100T 461928 &
./duplicate-twitter-data.sh twitter_3200_100T 461928 &
./duplicate-twitter-data.sh twitter_12800_100T 461928 &
./duplicate-twitter-data.sh twitter_16384_1PB 4730141 &


## Train Model

In [28]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

// some data prep (shuffle + label creation)
var train_df = spark.read.schema(schema).csv("sentiment140_prefix_2.csv.gz")
    .orderBy(rand(42))
    .withColumn("label", when($"sentiment".cast(IntegerType) > 0, 1.0).otherwise(0.0))
    .cache()
    
train_df.show()

+---------+------------+--------------------+------------+---------------+--------------------+-----+
|sentiment|      prefix|                date|query_string|           user|                text|label|
+---------+------------+--------------------+------------+---------------+--------------------+-----+
|        0|0-1981018684|Sun May 31 08:13:...|    NO_QUERY|     sherilynne|Drinking coffee a...|  0.0|
|        0|0-2013384590|Tue Jun 02 22:32:...|    NO_QUERY|  hannahjarin29|home. sleep. I lo...|  0.0|
|        0|0-2183989225|Mon Jun 15 15:15:...|    NO_QUERY|     ariannexxx|@wilsonswar I hop...|  0.0|
|        0|0-2192370750|Tue Jun 16 06:56:...|    NO_QUERY|     briancbray|I went from havin...|  0.0|
|        4|1-2067142866|Sun Jun 07 11:49:...|    NO_QUERY| AaronxFlavored|Deep fried Cajun ...|  1.0|
|        0|0-2017827506|Wed Jun 03 09:00:...|    NO_QUERY|        ruthijo|@westernslopetix ...|  0.0|
|        0|0-2058667150|Sat Jun 06 15:23:...|    NO_QUERY|      Tried100X|The Poli

train_df = [sentiment: string, prefix: string ... 5 more fields]


[sentiment: string, prefix: string ... 5 more fields]

### Feature Engineering and Model Training

In [29]:
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, HashingTF}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.Pipeline
import scala.math.pow

val tokenizer = new RegexTokenizer()
  .setGaps(false)
  .setPattern("\\p{L}+")
  .setInputCol("text")
  .setOutputCol("words")

val hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("features")
  .setNumFeatures(pow(2, 18).toInt)

val lr = new LogisticRegression()
  .setMaxIter(1)
  .setRegParam(0.2)
  .setElasticNetParam(0.0)

tokenizer = regexTok_9452d8f54c06
hashingTF = hashingTF_890807d8c228
lr = logreg_5944f231dbd7


logreg_5944f231dbd7

### Fit model

In [30]:
val lr_pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

var t0 = System.nanoTime()

val lrModel = lr_pipeline.fit(train_df)

var t1 = System.nanoTime()
val train_time = (t1 - t0)*1e-9
println("Time to train lr model: " + train_time + "s")

Time to train lr model: 19.738933929s


lr_pipeline = pipeline_d15a9ccfcd10
t0 = 6688427813206030
lrModel = pipeline_d15a9ccfcd10
t1 = 6688447552139959
train_time = 19.738933929


19.738933929

### Serialize Model using MLeap

In [31]:
// MLeap/Bundle.ML Serialization Libraries
import ml.combust.mleap.spark.SparkSupport._
import ml.combust.bundle.BundleFile
import org.apache.spark.ml.bundle.SparkBundleContext
import resource._
import java.io.File

val train_df_persist = train_df.persist()

// Serialize model pipeline to bundle.ml
val sbc = SparkBundleContext().withDataset(lrModel.transform(train_df_persist))

val bundleFilePath = "/tmp/twitter.model.lr.zip"
val fileObj = new File(bundleFilePath)
if (fileObj.exists()){
    fileObj.delete()
    println("Deleted an existed model bundle file.")
}
for(bf <- managed(BundleFile("jar:file:" + bundleFilePath))) {
    lrModel.writeBundle.save(bf)(sbc).get
    println("Saved model to a bundle file.")
}

Saved model to a bundle file.


train_df_persist = [sentiment: string, prefix: string ... 5 more fields]
sbc = SparkBundleContext(Some([sentiment: string, prefix: string ... 10 more fields]),BundleRegistry(scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@101bd174))
bundleFilePath = /tmp/twitter.model.lr.zip
fileObj = /tmp/twitter.model.lr.zip


/tmp/twitter.model.lr.zip

# Evaluate SparkML inference perfomance

In [None]:
lazy val numQueryThreads = "8"

for (maxPartitions <- Array("20480")) {
    for ((split, size) <- splitSizes.zip(dataSizes)) {
        val TEST_TABLE_NAME = s"twitter_${split}_${size}"

        println(s"LOG: $size with $split splits for table $TEST_TABLE_NAME using $numQueryThreads threads and $maxPartitions spark partitions")

        val props = Accumulo.newClientProperties().from(PROPS_PATH).build()
        props.put("table", TEST_TABLE_NAME)
        props.put("rowKey", "id")
        props.put("maxPartitions", maxPartitions)
        props.put("numQueryThreads", numQueryThreads)

        {
            var t0 = System.nanoTime()

            val df = spark.read
                        .format("com.microsoft.accumulo")
                        .options(props.asScala)
                        .schema(schema)
                        .load()

            val cnt = lrModel
                .transform(df)
                .filter("prediction > 0.9")
                .count()

            val time = (System.nanoTime() - t0)*1e-9

            println(s"DATA-INFER-0.9\t$size\t$split\t$numQueryThreads\t$time\t$cnt\t$maxPartitions")
        }
    }
}

# Evaluate Accumulo server-side inference performance

In [35]:
lazy val mleapBundle = Resources.toByteArray(new URL("file:///tmp/twitter.model.lr.zip"))
lazy val mleapBundleBase64 = Base64.getEncoder().encodeToString(mleapBundle)

mleapBundle = <lazy>
mleapBundleBase64 = <lazy>


<lazy>

In [37]:
lazy val numQueryThreads = "8"

for ((split, size) <- splitSizes.zip(dataSizes)) {
{
    val TEST_TABLE_NAME = s"twitter_${split}_${size}"
    
    println(s"LOG: $size with $split splits for table $TEST_TABLE_NAME")
    
    val props = Accumulo.newClientProperties().from(PROPS_PATH).build()
    props.put("table", TEST_TABLE_NAME)
    props.put("rowKey", "id")
    // override the 200 default so it scales to the full number of executors we have
    props.put("maxPartitions", "2048")
    props.put("numQueryThreads", numQueryThreads)
        
    // count the data
    {
        var t0 = System.nanoTime()
    
        var cnt = spark.read
                    .format("com.microsoft.accumulo")
                    .options(props.asScala)
                    .schema(schema)
                    .load()
                    .count()

        val time = (System.nanoTime() - t0)*1e-9

        println(s"DATA-COUNT: $size,$split,$numQueryThreads,$time,$cnt")
    }
    
    // server-side inference with 30% data transfer
    {
        var t0 = System.nanoTime()
    
        props.put("mleap", mleapBundleBase64)
        props.put("mleapfilter", "${prediction > .9}")
        
        var cnt = spark.read
                    .format("com.microsoft.accumulo")
                    .options(props.asScala)
                    .schema(schema)
                    .load()
                    .count()

        val time = (System.nanoTime() - t0)*1e-9

        println(s"DATA-INFER-0.9: $size,$split,$numQueryThreads,$time,$cnt")
    }
    
    // server-side inference with no data transfer
    {
        var t0 = System.nanoTime()
    
        props.put("mleap", mleapBundleBase64)
        props.put("mleapfilter", "${false}")
        
        var cnt = spark.read
                    .format("com.microsoft.accumulo")
                    .options(props.asScala)
                    .schema(schema)
                    .load()
                    .count()

        val time = (System.nanoTime() - t0)*1e-9

        println(s"DATA-INFER-0.0: $size,$split,$numQueryThreads,$time,$cnt")
    }
}

Name: Syntax Error.
Message: 
StackTrace: 