### 1. Getting Spark Context

In [265]:
import findspark
findspark.init('/usr/local/spark')

In [266]:
import pyspark
sc = pyspark.SparkContext(appName="Spark Work")

In [267]:
sc

### 2. RDD Creation

In [None]:
lines = sc.textFile("war_and_peace.txt")
type(lines)

In [None]:
lines

### 3. RDD Transformations and Actions

In [None]:
# How many null lines are there?
null_lines = lines.filter(lambda line : len(line) == 0)

In [None]:
type(null_lines)

In [None]:
null_lines.count()

In [None]:
# Create an RDD with out null lines
no_null_lines = lines.filter(lambda line : len(line) > 0)

In [57]:
no_null_lines.getNumPartitions()

1

In [58]:
no_null_lines.count()

50902

In [None]:
no_null_lines.take(10)

In [None]:
# Working with partitions

def count_in_partition(collection):
    yield sum(1 for i in collection)

partitions = lines.mapPartitions(count_in_partition).collect()

In [None]:
print(partitions)

In [None]:
lines.take(10)

In [None]:
# Analysis 1

TT = (("Red", 1), ("Green",2), ("Blue",3))
L = map(lambda i : len(i[0]), TT)

In [None]:
print(list(L))

In [None]:
sum(map(lambda i : len(i[0]), TT))

In [None]:
# Analysis 2


In [None]:
list(1 for i in range(10))

In [None]:
sum(1 for i in range(10))

In [None]:
# Analysis 3 - Generators in python
def getnumbers(n):
    return list(range(n))

L = getnumbers(5)
print(L)

In [None]:
print(L) # L is still in memory, accessible all the time

In [None]:
def getnumbers(n):
    for i in range(n):
        yield (i + 1)

In [None]:
L = getnumbers(5) # generator
for i in L:
    print(i)

In [None]:
for i in L:
    print(i)

In [None]:
L = getnumbers(5) # generator

In [None]:
next(L)

In [None]:
next(L)

In [None]:
for i in L:
    print(i)

In [50]:
# HOw many words are there?
words = no_null_lines.flatMap(lambda line : line.split())
words.count()

562613

In [51]:
words.take(10)

['The',
 'Project',
 'Gutenberg',
 'EBook',
 'of',
 'War',
 'and',
 'Peace,',
 'by',
 'Leo']

In [52]:
# How to convert all the words to lower case?
lowercase_words = words.map(lambda word : word.lower())
lowercase_words.take(10)

['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'war',
 'and',
 'peace,',
 'by',
 'leo']

In [54]:
# How to create a key, value pair
paired = lowercase_words.map(lambda word : (word, 1))
paired.take(50)

[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('war', 1),
 ('and', 1),
 ('peace,', 1),
 ('by', 1),
 ('leo', 1),
 ('tolstoy', 1),
 ('this', 1),
 ('ebook', 1),
 ('is', 1),
 ('for', 1),
 ('the', 1),
 ('use', 1),
 ('of', 1),
 ('anyone', 1),
 ('anywhere', 1),
 ('at', 1),
 ('no', 1),
 ('cost', 1),
 ('and', 1),
 ('with', 1),
 ('almost', 1),
 ('no', 1),
 ('restrictions', 1),
 ('whatsoever.', 1),
 ('you', 1),
 ('may', 1),
 ('copy', 1),
 ('it,', 1),
 ('give', 1),
 ('it', 1),
 ('away', 1),
 ('or', 1),
 ('re-use', 1),
 ('it', 1),
 ('under', 1),
 ('the', 1),
 ('terms', 1),
 ('of', 1),
 ('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('license', 1),
 ('included', 1),
 ('with', 1),
 ('this', 1)]

In [55]:
# Getting the word counts
wordcounts = paired.reduceByKey(lambda a, b : a + b)
wordcounts.take(20)

[('battle--', 1),
 ('century)', 1),
 ("lannes'", 1),
 ('"pierre,', 1),
 ('snap.', 1),
 ('circle', 34),
 ('spectators.', 1),
 ('alexander--though', 1),
 ('brigade--gave', 1),
 ('well--if', 1),
 ('dream)', 3),
 ('rustle,', 1),
 ('passing:', 2),
 ('now"', 1),
 ('evenly.', 1),
 ('reminiscence', 1),
 ('something?"', 1),
 ('review', 12),
 ('habitual,', 1),
 ('plainly:', 3)]

In [56]:
for word, count in wordcounts.take(20):
    print(word, ' ---> ', count)

battle--  --->  1
century)  --->  1
lannes'  --->  1
"pierre,  --->  1
snap.  --->  1
circle  --->  34
spectators.  --->  1
alexander--though  --->  1
brigade--gave  --->  1
well--if  --->  1
dream)  --->  3
rustle,  --->  1
passing:  --->  2
now"  --->  1
evenly.  --->  1
reminiscence  --->  1
something?"  --->  1
review  --->  12
habitual,  --->  1
plainly:  --->  3


### 4. Creating RDD using Existing Data

In [134]:
import random
r = tuple(random.randint(1, 100) for i in range(100))

In [135]:
len(r)

100

In [208]:
rdd = sc.parallelize(r, 10)

In [137]:
rdd

ParallelCollectionRDD[98] at parallelize at PythonRDD.scala:498

In [91]:
rdd.getNumPartitions()

10

In [92]:
rdd.mapPartitions(count_in_partition).collect()

[10, 10, 10, 10, 10, 10, 10, 10, 10, 10]

In [209]:
rdd.take(10)

[57, 49, 86, 9, 84, 80, 80, 38, 55, 15]

In [210]:
rdd.glom().collect()

[[57, 49, 86, 9, 84, 80, 80, 38, 55, 15],
 [19, 66, 77, 99, 31, 80, 66, 1, 70, 82],
 [14, 42, 49, 39, 6, 37, 79, 31, 19, 19],
 [67, 86, 95, 16, 47, 24, 36, 21, 31, 95],
 [18, 67, 37, 90, 75, 74, 40, 68, 1, 77],
 [88, 85, 2, 66, 42, 57, 13, 9, 100, 100],
 [49, 26, 63, 46, 81, 99, 35, 93, 85, 56],
 [61, 1, 77, 45, 98, 63, 42, 40, 52, 39],
 [10, 64, 64, 38, 85, 1, 64, 98, 5, 25],
 [42, 65, 27, 96, 98, 98, 94, 69, 30, 35]]

In [101]:
rdd.glom().collect()[1]

[61, 66, 81, 40, 40, 78, 11, 92, 42, 72]

In [99]:
rdd2 = rdd.repartition(20)

In [100]:
rdd2.glom().collect()

[[],
 [65, 43, 92, 57, 5, 83, 5, 70, 99, 78],
 [],
 [68, 33, 43, 29, 7, 12, 4, 16, 60, 39],
 [],
 [36, 74, 40, 63, 39, 76, 1, 20, 92, 70],
 [61, 66, 81, 40, 40, 78, 11, 92, 42, 72],
 [],
 [57, 32, 53, 94, 88, 87, 35, 87, 38, 36],
 [69, 73, 49, 14, 27, 91, 50, 30, 50, 49],
 [49, 35, 59, 56, 59, 26, 69, 89, 3, 99],
 [],
 [79, 79, 88, 51, 56, 27, 100, 99, 76, 69],
 [],
 [],
 [49, 84, 32, 58, 51, 89, 95, 8, 2, 62],
 [],
 [95, 22, 7, 6, 69, 34, 99, 58, 29, 76],
 [],
 []]

In [102]:
rdd3 = rdd.reduce(lambda x, y : x + y)

In [104]:
rdd3

5404

In [107]:
rdd.take(10)

[65, 43, 92, 57, 5, 83, 5, 70, 99, 78]

In [111]:
def sumvalues(collection):
    yield sum(n for n in collection)
    
rdd4 = rdd.mapPartitions(sumvalues)

In [113]:
rdd4.collect()

[597, 583, 502, 530, 311, 607, 724, 495, 511, 544]

In [211]:
rdd5 = rdd.map(lambda a : a * 2)

In [212]:
rdd.glom().collect()

[[57, 49, 86, 9, 84, 80, 80, 38, 55, 15],
 [19, 66, 77, 99, 31, 80, 66, 1, 70, 82],
 [14, 42, 49, 39, 6, 37, 79, 31, 19, 19],
 [67, 86, 95, 16, 47, 24, 36, 21, 31, 95],
 [18, 67, 37, 90, 75, 74, 40, 68, 1, 77],
 [88, 85, 2, 66, 42, 57, 13, 9, 100, 100],
 [49, 26, 63, 46, 81, 99, 35, 93, 85, 56],
 [61, 1, 77, 45, 98, 63, 42, 40, 52, 39],
 [10, 64, 64, 38, 85, 1, 64, 98, 5, 25],
 [42, 65, 27, 96, 98, 98, 94, 69, 30, 35]]

In [214]:
rdd5.glom().collect()

[[114, 98, 172, 18, 168, 160, 160, 76, 110, 30],
 [38, 132, 154, 198, 62, 160, 132, 2, 140, 164],
 [28, 84, 98, 78, 12, 74, 158, 62, 38, 38],
 [134, 172, 190, 32, 94, 48, 72, 42, 62, 190],
 [36, 134, 74, 180, 150, 148, 80, 136, 2, 154],
 [176, 170, 4, 132, 84, 114, 26, 18, 200, 200],
 [98, 52, 126, 92, 162, 198, 70, 186, 170, 112],
 [122, 2, 154, 90, 196, 126, 84, 80, 104, 78],
 [20, 128, 128, 76, 170, 2, 128, 196, 10, 50],
 [84, 130, 54, 192, 196, 196, 188, 138, 60, 70]]

In [216]:
rdd11 = rdd.map(lambda a : (a, a*2, a**2))
rdd11.glom().collect()

[[(57, 114, 3249),
  (49, 98, 2401),
  (86, 172, 7396),
  (9, 18, 81),
  (84, 168, 7056),
  (80, 160, 6400),
  (80, 160, 6400),
  (38, 76, 1444),
  (55, 110, 3025),
  (15, 30, 225)],
 [(19, 38, 361),
  (66, 132, 4356),
  (77, 154, 5929),
  (99, 198, 9801),
  (31, 62, 961),
  (80, 160, 6400),
  (66, 132, 4356),
  (1, 2, 1),
  (70, 140, 4900),
  (82, 164, 6724)],
 [(14, 28, 196),
  (42, 84, 1764),
  (49, 98, 2401),
  (39, 78, 1521),
  (6, 12, 36),
  (37, 74, 1369),
  (79, 158, 6241),
  (31, 62, 961),
  (19, 38, 361),
  (19, 38, 361)],
 [(67, 134, 4489),
  (86, 172, 7396),
  (95, 190, 9025),
  (16, 32, 256),
  (47, 94, 2209),
  (24, 48, 576),
  (36, 72, 1296),
  (21, 42, 441),
  (31, 62, 961),
  (95, 190, 9025)],
 [(18, 36, 324),
  (67, 134, 4489),
  (37, 74, 1369),
  (90, 180, 8100),
  (75, 150, 5625),
  (74, 148, 5476),
  (40, 80, 1600),
  (68, 136, 4624),
  (1, 2, 1),
  (77, 154, 5929)],
 [(88, 176, 7744),
  (85, 170, 7225),
  (2, 4, 4),
  (66, 132, 4356),
  (42, 84, 1764),
  (57, 114,

In [218]:
rdd12 = rdd.flatMap(lambda a : (a, a*2, a**2))
rdd12.glom().collect()

[[57,
  114,
  3249,
  49,
  98,
  2401,
  86,
  172,
  7396,
  9,
  18,
  81,
  84,
  168,
  7056,
  80,
  160,
  6400,
  80,
  160,
  6400,
  38,
  76,
  1444,
  55,
  110,
  3025,
  15,
  30,
  225],
 [19,
  38,
  361,
  66,
  132,
  4356,
  77,
  154,
  5929,
  99,
  198,
  9801,
  31,
  62,
  961,
  80,
  160,
  6400,
  66,
  132,
  4356,
  1,
  2,
  1,
  70,
  140,
  4900,
  82,
  164,
  6724],
 [14,
  28,
  196,
  42,
  84,
  1764,
  49,
  98,
  2401,
  39,
  78,
  1521,
  6,
  12,
  36,
  37,
  74,
  1369,
  79,
  158,
  6241,
  31,
  62,
  961,
  19,
  38,
  361,
  19,
  38,
  361],
 [67,
  134,
  4489,
  86,
  172,
  7396,
  95,
  190,
  9025,
  16,
  32,
  256,
  47,
  94,
  2209,
  24,
  48,
  576,
  36,
  72,
  1296,
  21,
  42,
  441,
  31,
  62,
  961,
  95,
  190,
  9025],
 [18,
  36,
  324,
  67,
  134,
  4489,
  37,
  74,
  1369,
  90,
  180,
  8100,
  75,
  150,
  5625,
  74,
  148,
  5476,
  40,
  80,
  1600,
  68,
  136,
  4624,
  1,
  2,
  1,
  77,
  154,
  5929],

#### The reason for flattening

In [219]:
LL = ["mary had a little lamb", "twinkle twinkle little stars", "ba ba black sheep"]

In [222]:
# Get the word count
LLW = [item.split() for item in LL]
LLW 

[['mary', 'had', 'a', 'little', 'lamb'],
 ['twinkle', 'twinkle', 'little', 'stars'],
 ['ba', 'ba', 'black', 'sheep']]

In [225]:
LLWF = [item for collection in LLW for item in collection]
LLWF

['mary',
 'had',
 'a',
 'little',
 'lamb',
 'twinkle',
 'twinkle',
 'little',
 'stars',
 'ba',
 'ba',
 'black',
 'sheep']

In [226]:
from collections import Counter
Counter(LLWF)

Counter({'a': 1,
         'ba': 2,
         'black': 1,
         'had': 1,
         'lamb': 1,
         'little': 2,
         'mary': 1,
         'sheep': 1,
         'stars': 1,
         'twinkle': 2})

### 5. Using the DAG for planning and creating a spark application

In [203]:
rdd =  sc.textFile("war_and_peace.txt")

In [206]:
# Stage - 1: Transformations
no_null_lines = rdd.filter(lambda line : len(line) > 0)
words = no_null_lines.map(lambda line : line.split())
words.glom().take(5)

[[['The',
   'Project',
   'Gutenberg',
   'EBook',
   'of',
   'War',
   'and',
   'Peace,',
   'by',
   'Leo',
   'Tolstoy'],
  ['This',
   'eBook',
   'is',
   'for',
   'the',
   'use',
   'of',
   'anyone',
   'anywhere',
   'at',
   'no',
   'cost',
   'and',
   'with',
   'almost'],
  ['no',
   'restrictions',
   'whatsoever.',
   'You',
   'may',
   'copy',
   'it,',
   'give',
   'it',
   'away',
   'or',
   're-use',
   'it'],
  ['under',
   'the',
   'terms',
   'of',
   'the',
   'Project',
   'Gutenberg',
   'License',
   'included',
   'with',
   'this'],
  ['eBook', 'or', 'online', 'at', 'www.gutenberg.org'],
  ['Title:', 'War', 'and', 'Peace'],
  ['Author:', 'Leo', 'Tolstoy'],
  ['Translators:', 'Louise', 'and', 'Aylmer', 'Maude'],
  ['Posting', 'Date:', 'January', '10,', '2009', '[EBook', '#2600]'],
  ['Last', 'Updated:', 'March', '15,', '2013'],
  ['Language:', 'English'],
  ['***',
   'START',
   'OF',
   'THIS',
   'PROJECT',
   'GUTENBERG',
   'EBOOK',
   'WAR',
  

In [207]:
# Stage - 1: Transformations
no_null_lines = rdd.filter(lambda line : len(line) > 0)
words = no_null_lines.flatMap(lambda line : line.split())
words.glom().take(5)

[['The',
  'Project',
  'Gutenberg',
  'EBook',
  'of',
  'War',
  'and',
  'Peace,',
  'by',
  'Leo',
  'Tolstoy',
  'This',
  'eBook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'at',
  'no',
  'cost',
  'and',
  'with',
  'almost',
  'no',
  'restrictions',
  'whatsoever.',
  'You',
  'may',
  'copy',
  'it,',
  'give',
  'it',
  'away',
  'or',
  're-use',
  'it',
  'under',
  'the',
  'terms',
  'of',
  'the',
  'Project',
  'Gutenberg',
  'License',
  'included',
  'with',
  'this',
  'eBook',
  'or',
  'online',
  'at',
  'www.gutenberg.org',
  'Title:',
  'War',
  'and',
  'Peace',
  'Author:',
  'Leo',
  'Tolstoy',
  'Translators:',
  'Louise',
  'and',
  'Aylmer',
  'Maude',
  'Posting',
  'Date:',
  'January',
  '10,',
  '2009',
  '[EBook',
  '#2600]',
  'Last',
  'Updated:',
  'March',
  '15,',
  '2013',
  'Language:',
  'English',
  '***',
  'START',
  'OF',
  'THIS',
  'PROJECT',
  'GUTENBERG',
  'EBOOK',
  'WAR',
  'AND',
  'PEACE',
  '***',
 

In [None]:
uppercase_words = words.map(lambda word : word.upper())
paired = uppercase_words.map(lambda word : (word, 1))

In [148]:
# Stage - 2: Actions
wordcounts = paired.reduceByKey(lambda a, b : a + b)
processed_data = list(wordcounts.collect())

In [149]:
processed_data[1:10]

[('HIM--PEOPLE', 1),
 ('ALTERNATIVE', 1),
 ('EMITTED', 4),
 ('ARMFELDTS,', 1),
 ('LIBRARY,', 2),
 ('VAIN', 6),
 ('TOCCHI!\'"', 1),
 ('ARTLESS', 2),
 ('TRAGIC', 4)]

#### Difference between map() and flaMap()

In [189]:
rdd7 = sc.parallelize([2, 3, 4])
rdd7.flatMap(lambda x : range(1, x)).collect()

[1, 1, 2, 1, 2, 3]

In [190]:
rdd7.map(lambda x : list(range(1, x))).collect()

[[1], [1, 2], [1, 2, 3]]

In [191]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd8=sc.parallelize(data)
for element in rdd8.collect():
    print(element)

print('-'*30)
#Flatmap    
rdd9=rdd8.map(lambda x: x.split(" "))
for element in rdd9.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
------------------------------
['Project', 'Gutenberg’s']
['Alice’s', 'Adventures', 'in', 'Wonderland']
['Project', 'Gutenberg’s']
['Adventures', 'in', 'Wonderland']
['Project', 'Gutenberg’s']


In [192]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd8=sc.parallelize(data)
for element in rdd8.collect():
    print(element)
    
print('-'*30)

#Flatmap    
rdd9=rdd8.flatMap(lambda x: x.split(" "))
for element in rdd9.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
------------------------------
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [193]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd8=sc.parallelize(data, 4)
for element in rdd8.collect():
    print(element)
    
print('-'*30)

#Flatmap    
rdd9=rdd8.map(lambda x: x.split(" "))
for element in rdd9.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
------------------------------
['Project', 'Gutenberg’s']
['Alice’s', 'Adventures', 'in', 'Wonderland']
['Project', 'Gutenberg’s']
['Adventures', 'in', 'Wonderland']
['Project', 'Gutenberg’s']


In [194]:
rdd8.glom().collect()

[['Project Gutenberg’s'],
 ['Alice’s Adventures in Wonderland'],
 ['Project Gutenberg’s'],
 ['Adventures in Wonderland', 'Project Gutenberg’s']]

In [196]:
rdd9.glom().collect()

[[['Project', 'Gutenberg’s']],
 [['Alice’s', 'Adventures', 'in', 'Wonderland']],
 [['Project', 'Gutenberg’s']],
 [['Adventures', 'in', 'Wonderland'], ['Project', 'Gutenberg’s']]]

In [197]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd8=sc.parallelize(data, 4)
for element in rdd8.collect():
    print(element)
    
print('-'*30)

#Flatmap    
rdd9=rdd8.flatMap(lambda x: x.split(" "))
for element in rdd9.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
------------------------------
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [198]:
rdd8.glom().collect()

[['Project Gutenberg’s'],
 ['Alice’s Adventures in Wonderland'],
 ['Project Gutenberg’s'],
 ['Adventures in Wonderland', 'Project Gutenberg’s']]

In [199]:
rdd9.glom().collect()

[['Project', 'Gutenberg’s'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s'],
 ['Adventures', 'in', 'Wonderland', 'Project', 'Gutenberg’s']]

In [200]:
rdd10 = rdd9.repartition(1)

In [201]:
rdd10.collect()

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s']

In [202]:
text = "mary had a little lamb"
text.split()

['mary', 'had', 'a', 'little', 'lamb']

### 7. RDD Persistance

In [227]:
lines

war_and_peace.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [228]:
lines.persist()

war_and_peace.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [229]:
#lines.cache()

In [230]:
lines.unpersist()

war_and_peace.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

### 8. RDD Accumulator

In [268]:
rdd13 = sc.textFile('war_and_peace.txt', 4)

In [269]:
# To count the number of blank lines in the the text

In [279]:
# Create an accumulator variable initialized with 0
blankLines = sc.accumulator(0)

In [280]:
# lambda line : line.split(' ')
def split2words(line):
    global blankLines
    if(len(line) == 0):
        blankLines += 1
    return line.split(' ')

In [281]:
words = rdd13.flatMap(split2words)

In [282]:
words.take(5)

['The', 'Project', 'Gutenberg', 'EBook', 'of']

In [283]:
words.getNumPartitions()

4

In [285]:
# Check the number of blank lines
blankLines.value

0

In [286]:
# Let's call an action, remember unless an action is taken the values will not be updated
words.count()

575680

In [287]:
blankLines.value

12975

### 9. Broadcasting values

In [301]:
states = {"NY": "New York", "CA" : "California", "FL": "Florida", "TX": "Texas"}

In [289]:
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

In [291]:
rdd = sc.parallelize(data, 4)

In [293]:
broadcastStates = sc.broadcast(states)
broadcastStates.value

{'CA': 'California', 'FL': 'Florida', 'NY': 'New York'}

In [303]:
def state_convert(code):
    return broadcastStates.value[code]

In [304]:
rdd2 = rdd.map(lambda x : (x[0], x[1], x[2], state_convert(x[3]))).collect()

In [305]:
rdd2

[('James', 'Smith', 'USA', 'California'),
 ('Michael', 'Rose', 'USA', 'New York'),
 ('Robert', 'Williams', 'USA', 'California'),
 ('Maria', 'Jones', 'USA', 'Florida')]

In [302]:
broadcastStates.value

{'CA': 'California', 'FL': 'Florida', 'NY': 'New York'}

### 10. Stopping the SparkContext

In [306]:
sc.stop()