In [1]:
import pyspark

## Introduction of pyspark
Anay spark program start with SpartContext, initialized with SparkConf object, 
SparkConf contains all parameters.
getOrCreate will create a new session or use existed session.
SparkSession provide uniform enter point for all API. Like Streaming/SQL/hive..now SparkSession used as enter point of DataFrame and DataSet API(combination of SQL+Hive)

In [1]:
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf = SparkConf().setAppName("miniProject").setMaster('local[*]')
sc=SparkContext.getOrCreate(conf)

### Initialize RDD
for a existed list, use sc.parallelize to create RDD. During operating, the elements in list will be automatically partitioned, and sent to differnt machines.

In [6]:
#1.
rdd = sc.parallelize([1,2,3,4,5])
rdd
#2.Initialized setting by me as 2k, because my laptop is 2-core.
rdd.getNumPartitions()
#3.How is partition?
rdd.glom().collect()
'''
Carefully use collect(), it will exceed the memory
'''

[[1, 2], [3, 4, 5]]

In [19]:
#As suggested by pyspark, install psutil
pip install psutil

SyntaxError: invalid syntax (<ipython-input-19-60b01ccb9234>, line 2)

In [9]:
#4.Another way to create RDD
#Read file in rdd. Each line is an item, be careful aout the path
#1.get work directory
import os
cwd = os.getcwd()
cwd
# 'C:\\Users\\Tianyi Fang\\Desktop\\case\\Untitled Folder'

#2.get the file path
rdd1 = sc.textFile(cwd +"\\20170303_1_play.log.fn")
#rdd_whole_files = sc.wholeTextFiles(cwd + filename)
rdd1

#3.first() will return the first item in rdd
rdd1.first()


'154750654 \tar \t10740680 \t0 \t胸是软绵绵的 \tMC马克 \t76 \t0 \t0 \t 20170303_1_play.log'

### RDD Transformation
#### 1.Basic operations
- map(): do the same operation for every item in RDD
- flatmap():do the same operation for every item in RDD, get a list, then put all results in that list into a new list with flat method
- filter(): filter rows based on conditions
- distinct():
- sample():
- sortBy():
- collect(): transform all item into a py.list, be caucious when dealing with big data

In [14]:
numbersRDD = sc.parallelize(range(1,10+1))
print(numbersRDD.collect())
mapRDD = numbersRDD.map(lambda x: x**2)
print(mapRDD.collect())
filterRDD = numbersRDD.filter(lambda x: x%2 ==0)
print(filterRDD.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
[2, 4, 6, 8, 10]


In [17]:
sentenceRDD = sc.parallelize(['Hello world!', 'I am learning Spark'])
wordRDD = sentenceRDD.flatMap(lambda sentence: sentence.split(" "))
print(wordRDD.collect(), wordRDD.count())
wordRDD2 = sentenceRDD.map(lambda sentence: sentence.split(" "))
print(wordRDD2.collect(), wordRDD2.count())
'''
Here flatMap represents operations in Python as:
l = ['Hello world!', 'I am learning Spark']
ll = []
for sentence in l:
    ll = ll + sentence.split(" ")
ll
'''

['Hello', 'world!', 'I', 'am', 'learning', 'Spark'] 6
[['Hello', 'world!'], ['I', 'am', 'learning', 'Spark']] 2


#### 2.Use in series

In [18]:
def doubleIfOdd(x):
    if x%2 ==1:
        return 2*x
    else:
        return x
resultRDD = (numbersRDD
            .map(doubleIfOdd)
            .filter(lambda x: x >6)
            .distinct())
resultRDD.collect()

[10, 14, 8, 18]

#### 3.Paired RDDs-tuple(k-v)
- reduceByKey(): excute reduce for all items with same key
- groupByKey(): return RDD with tuple(key, listOfValues)
- sortByKey(): 
- countByKey(): 
- collectAsMap(): same as collect, BUT return dictionary of k-v

In [3]:
rdd=sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD=(rdd
    .flatMap(lambda sentence:sentence.split(" "))  
    .map(lambda word:word.lower())                 
    .map(lambda word:(word, 1))   #return(word, its count)                  
    .reduceByKey(lambda x, y: x + y))  #for the items under same key, add them
print(resultRDD.collect())
print(resultRDD.collectAsMap())  #collectAsMap类似collect,以k-v字典的形式返回

resultRDD.sortByKey(ascending=True).take(2) 


#find top 2 word with most frequency
print(resultRDD
      .sortBy(lambda x: x[1], ascending=False)
      .take(2))

[('new', 1), ('york', 2), ('says', 1), ('hello', 4)]
{'new': 1, 'york': 2, 'says': 1, 'hello': 4}
[('hello', 4), ('york', 2)]


## Operations between RDDs
#### 1.RDD1*RDD2 => RDD3
- rdd1.union(rdd2): rdd1 ∪ rdd2 return all items(rows)
- rdd1.intersection(rdd2): rdd1 ∩ rdd2
- rdd1.substract(rdd2): rdd1 - rdd1 ∩ rdd2
- rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡尔乘积（正交和）

#### 2. SQL

In [2]:
#RDD1 Home of different people
homesRDD = sc.parallelize([
        ('Brussels', 'John'),
        ('Brussels', 'Jack'),
        ('Leuven', 'Jane'),
        ('Antwerp', 'Jill'),
    ])

#RDD2 Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
        ('Brussels', 10),
        ('Antwerp', 7),
        ('RestOfFlanders', 5),
    ])

In [4]:
homesRDD.join(lifeQualityRDD).collect()   #join

[('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10)),
 ('Antwerp', ('Jill', 7))]

In [5]:
homesRDD.leftOuterJoin(lifeQualityRDD).collect()   #leftOuterJoin

[('Brussels', ('John', 10)),
 ('Brussels', ('Jack', 10)),
 ('Antwerp', ('Jill', 7)),
 ('Leuven', ('Jane', None))]

In [7]:
print(homesRDD.cogroup(lifeQualityRDD).collect())   #cogroup
'''
that is hard to read
'''
print(homesRDD
.cogroup(lifeQualityRDD)
.map(lambda x:(x[0], (list(x[1][0]), list(x[1][1]))))
.collect())

[('Brussels', (<pyspark.resultiterable.ResultIterable object at 0x03055330>, <pyspark.resultiterable.ResultIterable object at 0x03055170>)), ('Antwerp', (<pyspark.resultiterable.ResultIterable object at 0x030554B0>, <pyspark.resultiterable.ResultIterable object at 0x03055370>)), ('RestOfFlanders', (<pyspark.resultiterable.ResultIterable object at 0x03055450>, <pyspark.resultiterable.ResultIterable object at 0x030554D0>)), ('Leuven', (<pyspark.resultiterable.ResultIterable object at 0x030555B0>, <pyspark.resultiterable.ResultIterable object at 0x03055570>))]
[('Brussels', (['John', 'Jack'], [10])), ('Antwerp', (['Jill'], [7])), ('RestOfFlanders', ([], [5])), ('Leuven', (['Jane'], []))]


## Actions
#### Spark is always lazy about excution. It will not immediately excute your order, but wait unitl the results are needed(see action), since there may be a series of transformation. In that case, interval storage and deliery are not needed.
#### Common Actions
- collect(): calculate all items and return all results to driver end，then collect() show results in Python list
- first(): return first item in Py.list
- take(n): return first items in Py.list
- count(): count # of items in RDD
- top(n): return first item in Py.list, ordered
- reduce(): aggregation for items :aggregate in each parition, then do it in whole

In [8]:
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y)  #reduce(): 对RDD中的items做聚合

55

#### cache()
save interval results temperoraily, save time and space of re-calculating, which is especially important for many ML algorithms, since they always process iterations during training. 

In [2]:
#be sure to install numpy first
import numpy as np
numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))
squaresRDD = numbersRDD.map(lambda x: x**2)

squaresRDD.cache()  # Preserve the actual items of this RDD in memory

avg = squaresRDD.reduce(lambda x, y: x + y) / squaresRDD.count()
print(avg)

38.5
