## 安裝 PySpark

In [0]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
! wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
! tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
! pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [8]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

data = [1,2,3,4,5]
rdd = sc.parallelize(data)
rdd
#rdd.collect()

ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:195

In [9]:
rdd.collect()

[1, 2, 3, 4, 5]

## 範例一：計算個評價的數量

In [10]:
! wget https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/u.data

--2020-05-25 05:56:02--  https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/u.data
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1979173 (1.9M) [text/plain]
Saving to: ‘u.data’


2020-05-25 05:56:03 (9.31 MB/s) - ‘u.data’ saved [1979173/1979173]



In [11]:
! head u.data

196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596
298	474	4	884182806
115	265	2	881171488
253	465	5	891628467
305	451	3	886324817
6	86	3	883603013


In [0]:
lines = sc.textFile('u.data')

In [14]:
lines

u.data MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [15]:
lines.take(3)

['196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116']

In [0]:
ratings = lines.map(lambda x: x.split()[2])


In [17]:
ratings.take(3)

['3', '3', '1']

In [0]:
result = ratings.countByValue()

In [20]:
result

defaultdict(int, {'1': 6110, '2': 11370, '3': 27145, '4': 34174, '5': 21201})

In [0]:
import collections
from collections import Counter
c = Counter(result)

In [25]:
c

Counter({'1': 6110, '2': 11370, '3': 27145, '4': 34174, '5': 21201})

## 氣溫資料轉換

In [26]:
! wget https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/1800.csv

--2020-05-25 06:07:59--  https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/1800.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 62728 (61K) [text/plain]
Saving to: ‘1800.csv’


2020-05-25 06:08:00 (2.08 MB/s) - ‘1800.csv’ saved [62728/62728]



In [27]:
! head 1800.csv

ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E,
EZE00100082,18000102,TMIN,-130,,,E,


In [0]:
def parseLine(line):
  fields = line.split(',')
  stationID = fields[0]
  entryType = fields[2]
  temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
  return (stationID, entryType, temperature)


In [0]:
rdd = sc.textFile('1800.csv')
rdd2 = rdd.map(parseLine)

In [31]:
rdd2.take(3)

[('ITE00100554', 'TMAX', 18.5),
 ('ITE00100554', 'TMIN', 5.359999999999999),
 ('GM000010962', 'PRCP', 32.0)]

In [33]:
minTemps = rdd2 = rdd.map(parseLine).filter(lambda x: "TMIN" in x[1])
minTemps.take(3)

[('ITE00100554', 'TMIN', 5.359999999999999),
 ('EZE00100082', 'TMIN', 7.699999999999999),
 ('ITE00100554', 'TMIN', 9.5)]

In [0]:
stationTemps = minTemps.map(lambda e: (e[0], e[2]) )

In [35]:
stationTemps.take(3)

[('ITE00100554', 5.359999999999999),
 ('EZE00100082', 7.699999999999999),
 ('ITE00100554', 9.5)]

In [0]:
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))


In [0]:
results = minTemps.collect()


In [38]:
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))


ITE00100554	5.36F
EZE00100082	7.70F


## 加總顧客消費

In [39]:
! wget https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/customer-orders.csv

--2020-05-25 06:17:34--  https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/customer-orders.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 146855 (143K) [text/plain]
Saving to: ‘customer-orders.csv’


2020-05-25 06:17:35 (3.34 MB/s) - ‘customer-orders.csv’ saved [146855/146855]



In [40]:
! head customer-orders.csv

44,8602,37.19
35,5368,65.89
2,3391,40.64
47,6694,14.98
29,680,13.08
91,8900,24.59
70,3959,68.68
85,1733,28.53
53,9900,83.55
14,1505,4.32


In [0]:
rdd = sc.textFile("customer-orders.csv")


In [0]:
def extractCustomerPricePairs(line):
    fields = line.split(',')
    return (int(fields[0]), float(fields[2]))


In [0]:
mapped_rdd = rdd.map(extractCustomerPricePairs)


In [0]:
totalByCustomer = mapped_rdd.reduceByKey(lambda x, y: x + y)


In [0]:
ret = totalByCustomer.sortBy(lambda e: -e[1])

In [50]:
ret.take(5)

[(68, 6375.450000000001),
 (73, 6206.200000000001),
 (39, 6193.110000000001),
 (54, 6065.390000000001),
 (71, 5995.660000000002)]

## 找出最受歡迎電影

In [51]:
! head u.data

196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596
298	474	4	884182806
115	265	2	881171488
253	465	5	891628467
305	451	3	886324817
6	86	3	883603013


In [0]:
rdd = sc.textFile('u.data')

In [53]:
rdd.take(3)

['196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116']

In [0]:
rdd2 = rdd.map(lambda e: (e.split()[1], 1))

In [57]:
rdd2.take(5)

[('242', 1), ('302', 1), ('377', 1), ('51', 1), ('346', 1)]

In [0]:
rdd3 = rdd2.reduceByKey(lambda x, y: x + y)

In [59]:
rdd3.take(3)

[('346', 126), ('474', 194), ('265', 227)]

In [0]:
ret = rdd3.sortBy(lambda a: -a[1])

In [61]:
ret.take(3)

[('50', 583), ('258', 509), ('100', 508)]

In [62]:
! wget https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/u.item

--2020-05-25 06:27:28--  https://raw.githubusercontent.com/ywchiu/taipeifubon/master/data/u.item
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 236347 (231K) [text/plain]
Saving to: ‘u.item’


2020-05-25 06:27:33 (4.39 MB/s) - ‘u.item’ saved [236347/236347]



In [63]:
! head u.item

﻿1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
6|Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|01-Jan-1995||http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|0|0|0|0
7|Twelve Monkeys (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|1|0|0|0
8|Babe (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Babe%20(1995)|0|0|0|0|

In [0]:
moviedic = {}
with open('u.item', 'r', encoding = 'latin1') as f:
  for line in f.readlines():
    fields = line.strip().split('|')
    moviedic[str(fields[0]).strip()] = fields[1]

In [69]:
ret.take(3)

[('50', 583), ('258', 509), ('100', 508)]

In [70]:
ret.map(lambda e: (moviedic.get(e[0]), e[1]) ).take(3)

[('Star Wars (1977)', 583), ('Contact (1997)', 509), ('Fargo (1996)', 508)]

In [0]:
nameDict = sc.broadcast(moviedic)

In [0]:
#nameDict.value

In [75]:
ret.map(lambda e: (nameDict.value.get(e[0]), e[1]) ).take(3)

[('Star Wars (1977)', 583), ('Contact (1997)', 509), ('Fargo (1996)', 508)]

## 小任務
- 請使用 Spark 找出 u.data 中 評分最高的電影，並且從u.item對應出電影名稱跟評分?