# Basic Spark Commands

In [1]:
sc

## Simple RDD Operations
- *sc.parallelize(data)* 
create an RDD from data
- *rdd.count()* 
count number of elements in an rdd
- *rdd.filter(func)* 
create a new rdd from existing rdd and keep only those elements that func is true

In [2]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
n = rdd.count()
print('count = {0}'.format(n))
l = rdd.collect()
print(l)

count = 5
[1, 2, 3, 4, 5]


In [4]:
l = rdd.take(3)
print(l)

[1, 2, 3]


In [5]:
f_rdd = rdd.filter(lambda d: d > 2)
for d in f_rdd.collect():
    print(d)
print('filter count = {0}'.format(f_rdd.count()))

3
4
5
filter count = 3


## RDD Operations - map and reduce
- *rdd.map(func)*
create a new rdd by performing function func on each element in an rdd
- *rdd.reduce(func)*
aggregate all elements in an rdd using function func

In [6]:
data = ['line 1', '2', 'more lines', 'last line']

In [7]:
lines = sc.parallelize(data)

In [8]:
print(lines.collect())

['line 1', '2', 'more lines', 'last line']


In [9]:
lineLengths = lines.map(lambda line: len(line))
print(lineLengths.collect())

[6, 1, 10, 9]


In [10]:
totalLength = lineLengths.reduce(lambda a, b: a+b)
print(totalLength)

26


In [12]:
data = (1,2,3,4)
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x: x*2)
print(rdd2.collect())
sum_val = rdd2.reduce(lambda a, b: a+b)
print('sum = {0}'.format(sum_val))
mul_val = rdd2.reduce(lambda a, b: a*b)
print('mul = {0}'.format(mul_val))

[2, 4, 6, 8]
sum = 20
mul = 384


## RDD Operations - aggregate

In [17]:
rdd.aggregate((0, 0),
              lambda acc, value: (acc[0]+value, acc[1]+1), 
              lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))

(10, 4)

In [18]:
lines.aggregate(("", 0),
                lambda a, s: (a[0]+s, a[1]+len(s)),
                lambda a, b: (a[0]+b[0], a[1]+b[1]))

('line 12more lineslast line', 26)

# Working with Text

In [21]:
sw = sc.textFile('star-wars.txt')
print('Total = {0} lines'.format(sw.count()))
for line in sw.take(10):
    print('{0}: [{1}]'.format(len(line), line))

Total = 6783 lines
0: []
1: [ ]
59: [                               STAR WARS: THE FORCE AWAKENS]
0: []
25: [                         ]
0: []
25: [                         ]
0: []
49: [                                       Written by]
0: []


In [23]:
nb_lines = sw.filter(lambda line: len(line) > 0)
print('Non blank line = {0} lines'.format(nb_lines.count()))
all_lowers = nb_lines.map(lambda line: line.lower())
for line in all_lowers.take(10):
    print('{0}: [{1}]'.format(len(line), line))

Non blank line = 4834 lines
1: [ ]
59: [                               star wars: the force awakens]
25: [                         ]
25: [                         ]
49: [                                       written by]
25: [                         ]
66: [                      lawrence kasdan, j.j. abrams & michael arndt]
22: [                      ]
25: [                         ]
66: [                       based on characters created by george lucas]


In [24]:
words = all_lowers.flatMap(lambda line: line.split())
for w in words.take(5):
    print(w)

star
wars:
the
force
awakens


In [27]:
sw = sc.textFile('star-wars.txt')
words = sw.flatMap(lambda line: line.split())
mappers = words.map(lambda word: (word, 1))
counts = mappers.reduceByKey(lambda x, y: x+y)
for wc in counts.take(10):
    print(wc)

('STAR', 29)
('Abrams', 1)
('Michael', 1)
('Based', 1)
('characters', 1)
('created', 2)
('Lucas', 1)
('long', 11)
('ago', 1)
('in', 200)
