# "Apach Spark Transformations"
> "Apach Spark Transformations"

- toc:true
- branch: master
- badges: true
- comments: true
- categories: [computing]
- tags: [spark]

![Apache_Spark_logo svg](https://user-images.githubusercontent.com/8268939/79700662-3c5bdc80-824c-11ea-85aa-4559e7028a8d.png)

## Aggregate

The aggregate can return a different type than RDD on which we are working on. It allows users to apply 2 functions, one on top of each partition (input type T => U), other to aggregate the results of all the partitions into final result (merging 2 U's). Both the functions have to be commutative and associative. We can also specify a initial value.

In [0]:
val inputRdd = sc.parallelize(Array(1,2,3,4,5,6,7))
val output = inputRdd.aggregate(0)((x,y) => x+y, (u,v) => u+v);

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27
output: Int = 28


## cartesian

Return the Cartesian product of this RDD and another one. result contains all pairs of (a,b) where a belongs to rdd1 and b belongs to rdd2.

In [0]:
val rdd1 = sc.parallelize(1 to 2)
val rdd2 = sc.parallelize(3 to 5)
val output = rdd1.cartesian(rdd2).collect()

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:25
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:26
output: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5))


## countByValue

Returns count of each unique value in this RDD as a local map of (value, count) pairs. Should be careful while using this when you have large data as it sends the results to driver.

In [0]:
val inputRdd = sc.parallelize(1 to 10)
val output = inputRdd.countByValue()

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:27
output: scala.collection.Map[Int,Long] = Map(5 -> 1, 10 -> 1, 1 -> 1, 6 -> 1, 9 -> 1, 2 -> 1, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 1)


## collect

Used only when the rdd is small enough. Return an array that contains all of the elements in this RDD.

In [0]:
val output = sc.parallelize(1 to 10, 2).collect()

output: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


## distinct

Return a new RDD containing the distinct elements in this RDD. Shuffle happens on this transformation.

In [0]:
val data = Seq(1,1,2,2,3,4)
val inputRdd = sc.parallelize(data)
val output = inputRdd.distinct();
output.collect()

data: Seq[Int] = List(1, 1, 2, 2, 3, 4)
inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:30
output: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at distinct at <console>:31
res0: Array[Int] = Array(1, 2, 3, 4)


## filter

Return a new RDD containing only the elements that satisfy a predicate.

In [0]:
val data = Seq(1, 2, 3, 4)
val inputRdd = sc.parallelize(data)
val filterOutput = inputRdd.filter( s => s>2 )
filterOutput.collect();

data: Seq[Int] = List(1, 2, 3, 4)
inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
filterOutput: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at filter at <console>:30
res1: Array[Int] = Array(3, 4)


## first

Return the first element in this RDD.

In [0]:
val output = sc.parallelize(1 to 10).first();

output: Int = 1


## flatMap

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. You need to supply (A ⇒ Iterable[B]) function to the flatMap i.e on each element of input A, Map is applied followed by flatten which is flatMap.

In [0]:
val data = Seq(1, 2, 3, 4)
val inputRdd = sc.parallelize(data)
val output = inputRdd.flatMap( s => List(s,s+1,s+2));
output.collect();

data: Seq[Int] = List(1, 2, 3, 4)
inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:31
output: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at flatMap at <console>:32
res2: Array[Int] = Array(1, 2, 3, 2, 3, 4, 3, 4, 5, 4, 5, 6)


## glom

Return an RDD created by coalescing all elements within each partition into an array.

In [0]:
val inputRdd = sc.parallelize(1 to 10, 2)
val output = inputRdd.glom().collect();

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:27
output: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10))


## groupBy

- Return an RDD of grouped items.
- Each group consists of a key and a sequence of elements mapping to that key.
- Ordering is not guaranteed, not same for every execution.
- Shuffle happens, better to use reduceby than groupby since it does not combine in each partition itself ( i.e groupBy happens in reduce phase), hence result high network traffic.

In [0]:
val rdd1 = sc.parallelize(Array(2,3,4,1,3,4))
val output = rdd1.groupBy(x => x).collect()

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:27
output: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1)), (2,CompactBuffer(2)), (3,CompactBuffer(3, 3)), (4,CompactBuffer(4, 4)))


## intersection

- Return the intersection of this RDD and another one.
- Result will not contain any duplicate.
- Done by map, co-group, filter in the background.
- performs a shuffle internally

In [0]:
val output = sc.parallelize(Array(1,2,3,4))
               .intersection(sc.parallelize(Array(3,4,5,6)))
               .collect()

output: Array[Int] = Array(3, 4)


## map

Return a new RDD by applying a function to all elements of this RDD

In [0]:
val data = Seq(1, 2, 3, 4)
val inputRdd = sc.parallelize(data)
val output = inputRdd.map( s => s+1 ) // applying anonymus function to rdd elements.
output.collect();  

data: Seq[Int] = List(1, 2, 3, 4)
inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:32
output: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at map at <console>:33
res3: Array[Int] = Array(2, 3, 4, 5)


## mapPartitions

Return a new RDD by applying a function to each partition of this RDD.

In [0]:
val rdd1 = sc.parallelize(1 to 20,3)
val output = rdd1.mapPartitions(x => x).collect();

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:27
output: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)


## mapPartitionsWithIndex

Similar to mapPartitions, additionally tracks the index of the original partition.

In [0]:
val inputRdd = sc.parallelize(1 to 10, 2)
val output = inputRdd.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect()

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:27
output: Array[(Int, Int)] = Array((0,1), (0,2), (0,3), (0,4), (0,5), (1,6), (1,7), (1,8), (1,9), (1,10))


## randomSplit

Randomly splits this RDD with the provided weights. You can specify the fraction weights the output rdd's needs to split. However you can see they are not exactly equally split based on fraction as in example.

In [0]:
val inputRdd = sc.parallelize(1 to 10) 
val output = inputRdd.randomSplit(Array(0.5,0.5)) // return's array of rdd's   
output(0).collect() // rdd in 0th location 
output(1).collect() // rdd in 1st location 

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at parallelize at <console>:28
output: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[41] at randomSplit at <console>:29, MapPartitionsRDD[42] at randomSplit at <console>:29)
res4: Array[Int] = Array(2, 4, 7, 9)


## reduce

Reduces the elements of this RDD. function in reduce obey's commutative and  associative properties.

In [0]:
val output = sc.parallelize(1 to 5).reduce((u,v) => u + v)

output: Int = 15


## repartition

Return a new RDD that has exactly the passed argument partitions to this method.

In [0]:
val data = Seq(1, 2, 3, 4) 
val inputRdd = sc.parallelize(data,2) 
inputRdd.partitions; // Get the array of partitions of this RDD 
inputRdd.repartition(1) 

data: Seq[Int] = List(1, 2, 3, 4)
inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:30
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[48] at repartition at <console>:34


## sample

Return a sampled subset of this RDD.

In [0]:
val inputRdd = sc.parallelize(1 to 10, 3) 

inputRdd.sample(true, 0.3, 0).collect()   

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:27
res6: Array[Int] = Array(9, 10)


## sortBy

Return this RDD sorted by the given key function. You should pass function since its not pair rdd to generate key, boolean (asce/desc).

In [0]:
val output  = sc.parallelize(Array(3,4,2,1))
                .sortBy(x => x,false) // desc order by false
                .collect()

output: Array[Int] = Array(4, 3, 2, 1)


## subtract

Subtracts elements of one rdd from other

In [0]:
val output = sc.parallelize(1 to 10).subtract(sc.parallelize(5 to 15))
output.collect()

output: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[62] at subtract at <console>:27
res7: Array[Int] = Array(1, 2, 3, 4)


## take

Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

In [0]:
val output = sc.parallelize(1 to 10).take(2)

output: Array[Int] = Array(1, 2)


## takeSample

Return a fixed-size sampled subset of this RDD in an array.

sample | takeSample
-------|-----------
It returns an RDD | It returns an Array
Return a fixed-size sampled subset	| Return a fixed-size sampled subset
Should specify sample as Double fraction arg | sample is specified as Int

In [0]:
val inputRdd = sc.parallelize(1 to 10, 3) 
inputRdd.takeSample(true, 3, 0);   

inputRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[64] at parallelize at <console>:27
res8: Array[Int] = Array(7, 7, 4)


## toLocalIterator

Return an iterator by converting RDD into a scala iterator that contains all of the elements in this RDD.

In [0]:
val output = sc.parallelize(1 to 5, 1).toLocalIterator
  
  while (output.hasNext) {
    println(output.next)
  }

1
2
3
4
5


output: Iterator[Int] = empty iterator


## union

- Return the union of this RDD and another one.
- Identical elements will appear multiple times.
- Need to use distinct to eliminate them.
- Can also use ++ instead of union.

In [0]:
val a = sc.parallelize(1 to 10, 1)
val b = sc.parallelize(10 to 20, 1)
a.union(b).collect();
    
a.union(b).distinct().collect();

a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[67] at parallelize at <console>:25
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[68] at parallelize at <console>:26
res10: Array[Int] = Array(4, 16, 14, 6, 8, 12, 18, 20, 10, 2, 13, 19, 15, 11, 1, 17, 3, 7, 9, 5)
