# Readme

Reading the blog [Using Spark's cache for correctness, not just performance](http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/) and trying the code below.  
You can try this notebook on IBM **Bluemix** using [IBM Starter for Apache Spark](https://console.ng.bluemix.net/catalog/apache-spark-starter/).  

In [7]:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.util.Random

// start with a sequence of 10,000 zeros
val zeros = Seq.fill(10000)(0)

// create a RDD from the sequence, and replace all zeros with random values
val randomRDD = sc.parallelize(zeros).map(x=>Random.nextInt())

// filter out all non-positive values, roughly half the set
val filteredRDD = randomRDD.filter(x=>x>0)

// count the number of elements that remain, twice
val count1 = filteredRDD.count()
val count2 = filteredRDD.count()

println(s"Surprise! count1: ${count1}, count2: ${count2}")

Surprise! count1: 4863, count2: 4953


## Would you expect that `count1` and `count2` are equals, right?  

Well, that rarely happens because the `map` and `filter` are **transformations** and `count` is an **action**, which
means that any time `count` is called, the entire pipeline of transformations is executed again. So, the `randomRDD`
changes and so does `filteredRDD`.  

## So, what?  

When you are in these situations, you should `cache (or persist)` RDD, so [Using Spark's cache for correctness, not just performance](http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/).  

In [8]:
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.util.Random

// start with a sequence of 10,000 zeros
val zeros = Seq.fill(10000)(0)

// create a RDD from the sequence, and replace all zeros with random values
val randomRDD = sc.parallelize(zeros).map(x=>Random.nextInt())

// filter out all non-positive values, roughly half the set
val filteredRDD = randomRDD.filter(x=>x>0).persist(StorageLevel.MEMORY_AND_DISK)

// count the number of elements that remain, twice
val count1 = filteredRDD.count()
val count2 = filteredRDD.count()

println(s"No alarms and no surprises! count1: ${count1}, count2: ${count2}")

No alarms and no surprises! count1: 5035, count2: 5035
