<a href="https://colab.research.google.com/github/villafue/Data-Science-at-Scale/blob/main/DataCamp/Big%20Data%20Fundamentals%20with%20PySpark/2%20Programming%20in%20PySpark%20RDD%E2%80%99s/2_Programming_in_PySpark_RDD%E2%80%99s.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Programming in PySpark RDD’s

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is the fundamental and backbone data type of this engine. This chapter introduces RDDs and shows how RDDs can be created and executed using RDD Transformations and Actions.

# Abstracting Data with RDDs

1. Introduction to PySpark RDD

In the first chapter, you have learned about different components of Spark namely, Spark Core, Spark SQL, and Spark MLlib. In this chapter, we will start with RDDs which are Spark’s core abstraction for working with data.
2. What is RDD?

Let's get started. RDD stands for Resilient Distributed Datasets. It is simply a collection of data distributed across the cluster. RDD is the fundamental and backbone data type in PySpark. When Spark starts processing data, it divides the data into partitions and distributes the data across cluster nodes, with each node containing a slice of data. Now, let's take a
3. Decomposing RDDs

look at the different features of RDD. The name RDD captures 3 important properties. Resilient, which means the ability to withstand failures and recompute missing or damaged partitions. Distributed, which means spanning the jobs across multiple nodes in the cluster for efficient computation. Datasets, which is a collection of partitioned data e.g. Arrays, Tables, Tuples or other objects. There are three different
4. Creating RDDs. How to do it?

methods for creating RDDs. You have already seen two methods in the previous chapter even though you are not aware that you are creating RDDs. The simplest method to create RDDs is to take an existing collection of objects (eg. a list, an array or a set) and pass it to SparkContext’s parallelize method. A more common way to create RDDs is to load data from external datasets such as files stored in HDFS or objects in Amazon S3 buckets or from lines in a text file stored locally and pass it to SparkContext's textFile method. Finally, RDDs can also be created from existing RDDs which we will see in the next video. In the first method,
5. Parallelized collection (parallelizing)

RDDs are created from a list or a set using the SparkContext’s parallelize method. Let's try and understand how RDDs are created using this method with a couple of examples. In the first example, an RDD named numRDD is created from a python list containing numbers 1, 2, 3, and 4. In the second example, an RDD named helloRDD is created from the 'hello world' string. You can confirm the object created is RDD using Python's type method. Creating
6. From external datasets

RDDs from external datasets is by far the most common method in PySpark. In this method, RDDs are created using SparkContext’s textFile method. In this simple example, an RDD named fileRDD is created from the lines of a README-dot-md file stored locally on your computer. Similar to previous method, you can confirm the RDD using the type method. Data
7. Understanding Partitioning in PySpark

partitioning is an important concept in Spark and understanding how Spark deals with partitions allow one to control parallelism. A partition in Spark is the division of the large dataset with each part being stored in multiple locations across the cluster. By default Spark partitions the data at the time of creating RDD based on several factors such as available resources, external datasets etc, however, this behavior can be controlled by passing a second argument called minPartitions which defines the minimum number of partitions to be created for an RDD. In the first example, we create an RDD named numRDD from the list of 10 integers using SparkContext's parallelize method with 6 partitions. In the second example, we create another RDD named fileRDD using SparkContext's textFile method with 6 partitions. The number of partitions in an RDD can always be found by using the getNumPartitions method. In the next
8. Let's practice

video, you'll see the final method of creating RDDs, for now let's create some RDDs like you just learnt. 

# RDDs from Parallelized collections

Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. It is an immutable distributed collection of objects. Since RDD is a fundamental and backbone data type in Spark, it is important that you understand how to create it. In this exercise, you'll create your first RDD in PySpark from a collection of words.

Remember you already have a SparkContext sc available in your workspace.

Instructions

1. Create an RDD named RDD from a list of words.

2. Confirm the object created is RDD.


In [None]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))

'''
<script.py> output:
    The type of RDD is <class 'pyspark.rdd.RDD'>
'''

Conclusion

Good job on creating your first RDD.

# RDDs from External Datasets

PySpark can easily create RDDs from files that are stored in external storage devices such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc. However, the most common method of creating RDD's is from files stored in your local file system. This method takes a file path and reads it as a collection of lines. In this exercise, you'll create an RDD from the file path (file_path) with the file name README.md which is already available in your workspace.

Remember you already have a SparkContext sc available in your workspace.

Instructions

1. Print the file_path in the PySpark shell.

2. Create an RDD named fileRDD from a file_path with the file name README.md.

3. Print the type of the fileRDD created.


In [None]:
# Print the file_path
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

'''
<script.py> output:
    The file_path is /usr/local/share/datasets/README.md
    The file type of fileRDD is <class 'pyspark.rdd.RDD'>
'''

Conclusion

Wonderful! Now you can create RDDs from Text files too!

Partitions in your data

SparkContext's textFile() method takes an optional second argument called minPartitions for specifying the minimum number of partitions. In this exercise, you'll create an RDD named fileRDD_part with 5 partitions and then compare that with fileRDD that you created in the previous exercise. Refer to the "Understanding Partition" slide in video 2.1 to know the methods for creating and getting the number of partitions in an RDD.

Remember, you already have a SparkContext sc, file_path and fileRDD available in your workspace.

Instructions

1. Find the number of partitions that support fileRDD RDD.

2. Create an RDD named fileRDD_part from the file path but create 5 partitions.

3. Confirm the number of partitions in the new fileRDD_part RDD.


In [None]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

'''
<script.py> output:
    Number of partitions in fileRDD is 2
    Number of partitions in fileRDD_part is 5
'''

Conclusion

Excellent! Note that modifying the number of partitions may result in faster performance due to parallelization.

# Basic RDD Transformations and Actions

1. RDD operations in PySpark

In the last video, you have learned how to load your data into RDDs. In this video, you'll learn about the various operations that support RDDs in PySpark. RDDs
2. Overview of PySpark operations

in PySpark supports two different types of operations - Transformations and Actions. Transformations are operations on RDDs that return a new RDD and Actions are operations that perform some computation on the RDD. The most important
3. RDD Transformations

feature which helps RDDs in fault tolerance and optimizing resource use is the lazy evaluation. So what is lazy evaluation? Spark creates a graph from all the operations you perform on an RDD and execution of the graph starts only when an action is performed on RDD as shown in this figure. This is called lazy evaluation in Spark. The RDD transformations we will look in this video are map, filter, flatMap and union. The map
4. map() Transformation

transformation takes in a function and applies it to each element in the RDD. Say you have an input RDD with elements 1,2,3,4. The map transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. In this example, the square function is applied to each element of the RDD. Let's understand this with an example. We first create an RDD using SparkContext's parallelize method on a list containing elements 1,2,3,4. Next, we apply map transformation for squaring each element of the RDD. The
5. filter() Transformation

filter transformation takes in a function and returns an RDD that only has elements that pass the condition. Suppose we have an input RDD with numbers 1,2,3,4 and we want to select numbers greater than 2, we can apply the filter transformation. Here is an example of the filter transformation wherein we use the same RDD as before to apply the filter transformation to filter out the numbers that are greater than 2. flatMap
6. flatMap() Transformation

is similar to map transformation except it returns multiple values for each element in the source RDD. A simple usage of flatMap is splitting up an input string into words. Here, you have an input RDD with two elements - "hello world" and "how are you". Applying the split function of the flatMap transformation results in 5 elements in the resulting RDD - "hello", "world", "how", "are", "you". As you can see, even though the input RDD has 2 elements, the output RDD now contains 5 elements. In this example, we create an RDD from a list containing the words "hello world" and "how are you". Next, we apply flatmap along with split function on the RDD to split the input string into individual words.
7. union() Transformation

union Transformation returns the union of one RDD with another RDD. In this figure, we are filtering the inputRDD and creating two RDDs - errorsRDD and warningsRDD and next we are combining both the RDDs using union transformation. To illustrate this using PySpark code, let's first create an inputRDD from a local file using SparkContext's textFile method, next we will use two filter transformations to create two RDDs errorRDD and warningsRDD and finally using union transformation we will combine them both. So far you have seen how RDD Transformations but after applying Transformations at some point, you'll want to actually do something with your dataset. This is when Actions come into picture.
8. RDD Actions

Actions are the operations that are applied on RDDs to return a value after running a computation. The four basic actions that you'll learn in this lesson are collect, take, first and count. Collect
9. collect() and take() Actions

action returns complete list of elements from the RDD. Whereas take(N) print an 'N' number of elements from the RDD. Continuing the map transformation example, executing collect returns all elements i.e 1, 4, 9, 16 from the RDD_map RDD that you created earlier. Similarly here is an example of take(2) action that prints the first 2 elements i.e 1 and 4 from the RDD_map RDD. Sometimes you just want to print the first element of
10. first() and count() Actions

the RDD. first action returns the first element in an RDD. It is similar to take(1). Here is an example of first action which prints the first element i.e 1 from the RDD_map RDD. Finally, the count action is used to return the total number of rows/elements in the RDD. Here is an example of count action to count the number of elements in the RDD_flatmap RDD. The result here indicates that there are 5 elements in the RDD_flatmap RDD. It's time for you to practice
11. Let's practice RDD operations

RDD operations in PySpark shell now. 

# Map and Collect

The main method by which you can manipulate data in PySpark is using map(). The map() transformation takes in a function and applies it to each element in the RDD. It can be used to do any number of things, from fetching the website associated with each URL in our collection to just squaring the numbers. In this simple exercise, you'll use map() transformation to cube each number of the numbRDD RDD that you created earlier. Next, you'll return all the elements to a variable and finally print the output.

Remember, you already have a SparkContext sc, and numbRDD available in your workspace.

Instructions

1. Create map() transformation that cubes all of the numbers in numbRDD.

2. Collect the results in a numbers_all variable.

3. Print the output from numbers_all variable.


In [None]:
# Create map() transformation to cube numbers
cubedRDD = numbRDD.map(lambda x: x ** 3)

# Collect the results
numbers_all = cubedRDD.collect()

# Print the numbers from numbers_all
for numb in numbers_all:
	print(numb)
 
 '''
 <script.py> output:
    1
    8
    27
    64
    125
    216
    343
    512
    729
    1000
 '''

Conclusion

Brilliant! collect() should only be used to retrieve results for small datasets. It shouldn’t be used on large datasets.

# Filter and Count

The RDD transformation filter() returns a new RDD containing only the elements that satisfy a particular function. It is useful for filtering large datasets based on a keyword. For this exercise, you'll filter out lines containing keyword Spark from fileRDD RDD which consists of lines of text from the README.md file. Next, you'll count the total number of lines containing the keyword Spark and finally print the first 4 lines of the filtered RDD.

Remember, you already have a SparkContext sc, file_path and fileRDD available in your workspace.

Instructions

1. Create filter() transformation to select the lines containing the keyword Spark.

2. How many lines in fileRDD_filter contains the keyword Spark?

3. Print the first four lines of the resulting RDD.


In [None]:
# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line.split())

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

# Print the first four lines of fileRDD
for line in fileRDD_filter.take(4): 
  print(line)

'''
<script.py> output:
    The total number of lines with the keyword Spark is 5
    Examples for Learning Spark
    Examples for the Learning Spark book. These examples require a number of libraries and as such have long build files. We have also added a stand alone example with minimal dependencies and a small build file
    These examples have been updated to run against Spark 1.3 so they may
    * Spark 1.3
'''

Conclusion

Well done! Note that the filter() operation does not mutate the existing fileRDD. Instead, it returns a pointer to an entirely new RDD.

# Pair RDDs in PySpark

1. Working with Pair RDDs in PySpark

In the last video, you were introduced to some basic RDD operations and in this video, you'll learn how to work with RDDs of key/value pairs, which are a common data type required for many operations in Spark
2. Introduction to pair RDDs in PySpark

Most of the real world datasets are generally key/value pairs. An example of this kind of dataset has the team name as key and the list of players as values. The typical pattern of this kind of dataset is each row is a key that maps to one or more values. In order to deal with this kind of dataset, PySpark provides a special data structure called pair RDDs. In pair RDDs, the key refers to the identifier, whereas value refers to the data.
3. Creating pair RDDs

There are a number of ways to create pair RDDs. The two most common ways are creating from a list of the key-value tuple or from a regular RDD. Irrespective of the method, the first step in creating pair RDDs is to get the data into key/value form. Here is an example of creating pair RDD from a list of the key-value tuple that contains the names as key and age as the value using SparkContext's parallelize method. And here is an example of creating pair RDD from regular RDDs. In this example, a regular RDD is created from a list that contains strings using SparkContext's parallelize method. Next, we create a pair RDD using map function which returns tuple with key/value pairs with key being the name and age being the value.
4. Transformations on pair RDDs

Pair RDDs are still RDDs and thus use all the transformations available to regular RDDs. Since pair RDDs contain tuples, we need to pass functions that operate on key-value pairs. A few special operations are available for this kind such as reduceByKey, groupByKey, sortByKey and join. Let's take a look at each of these four pair RDD transformations in detail now.
5. reduceByKey() transformation

The reduceByKey transformation is the most popular pair RDD transformation which combines values with the same key using a function. reduceByKey runs several parallel operations, one for each key in the dataset. Because datasets can have very large numbers of keys, reduceByKey is not implemented as an action. Instead, it returns a new RDD consisting of each key and the reduced value for that key. Here is an example of reducebykey transformation that uses a function to combine all the goals scored by each of the players. The result shows that player as key and total number of goals scored as value.
6. sortByKey() transformation

Sorting of data is necessary for many downstream applications. We can sort pair RDD as long as there is an ordering defined in the key. The sortByKey transformation returns an RDD sorted by key in ascending or descending order. Continuing our reduceByKey example, here is an example that sorts the data based on the number of goals scored by each player. A common use case of
7. groupByKey() transformation

pair RDDs is grouping the data by key. For example, viewing all of the airports for a particular country together. If the data is already keyed in the way that we want, the groupByKey operation groups all the values with the same key in the pair RDD. Here is an example of groupByKey transformation that groups all the airports for a particular country from an input list that contains list of tuples. Each tuple consists of country code and the corresponding airport code. Join transformation
8. join() transformation

joins two pair RDDs based on their key. Let's demonstrate this with an example. First, we create two RDDs. RDD1 contains the list of tuples with each tuple consisting of name and age and RDD2 contains the list of tuples with each tuple consisting of name and income. Applying join transformation on RDD1 and RDD2 merges two RDDs together by grouping elements with the same key. Here is an example that shows the result of join transformation of RDD1 and RDD2.
9. Let's practice

Now that you have learned all about pair RDDs, it's time for you to practice. 

# ReduceBykey and Collect

One of the most popular pair RDD transformations is reduceByKey() which operates on key, value (k,v) pairs and merges the values for each key. In this exercise, you'll first create a pair RDD from a list of tuples, then combine the values with the same key and finally print out the result.

Remember, you already have a SparkContext sc available in your workspace.

Instructions

1. Create a pair RDD named Rdd with tuples (1,2),(3,4),(3,6),(4,5).

2. Transform the Rdd with reduceByKey() into a pair RDD Rdd_Reduced by adding the values with the same key.

3. Collect the contents of pair RDD Rdd_Reduced and iterate to print the output.


In [None]:
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])

# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)

# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
  print("Key {} has {} Counts".format(num[0], num[1]))

'''
<script.py> output:
    Key 1 has 2 Counts
    Key 3 has 10 Counts
    Key 4 has 5 Counts
'''

Conclusion

Good job! reduceByKey() transformation merges the values for each key using an associative reduce function.

# SortByKey and Collect

Many times it is useful to sort the pair RDD based on the key (for example word count which you'll see later in the chapter). In this exercise, you'll sort the pair RDD Rdd_Reduced that you created in the previous exercise into descending order and print the final output.

Remember, you already have a SparkContext sc and Rdd_Reduced available in your workspace.

Instructions

1. Sort the Rdd_Reduced RDD using the key in descending order.

2. Collect the contents and iterate to print the output.


In [None]:
# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)

# Iterate over the result and print the output
for num in Rdd_Reduced_Sort.collect():
  print("Key {} has {} Counts".format(num[0], num[1]))

'''
<script.py> output:
    Key 4 has 5 Counts
    Key 3 has 10 Counts
    Key 1 has 2 Counts
'''

Conclusion

Congratulations! You'll see how you can use sortByKey() with real world data at the end of this chapter.

# Advanced RDD Actions

1. More actions

Previously you learned about advanced RDD Transformations for key/value datasets. Similar to advanced RDD Transformations there are advanced RDD Actions which you'll see in this video.
2. reduce() action

Reduce action takes in a function which operates on two elements of the same type of RDD and returns a new element of the same type. The function should be commutative and associative so that it can be computed correctly in parallel. A simple example of such a function is +, which we can use to sum our RDD. Here is an example of reduce action that calculates the sum of all the elements in an RDD. In this example, input RDD is first created using SparkContext's parallelize method on a list consisting of numbers 1,3,4,6. Eexcuting reduce action results in 14 which is the sum of 1,3,4,6.
3. saveAsTextFile() action

In many cases, it is not advisable to run collect action on RDDs because of the huge size of the data. In these cases, it’s common to write data out to a distributed storage systems such as HDFS or Amazon S3. saveAsTextFile action can be used to save RDD as a text file inside a particular directory. By default, saveAsTextFile saves RDD with each partition as a separate file inside a directory. Here is an example of saveAsTextFile that saves an RDD with each partition as a separate file inside a directory. However, you can change it to return a new RDD that is reduced into a single partition using the coalesce method. Here is an example of saveAsTextFile that saves RDD as a single file inside a directory. Similar to
4. Action Operations on pair RDDs

pair RDD Transformations, there are also RDD Actions available for pair RDDs. However, pair RDDs also attain some additional actions of PySpark especially those that leverage the advantage of data which is of key-value nature. Let’s take a look at two pair RDD actions - countByKey and collectAsMap in this video.
5. countByKey() action

countByKey is only available on RDDs of type (Key, Value). With the countByKey operation, we can count the number of elements for each key. Here is an example of counting the number of values for each key in the dataset. In this example, we first create a pair RDD named rdd using SparkContext's parallelize method. Since countByKey generates a dictionary, next we iterate over the dictionary to print the each unique and number of values associated with each key as shown here. One thing to note is that countByKey should only be used on a dataset whose size is small enough to fit in memory. collectAsMap
6. collectAsMap() action

returns the key-value pairs in the RDD to the as a dictionary. Here is an example of collectAsMap on a pair RDD. As before we create a pair RDD using SparkContext's parallelize method and next use collectAsMap action. collectAsMap produces the key-value pairs in the RDD as a dictionary which can be used for downstream analysis. Similar to countByKey, this action should only be used if the resulting data is expected to be small, as all the data is loaded into the memory. Let's practice
7. Let's practice

some of these advanced Actions on some test data in PySpark shell. 

In [None]:
In [4]:
Rdd.collect()
Out[4]:
[(1, 2), (3, 4), (3, 6), (4, 5)]

In [None]:
# Transform the rdd with countByKey()
total = Rdd.countByKey()

# What is the type of total?
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
  print("key", k, "has", v, "counts")

'''
<script.py> output:
    The type of total is <class 'collections.defaultdict'>
    key 1 has 1 counts
    key 3 has 2 counts
    key 4 has 1 counts
'''

Conclusion

Good job! Remember unlike reduceByKey() and sortByKey(), countByKey() is an action and not a transformation on the pair RDD.

# Create a base RDD and transform it

The volume of unstructured data (log lines, images, binary files) in existence is growing dramatically, and PySpark is an excellent framework for analyzing this type of data through RDDs. In this 3 part exercise, you will write code that calculates the most common words from Complete Works of William Shakespeare.

Here are the brief steps for writing the word counting program:

    * Create a base RDD from Complete_Shakespeare.txt file.
    * Use RDD transformation to create a long list of words from each element of the base RDD.
    * Remove stop words from your data.
    * Create pair RDD where each element is a pair tuple of ('w', 1)
    * Group the elements of the pair RDD by key (word) and add up their values.
    * Swap the keys (word) and values (counts) so that keys is count and value is the word.
    * Finally, sort the RDD by descending order and print the 10 most frequent words and their frequencies.

In this first exercise, you'll create a base RDD from Complete_Shakespeare.txt file and transform it to create a long list of words.

Remember, you already have a SparkContext sc already available in your workspace. A file_path variable (which is the path to the Complete_Shakespeare.txt file) is also loaded for you.

Instructions

1. Create an RDD called baseRDD that reads lines from file_path.

2. Transform the baseRDD into a long list of words and create a new splitRDD.

3. Count the total words in splitRDD.


In [None]:
# Create a baseRDD from the file path
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

'''
<script.py> output:
    Total number of words in splitRDD: 904061
'''

Conclusion

Good start! You have succesfully created and transformed RDD from unstructured data.

# Remove stop words and reduce the dataset

After splitting the lines in the file into a long list of words using flatMap() transformation, in the next step, you'll remove stop words from your data. Stop words are common words that are often uninteresting. For example "I", "the", "a" etc., are stop words. You can remove many obvious stop words with a list of your own. But for this exercise, you will just remove the stop words from a curated list stop_words provided to you in your environment.

After removing stop words, you'll next create a pair RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, pair RDD is composed of (w, 1) where w is for each word in the RDD and 1 is a number. Finally, you'll combine the values with the same key from the pair RDD using reduceByKey() operation

Remember you already have a SparkContext sc and splitRDD available in your workspace.

Instructions

1. Convert the words in splitRDD in lower case and then remove stop words from stop_words.

2. Create a pair RDD tuple containing the word and the number 1 from each word element in splitRDD.

3. Get the count of the number of occurrences of each word (word frequency) in the pair RDD using reduceByKey()


In [None]:
In [1]:
stop_words
Out[1]:

['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 'not',
 'only',
 'own',
 'same',
 'so',
 'than',
 'too',
 'very',
 'can',
 'will',
 'just',
 'don',
 'should',
 'now']

In [None]:
# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

Conclusion

Good job! You are nearly ready to print the words and their frequencies.

# Print word frequencies

After combining the values (counts) with the same key (word), you'll print the word frequencies using the take(N) action. You could have used the collect() action but as a best practice, it is not recommended as collect() returns all the elements from your RDD. You'll use take(N) instead, to return N elements from your RDD.

What if we want to return the top 10 words? For this first, you'll need to swap the key (word) and values (counts) so that keys is count and value is the word. After you swap the key and value in the tuple, you'll sort the pair RDD based on the key (count) and print the top 10 words in descending order.

You already have a SparkContext sc and resultRDD available in your workspace.

Instructions

1. Print the first 10 words and their frequencies from the resultRDD.

2. Swap the keys and values in the resultRDD.

3. Sort the keys according to descending order.

4. Print the top 10 most frequent words and their frequencies.


In [None]:
# Display the first 10 words and their frequencies
for word in resultRDD.take(10):
	print(word)

# Swap the keys and values 
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
	print("{} has {} counts". format(word[1], word[0]))
 
'''
<script.py> output:
    ('Quince', 1)
    ('Corin,', 2)
    ('circle', 10)
    ('enrooted', 1)
    ('divers', 20)
    ('Doubtless', 2)
    ('undistinguishable,', 1)
    ('widowhood,', 1)
    ('incorporate.', 1)
    ('rare,', 10)
    thou has 4247 counts
    thy has 3630 counts
    shall has 3018 counts
    good has 2046 counts
    would has 1974 counts
    Enter has 1926 counts
    thee has 1780 counts
    I'll has 1737 counts
    hath has 1614 counts
    like has 1452 counts
'''

Conclusion

Congratulations! You have sucessfully created a word count program using RDD in PySpark.