----------
## Notebook setup

When using PySpark kernel notebooks on HDInsight, there is no need to create a SparkContext or a SparkSession; a SparkSession which has the SparkContext is created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkSession (spark)
- SparkContext (sc)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

## How do I make an RDD?

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from the sample data files available in the storage container associated with your Spark cluster. One such sample data file is available on the cluster at `wasb:///example/data/fruits.txt`.  The /// notation reads data from the default container.

In [1]:
from pyspark import SparkContext
sc =SparkContext()

In [2]:
A = sc.parallelize(xrange(1, 100))
t = 50
B = A.filter(lambda x: x < t)
# print B.cache()
print B.count()
t = 10
C = B.filter(lambda x: x > t)
print C.count()

49
0


In [4]:
fruits = sc.textFile('wasb:///example/data/fruits.txt')
yellowThings = sc.textFile('wasb:///example/data/yellowthings.txt')
fruits.collect()

In [6]:
# # In local mode:
fruits = sc.textFile('../data/fruits.txt')
yellowThings = sc.textFile('../data/yellowthings.txt')
print fruits.collect()
print yellowThings.collect()

In [None]:
# You can also read from other containers.
# The 'cluster' container under the storage account 'msbd' has been made public.
# Use the following format to read data from a public container
# The file can also be accessed from the web at: 
# https://msbd.blob.core.windows.net/cluster/data/course.txt

txtfile = sc.textFile('wasb://cluster@msbd.blob.core.windows.net/data/course.txt')
txtfile.collect()

----------

##  RDD operations

In [None]:
# map 倒序
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])

In [None]:
fruitsReversed.unpersist()
# try changing the file and re-execute with and without cache
fruitsReversed.collect()

In [None]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()

### 在使用时map会将一个长度为N的RDD转换为另一个长度为N的RDD；而flatMap会将一个长度为N的RDD转换成一个N个元素的集合，然后再把这N个元素合成到一个单个RDD的结果集  
flatMap相对于map多了的是[[“a”,”b”,”c”],[],[“d”]] => [“a”,”b”,”c”,”d”]这一步

In [None]:
# flatMap
characters = fruits.flatMap(lambda fruit: list(fruit))
characters.collect()

In [None]:
# union
fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.collect()

In [None]:
# intersection
yellowFruits = fruits.intersection(yellowThings)
yellowFruits.collect()

In [None]:
# distinct
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.collect()

### RDD actions
Following are examples of some of the common actions available. For a detailed list, see [RDD Actions](https://spark.apache.org/docs/2.0.0/programming-guide.html#actions).

Run some transformations below to understand this better. Place the cursor in the cell and press **SHIFT + ENTER**.

In [None]:
# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
fruitsArray

In [None]:
# count
numFruits = fruits.count()
numFruits

In [None]:
# take
first3Fruits = fruits.take(3)
first3Fruits

In [None]:
# reduce
letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
letterSet

In [None]:
letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect()
letterSet

### Closure 闭包

- 广播变量（broadcast variables）：将一个只读变量缓存到集群的每个节点上。例如，将一份数据的只读缓存分发到每个节点。
- 累加变量（accumulators）：只允许add操作，用于计数、求和  
- 在驱动程序中，在一个已经存在的集合上调用SparkContext的parallelize方法可以创建一个并行集合。集合里的元素将被复制到一个可被并行操作的分布式数据集中  

共享变量  
一般来说，当一个函数被传递给Spark操作（例如map和reduce），在一个远程集群上运行，它实际上操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器，在远程机器上对变量的所有更新都不会被传播回驱动程序。通常看来，在任务之间中，读写共享变量显然不够高效。然而，Spark还是为两种常见的使用模式，提供了两种有限的共享变量：广播变量和累加器。  

- 广播变量 Broadcast Variables
广播变量允许程序员保留一个只读的变量，缓存在每一台机器上，而非每个任务保存一份拷贝。他们可以这样被使用，例如，以一种高效的方式给每个结点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量，从而减少通信的代价。  

广播变量是通过调用SparkContext.broadcast(v)方法从变量v创建的。广播变量是一个v的封装器，它的值可以通过调用value方法获得。

- 加器是一种只能通过关联操作进行“加”操作的变量，因此可以高效被并行支持。它们可以用来实现计数器（如MapReduce中）和求和器。Spark原生就支持Int和Double类型的累加器，开发者可以自己添加新的支持类型。  

一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始值v中创建。运行在集群上的任务，可以通过使用+=来给它加值。然而，他们不能读取这个值。只有驱动程序可以使用value的方法来读取累加器的值。  

In [8]:
counter = 0
rdd = sc.parallelize(xrange(5,10))

# Wrong: Don't do this!! ##？
def increment_counter(x):
    global counter
    counter += x
#     return counter
rdd.foreach(increment_counter)
print rdd.count()
print rdd.collect()
print counter

5
[5, 6, 7, 8, 9]
0


In [29]:
rdd = sc.parallelize(xrange(10))
print(rdd)
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
#     accum.add(x)

a = rdd.foreach(g)

print accum.value

PythonRDD[68] at RDD at PythonRDD.scala:48
45


- 在分布式下运行时，建议使用累加器定义一些全局集合。

在这里，发送给每个executor的闭包内的变量是当前变量的副本，因此当counter在foreach中被引用时，已经不是在driver节点上的counter了。在driver节点的内存中仍然有一个counter，但这个counter对executors不可见。executor只能操作序列化的闭包中的counter副本。因此，最终counter的值仍然是0，因为所有对counter的操作都是在序列化的闭包内的counter上进行的

In [31]:
accum = sc.accumulator(0)
def f(x):
    accum.add(x)
sc.parallelize([1,2,3,4]).foreach(lambda x:f(x))
accum.value# executed

10

In [38]:
accum = sc.accumulator(0)
def f(x):
    accum.add(x)
print sc.parallelize([1,2,3,4]).map(lambda x:f(x)).collect()
accum.value# not execute

[None, None, None, None]


10

In [None]:
def f(x):
    return (x,1)
rdd.map(lambda x:(x,1))

In [24]:
accum = sc.accumulator(0)
def f(x):
    accum.add(x)
    return accum
print sc.parallelize([1,2,3,4]).map(lambda x:f(x)).reduce
accum.value# not execute

[Accumulator<id=15, value=0>, Accumulator<id=15, value=0>, Accumulator<id=15, value=0>, Accumulator<id=15, value=0>]


10

In [8]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)

def g(x):
    global accum
    accum += x
    return x * x

a = rdd.map(g) #还未执行，
print accum.value
print rdd.reduce(lambda x, y: x+y)
# a.cache() # 后面的a会变成45，不再对global进行操作
tmp = a.count() #accum的值才更新
print accum.value
print rdd.reduce(lambda x, y: x+y)

tmp = a.count()
print accum.value
print rdd.reduce(lambda x, y: x+y)


0
45
45
45
90
45


In [27]:
from operator import add

rdd = sc.parallelize(xrange(10))
print rdd.collect()
print rdd.sum()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
45


In [30]:
A = sc.parallelize(xrange(10))

print A.collect()

x = 5
B = A.filter(lambda z: z < x) #更改后x会变成5 
#B.cache()
print B.take(10)
print B.collect()
x = 3 # 发送给所有的workers
print B.count()
print B.take(10) 
print B.collect() #bug? 返回最近的结果，和take不一样
#collect() doesn't always re-collect data - bad design!

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
3
[0, 1, 2]
[0, 1, 2, 3, 4]


In [34]:
# RDD variables are references
    A = sc.parallelize(xrange(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)

[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

In [53]:
# Linear-time selection

data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4
# x = A.first()
while True:
    x = A.first()
    A1 = A.filter(lambda z: z < x) # 第二次执行的时候，从最初再开始计算这里的A(来自于上一步的A2)，A会相应的改变，不是上一次应该的A2
    A2 = A.filter(lambda z: z > x)
    print A.take(10)
    mid = A1.count()
    print 'left mid,k:',mid,k
    if mid == k:
        print x
        break
    if k < mid: # k在左半边
        A = A1
    else:
        A = A2
        k = k - mid - 1# 右边，新的A2，重新定义K
#     A.cache()
    

[34, 67, 21, 56, 47, 89, 12, 44, 74, 43]
left mid,k: 3 4
[89, 74]
left mid,k: 0 0
67


In [None]:
sorted(data)

### Computing Pi using Monte Carlo simulation

### 蒙特卡罗方法-概率统计计算面积  
- 正方形内部有一个相切的圆，它们的面积之比是π/4,现在，在这个正方形内部，随机产生10000个点（即10000个坐标对 (x, y)），计算它们与中心点的距离，从而判断是否落在圆的内部,如果这些点均匀分布，那么圆内的点应该占到所有点的 π/4，因此将这个比值乘以4，就是π的值。通过R语言脚本随机模拟30000个点，π的估算值与真实值相差0.07%

In [67]:
# From the official spark examples.

import sys
import random

partitions = 10 # also workers
n = 100000 * partitions

def f(_):
    x = random.random()
    y = random.random()
    return 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions) \
          .map(f).reduce(lambda a,b: a+b) #gxrange - enerator
print(count)

print "Pi is roughly", 4.0 * count / n

7858300
Pi is roughly 3.14332


In [55]:
x = random.random()
x

0.9957222540657606

### 所有workers的random相同，种子相同，精度相同，要用不同的seed

In [62]:
a = sc.parallelize(xrange(0,20),4)
print a.collect()
print a.glom().collect()
# a.map(lambda x: random.random()).glom().collect() # 所有的workers做相同的工作

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]


mapPartitionsWithIndex(分区的idnex，迭代次数) 像函数传递两个参数

In [73]:
# Correct version

partitions = 1000
n = 100000 * partitions

def f(index, it):
    random.seed(index + 987236)
    for i in it:
        x = random.random()
        y = random.random()
        yield 1 if x ** 2 + y ** 2 < 1 else 0

count = sc.parallelize(xrange(1, n + 1), partitions) \
          .mapPartitionsWithIndex(f).reduce(lambda a,b: a+b) ##？

print "Pi is roughly", 4.0 * count / n

Pi is roughly 3.14152072


### Key-Value Pairs  
- useful operations 

In [75]:
# reduceByKey
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
numFruitsByLength.collect()

In [28]:
from operator import add

lines = sc.textFile('/Users/lucas/Desktop/workspace/temp')
counts = lines.flatMap(lambda x: x.split('\n')) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add).sortBy(lambda x:x[1],False).take(10)
counts
# counts.collect()

[(u'', 10),
 (u'                sum = sum + 1', 3),
 (u'        number = arrays[x]', 2),
 (u'import sys', 2),
 (u'def identify(a, b):', 2),
 (u'    _b = b', 2),
 (u'        count = count + 1', 2),
 (u'            if x == y:', 2),
 (u'    number = a * 10 ** count + _b', 2),
 (u'    else:', 2)]

In [54]:
t = 50
A = sc.parallelize(range(1,100))
B = A.filter(lambda x:x<t)
B.cache()
t = 10
C = B.filter(lambda x:x>t).collect()

### sortByKey  
counts.map(lambda (k,v): (v,k)).sortByKey(False).take(20)  
### sortBy

In [10]:
counts.sortBy(lambda x: x[1], False).take(4)

[(u'', 10),
 (u'                sum = sum + 1', 3),
 (u'        number = arrays[x]', 2),
 (u'import sys', 2)]

### 笛卡尔积，所有key相同的pair n*m

### PMI  
- tuple() 更适合key-value 操作

In [None]:
lines = sc.textFile('',8)
lines.count()
lines.
lines.take(5)

### K-means clustering

In [None]:
import numpy as np

def parseVector(line):
    return np.array([float(x) for x in line.split(' ')])

def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex # 返回最近的center的index当做label

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('../data/kmeans_data.txt', 5)  

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)  
# lines is an RDD of strings
K = 3
convergeDist = 0.01 # 终止阈值
# terminate algorithm when the total distance from old center to new centers is less than this value

data = lines.map(parseVector).cache() # data is an RDD of arrays

kCenters = data.takeSample(False, K, 1)  # intial centers as a list of arrays
tempDist = 1.0  # total distance from old centers to new centers

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))#用tuple来存储数据
    # for each point in data, find its closest center
    # closest is an RDD of tuples (index of closest center, (point, 1))
        
    pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
    # pointStats is an RDD of tuples (index of center,(array of sums of coordinates, total number of points assigned))
    
    newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
    # compute the new centers
    
    tempDist = sum(np.sum((kCenters[i] - p) ** 2) for (i, p) in newCenters)
    # compute the total disctance from old centers to new centers
    
    for (i, p) in newCenters:
        kCenters[i] = p
        
print "Final centers: ", kCenters


### PageRank

In [39]:
import re
from operator import add

def computeContribs(urls, rank):
    # Calculates URL contributions to the rank of other URLs.
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parseNeighbors(urls):
    # Parses a urls pair string into urls pair."""
    parts = urls.split(' ')
    return parts[0], parts[1]

# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile("../data/pagerank_data.txt", 2)
# lines = sc.textFile("../data/dblp.in", 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors. 
links = lines.map(lambda urls: parseNeighbors(urls)) \
             .groupByKey() # (1,(2,3)) (2,(4,1))

# Loads all URLs with other URL(s) link to from input file 
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0) # (1,1.0) ....

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks) \
                    .flatMap(lambda url_urls_rank:
                             computeContribs(url_urls_rank[1][0],
                                             url_urls_rank[1][1]))
    # After the join, each element in the RDD is of the form
    # (url, (list of neighbor urls, rank))
    
    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    # ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15))

print ranks.top(5, lambda x: x[1])

### Join vs. Broadcast Variables

In [3]:
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

print trans.join(products).collect()


[(1, ((134, 'OK'), 'Apple')), (1, ((135, 'OK'), 'Apple')), (1, ((45, 'OK'), 'Apple')), (2, ((53, 'OK'), 'Orange')), (3, ((34, 'OK'), 'TV')), (5, ((162, 'Error'), 'Computer'))]


In [1]:
products = {1: "Apple", 2: "Orange", 3: "TV", 5: "Computer"}
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])

# broadcasted_products = sc.broadcast(products)

def f(x):
    return (x[0], broadcasted_products.value[x[0]], x[1])

results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
# results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print results.collect()


In [1]:
a = 'asd'
a[::-1]

'dsa'