# Chapter 5: Effective Transformations

In this chapter, we will explore advance features of RDDs in order to perform effective transformations.

## Minimizing Object Creation

Minimizing the number of objects that our program creates is a great way to optimize our calculations. We are going to visit some examples which display the same functionalities but with a different number of objects created during its execution. In particular, we are going to consider a intial data where we have the report cards of different instructors for different pandas.

In [1]:
val recordCars = sc.parallelize(Array(("instructor1", "this is a happy panda"),
                                      ("instructor1", "this is a very happy panda"),
                                      ("instructor2", "good"),
                                      ("instructor2", "happy")))

recordCars = ParallelCollectionRDD[0] at parallelize at <console>:27


ParallelCollectionRDD[0] at parallelize at <console>:27

Over that data, we want to calculate, for each instructor: the longest woard, the mentions of happy and the average words of their reporst. For doing so, we are going to use different Aggregator classes, in which the number of objects created are different.

In [2]:
case class ReportCardMetrics(longestWord: Int, happyMentions: Int, averageWords: Double)

defined class ReportCardMetrics


In [3]:
class MetricsCalculator(val totalWords: Int, 
                        val longestWord: Int,
                        val happyMentions: Int,
                        val numberReportCards: Int) extends Serializable {
    
    def seqenceOp(reportCardContent: String): MetricsCalculator = {
        
        val words = reportCardContent.split(" ")
        val tW = words.length
        val hM = words.count(w => w.toLowerCase.equals("happy"))
        val lW = words.map(w => w.length).max
        
        new MetricsCalculator(tW + totalWords, Math.max(longestWord, lW),
                             hM + happyMentions, numberReportCards + 1)
        
        
    }
    
    def compOp(other: MetricsCalculator): MetricsCalculator = {
        
        new MetricsCalculator(this.totalWords + other.totalWords,
                             Math.max(this.longestWord, other.longestWord),
                             this.happyMentions + other.happyMentions,
                             this.numberReportCards + other.numberReportCards)
        
    }
    
    
    def toReportCardMetrics = ReportCardMetrics(longestWord, happyMentions, 
                                                totalWords.toDouble/numberReportCards)
    
}

defined class MetricsCalculator


In [4]:
recordCars.aggregateByKey(zeroValue = new MetricsCalculator(0,0,0,0))(
                         seqOp = ((reportCardMetrics, reportCardText) => reportCardMetrics.seqenceOp(reportCardText)),
                         combOp = ((x, y) => x.compOp(y))).map(x => (x._1, x._2.toReportCardMetrics)).collect()

[(instructor1,ReportCardMetrics(5,2,5.5)), (instructor2,ReportCardMetrics(5,1,1.0))]

In [5]:
class MetricsCalculatorReuseObjects(var totalWords: Int, 
                                    var longestWord: Int,
                                    var happyMentions: Int,
                                    var numberReportCards: Int) extends Serializable {
    
    def seqenceOp(reportCardContent: String): this.type = {
        
        val words = reportCardContent.split(" ")
        totalWords += words.length
        happyMentions += words.count(w => w.toLowerCase.equals("happy"))
        longestWord = Math.max(longestWord, words.map(w => w.length).max)
        numberReportCards += 1
        
        val lW = words.map(w => w.length).max
        
        this
        
        
    }
    
    def compOp(other: MetricsCalculatorReuseObjects): this.type = {
        
        totalWords += other.totalWords
        longestWord = Math.max(longestWord, other.longestWord)
        happyMentions += other.happyMentions
        numberReportCards += other.numberReportCards
        
        this
        
    }
    
    
    def toReportCardMetrics = ReportCardMetrics(longestWord, happyMentions, 
                                                totalWords.toDouble/numberReportCards)
    
}

defined class MetricsCalculatorReuseObjects


In [6]:
recordCars.aggregateByKey(zeroValue = new MetricsCalculatorReuseObjects(0,0,0,0))(
                         seqOp = ((reportCardMetrics, reportCardText) => reportCardMetrics.seqenceOp(reportCardText)),
                         combOp = ((x, y) => x.compOp(y))).map(x => (x._1, x._2.toReportCardMetrics)).collect()

[(instructor1,ReportCardMetrics(5,2,5.5)), (instructor2,ReportCardMetrics(5,1,1.0))]

In [7]:
class MetricsCalculatorArrays(val totalWordsIndex:Int = 0, 
                              val longestWordIndex:Int = 1,
                              val happyMentionsIndex:Int = 2,
                              val numberReportCardsIndex:Int = 3) extends Serializable {
    
    def seqenceOp(reportCardMetrics: Array[Int], reportCardContent: String): Array[Int] = {
        
        val words = reportCardContent.split(" ")
        
        reportCardMetrics(totalWordsIndex) += words.length
        reportCardMetrics(longestWordIndex) = Math.max(reportCardMetrics(longestWordIndex), 
                                                       words.map(w => w.length).max)
        reportCardMetrics(happyMentionsIndex) += words.count(w => w.toLowerCase.equals("happy"))
        reportCardMetrics(numberReportCardsIndex) += 1
        
        reportCardMetrics
        
    }
    
    def compOp(x: Array[Int], y: Array[Int]): Array[Int] = {
        
        x(totalWordsIndex) += y(totalWordsIndex)
        x(longestWordIndex) = Math.max(x(longestWordIndex), y(longestWordIndex))
        x(happyMentionsIndex) += y(happyMentionsIndex)
        x(numberReportCardsIndex) += y(numberReportCardsIndex)
        
        x
        
    }
    
    
    def toReportCardMetrics(x: Array[Int]) = {
        ReportCardMetrics(x(longestWordIndex), 
                          x(happyMentionsIndex), 
                          x(totalWordsIndex).toDouble/x(numberReportCardsIndex))   
    }
    
}

defined class MetricsCalculatorArrays


In [8]:
recordCars.aggregateByKey(zeroValue = Array(0,0,0,0))(
                         seqOp = ((reportCardMetrics, reportCardText) => reportCardMetrics.seqenceOp(reportCardText)),
                         combOp = ((x, y) => x.compOp(y))).map(x => (x._1, x._2.toReportCardMetrics)).collect()

Name: Compile Error
Message: <console>:31: error: value seqenceOp is not a member of Array[Int]
                                seqOp = ((reportCardMetrics, reportCardText) => reportCardMetrics.seqenceOp(reportCardText)),
                                                                                                  ^
<console>:32: error: value compOp is not a member of Array[Int]
                                combOp = ((x, y) => x.compOp(y))).map(x => (x._1, x._2.toReportCardMetrics)).collect()
                                                      ^

StackTrace: 

## Set Operations

In this section, we include some examples of set operations that can be done in Spark highighting some peculiarities.

Substract example:

In [9]:
val rddA = sc.parallelize(Array(1,2,3,4,4,4,4))
val rddB = sc.parallelize(Array(3,4))
val subtraction = rddA.subtract(rddB)

rddA = ParallelCollectionRDD[5] at parallelize at <console>:27
rddB = ParallelCollectionRDD[6] at parallelize at <console>:28
subtraction = MapPartitionsRDD[10] at subtract at <console>:29


MapPartitionsRDD[10] at subtract at <console>:29

In [10]:
subtraction.collect()

[1, 2]

In [11]:
assert(subtraction.count() < rddA.count() - rddB.count())

Intersection example:

In [12]:
val intersection = rddA.intersection(rddB)
intersection.collect()

intersection = MapPartitionsRDD[16] at intersection at <console>:30


[3, 4]

In [13]:
val union = rddA.union(rddB)
union.collect()

union = UnionRDD[17] at union at <console>:30


[1, 2, 3, 4, 4, 4, 4, 3, 4]

In [14]:
assert(!rddA.collect().sorted.sameElements(union.collect().sorted))

## Reducing Setup Overhead

In [15]:
case class Panda(id: Int, zip: Int, pt: String, happy: Boolean, attributes: Array[Double])

defined class Panda


In [16]:
val pandasRDD = sc.parallelize(Seq(Panda(1, 11000, "giant", false, Array(0.2, 0.8)),
                                   Panda(2, 11000, "small", false, Array(0.3, 0.1)),
                                   Panda(3, 13000, "small", true, Array(0.9, 0.7)),
                                   Panda(4, 13000, "medium", false, Array(0.5, 0.4)),
                                   Panda(5, 18000, "medium", true, Array(0.7, 0.1)),
                                   Panda(6, 18000, "giant", true, Array(0.1, 0.7)),
                                   Panda(7, 18000, "small", true, Array(0.3, 0.9))))

pandasRDD = ParallelCollectionRDD[18] at parallelize at <console>:29


ParallelCollectionRDD[18] at parallelize at <console>:29

### Shared Variables

#### Broadcast Variables

In [17]:
val invalidPandasIds = Array(2, 7)

invalidPandasIds = Array(2, 7)


[2, 7]

In [18]:
val invalidPandasBcst = sc.broadcast(invalidPandasIds)

invalidPandasBcst = Broadcast(16)


Broadcast(16)

In [19]:
pandasRDD.filter(panda => !invalidPandasBcst.value.contains(panda.id)).collect().foreach(println)

Panda(1,11000,giant,false,[D@633c0538)
Panda(3,13000,small,true,[D@51090be8)
Panda(4,13000,medium,false,[D@268e9067)
Panda(5,18000,medium,true,[D@20ca715f)
Panda(6,18000,giant,true,[D@30125e24)


In [20]:
pandasRDD.getNumPartitions

8

In [21]:
object LazyPrng extends Serializable{
    
    import java.util.Random
    
    @transient lazy val r = new Random()
}

val bcastprng = sc.broadcast(LazyPrng)
pandasRDD.filter(x => bcastprng.value.r.nextInt(3) == 0).collect().foreach(println)

Panda(6,18000,giant,true,[D@2819dda3)


defined object LazyPrng
bcastprng = Broadcast(18)


Broadcast(18)

#### Accumulators

In [22]:
val accFuzzyNess = sc.doubleAccumulator("fuzzyNess")
val transformed = pandasRDD.map(panda => {
    accFuzzyNess.add(panda.attributes(0))
    (panda.id, panda.zip)
})
transformed.count()
println("AccuFuzzyNess: " + accFuzzyNess)

AccuFuzzyNess: DoubleAccumulator(id: 450, name: Some(fuzzyNess), value: 3.0)


accFuzzyNess = DoubleAccumulator(id: 450, name: Some(fuzzyNess), value: 3.0)
transformed = MapPartitionsRDD[21] at map at <console>:32


MapPartitionsRDD[21] at map at <console>:32

In [23]:
import org.apache.spark.util.AccumulatorV2
class MaxDoubleAccumulator extends AccumulatorV2[Double, Option[Double]] {
    
    var currentVal: Option[Double] = None
    override def isZero = currentVal.isEmpty
    
    override def reset() = {
        currentVal = None
    }
    
    def copy() = {
        
        val newCopy = new MaxDoubleAccumulator()
        newCopy.currentVal = currentVal
        newCopy
    }
    
    override def copyAndReset() = {
        
        new MaxDoubleAccumulator()
        
    }
    
    override def add(value: Double) = {
        
        // currentVal = Some(currentVal.map(acc => Math.max(acc, value)).getOrElse(value))
        currentVal = Some(5.0)
        
    }
    
    override def merge(other: AccumulatorV2[Double, Option[Double]]) = {
        other match {
            
            case otherFuzzy: MaxDoubleAccumulator => otherFuzzy.currentVal.foreach(value => add(value))
            case _ => throw new Exception("Unexpected merge with unsopported type " + other)
            
        }
        
        
    }
    
    // override def value = currentVal
    
    override def value = currentVal
    
}

defined class MaxDoubleAccumulator


In [32]:
val acc = new MaxDoubleAccumulator()
sc.register(acc, "My accumulator")
val transformed = pandasRDD.repartition(1).map(x => {acc.add(x.attributes(0).toDouble); (x.id, x.zip)})
transformed.count()
acc

acc = MaxDoubleAccumulator(id: 604, name: Some(My accumulator), value: None)
transformed = MapPartitionsRDD[33] at map at <console>:38


MaxDoubleAccumulator(id: 604, name: Some(My accumulator), value: None)

In [33]:
acc.value

None

In [34]:
transformed.collect()

[(1,11000), (2,11000), (3,13000), (4,13000), (5,18000), (6,18000), (7,18000)]