# Spark in Colab

<a href="https://colab.research.google.com/github/UDICatNCHU/SparkTutorial/blob/master/(SparkTutorial)%20RDD_basic_tutorial%202019.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 環境初始化 (大約三至五分鐘)
! wget -O init_env.sh https://www.dropbox.com/s/6bnwn8u2hz19s59/init_env.sh && \
bash init_env.sh

In [None]:
!which python

In [None]:
!/usr/local/bin/python -V

In [None]:
import os, sys
os.environ['SPARK_HOME'] = "/usr/local/spark"
# os.environ['PYSPARK_PYTHON'] = "/usr/bin/python"
os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python"
sys.path.append("/usr/local/spark/python/")
sys.path.append("/usr/local/spark/python/lib/pyspark.zip")
sys.path.append("/usr/local/spark/python/lib/py4j-0.10.4-src.zip")

In [None]:
from pyspark import SparkContext
from pyspark import SparkConf
sc = SparkContext()

# Apache Spark 基本練習：

是一個開源叢集運算框架，最初是由加州大學柏克萊分校AMPLab所開發。相對於Hadoop的MapReduce會在執行完工作後將中介資料存放到磁碟中，Spark使用了記憶體內運算技術，能在資料尚未寫入硬碟時即在記憶體內分析運算。Spark在記憶體內執行程式的運算速度能做到比Hadoop MapReduce的運算速度快上100倍。

Some References :

1. [http://www.mccarroll.net/blog/pyspark/index.html](http://www.mccarroll.net/blog/pyspark/index.html)
2. [https://www.codementor.io/spark/tutorial/spark-python-rdd-basics](https://www.codementor.io/spark/tutorial/spark-python-rdd-basics)
3. [http://backtobazics.com/big-data/spark/apache-spark-map-example/](http://backtobazics.com/big-data/spark/apache-spark-map-example/)
4. [http://datascience-enthusiast.com/Python/Apache_Spark1.html](http://datascience-enthusiast.com/Python/Apache_Spark1.html)

## RDD 基本操作練習：



### 產生一個整數隨機陣列： python語法：

In [None]:
import numpy as np
random_array = np.random.randint(1000, size=1000)
print (random_array)

# 將資料轉成RDD，分別擺放於各spark executors上

<img src="https://www.dropbox.com/s/br94ete5q3rj9w3/spark%20data%20model.png?dl=1" width="500" align="left">




___
<img src="https://www.dropbox.com/s/l2gohpmn53jkv1b/%20spark%20system%20overview.png?dl=1" width="500" align="left">

In [None]:
rdd = sc.parallelize(random_array)

In [None]:
print(type(random_array))
print(type(rdd))

### RDD為Apache Spark最核心之概念，有別於MapRduce，僅提供Map()與Reduce()兩項操作。 RDD提供兩大類別Transformation與Action



> <img src="https://www.dropbox.com/s/omfoi3uzcgapcm4/rdd%20transformation%20concept.png?dl=1" width="500" align="left">  

> <img src="https://www.dropbox.com/s/3u8gt5376qq5vjy/spark%20core.png?dl=1" width="500" align="left">

## 最基本之Action操作  
### 使用 collect( ) 將分散於各機器之資料，收集成為單機資料集
<img src="https://www.dropbox.com/s/pjv20pl5wkevjf6/collect.png?dl=1" width="500" align="left">


In [None]:
rdd.first()

## Transformation 觀念介紹

In [None]:
def doubling(x):
    return x*2

print(doubling(10))

In [None]:
print(rdd.take(5))
print(rdd.map(doubling).take(5))
print(rdd.map(lambda x:x+1).take(5))
print(rdd.map(lambda x:x**0.5).take(5))

In [None]:
def minusone(y):
  return y-1

In [None]:
rdd.map(minusone).take(5)

#### 注意：匿名函式的使用  
    rdd.map(doubling) = rdd.map(lambda x: x*2)

In [None]:
rdd.map(lambda x: x-1).take(5)

In [None]:
rdd.map(lambda yy:0).take(2)

In [None]:
rdd.map(lambda x: x-1).map(lambda x:x-2).map(lambda x:x-3).take(2)

#### 練習：使用map()，將所有數字開平方根 
    import math
    print math.sqrt(5)

In [None]:
import math
rdd.map(lambda x: math.sqrt(x)).take(2)

#### 匿名函式的差異

In [None]:
import math
def sqrt(x):
    return math.sqrt(x)

In [None]:
rdd.map(lambda y: math.sqrt(y)).take(2)

In [None]:
rdd.map(sqrt).take(2)

# Transformation Operators: map(), filter(), sample(), groupBy(), etc 
## (完整Transformation Operator請參考下列網址) http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD 

rdd資料中有多少筆?

In [None]:
rdd.map(lambda x:1).reduce(lambda x,y:x+y) 

In [None]:
rdd.count()

In [None]:
rdd.map(lambda x: x if x%2==0 else 0).reduce(lambda x,y:x+y)

#### Transformation Operator 使用filter()，將所有偶數留下，奇數刪除。

In [None]:
rdd.filter(lambda x: x%2==0).reduce(lambda x,y:x+y)

#### Transformation Operator 使用sample( ) 抽樣給定比例之RDD子集合

In [None]:
subsetrdd = rdd.sample(False, 0.01)
subsetrdd.collect()

#### Transformation Operator: 使用 groupBy( ) 將資料分組

In [None]:
result = subsetrdd.groupBy(lambda x: x%2==0).collect()
print(result)
print([(x, list(y)) for (x, y) in result])

#### Transformation Operator 使用map ( ) 產生 key value pair

In [None]:
keyValueRdd = rdd.map(lambda x: ('even', x) if x%2==0 else ('odd',x))


In [None]:
rdd.map(lambda x: ('even', x) if x%2==0 else ('odd',x))\
   .reduceByKey(lambda x,y: x+y)\
   .take(2)

In [None]:
def evennumber(x):
  if x%2==0:
    return x
  else:
    return 0

In [None]:
evennumber(101)

In [None]:
rdd.reduce(lambda x,y: x*x+y*y)

In [None]:
rdd.map(evennumber).reduce(lambda x,y:x+y)

In [None]:
rdd.map(lambda x: x if x%2==0 else 0).reduce(lambda x,y:x+y)


#### Transformation Operator: 使用 groupbyKey ( ) 根據key將資料分組

In [None]:

keyValueRdd = rdd.map(lambda x: ('even', x) if x%2==0 else ('odd',x))

for x in keyValueRdd.groupByKey().collect():
    print (x[0], list(x[1]))


###練習 計算奇數加總值與偶數加總值
#### groupbyKey(), reduceByKey()

In [None]:
keyValueRdd.reduceByKey(lambda x,y: x+y).take(2)

In [None]:
rdd.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b).take(5)

___
___
___


# RDD Action Action Operation
#### 使用reduce()，計算所有數之加總值。

In [None]:
rdd.reduce(lambda x,y: x+y )

In [None]:
rdd.reduce(lambda x,y: x*x+y*y)

In [None]:
rdd.map(lambda x:1).reduce(lambda a,b:a+b)

In [None]:
# The sum of odd numbers
rdd.map(lambda x: x&1).reduce(lambda a,b: a+b)

In [None]:
# The sum of even numbers
rdd.map(lambda x: x&1^1).reduce(lambda a,b: a+b)

In [None]:
rdd.map(lambda x: ("odd",1) if x&1==1 else ("even",1)).reduceByKey(lambda x, y: x+y).collect()

In [None]:
rdd.map(lambda x:(x,1)).reduceByKey(lambda x, y: x+y).collect()

______

## 觀念
### 練習：計算所有數字平方和。

In [None]:
rdd.map(lambda x: x*x).reduce(lambda x,y: x+y)

## 練習：使用map(), reduceByKey()，計算所有數字出現頻率。

In [None]:
rdd.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)

_____
_____
_____

# WordCount

In [None]:
!wget -O speech "https://www.dropbox.com/s/28ljfwb1aeuyi37/speech.txt?dl=1"

In [None]:
data_file = "./speech"

In [None]:
text_file = sc.textFile(data_file)

In [None]:
print(type(text_file))

In [None]:
text_file.take(5) #get 5 lines

In [None]:
text_file.map(lambda line: line.split(" ")).count()
# output: 89 lines

In [None]:
c = text_file.flatMap(lambda line: line.split(" "))\
    .map(lambda x: (x, 1))\
    .reduceByKey(lambda a,b: a+b)\
    .map(lambda x: (x[1], x[0]))\
    .sortByKey(False)

c.take(20)

In [None]:
text_file.flatMap(lambda line: line.split(" ")).count()

In [None]:
wordcountsRDD = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
wordcountsRDD.take(10)


In [None]:
wordcountsRDD.take(10)

#### 根據字元符號順序做排序

In [None]:
text_file = sc.textFile(data_file)
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b).sortByKey()
counts.take(5)

#### 出現頻率做排序

In [None]:
text_file = sc.textFile(data_file)

counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)\
             .sortBy(lambda x: x[1], ascending=False)

counts.take(20)



#### map 與 flatMap的差異

In [None]:
text_file = sc.textFile(data_file)
counts = text_file.map(lambda line: line.split(" "))
counts.take(1)


In [None]:
text_file = sc.textFile(data_file)
counts = text_file.flatMap(lambda line: line.split(" "))
counts.take(1)

In [None]:
rdd.map(lambda x: x if x>500 else 0).reduce(lambda x,y:x+y)


____
____
____
# pi-estimation 

In [None]:
import random

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(range(0, 10000000)).map(sample) \
             .reduce(lambda a, b: a + b)
print ("Pi is roughly %f" % (4.0 * count / 10000000))

In [None]:
count = sc.parallelize(range(0, 100000000))\
        .map(lambda p: 1 if (random.random()**2 + random.random()**2)<1 else 0) \
        .reduce(lambda a, b: a + b)
print ("Pi is roughly %f" % (4.0 * count / 100000000))

___
___
___

# Text Search Example

In [None]:
import urllib
f=urllib.urlretrieve("https://www.ccel.org/ccel/bible/kjv.txt","bible")
text_file = sc.textFile(data_file)
lines = text_file.map(lambda line: line) 
lines.take(20)


___
___
___

# Filter Example

In [None]:
import urllib
urllib.urlretrieve("https://www.ccel.org/ccel/bible/kjv.txt","bible")
data_file = "./bible"
text_file = sc.textFile(data_file)
lines = text_file.filter(lambda line: 'and' in line)
lines.take(20)