## PySpark Demo - (In Jupyter Notebook)

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

rdd = sc.parallelize([1,2,3,4])
rdd.collect()

[1, 2, 3, 4]

## PySpark Demo - (In Zepplin)

In [None]:
%pyspark

rdd = sc.parallelize([1,2,3,4])
rdd.collect()

## Iterative Programming

In [4]:
# loading data
a = [1,2,3,4,5,6,7,8]

# get even number
b = []
for ele in a:
    if ele % 2 == 0: 
        b.append(ele)
        
# get sum of even number
c = sum(b)
c

20

## Functional Programming

In [9]:
import numpy as np
a = np.array([1,2,3,4,5,6,7,8])

def f(ele):
    return ele % 2 == 0

sum(a[f(a)])


'/home/jovyan'

## PySpark 語法

In [10]:
rdd = sc.parallelize([1,2,3,4,5], 4)
rdd

ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:175

In [11]:
rdd.collect()

[1, 2, 3, 4, 5]

## Get Data From File

### Zeppelin

In [None]:
%pyspark 
lines = sc.textFile('file:/tmp/trump.txt') 
lines.take(3)

### Jupyter Notebook

In [12]:
lines = sc.textFile('trump.txt')
lines.take(3)

['Chief Justice Roberts, President Carter, President Clinton, President Bush, fellow Americans and people of the world – thank you.',
 'We the citizens of America have now joined a great national effort to rebuild our county and restore its promise for all our people.',
 '']

## Python Lambda

In [14]:
def addNum(a, b):
    return a + b

addNum(2,3)

addNum2 = lambda a, b : a + b
addNum2(3,4)

7

In [17]:
exp = lambda e: e**2
exp(4)

16

## PySpark Transformation

In [18]:
rdd = sc.parallelize([1, 2, 3, 4]) 
a   = rdd.map(lambda x: x * 2)
a

PythonRDD[6] at RDD at PythonRDD.scala:48

In [19]:
a.collect()

[2, 4, 6, 8]

In [24]:
rdd = sc.parallelize([1, 2, 3, 4]) 
a   = rdd.map(lambda x: x % 2 == 0)
a.collect()

[False, True, False, True]

In [25]:
a   = rdd.filter(lambda x: x % 2 == 0)
a.collect()

[2, 4]

In [23]:
import numpy as np
a = np.array([1,2,3,4])
a[a % 2 == 0] 

array([2, 4])

In [26]:
rdd = sc.parallelize([1, 4, 2,2,3]) 
a   = rdd.distinct()
a.collect()

[4, 1, 2, 3]

In [29]:
rdd=sc.parallelize([1,2,3])
a = rdd.map(lambda x:[x,x+5])
a.collect()

[[1, 6], [2, 7], [3, 8]]

In [30]:
rdd=sc.parallelize([1,2,3])
a = rdd.flatMap(lambda x:[x,x+5])
a.collect()

[1, 6, 2, 7, 3, 8]

## Spark Action

In [31]:
rdd=sc.parallelize([1,2,3]) 
rdd.reduce(lambda a,b:a*b)

#  1   2   3
#    2
#       6

6

In [32]:
rdd.take(2)

[1, 2]

In [33]:
rdd.collect()

[1, 2, 3]

In [34]:
rdd=sc.parallelize([5,3,1,2]) 
rdd.takeOrdered(3,lambda s:-1*s)

[5, 3, 2]

## Key-Value Pair

In [36]:
rdd = sc.parallelize([(1,2), (3,4), (3,6)]) 
a = rdd.reduceByKey(lambda a, b: a + b) 
a.collect()
#RDD: [(1,2), (3,4), (3,6)] → [(1,2), (3,10)]


[(1, 2), (3, 10)]

In [40]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
a = rdd2.sortByKey()
a.collect()
#RDD: [(1,'a'), (2,'c'), (1,'b')] → [(1,'a'), (1,'b'), (2,'c')]

[(1, 'a'), (1, 'b'), (2, 'c')]

In [44]:
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) 
rdd2.groupByKey()
a.collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x7f3ae4e73390>),
 (2, <pyspark.resultiterable.ResultIterable at 0x7f3ae4e73518>)]

## Broadcast

In [45]:
broadcastVar = sc.broadcast([1, 2, 3])

In [47]:
broadcastVar.value

[1, 2, 3]

In [48]:
accum = sc.accumulator(0) 
rdd = sc.parallelize([1, 2, 3, 4])

def f(x):
    global accum 
    accum += x

rdd.foreach(f) 
accum.value

10

## 電影分析

In [50]:
lines = sc.textFile("u.data") 
lines.take(5)

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596']

In [53]:
movies= lines.map(lambda x : (int(x.split()[1]) , 1) ) 
movies.take(3)

[(242, 1), (302, 1), (377, 1)]

In [54]:
movieCounts = movies.reduceByKey(lambda x,y: x+ y)

In [55]:
movieCounts.take(3)

[(242, 117), (302, 297), (346, 126)]

In [56]:
res = movieCounts.sortBy(lambda a: -a[1])

In [57]:
res.take(5)

[(50, 583), (258, 509), (100, 508), (181, 507), (294, 485)]

In [75]:
def loadMovieNames():
    movieNames = {}
    with open('u.item', 'r', encoding='utf-8') as f:
        for line in f.readlines():
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            #print(fields[0], fields[1])
            #break
    return movieNames

In [76]:
nameDict = sc.broadcast(loadMovieNames())

In [77]:
res = movieCounts.sortBy(lambda a: -a[1])
res2 = res.map(lambda e: (nameDict.value.get(e[0]), e[1]))
res2.take(10)

[('Star Wars (1977)', 583),
 ('Contact (1997)', 509),
 ('Fargo (1996)', 508),
 ('Return of the Jedi (1983)', 507),
 ('Liar Liar (1997)', 485),
 ('English Patient, The (1996)', 481),
 ('Scream (1996)', 478),
 ('Toy Story (1995)', 452),
 ('Air Force One (1997)', 431),
 ('Independence Day (ID4) (1996)', 429)]

## Spark SQL and DataFrame

In [78]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [79]:
data_file = 'ratings.txt'
raw_data  = sc.textFile(data_file, 4)

In [80]:
raw_data.take(3)

['userid::itemid::rating', '0::0::4', '0::1::5']

In [81]:
header = raw_data.first()
header

'userid::itemid::rating'

In [82]:
skip_data = raw_data.filter(lambda line: line != header)

In [83]:
skip_data.take(3)

['0::0::4', '0::1::5', '0::7495::3']

In [84]:
csv_data = skip_data.map(lambda l: l.split("::"))
csv_data.take(3)

[['0', '0', '4'], ['0', '1', '5'], ['0', '7495', '3']]

In [85]:
from pyspark.sql import Row

In [87]:
row_data = csv_data.map(
        lambda p: 
            Row(
                userid=p[0], 
                itemid=p[1], 
                rating=int(p[2]) 
            )
)
row_data.take(3)

[Row(itemid='0', rating=4, userid='0'),
 Row(itemid='1', rating=5, userid='0'),
 Row(itemid='7495', rating=3, userid='0')]

In [89]:
a = [{'userid':0, 'itemid':0, 'rating':4},
 {'userid':0, 'itemid':1, 'rating':5}
]
# padas df
import pandas
pandas_df = pandas.DataFrame(a)
pandas_df

Unnamed: 0,itemid,rating,userid
0,0,4,0
1,1,5,0


In [90]:
# Spark DataFrame
df = sqlContext.createDataFrame(row_data) 

In [91]:
df.take(3)

[Row(itemid='0', rating=4, userid='0'),
 Row(itemid='1', rating=5, userid='0'),
 Row(itemid='7495', rating=3, userid='0')]

In [92]:
df.show(3)

+------+------+------+
|itemid|rating|userid|
+------+------+------+
|     0|     4|     0|
|     1|     5|     0|
|  7495|     3|     0|
+------+------+------+
only showing top 3 rows



In [93]:
df.select("userid", "rating").groupBy("userid").avg().show()

+------+------------------+
|userid|       avg(rating)|
+------+------------------+
|   296|3.7885714285714287|
|   467| 3.257575757575758|
|   675| 3.193661971830986|
|   691|3.6065573770491803|
|   829|3.9507042253521125|
|  1090| 3.398876404494382|
|  1159|               4.4|
|  1436| 4.101167315175097|
|  1512| 3.466666666666667|
|  1572|             3.895|
|  2069|3.8285714285714287|
|  2088|3.7857142857142856|
|  2136|               3.6|
|  2162| 2.857142857142857|
|  2294|               3.0|
|  2904| 4.222222222222222|
|  3210| 4.229166666666667|
|  3414| 4.416666666666667|
|  3606|               4.0|
|  3959| 4.538461538461538|
+------+------------------+
only showing top 20 rows



In [97]:
df.registerTempTable("ratings")

In [94]:
df.printSchema()

root
 |-- itemid: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- userid: string (nullable = true)



In [100]:
ratings_data = sqlContext.sql("""
    SELECT userid,AVG(rating) FROM ratings GROUP BY userid
""") 
ratings_data.show()

+------+------------------+
|userid|       avg(rating)|
+------+------------------+
|   296|3.7885714285714287|
|   467| 3.257575757575758|
|   675| 3.193661971830986|
|   691|3.6065573770491803|
|   829|3.9507042253521125|
|  1090| 3.398876404494382|
|  1159|               4.4|
|  1436| 4.101167315175097|
|  1512| 3.466666666666667|
|  1572|             3.895|
|  2069|3.8285714285714287|
|  2088|3.7857142857142856|
|  2136|               3.6|
|  2162| 2.857142857142857|
|  2294|               3.0|
|  2904| 4.222222222222222|
|  3210| 4.229166666666667|
|  3414| 4.416666666666667|
|  3606|               4.0|
|  3959| 4.538461538461538|
+------+------------------+
only showing top 20 rows



In [101]:
ratings = sc.textFile('file://tmp/u.data', 4) 
ratings_data = ratings.map(lambda l:l.split()) 
ratings_row_data = ratings_data.map(lambda p:
    Row( userid=p[0], movieid=p[1], rating=int(p[2]) ) )

ratings_row_data.take(4)

[Row(movieid='242', rating=3, userid='196'),
 Row(movieid='302', rating=3, userid='186'),
 Row(movieid='377', rating=1, userid='22'),
 Row(movieid='51', rating=2, userid='244')]

In [102]:
df = sqlContext.createDataFrame(ratings_row_data) 
df.registerTempTable("ratings")

In [104]:
ratings_data = sqlContext.sql("""
    SELECT movieid, count(1) FROM ratings GROUP BY movieid ORDER BY COUNT(1) DESC LIMIT 10
""") 
ratings_data.show()

+-------+--------+
|movieid|count(1)|
+-------+--------+
|     50|     583|
|    258|     509|
|    100|     508|
|    181|     507|
|    294|     485|
|    286|     481|
|    288|     478|
|      1|     452|
|    300|     431|
|    121|     429|
+-------+--------+



In [105]:
movies = sc.textFile('file://tmp/u.item', 4)
movies_data = movies.map(lambda l:l.split('|')) 
#movies_data.take(3)
movies_row_data = movies_data.map(lambda p:
    Row(movieid=p[0], moviename=p[1] ) )
movies_row_data.take(4)

[Row(movieid='1', moviename='Toy Story (1995)'),
 Row(movieid='2', moviename='GoldenEye (1995)'),
 Row(movieid='3', moviename='Four Rooms (1995)'),
 Row(movieid='4', moviename='Get Shorty (1995)')]

In [106]:
movies_df = sqlContext.createDataFrame(movies_row_data)
movies_df.registerTempTable("movies")

In [113]:
best_movies = sqlContext.sql("""
SELECT moviename,avg(rating) as avg_rating, count(1) as cnt 
FROM movies INNER JOIN ratings ON ratings.movieid = movies.movieid 
GROUP BY moviename 
ORDER by AVG(rating) DESC LIMIT 10
""") 
best_movies.show(5)

+--------------------+----------+---+
|           moviename|avg_rating|cnt|
+--------------------+----------+---+
|Someone Else's Am...|       5.0|  1|
|Saint of Fort Was...|       5.0|  2|
|Aiqing wansui (1994)|       5.0|  1|
|Marlene Dietrich:...|       5.0|  1|
|     Star Kid (1997)|       5.0|  3|
+--------------------+----------+---+
only showing top 5 rows



In [114]:
best_movies = sqlContext.sql("""
SELECT moviename,avg(rating) as avg_rating, count(1) as cnt 
FROM movies INNER JOIN ratings ON ratings.movieid = movies.movieid 
GROUP BY moviename 
ORDER by cnt DESC LIMIT 10
""") 
best_movies.show(5)

+--------------------+------------------+---+
|           moviename|        avg_rating|cnt|
+--------------------+------------------+---+
|    Star Wars (1977)|4.3584905660377355|583|
|      Contact (1997)|3.8035363457760316|509|
|        Fargo (1996)| 4.155511811023622|508|
|Return of the Jed...| 4.007889546351085|507|
|    Liar Liar (1997)| 3.156701030927835|485|
+--------------------+------------------+---+
only showing top 5 rows



## ALS

In [115]:
rawData = sc.textFile("u.data") 
rawData.first() #讀取第一列資料
#讀取user id, movie id, ratings
rawRatings = rawData.map(lambda e: e.split()) 
rawRatings.take(3)

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116']]

In [118]:
from pyspark.sql import Row
#將資料轉進ratingsRDD 物件 
ratingsRDD = rawRatings.map(\
    lambda p: Row(userId=int(p[0]), \
                  movieId=int(p[1]), \
                  rating=float(p[2]), \
                  timestamp=int(p[3])))

# 建立 DataFrame
ratings = sqlContext.createDataFrame(ratingsRDD)

# 將資料區分為訓練與測試資料集
(training, test) = ratings.randomSplit([0.8, 0.2])
                                                     

In [119]:
from pyspark.ml.recommendation import ALS
als = ALS(rank=50, maxIter=10, regParam=0.01, \
          userCol="userId", itemCol="movieId", \
          ratingCol="rating")
model = als.fit(training)

In [120]:
model.userFactors.count()

943

In [121]:
model.itemFactors.count()

1648

In [122]:
userRecs = model.recommendForAllUsers(10)

In [123]:
userRecs.take(5)

[Row(userId=471, recommendations=[Row(movieId=945, rating=5.86926794052124), Row(movieId=1197, rating=5.369887351989746), Row(movieId=210, rating=5.217687606811523), Row(movieId=253, rating=5.048292636871338), Row(movieId=478, rating=5.041204452514648), Row(movieId=464, rating=5.041119575500488), Row(movieId=102, rating=5.018511772155762), Row(movieId=422, rating=5.004819869995117), Row(movieId=932, rating=4.989136219024658), Row(movieId=140, rating=4.965829849243164)]),
 Row(userId=463, recommendations=[Row(movieId=317, rating=5.898265361785889), Row(movieId=1142, rating=5.89493989944458), Row(movieId=48, rating=5.513393402099609), Row(movieId=45, rating=5.482431411743164), Row(movieId=750, rating=5.397836685180664), Row(movieId=87, rating=5.2709784507751465), Row(movieId=213, rating=5.268646717071533), Row(movieId=715, rating=5.204608917236328), Row(movieId=408, rating=5.200106620788574), Row(movieId=124, rating=5.070901870727539)]),
 Row(userId=833, recommendations=[Row(movieId=896,

In [124]:
movieRecs = model.recommendForAllItems(10)

In [125]:
movieRecs.take(3)

[Row(movieId=1580, recommendations=[Row(userId=405, rating=1.0204888582229614), Row(userId=190, rating=0.4247187376022339), Row(userId=703, rating=0.34964510798454285), Row(userId=432, rating=0.3292597532272339), Row(userId=797, rating=0.324725866317749), Row(userId=264, rating=0.3212633728981018), Row(userId=486, rating=0.3129093050956726), Row(userId=509, rating=0.3122008740901947), Row(userId=104, rating=0.31171172857284546), Row(userId=158, rating=0.3071918785572052)]),
 Row(movieId=471, recommendations=[Row(userId=236, rating=5.755523681640625), Row(userId=579, rating=5.606522560119629), Row(userId=534, rating=5.512883186340332), Row(userId=848, rating=5.4829182624816895), Row(userId=684, rating=5.445981979370117), Row(userId=9, rating=5.373197078704834), Row(userId=416, rating=5.370927333831787), Row(userId=398, rating=5.352773666381836), Row(userId=863, rating=5.301808834075928), Row(userId=532, rating=5.287174701690674)]),
 Row(movieId=1591, recommendations=[Row(userId=519, rat