# Lecture 10: Code

## Functional Primer

In [None]:
from functools import reduce

In [None]:
data = [1, 3, 5, 7, 9]

In [None]:
f = lambda x: x*x
p = lambda x: x > 5
g = lambda x,y: x+y

In [None]:
f(4)

In [None]:
p(3)

In [None]:
p(7)

In [None]:
g(4,g(2,6)) == g(g(4,2),6)

In [None]:
list(map(f, data))

In [None]:
list(filter(p, data))

In [None]:
reduce(g, data)

## PATHS

Set the paths below to be able to run it all.

In [None]:
ENWIKI_FULL="file:/usr/local/cs/EDAN95/datasets/wikipedia/enwiki"
ENWIKI_1P="file:/usr/local/cs/EDAN95/datasets/wikipedia/enwiki_1p"
SPARK_HOME="file:/usr/local/spark"

## Spark

In [None]:
import os
import sys

In [None]:
os.environ["SPARK_HOME"] = SPARK_HOME

In [None]:
sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python"))
sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python", "lib", "py4j-0.10.7-src.zip"))

In [None]:
import pyspark

Sets the **JVM** Max Heap Size to 8GB, data which is cached or intermediary is stored in a JVM.

In [None]:
config = (pyspark.SparkConf()
                .setAll([('spark.executor.memory', '8g'), 
                         ('spark.driver.memory','8g')]))

In [None]:
sc = pyspark.SparkContext(conf=config)

# Section A

First program

In [None]:
lines = sc.textFile(ENWIKI_1P, 4)

In [None]:
lines.count()

In [None]:
pythonLines = lines.filter(lambda line : "Python" in line)

In [None]:
pythonLines.first()

collect()

In [None]:
rdd = sc.parallelize([1, 2, 3])
rdd.collect()

map()

In [None]:
rdd = sc.parallelize([1, 2, 3, 4])

In [None]:
rdd.map(lambda x: x * 2)

In [None]:
rdd.map(lambda x: x * 2).collect()

filter()

In [None]:
rdd = sc.parallelize([1, 2, 3, 4])

In [None]:
rdd.filter(lambda x: x % 2 == 0).collect()

flatMap()

In [None]:
rdd = sc.parallelize([1, 2, 3])

In [None]:
rdd.map(lambda x: [x, x * 2]).collect()

In [None]:
rdd.flatMap(lambda x: [x, x * 2]).collect()

count()

In [None]:
rdd = sc.parallelize([1, 2, 3, 4])

In [None]:
rdd.count()

# Section B

reduce()

In [None]:
rdd = sc.parallelize([1,2,3])

In [None]:
rdd.reduce(lambda x, y: y+x)

In [None]:
from operator import add
rdd.reduce(add)

take(), first(), top(), takeSample()

In [None]:
lines.take(5)

In [None]:
lines.takeSample(False, 5)

In [None]:
lines.takeOrdered(10)

In [None]:
lines.top(10)

Sample()

In [None]:
lines.sample(withReplacement=False, fraction=0.01).count()

In [None]:
lines.sample(withReplacement=False, fraction=0.1).count()

In [None]:
lines.sample(withReplacement=False, fraction=0.5).count()

In [None]:
lines.count()

Building a pipeline of RDD operations

In [None]:
(lines
 .map(str.upper)
 .filter(lambda line: "LTH" in line)
 .count())

# Section C

Working with key/value pairs

In [None]:
pairs = sc.parallelize([("a", 2), ("b", 6)]) 

In [None]:
pairs.collect()

In [None]:
pairs.collectAsMap()

In [None]:
pairs = sc.parallelize({"a":2, "b":6}.items())

In [None]:
pairs.collect()

In [None]:
pairs.collectAsMap()

reduceByKey()

In [None]:
rdd = sc.parallelize([("a", 2), ("b", 4), ("b", 6)])

In [None]:
rdd.reduceByKey(lambda x, y: x + y).collect()

In [None]:
import regex as re

In [None]:
m = re.compile("\p{Ll}+")

countWords matching words by regex

In [None]:
(lines
.flatMap(lambda par: par.split())
.filter(lambda t: m.fullmatch(t) is not None)
.map(lambda w: (w,1))
.reduceByKey(add)
.top(20,key=lambda tup: tup[1]))

In [None]:
rdd = sc.parallelize([0,1,2,3,4],2)

# Section D

In [None]:
lines.getNumPartitions()

In [None]:
lines.repartition(64).getNumPartitions()

In [None]:
lines.coalesce(48,shuffle=False).getNumPartitions()

mapPartitions()

In [None]:
rdd = sc.parallelize([0,1,2,3,4],2)

In [None]:
rdd.mapPartitions(lambda partition: [sum(partition)]).collect()

zipWithIndex()

In [None]:
rdd = sc.parallelize([4,3,2,1,0],2)

In [None]:
rdd.zipWithIndex().collect()

fold()

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])

In [None]:
from operator import add

In [None]:
rdd.fold(0, add)

# Sampling

In [70]:
lines_full = sc.textFile(ENWIKI)

In [82]:
def show_number(n):
    print("{:,}".format(n))

In [83]:
show_number(lines_full.count())

35,650,731


Count the total number of words in this corpus

In [84]:
show_number(lines_full.mapPartitions(lambda data: [sum([len(ln.split()) for ln in data])]).sum())

2,436,444,385


In [78]:
N=250

In [72]:
data_1p = (lines_full
.sample(False, 0.01)
.flatMap(lambda ln: ln.split())
.map(lambda w: (w,1))
.reduceByKey(add)
.top(N, key=lambda tup: tup[1]))

In [73]:
data_2p = (lines_full
.sample(False, 0.02)
.flatMap(lambda ln: ln.split())
.map(lambda w: (w,1))
.reduceByKey(add)
.top(N, key=lambda tup: tup[1]))

In [74]:
data_5p = (lines_full
.sample(False, 0.05)
.flatMap(lambda ln: ln.split())
.map(lambda w: (w,1))
.reduceByKey(add)
.top(N, key=lambda tup: tup[1]))

In [75]:
data_10p = (lines_full
.sample(False, 0.10)
.flatMap(lambda ln: ln.split())
.map(lambda w: (w,1))
.reduceByKey(add)
.top(N, key=lambda tup: tup[1]))

In [76]:
data_100p = (lines_full
.flatMap(lambda ln: ln.split())
.map(lambda w: (w,1))
.reduceByKey(add)
.top(N, key=lambda tup: tup[1]))

In [77]:
output = ["<table>", "<thead>", "<tr>"]
output.extend(["<th>Rank</th>", "<th>1%</th>", "<th>2%</th>", "<th>5%</th>", "<th>10%</th>", "<th>100%</th>"])
output.extend(["</tr>", "</thead>", "<tbody>"])
for indx, tup in enumerate(zip(data_1p, data_2p, data_5p, data_10p, data_100p)):
    output.append("<tr>")
    output.append("<td>%d.</td>" % (indx+1))

    for w in tup:
        output.append("<td>%s = %d</td>" % w)
    
    output.append("</tr>")
output.append("</tbody>")
output.append("</table>")

from IPython.display import HTML
HTML("".join(output))

Rank,1%,2%,5%,10%,100%
1.0,the = 1295777,the = 2576230,the = 6450018,the = 12906596,the = 129146052
2.0,", = 1216636",", = 2422925",", = 6051002",", = 12116073",", = 121262253"
3.0,. = 981308,. = 1952881,. = 4893649,. = 9792863,. = 97981464
4.0,of = 710110,of = 1411854,of = 3533316,of = 7077155,of = 70762800
5.0,and = 612371,and = 1217615,and = 3043562,and = 6088457,and = 60937731
6.0,in = 534006,in = 1063723,in = 2657982,in = 5324402,in = 53276318
7.0,to = 434632,to = 864914,to = 2159499,to = 4323734,to = 43255228
8.0,a = 421894,a = 841339,a = 2104875,a = 4210952,a = 42109642
9.0,was = 261481,was = 521288,was = 1307646,was = 2609620,was = 26166800
10.0,The = 218004,The = 433546,The = 1084871,The = 2170256,The = 21719562
