## Basic RDD OPERATIONS - map, flatMap etc.

In [17]:
val rddlist = sc.parallelize(0 to 10)

rddlist: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:25


In [3]:
rddlist.collect()

res1: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


In [5]:
val rddText = sc.textFile("RDD.txt")

rddText: org.apache.spark.rdd.RDD[String] = RDD.txt MapPartitionsRDD[2] at textFile at <console>:25


In [6]:
rddText.take(5)

res3: Array[String] = Array(0,1,2,3,4,5,6,7,8,9)


In [19]:
val rddlist2 = sc.parallelize(List((0 to 4).toList, (5 to 9).toList))
rddlist2.collect()

rddlist2: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[7] at parallelize at <console>:27
res14: Array[List[Int]] = Array(List(0, 1, 2, 3, 4), List(5, 6, 7, 8, 9))


In [20]:
rddlist2.flatMap(x => x.map(_ + 1)).collect()

res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


> **TODO**: W RDDlist2 zastąp liczby parzyste literą *P* a nieparzyste *N*

In [23]:
rddlist2.map(x => x.map{case x if x % 2 == 0 => "P"; case _ => "N"}).collect()

res18: Array[List[String]] = Array(List(P, N, P, N, P), List(N, P, N, P, N))


In [29]:
rddlist2.flatMap(x => x.map(x  => x)).collect()

res22: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


##### mapValues(func) 
Example from lecture -> [("A",["Adam","Ada","Adrian"]),("B",["Bonifacy","Barnaba"])]

Note! mapValues deprecated in Scala, therefore regular map used

In [2]:
val mapa = List(("A", List("Adam", "Ada", "Adrian")), ("B", List("Bonifacy", "Barnaba")))
val rddpair = sc.parallelize(mapa)

rddpair.collect()

mapa: List[(String, List[String])] = List((A,List(Adam, Ada, Adrian)), (B,List(Bonifacy, Barnaba)))
rddpair: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[0] at parallelize at <console>:26
res1: Array[(String, List[String])] = Array((A,List(Adam, Ada, Adrian)), (B,List(Bonifacy, Barnaba)))


#### Return tuple of key and length of of the list

In [39]:
rddpair.map { case (k,v) => (k, v.length)}.collect()

res27: Array[(String, Int)] = Array((A,3), (B,2))


> **TODO**: Zmodyfikuj wartości w RDDpair tak aby zawierały nie imiona a liczby liter w imionach, następnie zsumuj liczby liter

In [40]:
rddpair.map { case (k,v) => (k, v.map(y => y.length))}.collect()

res28: Array[(String, List[Int])] = Array((A,List(4, 3, 6)), (B,List(8, 7)))


In [41]:
rddpair.map { case (k,v) => (k, v.map(y => y.length).sum)}.collect()

res29: Array[(String, Int)] = Array((A,13), (B,15))


> **TODO**: Na podstawie RDDpair stwórz RDD o następującej strukturze: (litera, (imię, liczba liter))

In [45]:
rddpair.map { case (k,v) => (k, v.map(x => (x, x.length)) )}.collect().foreach(println)

(A,List((Adam,4), (Ada,3), (Adrian,6)))
(B,List((Bonifacy,8), (Barnaba,7)))


##### join(RDD), union, distinct, groupby

In [14]:
rddpair.flatMapValues(x => x).map{ case (k,v) => (k,(v,v.length))}.collect().foreach(println)

(A,(Adam,4))
(A,(Ada,3))
(A,(Adrian,6))
(B,(Bonifacy,8))
(B,(Barnaba,7))


In [16]:
rddpair.join(rddpair.flatMapValues(x => x)).collect().foreach(println)

(A,(List(Adam, Ada, Adrian),Adam))
(A,(List(Adam, Ada, Adrian),Ada))
(A,(List(Adam, Ada, Adrian),Adrian))
(B,(List(Bonifacy, Barnaba),Bonifacy))
(B,(List(Bonifacy, Barnaba),Barnaba))


In [19]:
rddlist.groupBy(_ % 2).mapValues(_.toList).collect()

res16: Array[(Int, List[Int])] = Array((0,List(0, 2, 4, 6, 8, 10)), (1,List(1, 3, 5, 7, 9)))


In [21]:
val names = List(("Adam", 4), ("Ada", 3), ("Adrian", 6), ("Bonifacy", 8), ("Barnaba", 7))
val rddpair2 = sc.parallelize(names)

names: List[(String, Int)] = List((Adam,4), (Ada,3), (Adrian,6), (Bonifacy,8), (Barnaba,7))
rddpair2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:26


In [25]:
rddpair2.groupBy{ case (x,y) if y % 2 == 0 => "Even"; case _ => "Odd"}.collect()

res19: Array[(String, Iterable[(String, Int)])] = Array((Even,CompactBuffer((Adam,4), (Adrian,6), (Bonifacy,8))), (Odd,CompactBuffer((Ada,3), (Barnaba,7))))


> **TODO**: Pogrupuj RDDpair2 ze względu na pierwszą literę imienia

In [28]:
rddpair2.groupBy(_._1.charAt(0)).collect()

res22: Array[(Char, Iterable[(String, Int)])] = Array((A,CompactBuffer((Adam,4), (Ada,3), (Adrian,6))), (B,CompactBuffer((Bonifacy,8), (Barnaba,7))))


##### groupByKey(), reduceByKey(func), aggregateByKey(zeroValue, seqFunc, combFunc)

> **TODO**: Na podstawie RDDpair stwórz RDD o następującej strukturze: (litera, (imię, l. liter)), następnie pogrupuj je po literze (zamień nowe wartości na listę tak aby były czytelne po użyciu `collect`)

In [33]:
rddpair.flatMapValues(x => x).map{ case (k,v) => (k,(v,v.length))}.groupByKey.mapValues(_.toList)
.collect().foreach(println)

(A,List((Adam,4), (Ada,3), (Adrian,6)))
(B,List((Bonifacy,8), (Barnaba,7)))


In [64]:
val rddpair3 = rddpair2.groupBy(_._1.charAt(0)).values.map(_.toList).flatMap(x=> x).map(x=> (x._1.charAt(0), x._2))
.collect()

rddpair3: Array[(Char, Int)] = Array((A,4), (A,3), (A,6), (B,8), (B,7))


>##### **TODO**: Uzyskaj iloczyn dla każdego klucza w RDDpair3

In [76]:
rddpair3.groupBy(_._1).map{case (k,v) => (k, v.map(_._2).product)}

res39: scala.collection.immutable.Map[Char,Int] = Map(A -> 72, B -> 56)


In [2]:
val rddpair4 = sc.parallelize(List(("A", "Adam"),("A", "Ada"),("A", "Adrian"),("B", "Bonifacy"),("B", "Barnaba")))

rddpair4: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:25


Rozbudowana wersja `reduceByKey` pozwalająca na zwrócenie wartości o innym typie niż oryginalne. W związku z tym konieczne jest podanie trzech parametrów:
- zeroValue - domyślna wartość neutralna dla agregacji (dodawanie: 0, mnożenie: 1, tworzenie zbioru unikatowych wartości: pusty zbiór, itd.),
- seqFunc - funkcja agregująca wartości w oryginalnym RDD, przyjmuje dwa parametry, gdzie drugi jest włączany (dodawany itp.) do pierwszego
- combFunc - funkcja łącząca wartości uzyskane z pierwszej funkcji dla kluczy

In [10]:
val initiaValue = 0
val addToCounts = (n:Int, x: String) => n + x.length
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2

initiaValue: Int = 0
addToCounts: (Int, String) => Int = $Lambda$2278/0x00000008409f5040@2fa16cb3
sumPartitionCounts: (Int, Int) => Int = $Lambda$2279/0x0000000840502040@41e2ed85


In [12]:
rddpair4.aggregateByKey(initiaValue)(addToCounts, sumPartitionCounts).collect()

res3: Array[(String, Int)] = Array((A,13), (B,15))
