### Getting SparkContext

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext(appName="Word Count Application")

In [3]:
sc

### RDD Creation

In [4]:
lines = sc.textFile("war_and_peace.txt")
type(lines)

pyspark.rdd.RDD

In [5]:
lines

war_and_peace.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

### RDD Actions

In [6]:
lines.first()

'The Project Gutenberg EBook of War and Peace, by Leo Tolstoy'

In [7]:
lines.count()

63877

In [9]:
lines.take(5)

['The Project Gutenberg EBook of War and Peace, by Leo Tolstoy',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever.  You may copy it, give it away or re-use it',
 'under the terms of the Project Gutenberg License included with this']

In [10]:
lines.getNumPartitions()

1

### RDD Transformations

##### How many null lines are there?

In [11]:
null_lines = lines.filter(lambda line : len(line) == 0)
type(null_lines)

pyspark.rdd.PipelinedRDD

In [12]:
null_lines.count()

12975

##### Create a RDD with out null lines

In [13]:
noNullLines = lines.filter(lambda line : len(line) > 0)

In [14]:
noNullLines.count()

50902

In [15]:
noNullLines.take(10)

['The Project Gutenberg EBook of War and Peace, by Leo Tolstoy',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever.  You may copy it, give it away or re-use it',
 'under the terms of the Project Gutenberg License included with this',
 'eBook or online at www.gutenberg.org',
 'Title: War and Peace',
 'Author: Leo Tolstoy',
 'Translators: Louise and Aylmer Maude',
 'Posting Date: January 10, 2009 [EBook #2600]',
 'Last Updated: March 15, 2013']

##### Working with RDD partitions

In [16]:
def getvalues():
    return list(range(5))

In [17]:
r = getvalues()
r

[0, 1, 2, 3, 4]

In [18]:
for i in r:
    print(i, end=' ')

0 1 2 3 4 

In [19]:
for i in r:
    print(i, end=' ')

0 1 2 3 4 

In [21]:
def getvalues_y():
    yield 0
    yield 1
    yield 2
    yield 3
    yield 4

In [23]:
r = getvalues_y()
r

<generator object getvalues_y at 0x7f66362f9db0>

In [24]:
next(r)

0

In [25]:
next(r)

1

In [26]:
for i in r:
    print(i, end=' ')

2 3 4 

In [27]:
for i in r:
    print(i, end=' ')

In [28]:
r = getvalues_y()

In [29]:
for i in r:
    print(i, end=' ')

0 1 2 3 4 

In [30]:
for i in r:
    print(i, end=' ')

In [63]:
def count_in_partition(iterator):
    yield sum(1 for i in iterator)

In [64]:
partitions = lines.mapPartitions(count_in_partition).collect()

In [65]:
type(partitions), partitions

(list, [63877])

##### How many words are there?

In [35]:
words = noNullLines.flatMap(lambda line : line.split())

In [36]:
type(words)

pyspark.rdd.PipelinedRDD

In [37]:
words.count()

562613

In [38]:
words.take(5)

['The', 'Project', 'Gutenberg', 'EBook', 'of']

##### Convert all the words in lowercase

In [39]:
lowercase_words = words.map(lambda word : word.lower())

In [40]:
lowercase_words.count()

562613

In [41]:
lowercase_words.take(5)

['the', 'project', 'gutenberg', 'ebook', 'of']

##### Create a key, value pair for the sake of reduction

In [42]:
paired = lowercase_words.map(lambda w : (w, 1))

In [43]:
paired.count()

562613

In [44]:
paired.take(5)

[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1)]

##### Understanding the python's version of reduce

In [45]:
from functools import reduce

In [46]:
L = [1, 2, 3, 4]
reduce(lambda x, y: x + y, L)

10

##### A similar process can be used in pyspark

In [47]:
wordcounts = paired.reduceByKey(lambda a, b : a + b)

In [49]:
count_list = wordcounts.take(5)

In [50]:
type(count_list)

list

##### Print the list

In [52]:
for item in count_list:
    print(item)

('battle--', 1)
('century)', 1)
("lannes'", 1)
('"pierre,', 1)
('snap.', 1)


##### Creating an RDD from existing data

In [53]:
import random
rn = [random.randint(1, 100) for _ in range(100)]


In [54]:
rn

[35,
 39,
 5,
 11,
 86,
 95,
 14,
 12,
 74,
 63,
 61,
 31,
 63,
 84,
 100,
 95,
 90,
 94,
 9,
 74,
 23,
 99,
 39,
 4,
 67,
 35,
 88,
 96,
 68,
 10,
 73,
 70,
 14,
 98,
 22,
 39,
 31,
 62,
 79,
 88,
 85,
 85,
 1,
 85,
 63,
 99,
 58,
 9,
 64,
 74,
 9,
 97,
 66,
 72,
 58,
 22,
 73,
 53,
 20,
 90,
 2,
 3,
 86,
 45,
 97,
 13,
 60,
 46,
 41,
 92,
 6,
 17,
 5,
 22,
 84,
 62,
 47,
 12,
 46,
 1,
 74,
 31,
 23,
 21,
 94,
 32,
 65,
 69,
 82,
 66,
 59,
 92,
 28,
 36,
 73,
 61,
 47,
 63,
 84,
 12]

In [55]:
rn_rdd = sc.parallelize(rn)

In [56]:
type(rn_rdd)

pyspark.rdd.RDD

In [57]:
rn_rdd.getNumPartitions()

1

In [58]:
rn_rdd = sc.parallelize(rn, 4)

In [59]:
rn_rdd.getNumPartitions()

4

In [66]:
partitions_rn_rdd = rn_rdd.mapPartitions(count_in_partition).collect()

In [67]:
partitions_rn_rdd

[25, 25, 25, 25]

##### Stop the SparkContext

In [68]:
sc.stop()

### Use of DAG for creating spark application

In [93]:
# Getting the SparkContext
sc = pyspark.SparkContext(appName="Word Count Application using DAG")

In [94]:
# Loading the data and creating RDD
rdd = sc.textFile("war_and_peace.txt", 4)

In [95]:
# Stage 1 processing
no_null_lines = rdd.filter(lambda line : len(line) > 0)
words = no_null_lines.flatMap(lambda line : line.split())
uppercase_words = words.map(lambda word : word.upper())
paired = uppercase_words.map(lambda w : (w, 1))

In [106]:
# Stage 2 Processing
wordcounts = paired.reduceByKey(lambda a, b : a + b)
processed_data = wordcounts.collect()
processed_data = list(processed_data[:])


[('"\'COME', 1),
 ('"\'DIEU', 1),
 ('"\'DIO', 1),
 ('"\'FROM', 1),
 ('"\'GRANT', 1),
 ('"\'I', 4),
 ('"\'NO', 1),
 ('"\'NOW', 1),
 ('"\'RUSSIA', 1),
 ('"\'SERGEY', 1),
 ('"\'THE', 1),
 ('"\'TO', 1),
 ('"\'TOLD', 1),
 ('"\'WHAT', 1),
 ('"\'YOU', 1),
 ('"-SO', 1),
 ('"...OF', 1),
 ('"5"', 1),
 ('"800', 2),
 ('"A', 81),
 ('"A-TU!"', 3),
 ('"AAH!"', 1),
 ('"ABOUT', 7),
 ('"ABOUT..."', 1),
 ('"ABSTIENS-TOI"', 1),
 ('"ACCORDING', 1),
 ('"ADELE', 1),
 ('"ADIEU,', 2),
 ('"ADJUTANT!"', 1),
 ('"ADMIRABLE!"', 1),
 ('"ADORABLE!', 1),
 ('"ADORED', 1),
 ('"ADVISERS"', 1),
 ('"AFRAID', 3),
 ('"AFTER', 9),
 ('"AGAIN!"', 1),
 ('"AGAIN,', 1),
 ('"AGAINST', 1),
 ('"AH', 5),
 ('"AH!', 14),
 ('"AH!"', 6),
 ('"AH!...', 1),
 ('"AH!..."', 1),
 ('"AH,', 111),
 ('"AH...', 1),
 ('"AH?', 1),
 ('"AHAHAH!"', 1),
 ('"ALEXANDER,', 1),
 ('"ALINE,"', 1),
 ('"ALL', 56),
 ('"ALLEY!', 1),
 ('"ALLOW', 9),
 ('"ALMOST', 1),
 ('"ALONE', 1),
 ('"ALPATYCH!"', 1),
 ('"ALPATYCH,', 1),
 ('"ALTOGETHER', 1),
 ('"ALWAYS', 6),
 ('"AM'

In [87]:
# Release the SparkContext
sc.stop()

In [107]:
# Post processing
processed_data.sort(key=lambda pair:pair[1], reverse=True)
print(processed_data[:5])

[('THE', 34093), ('AND', 21335), ('TO', 16426), ('OF', 14791), ('A', 10357)]


In [111]:
top_pairs = processed_data[:20]
[k for k,v in top_pairs]

['THE',
 'AND',
 'TO',
 'OF',
 'A',
 'HE',
 'IN',
 'HIS',
 'THAT',
 'WAS',
 'WITH',
 'HAD',
 'AT',
 'NOT',
 'HER',
 'AS',
 'IT',
 'ON',
 'BUT',
 'FOR']

In [112]:
[v for k,v in top_pairs]

[34093,
 21335,
 16426,
 14791,
 10357,
 9294,
 8551,
 7932,
 7402,
 7201,
 5604,
 5334,
 4482,
 4460,
 3963,
 3904,
 3888,
 3656,
 3614,
 3365]

In [None]:

plot.bar([k for k,v in top_pairs], [v for k,v in top_pairs], color='b')

In [113]:
import pandas as pd

ImportError: No module named 'pandas'

ImportError: No module named 'nltk'