# Spark Basics 2

---

이번 notebook에서는 앞으로 학습할 Spark의 활용에 또 다시 한번 예습과 실습을 진행하겠습니다.
지난주 했던 RDD와 유사하지만, 각각의 요소가 key와 value로 구성된 RDD를 학습하도록 하겠습니다. Key/Value RDD는 동일한 key를 사용하여 데이터를 집계 및 그룹화하고, 서로 다른 두 RDD를 그룹화 하는 등 새로운 작업을 구성할 수 있습니다. **이와 같은 RDD를 Key/Value RDD or pair RDD라고 부릅니다.!!**

**In python, each element of a pair RDD is a pair tuple.**

* Key / Value RDD in python

* function

### pyspark import & SparkContext 생성(지난주에 했었다)

In [0]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext(master="local[*]")
sc
#sc.stop()

### Spark UI (Spark Context 생성시, 약간의 설정 후 192.168.99.100:4040으로 접속가능) 

![spark_ui](https://docs.google.com/uc?export=download&id=1H77zTnNWYtKkRIXDmGYUlE4YgPm4Wxvr)
![excutors](https://docs.google.com/uc?export=download&id=1jjNhSKwyUjLpa1_nMfkk79BlINhukySF)

Host OS와 Docker Container와 port 연동을 해야합니다.

- - -

**(Kitematic을 이용하는 경우)**

Kitematic -> 생성한 container stop -> Setting -> HostName/Port

Configure Ports을 다음과 같이 설정 후 save
![kit](https://docs.google.com/uc?export=download&id=1gFXz5el_8nNxiIGNZV4KA4rrBZm0uaUo)

- - -

**(CMD or Docker QuickStart Terminal를 이용하는 경우)**

``docker stop running_container``

``docker commit running_container new_container``

``docker run -p 4040:4040 new_container``


## Pair RDD 생성

이전에 실습했던 parallelize를 이용해서 Pair RDD를 생성합니다. 다만, Pair RDD를 생성하기 위해서는 tuple을 사용한다는 것을 잊지말아 주세요....

* python tuple

    1. list와 거의 비슷하지만 **변경 불가능(Immutable)한 특성**을 가지고 있음

    2. **슬라이스와 인덱싱등은 지원을 하지만 append, pop, remove등은 지원하지 않음. 한마디로 말해 값을 가져올수 있지만 추가, 수정, 삭제가 불가능**

In [0]:
pair_rdd = sc.parallelize([(1, 2), (3, 4)])
print(pair_rdd.collect())

[(1, 2), (3, 4)]


**물론 위와 같이 생성할 수도 있지만.. map()을 사용하여 RDD에서 pair RDD를 생성할 수도 있습니다.!!!**

```transformation```과 ```action```이 기억이 안 난다면... **HW1 자료**를 통해 다시 학습합시다!

In [0]:
regular_rdd = sc.parallelize(range(1,7)) # 기존의 RDD 생성
pair_rdd = regular_rdd.map(lambda x : (x, x ** x)) # map을 이용한 transformation

print(pair_rdd.collect())
print(pair_rdd.take(4))

[(1, 1), (2, 4), (3, 27), (4, 256), (5, 3125), (6, 46656)]
[(1, 1), (2, 4), (3, 27), (4, 256)]


**위의 셀에서 take를 이용하였습니다.**

``pair_rdd.take(4)``  RDD에서 4개의 값을 가져옴(action의 일종)!

**collect와 유사하지만, 사용자가 원하는 개수만큼의 데이터를 가져올 수 있죠.**



## Exercise 1

---

KDD Cup 1999에 제공되는 축소 된 데이터셋(약 10%)를 사용합니다. 이 데이터셋에는 약 50만 개의 **network interactions**을 포함합니다.

``[[0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.], [...] ,[...]]``

각 요소는 ``list`` 및 ``,``로 구분되어 있습니다. 또한 각 요소의 마지막의 값(normal.)이 ``network interactions type(Key)``, 그 외의 값은 ``Value``입니다

* input : regular_RDD

* Map each comma separated row of this dataset into a list and create a key/value pair RDD with key as x(Network interaction type) and value as x where x is a list. Print the first row of your newly created pair RDD.


Expected output :



```
[('normal.', ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']),
 ('normal.',  ['0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.'])]
```
 
**위와 같은 ``Expected output``이 나오기 위해 ``kdd_func``를 작성해보세요.**


* KDD cup 1999 데이터 예시(총 ``494,021`` row 및 ``,``로 구분된 다수의 colum(``last column이 network interactions type``)

![image](https://docs.google.com/uc?export=download&id=1r8QwJptLBWyR-01mpRs7vGLemgb2zGz_)

In [0]:
# KDD Cup 1999를 받기 위한 코드
import urllib.request

f = urllib.request.urlretrieve ("https://docs.google.com/uc?export=download&id=1b-cfzMvUaiZQx9gR2KXXmwuOzd9OHs6-", "kddcup.data_10_percent_corrected")
data_file = "./kddcup.data_10_percent_corrected"

raw_data = sc.textFile(data_file) # 이와 같이 데이터를 한 번에 RDD로 생성할 수 있다.

In [0]:
def kdd_func(regular_RDD):
    
    net_type = regular_RDD.split(",")[-1]
    
    return (net_type, [regular_RDD)
    # 여기에 코드를 작성하세요

raw_data.map(kdd_func).take(2)

## Key-Value Transformations

(1) **reduceByKey(func) : 같은 Key를 가진 Value에 Reduce를 적용**

* reduce의 기능과 유사하지만, 데이터셋에 수많은 Key가 있을 수 있으므로 ``Action``이 아닌 ``Transformation``으로 적용됩니다. 

In [0]:
RDD_test = sc.parallelize([(1, 2), (2, 4), (2, 6)])

print("Original RDD : ", RDD_test.collect())
print("After transformation : ", RDD_test.reduceByKey(lambda a, b: a + b).collect()) # Action이 아닌 Transformation!!!!!!

Original RDD :  [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, 2), (2, 10)]


(2) **sorByKey(): Key를 이용하여 오름차순(ascending order) 정렬**

In [0]:
RDD_test= sc.parallelize([(2,261234125), (1,444), (3,6323)])
print("Original RDD :", RDD_test.collect())
print("After transformation : ", RDD_test.sortByKey().collect())

Original RDD : [(2, 261234125), (1, 444), (3, 6323)]
After transformation :  [(1, 444), (2, 261234125), (3, 6323)]


(3) **mapValues(func) : Key 값의 변경없이 Value에 func를 적용**

In [0]:
RDD_test = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", RDD_test.collect())
print("After transformation : ", RDD_test.mapValues(lambda x: x**9).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, 512), (2, 262144), (2, 10077696)]


(4) **groupByKey() : Returns a new RDD of `(key,<iterator>)` pairs where the iterator iterates over the values associated with the key.**

In [0]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())
print("After transformation : ", rdd.groupByKey().mapValues(lambda x: [a for a in x]).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, [2]), (2, [4, 6])]


(5) **flatMapValue(func)**:

``func``는 input으로 ``하나의 값``을 받고, return으로 어떠한 값(사용자가 생성하고자 하는)으로 이루어진 ``iterator``를 반환합니다. ``flatMapValue``는 Key/value RDD에 사용되는 method로, 각 요소에 ``func``를 적용하고, ``iterator``를 통하여 value를 생성합니다. 그런 다음 각 value를 원래의 Key와 결합하여 Key/Value 요소를 생성합니다.

In [0]:
RDD = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", RDD.collect())
# the lambda function generates for each number i, an iterator that produces i,i+1
print("After transformation : ", RDD.flatMapValues(lambda x: range(x,x+4)).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, 2), (1, 3), (1, 4), (1, 5), (2, 4), (2, 5), (2, 6), (2, 7), (2, 6), (2, 7), (2, 8), (2, 9)]


---
## Exercise 2


Continue with the RDD created in last exercise. Print the top 5 network interaction types(with their total durations) which have the largest total durations. Duration is the first column of x, i.e. x[0].

Expected output: 

`
[('normal.', 21075991), ('portsweep.', 1991911), ('warezclient.', 627563), ('buffer_overflow.', 2751), ('multihop.', 1288)]
`

In [0]:
# 여기에 작성하세요



- - -

## Exercise 3-4 ``Moby Dic``에서 n-grams 찾기


Unigrams, bigrams, and in general n-grams are 1,2 or n words that appear consecutively in a single sentence. Consider the sentence:

    "to know you is to love you."

This sentence contains:

    Unigrams(1-grams): to(2 times), know(1 time), you(2 times), is(1 time), love(1 time)
    Bigrams(2-grams): "to know","know you","you is", "is to","to love", "love you" (all 1 time)
    Trigrams(3-grams): "to know you", "know you is", "you is to", "is to love", "to love you" (all 1 time)

**Exercise 3**: 
* Count the occurance of each word and of each 1 - 5 grams( **first()를 사용하여 첫 문장만 수행할 것**)
* ngram function을 완성할 것

**Exercise 4**
* List the 5 most common elements for each order (**1 - 5 grams**). For each element, list the sequence of words and the number of occurances.
* Exercise 3의 ngram function을 이용하여 상위 5개의 keword를 출력할 것(printOutput 이용)

- - -


**출력 예시**

**Exercise 3 output :**

```
###gram  1  :  [(('the',), 1), (('project',), 1), (('gutenberg',), 1), (('ebook',), 1), (('of',), 1), (('moby',), 1), (('dick',), 1), (('or',), 1), (('the',), 1), (('whale',), 1), (('by',), 1), (('herman',), 1), (('melville',), 1)] 

###gram  2  :  [(('the', 'project'), 1), (('project', 'gutenberg'), 1), (('gutenberg', 'ebook'), 1), (('ebook', 'of'), 1), (('of', 'moby'), 1), (('moby', 'dick'), 1), (('dick', 'or'), 1), (('or', 'the'), 1), (('the', 'whale'), 1), (('whale', 'by'), 1), (('by', 'herman'), 1), (('herman', 'melville'), 1)] 

###gram  3  :  [(('the', 'project', 'gutenberg'), 1), (('project', 'gutenberg', 'ebook'), 1), (('gutenberg', 'ebook', 'of'), 1), (('ebook', 'of', 'moby'), 1), (('of', 'moby', 'dick'), 1), (('moby', 'dick', 'or'), 1), (('dick', 'or', 'the'), 1), (('or', 'the', 'whale'), 1), (('the', 'whale', 'by'), 1), (('whale', 'by', 'herman'), 1), (('by', 'herman', 'melville'), 1)] 

###gram  4  :  [(('the', 'project', 'gutenberg', 'ebook'), 1), (('project', 'gutenberg', 'ebook', 'of'), 1), (('gutenberg', 'ebook', 'of', 'moby'), 1), (('ebook', 'of', 'moby', 'dick'), 1), (('of', 'moby', 'dick', 'or'), 1), (('moby', 'dick', 'or', 'the'), 1), (('dick', 'or', 'the', 'whale'), 1), (('or', 'the', 'whale', 'by'), 1), (('the', 'whale', 'by', 'herman'), 1), (('whale', 'by', 'herman', 'melville'), 1)] 

###gram  5  :  [(('the', 'project', 'gutenberg', 'ebook', 'of'), 1), (('project', 'gutenberg', 'ebook', 'of', 'moby'), 1), (('gutenberg', 'ebook', 'of', 'moby', 'dick'), 1), (('ebook', 'of', 'moby', 'dick', 'or'), 1), (('of', 'moby', 'dick', 'or', 'the'), 1), (('moby', 'dick', 'or', 'the', 'whale'), 1), (('dick', 'or', 'the', 'whale', 'by'), 1), (('or', 'the', 'whale', 'by', 'herman'), 1), (('the', 'whale', 'by', 'herman', 'melville'), 1)]
```



- - -

**Exercise 4 output** :


![result](https://docs.google.com/uc?export=download&id=1esaB2-j5IPkN1sjt-IvWFkDQEdUP-S4J)

---

**Moby Dic.txt 예시(22,108 row로 이루어진 text)**

![image](https://docs.google.com/uc?export=download&id=1DvXEa10-b4WywIyfboimPzHbpCkJTD2j)

In [0]:
# Moby-dick download

import urllib.request
import re

def removePunctuation(text):
    text = re.sub("[^0-9a-zA-Z ]", " ", text)
    return text

f = urllib.request.urlretrieve ("https://docs.google.com/uc?export=download&id=1BYRYdnYg5QHtpC9AViDexoiLdjM0cz4M", "Moby-Dick.txt")
data_file = "./Moby-Dick.txt"
raw_data = sc.textFile(data_file).map(removePunctuation).map(lambda x:x.lower())

In [0]:
raw_data.first()

'the project gutenberg ebook of moby dick  or the whale  by herman melville'

## Exercise 3 - ngram function 작성

In [0]:
def ngram(sentence):
    
    # 여기에 코드를 입력하세요

# 결과 출력
for gram in range(1, 6):
    print("###gram ", gram, " : ", ngram(raw_data.map(lambda x:x.split()).first()), "\n") # first 첫 번째 값 반환

## Exercise 4 - ngram을 이용한 출력 작성

In [0]:
def printOutput(n,freq_ngramRDD): 
    top=freq_ngramRDD.take(5) # 이미 sort 되어있기 때문에 take(5)
    print('\n============ %d most frequent %d-grams'%(5,n))
    print('\nrank\tcount\tngram')
    for i in range(5):
        print('%d\t%d: \t"%s"'%(i+1,top[i][0], ' '.join(top[i][1]))) # ' '.join(tuple) ---> tuple to string
        
for gram in range(1,6):
    ngrams = raw_data.map(lambda x:x.split()).flatMap(ngram)
    
    # 여기에 코드를 입력하세요