<h1 align = "center"> Spark RDD </h1>

Ověření inicializace Spark contextu.

In [1]:
sc.version

2.4.3

## Vytvoření RDD

Vytvoření RDD pomocí paralelizace kolekcí.

In [40]:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.collect().foreach(println)

1
2
3
4
5


data = Array(1, 2, 3, 4, 5)
distData = ParallelCollectionRDD[25] at parallelize at <console>:31


ParallelCollectionRDD[25] at parallelize at <console>:31

Vytvoření RDD z textového souboru pomocí Spark contextu.

In [15]:
val logFile = sc.textFile("/resources/Prezentace/LabData/notebook.log")
logFile.take(10).foreach(println)

[I 12:09:13.491 NotebookApp] Using MathJax: /static/vendor/MathJax-2.5-latest/MathJax.js
[I 12:09:13.494 NotebookApp] Using existing profile dir: u'/home/notebook/.ipython/profile_default'
[I 12:09:13.513 NotebookApp] Writing notebook server cookie secret to /home/notebook/.ipython/profile_default/security/notebook_cookie_secret
[I 12:09:13.586 NotebookApp] Serving notebooks from local directory: /resources
[I 12:09:13.586 NotebookApp] 0 active kernels 
[I 12:09:13.586 NotebookApp] The IPython Notebook is running at: http://[all ip addresses on your system]:8888/
[I 12:09:13.586 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[W 12:09:13.586 NotebookApp] No web browser found: could not locate runnable browser.


logFile = /resources/Prezentace/LabData/notebook.log MapPartitionsRDD[19] at textFile at <console>:29


lastException: Throwable = null


/resources/Prezentace/LabData/notebook.log MapPartitionsRDD[19] at textFile at <console>:29

Vytvoření nového RDD pomocí transformace existujícího.

In [16]:
val error = logFile.filter(line => line.contains("ERROR"))
error.take(10).foreach(println)

       # Filter out the lines that contains INFO (or ERROR, if the particular log has it)
15/10/22 06:23:09 [ERROR] o.a.s.s.s.ReceiverTracker - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Socket data stream had no more data
15/10/22 06:23:11 [ERROR] o.a.s.s.s.ReceiverTracker - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:7777 - java.net.ConnectException: Connection refused
15/10/22 06:23:13 [ERROR] o.a.s.s.s.ReceiverTracker - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:7777 - java.net.ConnectException: Connection refused
15/10/22 06:23:15 [ERROR] o.a.s.s.s.ReceiverTracker - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:7777 - java.net.ConnectException: Connection refused
15/10/22 06:23:17 [ERROR] o.a.s.s.s.ReceiverTracker - Deregistered receiver for stream 0: Restarting receiver

error = MapPartitionsRDD[20] at filter at <console>:27


MapPartitionsRDD[20] at filter at <console>:27

In [18]:
val timesThree = distData.map(x => x * 3)
timesThree.collect().foreach(println)

3
6
9
12
15


timesTree = MapPartitionsRDD[22] at map at <console>:29


MapPartitionsRDD[22] at map at <console>:29

## Akce nad RDD

Zjištění počtu řádků v RDD.

In [29]:
logFile.count()

34836

Maximální hodnota v RDD.

In [31]:
distData.max()

5

Minimální hodnota v RDD.

In [33]:
distData.min()

1

Vypsání statistiky z RDD.

In [34]:
distData.stats()

(count: 5, mean: 3.000000, stdev: 1.414214, max: 5.000000, min: 1.000000)

Vrácení všech elementů RDD do kolekce.

In [37]:
distData.collect()

Array(1, 2, 3, 4, 5)

Vrácení určitého množství elementů RDD do kolekce.

In [38]:
logFile.take(10)



Sečtení RDD.

In [42]:
distData.reduce((x, y) => x + y)

15

In [43]:
distData.sum()

15.0

## Sdílené proměnné a cache ?

## Key-value pairs RDD - Průměrný věk přátel

Vytvoříme nové RDD ze souboru fakefriends-noheader.csv

In [48]:
val friends = sc.textFile("/resources/Prezentace/LabData/fakefriends-noheader.csv")

friends = /resources/Prezentace/LabData/fakefriends-noheader.csv MapPartitionsRDD[30] at textFile at <console>:27


/resources/Prezentace/LabData/fakefriends-noheader.csv MapPartitionsRDD[30] at textFile at <console>:27

Zobrazíme si prvních pět řádků

In [49]:
friends.take(5).foreach(println)

0,Will,33,385
1,Jean-Luc,26,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21


Vytvoření RDD které obsahuje tuple (věk,počet přátel)

In [56]:
val rdd = friends.map(line => {
      val fields = line.split(",")
      val age = fields(2).toInt
      val numFriends = fields(3).toInt
      (age, numFriends)
})
rdd.take(15).foreach(println)

(33,385)
(26,2)
(55,221)
(40,465)
(68,21)
(59,318)
(37,220)
(54,307)
(38,380)
(27,181)
(53,191)
(57,372)
(54,253)
(56,444)
(43,49)


rdd = MapPartitionsRDD[36] at map at <console>:29


MapPartitionsRDD[36] at map at <console>:29

Vytvoříme key-value pair kde klíč bude věk a value bude tuple (celkový počet přátel, počet lidí)

In [53]:
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
totalsByAge.take(5).foreach(println)

(19,(2346,11))
(39,(1185,7))
(34,(1473,6))
(52,(3747,11))
(55,(3842,13))


totalsByAge = ShuffledRDD[34] at reduceByKey at <console>:27


ShuffledRDD[34] at reduceByKey at <console>:27

Celkový počet přátel vydělíme počtem lidí.

In [54]:
val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)

averagesByAge = MapPartitionsRDD[35] at mapValues at <console>:27


MapPartitionsRDD[35] at mapValues at <console>:27

Vypíšeme výsledky.

In [55]:
val results = averagesByAge.collect()
results.sorted.foreach(println)

(18,343)
(19,213)
(20,165)
(21,350)
(22,206)
(23,246)
(24,233)
(25,197)
(26,242)
(27,228)
(28,209)
(29,215)
(30,235)
(31,267)
(32,207)
(33,325)
(34,245)
(35,211)
(36,246)
(37,249)
(38,193)
(39,169)
(40,250)
(41,268)
(42,303)
(43,230)
(44,282)
(45,309)
(46,223)
(47,233)
(48,281)
(49,184)
(50,254)
(51,302)
(52,340)
(53,222)
(54,278)
(55,295)
(56,306)
(57,258)
(58,116)
(59,220)
(60,202)
(61,256)
(62,220)
(63,384)
(64,281)
(65,298)
(66,276)
(67,214)
(68,269)
(69,235)


results = Array((19,213), (39,169), (34,245), (52,340), (55,295), (66,276), (28,209), (29,215), (54,278), (65,298), (30,235), (35,211), (50,254), (36,246), (24,233), (64,281), (57,258), (51,302), (37,249), (45,309), (63,384), (18,343), (20,165), (38,193), (49,184), (43,230), (41,268), (61,256), (56,306), (21,350), (47,233), (53,222), (22,206), (25,197), (46,223), (48,281), (59,220), (32,207), (27,228), (62,220), (33,325), (42,303), (23,246), (40,250), (67,214), (69,235), (58,116), (44,282), (60,202), (31,267), (26,242), (68,269))


Array((19,213), (39,169), (34,245), (52,340), (55,295), (66,276), (28,209), (29,215), (54,278), (65,298), (30,235), (35,211), (50,254), (36,246), (24,233), (64,281), (57,258), (51,302), (37,249), (45,309), (63,384), (18,343), (20,165), (38,193), (49,184), (43,230), (41,268), (61,256), (56,306), (21,350), (47,233), (53,222), (22,206), (25,197), (46,223), (48,281), (59,220), (32,207), (27,228), (62,220), (33,325), (42,303), (23,246), (40,250), (67,214), (69,235), (58,116), (44,282), (60,202), (31,267), (26,242), (68,269))

# Samostatná práce