## RDD Advanced

In [1]:
import findspark
findspark.init('/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/spark/')


In [2]:
import pyspark
sc = pyspark.SparkContext(appName="myAppName")

In [19]:
sc

<pyspark.context.SparkContext at 0x7fd3a07e77d0>

### RDD Basic

In [7]:
import collections
lines = sc.textFile("file:///root/Desktop/u.data")
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.iteritems():
    print "%s %i" % (key, value)

1 6110
2 11370
3 27145
4 34174
5 21201


In [8]:
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

lines = sc.textFile("file:///root/Desktop/fakefriends.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
    print result

(18, 343)
(19, 213)
(20, 165)
(21, 350)
(22, 206)
(23, 246)
(24, 233)
(25, 197)
(26, 242)
(27, 228)
(28, 209)
(29, 215)
(30, 235)
(31, 267)
(32, 207)
(33, 325)
(34, 245)
(35, 211)
(36, 246)
(37, 249)
(38, 193)
(39, 169)
(40, 250)
(41, 268)
(42, 303)
(43, 230)
(44, 282)
(45, 309)
(46, 223)
(47, 233)
(48, 281)
(49, 184)
(50, 254)
(51, 302)
(52, 340)
(53, 222)
(54, 278)
(55, 295)
(56, 306)
(57, 258)
(58, 116)
(59, 220)
(60, 202)
(61, 256)
(62, 220)
(63, 384)
(64, 281)
(65, 298)
(66, 276)
(67, 214)
(68, 269)
(69, 235)


### 過濾RDD

In [9]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("file:///root/Desktop/1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect()

for result in results:
    print result[0] + "\t{:.2f}F".format(result[1])

ITE00100554	5.36F
EZE00100082	7.70F


def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("file:///root/Desktop/1800.csv")
parsedLines = lines.map(parseLine)
maxTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
stationTemps = maxTemps.map(lambda x: (x[0], x[2]))
maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))
results = maxTemps.collect()

for result in results:
    print result[0] + "\t{:.2f}F".format(result[1])


### Map v.s. Flatmap

In [7]:
input = sc.textFile("file:///root/Desktop/cnn.txt")
words = input.map(lambda x: x.upper())
print words.collect()



In [9]:
input = sc.textFile("file:///root/Desktop/cnn.txt")
words = input.flatMap(lambda x: x.split())
print words.collect()



In [13]:
input = sc.textFile("file:///root/Desktop/cnn.txt")
words = input.flatMap(lambda x: x.split())
wordCounts = words.countByValue()

print wordCounts



### Data Sampling

In [20]:
data_file = "file:///root/Desktop/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)


In [21]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print sample_size
print total_size

49493
494021


### Reduce By Key

In [15]:
def extractCustomerPricePairs(line):
    fields = line.split(',')
    return (int(fields[0]), float(fields[2]))

input = sc.textFile("file:///root/Desktop/customer-orders.csv")
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)

results = totalByCustomer.collect();
for result in results:
    print result

(0, 5524.949999999998)
(1, 4958.600000000001)
(2, 5994.59)
(3, 4659.63)
(4, 4815.050000000002)
(5, 4561.069999999999)
(6, 5397.879999999998)
(7, 4755.070000000001)
(8, 5517.240000000001)
(9, 5322.649999999999)
(10, 4819.700000000001)
(11, 5152.290000000002)
(12, 4664.589999999998)
(13, 4367.62)
(14, 4735.030000000001)
(15, 5413.510000000001)
(16, 4979.06)
(17, 5032.679999999999)
(18, 4921.27)
(19, 5059.4299999999985)
(20, 4836.859999999999)
(21, 4707.41)
(22, 5019.449999999999)
(23, 4042.6499999999987)
(24, 5259.920000000003)
(25, 5057.610000000001)
(26, 5250.4)
(27, 4915.889999999999)
(28, 5000.709999999998)
(29, 5032.529999999999)
(30, 4990.72)
(31, 4765.05)
(32, 5496.050000000004)
(33, 5254.659999999998)
(34, 5330.8)
(35, 5155.419999999999)
(36, 4278.049999999997)
(37, 4735.200000000002)
(38, 4898.460000000002)
(39, 6193.109999999999)
(40, 5186.429999999999)
(41, 5637.62)
(42, 5696.840000000003)
(43, 5368.83)
(44, 4756.8899999999985)
(45, 3309.38)
(46, 5963.109999999999)
(47, 4316.2

### Broadcast

In [16]:
lines = sc.textFile("file:///root/Desktop/u.data")
movies = lines.map(lambda x: (int(x.split()[1]), 1))
movieCounts = movies.reduceByKey(lambda x, y: x + y)

flipped = movieCounts.map( lambda (x, y) : (y, x) )
sortedMovies = flipped.sortByKey()

results = sortedMovies.collect()

for result in results:
    print result

(1, 599)
(1, 677)
(1, 711)
(1, 814)
(1, 830)
(1, 852)
(1, 857)
(1, 1122)
(1, 1130)
(1, 1156)
(1, 1201)
(1, 1235)
(1, 1236)
(1, 1309)
(1, 1310)
(1, 1320)
(1, 1325)
(1, 1329)
(1, 1339)
(1, 1340)
(1, 1341)
(1, 1343)
(1, 1348)
(1, 1349)
(1, 1352)
(1, 1363)
(1, 1364)
(1, 1366)
(1, 1373)
(1, 1414)
(1, 1447)
(1, 1452)
(1, 1453)
(1, 1457)
(1, 1458)
(1, 1460)
(1, 1461)
(1, 1476)
(1, 1482)
(1, 1486)
(1, 1492)
(1, 1493)
(1, 1494)
(1, 1498)
(1, 1505)
(1, 1507)
(1, 1510)
(1, 1515)
(1, 1520)
(1, 1525)
(1, 1526)
(1, 1533)
(1, 1536)
(1, 1543)
(1, 1546)
(1, 1548)
(1, 1557)
(1, 1559)
(1, 1561)
(1, 1562)
(1, 1563)
(1, 1564)
(1, 1565)
(1, 1566)
(1, 1567)
(1, 1568)
(1, 1569)
(1, 1570)
(1, 1571)
(1, 1572)
(1, 1574)
(1, 1575)
(1, 1576)
(1, 1577)
(1, 1579)
(1, 1580)
(1, 1581)
(1, 1582)
(1, 1583)
(1, 1584)
(1, 1586)
(1, 1587)
(1, 1593)
(1, 1595)
(1, 1596)
(1, 1599)
(1, 1601)
(1, 1603)
(1, 1604)
(1, 1606)
(1, 1613)
(1, 1614)
(1, 1616)
(1, 1618)
(1, 1619)
(1, 1621)
(1, 1624)
(1, 1625)
(1, 1626)
(1, 1627)
(1, 163

In [19]:
def loadMovieNames():
    movieNames = {}
    with open("/root/Desktop/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

nameDict = sc.broadcast(loadMovieNames())

lines = sc.textFile("file:///root/Desktop/u.data")
movies = lines.map(lambda x: (int(x.split()[1]), 1))
movieCounts = movies.reduceByKey(lambda x, y: x + y)

flipped = movieCounts.map( lambda (x, y) : (y, x))
sortedMovies = flipped.sortByKey()

sortedMoviesWithNames = sortedMovies.map(lambda (count, movie) : (nameDict.value[movie], count))
results = sortedMoviesWithNames.collect()

for result in results:
    print result

('Police Story 4: Project S (Chao ji ji hua) (1993)', 1)
('Fire on the Mountain (1996)', 1)
('Substance of Fire, The (1996)', 1)
('Great Day in Harlem, A (1994)', 1)
('Power 98 (1995)', 1)
('Bloody Child, The (1996)', 1)
('Paris Was a Woman (1995)', 1)
('They Made Me a Criminal (1939)', 1)
("Jupiter's Wife (1994)", 1)
('Cyclo (1995)', 1)
('Marlene Dietrich: Shadow and Light (1996) ', 1)
('Big Bang Theory, The (1994)', 1)
('Other Voices, Other Rooms (1997)', 1)
('Very Natural Thing, A (1974)', 1)
('Walk in the Sun, A (1945)', 1)
('Homage (1995)', 1)
('August (1996)', 1)
('Low Life, The (1994)', 1)
('Stefano Quantestorie (1993)', 1)
('Crude Oasis, The (1995)', 1)
('Hedd Wyn (1992)', 1)
('Lotto Land (1995)', 1)
('Every Other Weekend (1990)', 1)
('Mille bolle blu (1993)', 1)
('Shadow of Angels (Schatten der Engel) (1976)', 1)
('Leopard Son, The (1996)', 1)
('Bird of Prey (1996)', 1)
('JLG/JLG - autoportrait de d\xe9cembre (1994)', 1)
('Good Morning (1971)', 1)
('Coldblooded (1995)', 1)
('C

### Spark SQL

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [5]:
data_file = "file:///root/Desktop/ratings.txt"
raw_data = sc.textFile(data_file)

In [6]:
header = raw_data.first()
skip_data = raw_data.filter(lambda line: line != header)

In [7]:
from pyspark.sql import Row

csv_data = skip_data.map(lambda l: l.split("::"))
row_data = csv_data.map(lambda p: Row(
    userid=p[0], 
    itemid=p[1],
    rating=int(p[2])
    )
)

In [8]:
df = sqlContext.createDataFrame(row_data)
df.registerTempTable("ratings")

In [16]:
df.take(5)

[Row(itemid=u'0', rating=4, userid=u'0'),
 Row(itemid=u'1', rating=5, userid=u'0'),
 Row(itemid=u'7495', rating=3, userid=u'0'),
 Row(itemid=u'7496', rating=5, userid=u'0'),
 Row(itemid=u'7497', rating=5, userid=u'0')]

In [17]:
df.show(5)

+------+------+------+
|itemid|rating|userid|
+------+------+------+
|     0|     4|     0|
|     1|     5|     0|
|  7495|     3|     0|
|  7496|     5|     0|
|  7497|     5|     0|
+------+------+------+
only showing top 5 rows



In [26]:
df.toPandas().head()

Unnamed: 0,itemid,rating,userid
0,0,4,0
1,1,5,0
2,7495,3,0
3,7496,5,0
4,7497,5,0


In [11]:
ratings_data = sqlContext.sql("""
    SELECT userid,avg(rating) from ratings group by userid
""")
ratings_data.show()

+------+------------------+
|userid|               _c1|
+------+------------------+
|   286| 3.741496598639456|
|   330|3.4833333333333334|
|   448|3.7277486910994764|
|   781|            3.6125|
|   899| 4.141414141414141|
|   943|3.4838709677419355|
|  1098|               3.4|
|  1142|4.4772727272727275|
|  1304|              3.92|
|  1593|3.2674418604651163|
|  1755| 3.466666666666667|
|  1917|3.8333333333333335|
|  2033|              4.25|
|  2484|             3.125|
|  2646|              2.75|
|  2808|              4.25|
|  3375|               4.1|
|  3537|               3.5|
|  3870| 4.230769230769231|
|  3988|               4.0|
+------+------------------+
only showing top 20 rows



In [12]:
ratings_out = ratings_data.map(lambda p: "userid: {}, mean rating: {}".format(p.userid, p._c1))
for ele in ratings_out.collect():
    print ele

userid: 286, mean rating: 3.74149659864
userid: 330, mean rating: 3.48333333333
userid: 448, mean rating: 3.7277486911
userid: 781, mean rating: 3.6125
userid: 899, mean rating: 4.14141414141
userid: 943, mean rating: 3.48387096774
userid: 1098, mean rating: 3.4
userid: 1142, mean rating: 4.47727272727
userid: 1304, mean rating: 3.92
userid: 1593, mean rating: 3.26744186047
userid: 1755, mean rating: 3.46666666667
userid: 1917, mean rating: 3.83333333333
userid: 2033, mean rating: 4.25
userid: 2484, mean rating: 3.125
userid: 2646, mean rating: 2.75
userid: 2808, mean rating: 4.25
userid: 3375, mean rating: 4.1
userid: 3537, mean rating: 3.5
userid: 3870, mean rating: 4.23076923077
userid: 3988, mean rating: 4.0
userid: 4266, mean rating: 4.6
userid: 4310, mean rating: 4.44444444444
userid: 4428, mean rating: 3.6
userid: 4761, mean rating: 4.29411764706
userid: 4879, mean rating: 3.93333333333
userid: 4923, mean rating: 4.2
userid: 5157, mean rating: 4.0
userid: 5201, mean rating: 4.09

In [13]:
df.printSchema()

root
 |-- itemid: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- userid: string (nullable = true)



In [15]:
df.select("userid", "rating").groupBy("userid").avg().show()

+------+------------------+
|userid|       avg(rating)|
+------+------------------+
|   286| 3.741496598639456|
|   330|3.4833333333333334|
|   448|3.7277486910994764|
|   781|            3.6125|
|   899| 4.141414141414141|
|   943|3.4838709677419355|
|  1098|               3.4|
|  1142|4.4772727272727275|
|  1304|              3.92|
|  1593|3.2674418604651163|
|  1755| 3.466666666666667|
|  1917|3.8333333333333335|
|  2033|              4.25|
|  2484|             3.125|
|  2646|              2.75|
|  2808|              4.25|
|  3375|               4.1|
|  3537|               3.5|
|  3870| 4.230769230769231|
|  3988|               4.0|
+------+------------------+
only showing top 20 rows



### SQL Join

In [22]:
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2), ("a", 3)]) 
sorted(x.join(y).collect()) 


[('a', (1, 2)), ('a', (1, 3))]

In [23]:
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2)]) 
sorted(x.leftOuterJoin(y).collect()) 

[('a', (1, 2)), ('b', (4, None))]

In [24]:
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2)]) 
sorted(y.rightOuterJoin(x).collect())

[('a', (2, 1)), ('b', (None, 4))]

In [25]:
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2), ("c", 8)]) 
sorted(x.fullOuterJoin(y).collect()) 

[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]