# Chapter 12. Resilient Distributed Datasets

- There are times when higher-level manipulation will not meet the business or engineering problem you are trying to solve.
- For those cases, you might need to use Spark's lower-level APIs, specifically RDD, the SparkContext, and distributed shared variables like accumulators and broadcast variables.

# What are the Low-Level APIs?
- manipulating distributed data(RDDs)
- distributing and manipulating distributed ahred variables (broadcast variables and accumulators)

## When to Use the Low-Level APIs?
- You need some functionality that you cannot find in the higher-level APIs; for example, if you need very tight control over physical data placement across the cluster.
- You need to maintain some legacy codebase written using RDDs.
- You need to do some custom shared variable manipulation. We will discuss shared variables more in Chapter 14.

## How to Use the Low-Level APIs?
- SparkContext is the entry point for low-level API functionality.
    - You can access it through SparkSession

In [3]:
spark.sparkContext

res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2b86fe63


# About RDDs
- Virtually all Spark code you run, whether DataFrame or Datasets, complies down to an RDD.
- RDD represents an immutable, partitioned collection of records that can be operated on in parallel.
- In RDDs the records are just Java, Scala, or Python objects of the programmer's choosing
    - You can store anything you want in these objects, in any format you want and this gives you great power, but not without potential issues.

## Types of RDDs
- As a user, you will likely only be creating two types of RDDs: the "generic" RDD type or a key-value RDD that provides additional functions, such as aggregating by key.
- Internally, each RDD is characterized by five amin properties:
    - a list of partitions
    - a function for computing each split
    - a list of dependencies on other RDDs
    - Optionally, a Partitionaer for key-value RDDs(e.g., to say that the RDD is hash-partitioned)
    - Optionally, a list of preferred locations on which to compute each split(e.g., block locations for a Hadoop Distributed File System file)

- There is no concept of "rows" in RDDs; individual records are just raw Java/Scala/Python objects, and you manipulate those manually instead of tapping into the repository of functions that you have in the strucured APIs.

- For Scala and Java, the performance is for the most part the same, the large costs incurred in manipulating the raw objects.
- Python, however, can lose a substantial amount of performance when using RRDs.
    - Running Python RDDs equates to running Python user-defined functions(UDFs) row by row.
    - This causes a high overhead for Python RDD manipulations.


## When to Use RDDs?
- In general, you should not manually create RDDs unless you have a very, very specific reason for doing so.
    - For the vast majority of use cases, DataFrames will be more efficient, more stable, and more expressive than RDDs.
- The most likely reason for why you'll want to use RDDs is because you need fine-grained control over the physical distribution of data (custom partitioning of data).

## Datasets and RDDs of Case Classes
- What is the difference between RDDs of Case Classes and Datasets?
    - The difference is that Datasets can still take advantage of the wealth of functions and optimizations that the Structured APIs have to offer.

# Creating RDDs

## Interoperating Between DataFrames, Datasets, and RDDs
- One of the easiest ways to get RDDs is from an existing DataFrame or Dataset.
    - Converting these to an RDD is simple: just use `rdd` method on any of these data type

In [4]:
//converts a Dataset[Long] to RDD[Long]
spark.range(500).rdd

res3: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at rdd at <console>:26


In [5]:
//create DataFrame or Dataset from an RDD
spark.range(10).rdd.toDF()

res4: org.apache.spark.sql.DataFrame = [value: bigint]


## From a Local Collection
- To create an RDD from a collection, you will need to use the `parallelize` method on a `SparkContext`.
    - This turns a single node collection into a parallel collection

In [6]:
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)

myCollection: Array[String] = Array(Spark, The, Definitive, Guide, :, Big, Data, Processing, Made, Simple)
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24


In [7]:
words.setName("myWords")
words.name

res5: String = myWords


## From Data Sources
- It's often preferable to use the Data Sources APIs.

In [8]:
spark.sparkContext.textFile("/some/path/withTextFiles")

res6: org.apache.spark.rdd.RDD[String] = /some/path/withTextFiles MapPartitionsRDD[14] at textFile at <console>:25


- This creates an RDD for which each record in the RDD represents a line in that text file or files.
- Alternatively, you can read in data for which each text file should become a single record.
    - The use case here would be where each file is a file that consists of a large JSON object or some document that you will operate on as an individual:

In [9]:
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")

res7: org.apache.spark.rdd.RDD[(String, String)] = /some/path/withTextFiles MapPartitionsRDD[16] at wholeTextFiles at <console>:25


# Manipulating RDDs

# Transformations
- Just as you do with DataFrames and Datasets, you specify transformations on one RDD to create another.

## distinct

In [10]:
words.distinct().count()

res8: Long = 10


## filter

In [11]:
def startsWithS(individual:String) ={
    individual.startsWith("S")
}

startsWithS: (individual: String)Boolean


In [12]:
words.filter(word => startsWithS(word)).collect()

java.lang.InternalError:  java.lang.IllegalAccessException: final field has no write access: $Lambda$3732/0x0000000801ff2898.arg$1/putField, from class java.lang.Object (module java.base)

## map

In [13]:
val words2 = words.map(word => (word, word(0), word.startsWith("S")))

words2: org.apache.spark.rdd.RDD[(String, Char, Boolean)] = MapPartitionsRDD[20] at map at <console>:24


In [14]:
words2.filter(record => record._3).take(5)

res10: Array[(String, Char, Boolean)] = Array((Spark,S,true), (Simple,S,true))


## flatMap

In [15]:
words.flatMap(word => word.toSeq).take(5)

res11: Array[Char] = Array(S, p, a, r, k)


## sort

In [16]:
words.sortBy(word => word.length() * -1).take(2)

res12: Array[String] = Array(Definitive, Processing)


## Random Splits

In [17]:
val fiftyFifytySplit = words.randomSplit(Array[Double](0.5, 0.5))

fiftyFifytySplit: Array[org.apache.spark.rdd.RDD[String]] = Array(MapPartitionsRDD[28] at randomSplit at <console>:24, MapPartitionsRDD[29] at randomSplit at <console>:24)


# Actions
- Actions either collect data to the driver or write to an external data source.

## reduce

In [18]:
spark.sparkContext.parallelize(1 to 20).reduce(_ + _)

res13: Int = 210


In [19]:
def wordLengthReducer(leftWord:String, rightWord:String): String = {
    if (leftWord.length > rightWord.length)
        return leftWord
    else
        return rightWord
}

words.reduce(wordLengthReducer)

java.lang.InternalError:  java.lang.IllegalAccessException: final field has no write access: $Lambda$3924/0x0000000802095050.arg$1/putField, from class java.lang.Object (module java.base)

## count

In [20]:
words.count()

res15: Long = 10


## countApprox

In [21]:
val confidence = 0.95
val timeoutMilliseconds = 400
words.countApprox(timeoutMilliseconds, confidence)

confidence: Double = 0.95
timeoutMilliseconds: Int = 400
res16: org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble] = (final: [10.000, 10.000])


## countApproxDistinct

In [22]:
words.countApproxDistinct(0.05)

res17: Long = 10


In [23]:
words.countApproxDistinct(4, 10)

res18: Long = 10


## countByValue

In [24]:
words.countByValue()

res19: scala.collection.Map[String,Long] = Map(Definitive -> 1, Simple -> 1, Processing -> 1, The -> 1, Spark -> 1, Made -> 1, Guide -> 1, Big -> 1, : -> 1, Data -> 1)


## countByValueApprox

In [25]:
words.countByValueApprox(1000, 0.95)

res20: org.apache.spark.partial.PartialResult[scala.collection.Map[String,org.apache.spark.partial.BoundedDouble]] = (final: Map(Definitive -> [1.000, 1.000], Simple -> [1.000, 1.000], Processing -> [1.000, 1.000], The -> [1.000, 1.000], Spark -> [1.000, 1.000], Made -> [1.000, 1.000], Guide -> [1.000, 1.000], Big -> [1.000, 1.000], : -> [1.000, 1.000], Data -> [1.000, 1.000]))


## first

In [26]:
spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()

res21: Int = 1


## take

In [27]:
words.take(5)
words.takeOrdered(5)
words.top(5)
val withReplacement = true
val numberToTake = 6
val randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)

withReplacement: Boolean = true
numberToTake: Int = 6
randomSeed: Long = 100
res22: Array[String] = Array(Guide, Spark, :, Simple, Simple, Spark)


In [28]:
words.take(5)

res23: Array[String] = Array(Spark, The, Definitive, Guide, :)


In [29]:
words.takeOrdered(5)

res24: Array[String] = Array(:, Big, Data, Definitive, Guide)


In [30]:
words.top(5)

res25: Array[String] = Array(The, Spark, Simple, Processing, Made)


# Saving Files

- With RDDs, you cannot actually "save" to a data source in the conventional sense.
    - You must iterate over the partitions in order to save the contents of each partition to some external database.

## saveAsTextFile

In [31]:
words.saveAsTextFile("file:/tmp/bookTitle")

org.apache.hadoop.mapred.FileAlreadyExistsException:  Output directory file:/tmp/bookTitle already exists

In [32]:
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])

org.apache.hadoop.mapred.FileAlreadyExistsException:  Output directory file:/tmp/bookTitleCompressed already exists

## SequenceFiles

- A `sequenceFile` is a flat file consisting of binary key-value pairs.
- It is extensively used in MapReduce as input/output formats.

In [33]:
words.saveAsObjectFile("/tmp/my/sequenceFilePath")

org.apache.hadoop.mapred.FileAlreadyExistsException:  Output directory file:/tmp/my/sequenceFilePath already exists

## Hadoop Files

# Caching

In [34]:
words.cache()

res29: words.type = myWords ParallelCollectionRDD[12] at parallelize at <console>:24


In [35]:
words.getStorageLevel

res30: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)


# Checkpointing

- Checkpointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source.
- This is similar to caching except that it's not stored in memory, only disk

In [36]:
spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")
words.checkpoint()

java.io.FileNotFoundException:  File /some/path/for/checkpointing/b8240637-20c3-4744-bd76-f6a3a4aef510 does not exist

# Pipe RDDs to System Commands
- The `pipe` method is probably one of Spark's more interesting methods.
- With pipe, you can return an RDD created by piping elements to a forked external process.
    - The resulting RDD is computed by executing the given process once per partition.
    - All elements of each input partition are written to a process's stdin as lines of input separated by a newline.
    - The resulting partition consists of the precess's stdout output, with each line of stdout resulting in one element of the output partition.
    - A process is invoked even for empty partitions.

In [37]:
words.pipe("wc -l").collect()

res32: Array[String] = Array("       5", "       5")


## mapPartitions


In [38]:
words.mapPartitions(part => Iterator[Int](1)).sum()

res33: Double = 2.0


In [39]:
def indexedFunc(partitionIndex:Int, withinPartIterator: Iterator[String]) = {
    withinPartIterator.toList.map(
        value => s"Partition: $partitionIndex => $value").iterator
}
words.mapPartitionsWithIndex(indexedFunc).collect()

java.lang.InternalError:  java.lang.IllegalAccessException: final field has no write access: $Lambda$4253/0x000000080216ce48.arg$1/putField, from class java.lang.Object (module java.base)

## foreachPartition

In [41]:
words.foreachPartition { iter =>
    import java.io._
    import scala.util.Random
    val randomFileName = new Random().nextInt()
    val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
    while (iter.hasNext) {
        pw.write(iter.next())
    }
    pw.close()
}

## glom
- takes every partition in your dataset and converts them to arrays.

In [42]:
spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom()

res36: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[49] at glom at <console>:25
