In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import numpy as np

In [2]:
conf = SparkConf().setAppName('SparkApp').setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
txt_file = sc.textFile("./data/sample.txt")

In [4]:
print(txt_file.collect())

['Master Chief Petty Officer John-117 or Master Chief is a fictional character and the protagonist in the Halo multimedia franchise', 'Master Chief is a playable character in the series of science fiction first-person shooter video games Halo', 'Master Chief is a towering supersoldier known as a Spartan raised and trained from childhood for combat']


#### Q1: Convert all words in a rdd to lowercase and split the lines of a document using space.

In [5]:
splitted_line_rdd = txt_file.map(lambda x: x.lower().split())

words_rdd = splitted_line_rdd.flatMap(lambda data: [x for x in data])

In [6]:
print(words_rdd.collect()) # printing all words
print("\nSize :: %d" % words_rdd.count())

['master', 'chief', 'petty', 'officer', 'john-117', 'or', 'master', 'chief', 'is', 'a', 'fictional', 'character', 'and', 'the', 'protagonist', 'in', 'the', 'halo', 'multimedia', 'franchise', 'master', 'chief', 'is', 'a', 'playable', 'character', 'in', 'the', 'series', 'of', 'science', 'fiction', 'first-person', 'shooter', 'video', 'games', 'halo', 'master', 'chief', 'is', 'a', 'towering', 'supersoldier', 'known', 'as', 'a', 'spartan', 'raised', 'and', 'trained', 'from', 'childhood', 'for', 'combat']

Size :: 54


#### Q2: Next, I want to remove the words, which are not necessary to analyze this text. We call these words as “stop words”; Stop words do not add much value in a text. For example, “is”, “am”, “are” and “the” are few examples of stop words.

In [7]:
stop_words_list = sc.textFile("./data/stop_words.txt").collect()
no_stopwords_rdd = words_rdd.filter(lambda x: x not in stop_words_list)

In [8]:
print(no_stopwords_rdd.collect())
print("\nSize :: %d" % no_stopwords_rdd.count())

['master', 'chief', 'petty', 'officer', 'john-117', 'master', 'chief', 'fictional', 'character', 'protagonist', 'halo', 'multimedia', 'franchise', 'master', 'chief', 'playable', 'character', 'series', 'science', 'fiction', 'first-person', 'shooter', 'video', 'games', 'halo', 'master', 'chief', 'towering', 'supersoldier', 'known', 'spartan', 'raised', 'trained', 'childhood', 'combat']

Size :: 35


#### Q3: After getting the results into `no_stopwords_rdd`, we want to group the words in `no_stopwords_rdd` based on which letters they start with. For example, suppose I want to group each word of `no_stopwords_rdd` based on first 3 characters.

In [9]:
grouped_rdd = no_stopwords_rdd.groupBy(lambda x: x[:3])

print(" :: Groups and words ::")
{k: list(v) for (k,v) in grouped_rdd.collect()}

 :: Groups and words ::


{'mas': ['master', 'master', 'master', 'master'],
 'chi': ['chief', 'chief', 'chief', 'chief', 'childhood'],
 'pet': ['petty'],
 'off': ['officer'],
 'joh': ['john-117'],
 'fic': ['fictional', 'fiction'],
 'cha': ['character', 'character'],
 'pro': ['protagonist'],
 'hal': ['halo', 'halo'],
 'mul': ['multimedia'],
 'fra': ['franchise'],
 'pla': ['playable'],
 'ser': ['series'],
 'sci': ['science'],
 'fir': ['first-person'],
 'sho': ['shooter'],
 'vid': ['video'],
 'gam': ['games'],
 'tow': ['towering'],
 'sup': ['supersoldier'],
 'kno': ['known'],
 'spa': ['spartan'],
 'rai': ['raised'],
 'tra': ['trained'],
 'com': ['combat']}

#### Q4: What if we want to calculate how many times each word is coming in corpus ?

In [10]:
print(" :: Groups and number of words ::")
{k: len(list(v)) for (k,v) in grouped_rdd.collect()}

 :: Groups and number of words ::


{'mas': 4,
 'chi': 5,
 'pet': 1,
 'off': 1,
 'joh': 1,
 'fic': 2,
 'cha': 2,
 'pro': 1,
 'hal': 2,
 'mul': 1,
 'fra': 1,
 'pla': 1,
 'ser': 1,
 'sci': 1,
 'fir': 1,
 'sho': 1,
 'vid': 1,
 'gam': 1,
 'tow': 1,
 'sup': 1,
 'kno': 1,
 'spa': 1,
 'rai': 1,
 'tra': 1,
 'com': 1}

#### Q5: How do I perform a task (say count the words ‘Chief’ and ‘Master’ in `no_stopwords_rdd`) separatly on each partition and get the output of the task performed in these partition ?

In [11]:
word_list = ['master', 'character']

def count_words(part_iter):
    counts = np.zeros(len(word_list), dtype=int)
    for i in part_iter:
        for j,k in enumerate(word_list):
            if i == k:
                print(i, k, j)
                counts[j] += 1
    return counts

In [12]:
no_stopwords_rdd.mapPartitions(count_words).glom().collect()

[[4, 2]]

#### Q6: What if I want to work with samples instead of full data ?

In [13]:
sample_30_perc1 = no_stopwords_rdd.sample(False, 0.3, seed=42)
print(sample_30_perc1.collect())

['master', 'chief', 'officer', 'john-117', 'fictional', 'halo', 'chief', 'series', 'science', 'trained', 'childhood', 'combat']


#### Q7: What if I want to create a RDD which contains all the elements (a.k.a. union) of two RDDs ?

In [14]:
sample_30_perc1 = no_stopwords_rdd.sample(False, 0.1, seed=42)
sample_30_perc2 = no_stopwords_rdd.sample(False, 0.1, seed=43)
print('RDD1 ::', sample_30_perc1.collect())
print('RDD2 ::', sample_30_perc2.collect())
print('RDD1 U RDD1 ::',sample_30_perc1.union(sample_30_perc2).collect())

RDD1 :: ['master', 'officer', 'halo', 'series', 'science', 'trained']
RDD2 :: ['chief', 'series', 'first-person', 'games', 'known']
RDD1 U RDD1 :: ['master', 'officer', 'halo', 'series', 'science', 'trained', 'chief', 'series', 'first-person', 'games', 'known']


#### Q8: If we want to join the two pair RDDs based on their key.

In [15]:
g1 = grouped_rdd.sample(False, 0.1, seed=42)
g2 = grouped_rdd.sample(False, 0.1, seed=43)
g1.collect(), g2.collect()
print("________________________:: G1 ::________________________")
display({k: list(v) for (k,v) in g1.collect()})
print("________________________:: G2 ::________________________")
display({k: list(v) for (k,v) in g2.collect()})
print("______________________:: G1 U G2 ::_____________________")
display({k: list(v) for (k,v) in g1.union(g2).collect()})

________________________:: G1 ::________________________


{'mas': ['master', 'master', 'master', 'master'],
 'off': ['officer'],
 'fra': ['franchise'],
 'gam': ['games'],
 'tow': ['towering']}

________________________:: G2 ::________________________


{'cha': ['character', 'character'],
 'gam': ['games'],
 'kno': ['known'],
 'tra': ['trained']}

______________________:: G1 U G2 ::_____________________


{'mas': ['master', 'master', 'master', 'master'],
 'off': ['officer'],
 'fra': ['franchise'],
 'gam': ['games'],
 'tow': ['towering'],
 'cha': ['character', 'character'],
 'kno': ['known'],
 'tra': ['trained']}

#### Q9: How to calculate distinct elements in a RDD ?

In [16]:
no_stopwords_rdd.distinct().count()

27

#### Q10: What if I want to reduce the number of partition of a RDD and get the result in a new RDD?

In [17]:
num_rdd = sc.textFile('./data/nums.txt', 4)

print('Initial #Partitions :: ', num_rdd.getNumPartitions())
num_rdd2 = num_rdd.coalesce(1)
print('After coalesce #Partitions :: ', num_rdd2.getNumPartitions())

Initial #Partitions ::  4
After coalesce #Partitions ::  1


#### Q11: How do I find out number of parition in RDD ?

In [18]:
num_rdd.getNumPartitions()

4

#### Q13: Count the number of elements in RDD.

In [19]:
num_rdd.count()

30

#### Q14: Find the maximum, minimum, sum, variance and standard deviation of “num_rdd”.

In [20]:
num_rdd = num_rdd.map(lambda x: int(x))
print('Max : {0}\nMin : {1}\nSum : {2}\nVariance : {3}\nStandard Deviation : {4}'.\
      format(num_rdd.max(), num_rdd.min(), num_rdd.sum(), num_rdd.variance(),
             num_rdd.stdev()))

Max : 10000
Min : 1
Sum : 18787
Variance : 3975026.778888888
Standard Deviation : 1993.7469194681878
