## Optimizing PySpark and PySpark Streaming

- PageRank Algorithm
- KNN (cartesian, broadcast)
- Streaming

### Page-Rank Algorithm by Using PySpark Code

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PageRank').getOrCreate()

In [3]:
pageLinks =  [['a' ,['b','c','d']],
               ['b', ['d','c']],
               ['c', ['b']],
               ['d', ['a','c']]]
pageRanks =  [['a',1],
               ['b',1],
               ['c',1],
               ['d',1]]

numIter = 20

# Paired RDD joining 操作在任何分布式系统中都是一个比较昂贵的, partitionBy()尽可能的让相同Key的Rddpari出现在相同的分区,减少shuffle
pageRanksRDD  = spark.sparkContext.parallelize(pageRanks, 2).partitionBy(2,hash).persist() 
pageLinksRDD  = spark.sparkContext.parallelize(pageLinks, 2).partitionBy(2,hash).persist()
s = 0.85

In [4]:
def rankContribution(uris, rank):
    numberOfUris = len(uris)
    rankContribution = float(rank) / numberOfUris
    newrank =[]
    for uri in uris:
        newrank.append((uri, rankContribution))
    return newrank

for i in range(numIter):
    linksRank = pageLinksRDD.join(pageRanksRDD)
    contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
    sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
    pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))

pageRanksRDD.collect()

[('b', 1.357243795127983),
 ('a', 0.521726802480915),
 ('c', 1.246378102436009),
 ('d', 0.8746512999550939)]

### Implement the K-Nearest Neighbors (KNN) algorithm using PySpark

In [5]:
# 计算两个元组之间的距离
def distanceBetweenTuples(data1 , data2) :
    squaredSum = 0.0
    for i in range(len(data1)):
        squaredSum = squaredSum + (data1[i] - data2[i])**2
    return(squaredSum**0.5)

pythonTuple1 = (1.2, 3.4, 3.2)
pythonTuple2 = (2.4, 2.2, 4.2)
distanceBetweenTuples(pythonTuple1, pythonTuple2)

1.9697715603592207

In [6]:
data_list   = [((3.09,1.97,3.73),'group1'),
               ((2.96,2.15,4.16),'group1'),
               ((2.87,1.93,4.39),'group1'),
               ((3.02,1.55,4.43),'group1'),
               ((1.80,3.65,2.08),'group2'),
               ((1.36,4.43,1.95),'group2'),
               ((1.71,4.35,1.94),'group2'),
               ((1.03,3.75,2.12),'group2'),
               ((2.30,3.59,1.99),'group2')]

rdd = spark.sparkContext.parallelize(data_list, 2)

new_record = [(2.5, 1.7, 4.2)]
new_rdd = spark.sparkContext.parallelize(new_record, 1)
# 笛卡尔积
cartesian_rdd = rdd.cartesian(new_rdd) 
cartesian_rdd.take(5)

[(((3.09, 1.97, 3.73), 'group1'), (2.5, 1.7, 4.2)),
 (((2.96, 2.15, 4.16), 'group1'), (2.5, 1.7, 4.2)),
 (((2.87, 1.93, 4.39), 'group1'), (2.5, 1.7, 4.2)),
 (((3.02, 1.55, 4.43), 'group1'), (2.5, 1.7, 4.2)),
 (((1.8, 3.65, 2.08), 'group2'), (2.5, 1.7, 4.2))]

In [9]:
K = 5
distance_rdd = cartesian_rdd.map(lambda data: (data[0][1], distanceBetweenTuples(data[0][0], data[1])))

distance_rdd.take(K)

[('group1', 0.8011866199581719),
 ('group1', 0.6447480127925947),
 ('group1', 0.47528938553264566),
 ('group1', 0.5880476171195661),
 ('group2', 2.9642705679475347)]

In [10]:
new_rdd_class = distance_rdd.takeOrdered(K, key = lambda data : data[1]) 

In [11]:
new_rdd_class

[('group1', 0.47528938553264566),
 ('group1', 0.5880476171195661),
 ('group1', 0.6447480127925947),
 ('group1', 0.8011866199581719),
 ('group2', 2.9148241799463652)]

In [12]:
ourClassesGroup = [data[0] for data in new_rdd_class]
ourClassesGroup
max(ourClassesGroup,key=ourClassesGroup.count)

'group1'

### KNN : Broadcasting the Record Value vs Cartesian

In [16]:
new_record = [(2.5, 1.7, 4.2)]
broadCasted_record = spark.sparkContext.broadcast(new_record)
broadCasted_record.value, broadCasted_record.value[0]

([(2.5, 1.7, 4.2)], (2.5, 1.7, 4.2))

In [18]:
distance_rdd2 = rdd.map(lambda data : (data[1] ,distanceBetweenTuples(data[0], tuple(broadCasted_record.value[0]))))
distance_rdd2.take(5)

[('group1', 0.8011866199581719),
 ('group1', 0.6447480127925947),
 ('group1', 0.47528938553264566),
 ('group1', 0.5880476171195661),
 ('group2', 2.9642705679475347)]

In [19]:
distance_rdd2.takeOrdered(K, key = lambda data : data[1]) 

[('group1', 0.47528938553264566),
 ('group1', 0.5880476171195661),
 ('group1', 0.6447480127925947),
 ('group1', 0.8011866199581719),
 ('group2', 2.9148241799463652)]

### PySpark Streaming

### Reading data from netcat server and sum calculation.

In [3]:
# 字符串转换为数字并求和
def stringToNumberSum(data):
    removedSpaceData = data.strip()
    if   removedSpaceData == '' :
        return(None)
    splittedData =  removedSpaceData.split(' ')
    numData =  [float(x) for x in splittedData]
    sumOfData = sum(numData)
    return (sumOfData)

nc command 
```
[xxxx@wjh]$nc -lk 99527
```

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(appName="read_streaming_data")
ssc = StreamingContext(sc, 1)

consoleStreamingData = ssc.socketTextStream(
                                         hostname = 'localhost',
                                         port = 9981
                                        )

In [4]:
sumedData = consoleStreamingData.map(stringToNumberSum)

sumedData.pprint()

ssc.start()
ssc.awaitTerminationOrTimeout(30)

-------------------------------------------
Time: 2019-12-02 12:09:47
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:48
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:49
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:50
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:51
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:52
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:53
-------------------------------------------

-------------------------------------------
Time: 2019-12-02 12:09:54
-------------------------------------------



KeyboardInterrupt: 