## Initialize spark context
Topics not covered with an example:
* Cahce: Save RDD for later use

In [17]:
#Import `pyspark` and create `SparkContext`
from pyspark import SparkContext

#If spark context does not exist yet
try:
    sc
except NameError:
    sc = None
    
if sc is None:
    sc = SparkContext("local", "test app")
    print("Created spark context")
else:
    print("Spark context already exists, use the old one")

Spark context already exists, use the old one


## Create RDDs for testing

In [5]:
#Initialize a list of random words
word_list = [
    "alpha",
    "bravo",
    "charlie",
    "delta",
    "echo",
    "foxtrot",
    "alpha and bravo",
    "charlie and alpha"
]

#List to RDD
word_rdd = sc.parallelize(word_list)

In [6]:
#Initialize a list of numbers
num_list = [1,2,3,4,5,6]

#List to RDD
num_rdd = sc.parallelize(num_list)

## Read a text file

In [7]:
#Name of file to read
file_name = "test-file.txt"

#Read the text file
file_data = sc.textFile(file_name).cache()

#Count number of lines having a letter
num_a = file_data.filter(lambda s: 'a' in s).count()
num_e = file_data.filter(lambda s: 'e' in s).count()

#Print results
msg = "Lines with a: {}, lines with e: {}".format(num_a, num_e)
print(msg)

Lines with a: 1, lines with e: 3


## Word count

In [8]:
#Count words in RDD
cnt = word_rdd.count()

#Print results
print("Number of words in RDD: {}".format(cnt))

Number of words in RDD: 8


## All items in RDD

In [9]:
word_rdd.collect()

['alpha',
 'bravo',
 'charlie',
 'delta',
 'echo',
 'foxtrot',
 'alpha and bravo',
 'charlie and alpha']

## For each

In [10]:
def f(x):
    #Prints to console, not on notebook
    print(x)
    
#For each
word_rdd.foreach(f)

## Filter

In [11]:
#Set up the filter: Words containing 'alpha'
words_filter = word_rdd.filter(lambda x: 'alpha' in x)

#Get the list of filtered items
filtered = words_filter.collect()

#Print results
print("Fitered RDD: {}".format(filtered))

Fitered RDD: ['alpha', 'alpha and bravo', 'charlie and alpha']


## Map
A new RDD is returned by applying a function to each element in the RDD.

In [12]:
#Set up the mapper: Words to uppercase
words_map = word_rdd.map(lambda x: x.upper())

#Map and return all items
mapping = words_map.collect()

#Print results
print("Key value pair: {}".format(mapping))

Key value pair: ['ALPHA', 'BRAVO', 'CHARLIE', 'DELTA', 'ECHO', 'FOXTROT', 'ALPHA AND BRAVO', 'CHARLIE AND ALPHA']


## Reduce
**1. step**
* Take the first two items as input
* Perform the reduce function to get the output

**All other steps** 
* Take the current item and the  previous output as input
* Perform the reduce function to get the output

In [13]:
#A custom reducer function that takes two numbers and sums thems up
def add_reducer(a, b):
    return a+b

#Set up the reducer: Add two items to each other
adding = num_rdd.reduce(add_reducer)
#Print results
print("Sum of all elements: {}".format(adding))

Sum of all elements: 21


## Join

In [14]:
#First RDD
x = sc.parallelize([("spark", 1), ("hadoop", 4)])

#Another RDD to join
y = sc.parallelize([("spark", 2), ("hadoop", 5)])

#Join RDDs
joined = x.join(y)

#Get all items
joined_items = joined.collect()

#Print results
print("Joined RDD: {}".format(joined_items))

Join RDD: [('hadoop', (4, 5)), ('spark', (1, 2))]
