![](imgs/logo.png)

# Przetwarzanie Big Data z użyciem Apache Spark

Autor notebooka: Jakub Nowacki.

## Więcej niż MapReduce

In [3]:
import pyspark
sc = pyspark.SparkContext(appName="beyondMapReduce")

## Akumulatory

Akumulatory to zmienne tylko do zapisu dla workerów, które służą do zliczania wystąpień.

In [3]:
acc = sc.accumulator(0)

In [4]:
rdd = sc.textFile("data/titus_andronicus.txt")

In [5]:
rdd.foreach(lambda x: acc.add(1))

In [6]:
# Wartość dostępna jest jako
acc.value

3255

In [7]:
# Akumulatory można opakowywać w funkję, jeżeli zachodzi taka potrzeba
def function_with_acc(x):
    acc.add(1)

In [8]:
rdd.foreach(function_with_acc)

In [9]:
acc.value

6510

In [10]:
# Zainicjujmy kolejny akumulator
acc2 = sc.accumulator(0)

In [14]:
# ... i związaną funkcję
def split_and_countspaces(x):
    acc2.add(x.count(" "))
    return x.split()

In [15]:
words = rdd.flatMap(split_and_countspaces)

### Exercise 

* Jaka jest wartość `acc2`?
* Pobierz 15 wyrazów: jaka jest teraz wartość `acc2`?
* Pobierz jeszcze raz i sprawdź wartość.
* Użyj `words.cache()`, pobierz 15 wyrazów i sprawdź `acc2.value`.
* Weż pierwsze 15 wyrazów i sprawdź `acc2.value`.

## Broadcasts

Zmienne tylko do odczytu pozwalające przekazywać argumenty do zadań. 

In [21]:
# Zmiennych tych używa się do przekazywania argumentów; wartości te mogą być dość duże
br = sc.broadcast(["Marcus", "Titus", "hate", "die"])

In [22]:
br.value

['Marcus', 'Titus', 'hate', 'die']

In [23]:
def line_of_love(line):
    t = 0
    for word in br.value:
        if word in line:
            t += 1
    return t > 1

In [24]:
rdd.filter(line_of_love).take(5)

[u'    Let Marcus, Lucius, or thyself, old Titus,',
 u"  TITUS. Marcus, my brother! 'Tis sad Titus calls."]

In [25]:
rdd.filter(line_of_love).takeSample(False, 5)

[u'    Let Marcus, Lucius, or thyself, old Titus,',
 u"  TITUS. Marcus, my brother! 'Tis sad Titus calls."]

## Operacje na zbiorach

In [None]:
rdd1 = sc.parallelize(["cat", "dog", "python"])
rdd2 = sc.parallelize(["magpie", "python", "elephant"])

In [None]:
# Konkatenacja a nie suma zbiorów
rdd1.union(rdd2).collect()

### Zadanie

* Żeby otrzymać prawdziwą sumę zbiorów z `union` użyj `.distinct()`.

In [None]:
# Część wspólna zbiorów
rdd1.intersection(rdd2).collect()

## Złączenie zbiorów

Łączenie zbiorów (join) działa podobnie jak w SQL ale odbywa się na parze klucz wartość w formie `(key, value)`.

In [3]:
species = sc.parallelize([("Max", "dog"), ("Sasha", "cat"), ("Alice", "snake")])
ages = sc.parallelize([("Alice", 3), ("Max", 11), ("Ulrich", 2), ("Karl", 31)])

In [8]:
# inner join
#species.join(ages) \
#  .collect()
    
species.join(ages).map(lambda v: v[1][0]).distinct().collect()

['snake', 'dog']

### Zadanie

* Wypisz zbiór wierząt ze znanym wiekiem.

In [None]:
# Podpowiedź
z = ("a", ("b", "c"))
print(z[1])
print(z[1][0])

In [None]:
# left outer join
species.leftOuterJoin(ages) \
  .collect()

In [None]:
# right outer join
species.rightOuterJoin(ages) \
  .collect()

In [None]:
# full outer join
species.fullOuterJoin(ages) \
  .collect()

In [None]:
# iloczyn kartezjański
species.cartesian(ages) \
  .collect()

### Zadanie

* ★ Zamień produkt kartezjański w wewnętrzne złączenie (inner join).
* ★ Wypisz wszystkie możliwe pary zwierząt, np. `[("Max", "Alice"), ... ("Ulrich", "Karl")]`.

## Grupowanie (po kluczu)

In [4]:
cities = sc.parallelize(["Paris, France", "Lyon, France", "Warsaw, Poland", "London, United Kingdom"])

In [5]:
cities \
  .groupBy(lambda x: x.split(", ")[1]) \
  .map(lambda x: (x[0], list(x[1]))) \
  .collect()

[('United Kingdom', ['London, United Kingdom']),
 ('Poland', ['Warsaw, Poland']),
 ('France', ['Paris, France', 'Lyon, France'])]

### Zadanie

* Zamiast `.map(lambda x: (x[0], list(x[1])))` użyj `.mapValues(some_function)`. 

In [6]:
# Podpowiedź
sc.parallelize([("a", 3), ("b", -2)]) \
  .mapValues(lambda x: x*x) \
  .collect()

[('a', 9), ('b', 4)]

### Więcej zadań

Wracamy do Titus Andronicus:

* Wyczyść dane przez usuniecie niechcianych znaków
* Policz i wyświetl 20 najczęstrzych:
    * słów zaczynających się zawsze wielką literą,
    * słów nigdy nie zaczynających się od wielkiej litery.
* Policz częstotliwość (i.e. $p_{word}=n_{word}/n$ wszystkich słów (ignorując wielkość liter).
* Pobierz słownik z [tej strony](http://www.math.sjsu.edu/~foster/dictionary.txt); policz ile słów ze słownika występuje w dziele (słownik ma tylko wyrazy o małych literach).
* ★ Pobierz dzieło nie napisane przez Shakespeare [Gutenberg Project](http://www.gutenberg.org/); znajdź słowa charakterystyczne dla Shakespeare'a, tj. które nie występują w innym dziele.
* ★ Dla wszystkich słów w lini policz

$$\frac{p_{pair}}{p_{word1}p_{word2}} = \frac{n_{pair} n}{n_{word1}n_{word2}},$$

tylko dla par występujących więcej niż 5 razy; pokaż 20 najwyższych wartości.

In [7]:
lines = sc.textFile("data/titus_andronicus.txt")

In [17]:
import re
words = lines.flatMap(lambda l: re.findall('[\w]+', l)) \
            .map(lambda w: (w, 1))

In [18]:
print(words.toDebugString())
words.take(10)

(2) PythonRDD[42] at RDD at PythonRDD.scala:43 []
 |  data/titus_andronicus.txt MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  data/titus_andronicus.txt HadoopRDD[8] at textFile at NativeMethodAccessorImpl.java:-2 []


[(u'This', 1),
 (u'Etext', 1),
 (u'file', 1),
 (u'is', 1),
 (u'presented', 1),
 (u'by', 1),
 (u'Project', 1),
 (u'Gutenberg', 1),
 (u'in', 1),
 (u'cooperation', 1)]

In [21]:
count = words.reduceByKey(lambda x,y: x+y).sortByKey()
count.top(10, lambda p: p[1])

[(u'the', 633),
 (u'and', 568),
 (u'I', 442),
 (u'to', 427),
 (u'of', 352),
 (u'And', 308),
 (u'my', 273),
 (u'a', 270),
 (u'in', 267),
 (u'with', 257)]

In [43]:
from sets import Set
#słów zaczynających się zawsze wielką literą,
startUpper = count.filter(lambda p: p[0].istitle())
#startUpper.top(20, lambda p: p[1])
lowerSet = Set()
count.filter(lambda p: p[0].islower()).foreach(lambda p: lowerSet.add(p[0]))
print lowerSet
#onlyStartUpper = startUpper.filter(lambda p: )

Set([])


In [35]:
#słów nigdy nie zaczynających się od wielkiej litery.
out = count.filter(lambda p: p[0].islower())
out.top(20, lambda p: p[1])

[(u'the', 633),
 (u'and', 568),
 (u'to', 427),
 (u'of', 352),
 (u'my', 273),
 (u'a', 270),
 (u'in', 267),
 (u'with', 257),
 (u'd', 232),
 (u'you', 230),
 (u'that', 205),
 (u's', 197),
 (u'me', 195),
 (u'thy', 189),
 (u'this', 184),
 (u'for', 172),
 (u'not', 168),
 (u'is', 164),
 (u'thou', 157),
 (u'his', 156)]