# Programming in PySpark RDD’s

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is the fundamental and backbone data type of this engine. This track introduces RDDs and shows how RDDs can be created and executed using RDD Transformations and Actions.

## Preparing the environment

### Importing libraries

In [1]:
import glob
import shutil

from pyspark.sql.types import (_parse_datatype_string, StructType, StructField,
                               DoubleType, IntegerType, StringType)
from pyspark.sql import SparkSession

### Connect to Spark

In [2]:
spark = SparkSession.builder.getOrCreate()

# eval DataFrame in notebooks
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [3]:
sc = spark.sparkContext

### Reading the data

In [4]:
fifa = spark.read.csv('data-sources/Fifa2018_dataset.csv', header=True, inferSchema=True)
# cast to integer
for col_name in ['Acceleration', 'Aggression', 'Agility', 'Balance', 'Ball control', 'Composure', 
                 'Crossing', 'Curve', 'Dribbling', 'Finishing', 'Free kick accuracy', 'GK diving', 
                 'GK handling', 'GK kicking', 'GK positioning', 'GK reflexes', 'Heading accuracy', 
                 'Interceptions', 'Jumping', 'Long passing', 'Long shots', 'Marking', 'Penalties', 
                 'Positioning', 'Reactions', 'Short passing', 'Shot power', 'Sliding tackle', 
                 'Sprint speed', 'Stamina', 'Standing tackle', 'Strength', 'Vision', 'Volleys']:
    fifa = fifa.withColumn(col_name, fifa[col_name].cast('integer'))
fifa.createOrReplaceTempView("fifa")
fifa.printSchema()
fifa.limit(2)

root
 |-- _c0: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Photo: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Flag: string (nullable = true)
 |-- Overall: integer (nullable = true)
 |-- Potential: integer (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club Logo: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Wage: string (nullable = true)
 |-- Special: integer (nullable = true)
 |-- Acceleration: integer (nullable = true)
 |-- Aggression: integer (nullable = true)
 |-- Agility: integer (nullable = true)
 |-- Balance: integer (nullable = true)
 |-- Ball control: integer (nullable = true)
 |-- Composure: integer (nullable = true)
 |-- Crossing: integer (nullable = true)
 |-- Curve: integer (nullable = true)
 |-- Dribbling: integer (nullable = true)
 |-- Finishing: integer (nullable = true)
 |-- Free kick accuracy: integer (nullable = true)
 |-- GK diving: integer (nulla

_c0,Name,Age,Photo,Nationality,Flag,Overall,Potential,Club,Club Logo,Value,Wage,Special,Acceleration,Aggression,Agility,Balance,Ball control,Composure,Crossing,Curve,Dribbling,Finishing,Free kick accuracy,GK diving,GK handling,GK kicking,GK positioning,GK reflexes,Heading accuracy,Interceptions,Jumping,Long passing,Long shots,Marking,Penalties,Positioning,Reactions,Short passing,Shot power,Sliding tackle,Sprint speed,Stamina,Standing tackle,Strength,Vision,Volleys,CAM,CB,CDM,CF,CM,ID,LAM,LB,LCB,LCM,LDM,LF,LM,LS,LW,LWB,Preferred Positions,RAM,RB,RCB,RCM,RDM,RF,RM,RS,RW,RWB,ST
0,Cristiano Ronaldo,32,https://cdn.sofif...,Portugal,https://cdn.sofif...,94,94,Real Madrid CF,https://cdn.sofif...,€95.5M,€565K,2228,89,63,89,63,93,95,85,81,91,94,76,7,11,15,14,11,88,29,95,77,92,22,85,95,96,83,94,23,91,92,31,80,85,88,89.0,53.0,62.0,91.0,82.0,20801,89.0,61.0,53.0,82.0,62.0,91.0,89.0,92.0,91.0,66.0,ST LW,89.0,61.0,53.0,82.0,62.0,91.0,89.0,92.0,91.0,66.0,92.0
1,L. Messi,30,https://cdn.sofif...,Argentina,https://cdn.sofif...,93,93,FC Barcelona,https://cdn.sofif...,€105M,€565K,2154,92,48,90,95,95,96,77,89,97,95,90,6,11,15,14,8,71,22,68,87,88,13,74,93,95,88,85,26,87,73,28,59,90,85,92.0,45.0,59.0,92.0,84.0,158023,92.0,57.0,45.0,84.0,59.0,92.0,90.0,88.0,91.0,62.0,RW,92.0,57.0,45.0,84.0,59.0,92.0,90.0,88.0,91.0,62.0,88.0


In [5]:
movies = spark.read.csv('data-sources/movie-ratings.csv', header=False, inferSchema=True,
                        schema='userId int, movieId int, rating double, timestamp int')
movies.createOrReplaceTempView("movies")
movies.printSchema()
movies.limit(2)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



userId,movieId,rating,timestamp
1,31,2.5,1260759144
1,1029,3.0,1260759179


In [6]:
people = spark.read.csv('data-sources/people.csv', header=True, inferSchema=True)
people.createOrReplaceTempView("people")
people.printSchema()
people.limit(2)

root
 |-- _c0: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- date of birth: timestamp (nullable = true)



_c0,person_id,name,sex,date of birth
0,100,Penelope Lewis,female,1990-08-31 00:00:00
1,101,David Anthony,male,1971-10-14 00:00:00


In [7]:
wine = spark.read.csv('data-sources/wine-data.csv', header=True, inferSchema=True)
wine.createOrReplaceTempView("wine")
wine.printSchema()
wine.limit(2)

root
 |-- Wine: integer (nullable = true)
 |-- Alcohol: double (nullable = true)
 |-- Malic.acid: double (nullable = true)
 |-- Ash: double (nullable = true)
 |-- Acl: double (nullable = true)
 |-- Mg: integer (nullable = true)
 |-- Phenols: double (nullable = true)
 |-- Flavanoids: double (nullable = true)
 |-- Nonflavanoid.phenols: double (nullable = true)
 |-- Proanth: double (nullable = true)
 |-- Color.int: double (nullable = true)
 |-- Hue: double (nullable = true)
 |-- OD: double (nullable = true)
 |-- Proline: integer (nullable = true)



Wine,Alcohol,Malic.acid,Ash,Acl,Mg,Phenols,Flavanoids,Nonflavanoid.phenols,Proanth,Color.int,Hue,OD,Proline
1,14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065
1,13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050


In [8]:
spark.catalog.listTables()

[Table(name='fifa', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='movies', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='people', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='wine', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

## Ex. 1 - RDDs from Parallelized collections
Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. It is an immutable distributed collection of objects. Since RDD is a fundamental and backbone data type in Spark, it is important that you understand how to create it. In this exercise, you'll create your first RDD in PySpark from a collection of words.

**Instructions:**

1. Create a RDD named `RDD` from a Python list of words.
2. Confirm the object created is RDD.

In [9]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Review the parallelized data
print(f"""
{RDD}
Total values: {RDD.count()}
Values:
{RDD.take(6)}
""")


ParallelCollectionRDD[69] at readRDDFromFile at PythonRDD.scala:289
Total values: 6
Values:
['Spark', 'is', 'a', 'framework', 'for', 'Big Data processing']



In [10]:
# Print out the type of the created object
print("The type of RDD is", type(RDD))

The type of RDD is <class 'pyspark.rdd.RDD'>


## Ex. 2 - RDDs from External Datasets

PySpark can easily create RDDs from files that are stored in external storage devices, such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc. However, the most common method of creating RDD's is from files stored in your local file system. This method takes a file path and reads it as a collection of lines. In this exercise, you'll create an RDD from the file path (`file_path`) with the file name `sample_text.md` which is already available in your workspace.

**Instructions:**

1. Print the `file_path` in the PySpark shell.
2. Create a RDD named `fileRDD` from a file_path.
3. Print the type of the `fileRDD` created.

In [11]:
file_path = 'data-sources/sample_text.md'

# Print the file_path
print("The file_path is", file_path)

The file_path is data-sources/sample_text.md


In [12]:
# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Review the loaded data
print(f"""
{fileRDD}
Total lines in the file: {fileRDD.count()}
First 5 lines of the file:
""")
fileRDD.take(5)


data-sources/sample_text.md MapPartitionsRDD[75] at textFile at <unknown>:0
Total lines in the file: 36
First 5 lines of the file:



['[![buildstatus](https://travis-ci.org/holdenk/learning-spark-examples.svg?branch=master)](https://travis-ci.org/holdenk/learning-spark-examples)',
 'Examples for Learning Spark',
 'Examples for the Learning Spark book. These examples require a number of libraries and as such have long build files. We have also added a stand alone example with minimal dependencies and a small build file',
 'in the mini-complete-example directory.']

In [13]:
# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

The file type of fileRDD is <class 'pyspark.rdd.RDD'>


## Ex. 3 - Partitions in your data

SparkContext's `textFile()` method takes an optional second argument called `minPartitions` for specifying the minimum number of partitions. In this exercise, you'll create a RDD named `fileRDD_part` with 5 partitions and then compare that with fileRDD that you created in the previous exercise. Refer to the "Understanding Partition" slide in video 2.1 to know the methods for creating and getting the number of partitions in a RDD.

**Instructions:**

1. Find the number of partitions that support `fileRDD` RDD.
2. Create an RDD named `fileRDD_part` from the file path but create 5 partitions.
3. Confirm the number of partitions in the new `fileRDD_part` RDD.

In [14]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

Number of partitions in fileRDD is 2
Number of partitions in fileRDD_part is 5


## Basic RDD Transformations and Actions

### map() - RDD Transformations

`map()` transformation applies a function to all elements in the RDD.

In [15]:
RDD = sc.parallelize([1,2,3,4])
RDD_map = RDD.map(lambda x: x * x)
RDD_map.collect()

[1, 4, 9, 16]

### filter() - RDD Transformations

`filter()` transformation returns a new RDD with only the elements that pass the condition

In [16]:
RDD = sc.parallelize([1,2,3,4])
RDD_filter = RDD.filter(lambda x: x > 2)
RDD_map.collect()

[1, 4, 9, 16]

### flatMap() - RDD Transformations

`flatMap()` transformation returns multiple values for each element in the original RDD

In [17]:
RDD = sc.parallelize(["hello world", "how are you"])
RDD_flatmap = RDD.flatMap(lambda x: x.split(" "))
RDD_flatmap.collect()

['hello', 'world', 'how', 'are', 'you']

### union() - RDD Transformations

In [18]:
inputRDD = sc.textFile("data-sources/data.log")
errorRDD = inputRDD.filter(lambda x: "error" in x.split())
warningsRDD = inputRDD.filter(lambda x: "warning" in x.split())
combinedRDD = errorRDD.union(warningsRDD)

In [19]:
errorRDD.collect()

['03/22 08:51:06 error   :...read_physical_netif: index #3, interface LINK12 has address 9.67.101.1, ifidx 3',
 '03/22 08:51:06 error   :...read_physical_netif: index #5, interface CTCD2 has address 9.67.117.98, ifidx 5']

In [20]:
warningsRDD.collect()



In [21]:
combinedRDD.collect()

['03/22 08:51:06 error   :...read_physical_netif: index #3, interface LINK12 has address 9.67.101.1, ifidx 3',
 '03/22 08:51:06 error   :...read_physical_netif: index #5, interface CTCD2 has address 9.67.117.98, ifidx 5',

### collect() - RDD Actions

`collect()` return all the elements of the dataset as an array

In [22]:
RDD_map.collect()

[1, 4, 9, 16]

### take(N) - RDD Actions

`take(N)` returns an array with the first N elements of the dataset

In [23]:
RDD_map.take(2)

[1, 4]

### first() - RDD Actions

`first()` prints the first element of the RDD

In [24]:
RDD_map.first()

1

### count() - RDD Actions

`count()` return the number of elements in the RDD

In [25]:
RDD_map.count()

4

## Ex. 4 - Map and Collect

The main method with which you can manipulate data in PySpark is using `map()`. The `map()` transformation takes in a function and applies it to each element in the RDD. It can be used to do any number of things, from fetching the website associated with each URL in our collection to just squaring the numbers. In this simple exercise, you'll use `map()` transformation to cube each number of the `numbRDD` RDD that you've created earlier. Next, you'll store all the elements in a variable and finally print the output.

**Instructions:**

1. Create `map()` transformation that cubes all of the numbers in `numbRDD`.
2. Collect the results in a `numbers_all` variable.
3. Print the output from `numbers_all` variable.

In [26]:
# Loading the data
numbRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Create map() transformation to cube numbers
cubedRDD = numbRDD.map(lambda x: x**3)

# Collect the results
numbers_all = cubedRDD.collect()

# Print the numbers from numbers_all
print(numbers_all)

[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]


## Ex. 5 - Filter and Count
The RDD transformation `filter()` returns a new RDD containing only the elements that satisfy a particular function. It is useful for filtering large datasets based on a keyword. For this exercise, you'll filter out lines containing keyword Spark from fileRDD RDD which consists of lines of text from the `sample_text.md` file. Next, you'll count the total number of lines containing the keyword Spark and finally print the first 4 lines of the filtered RDD.

**Instructions:**

1. Create `filter()` transformation to select the lines containing the keyword Spark.
2. How many lines in `fileRDD_filter` contain the keyword Spark?
3. Print the first four lines of the resulting RDD.

In [27]:
# Load the data
file_path = 'data-sources/sample_text.md'
fileRDD = sc.textFile(file_path)

# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

# Print the first four lines of fileRDD
print("\nFirst 4 lines of filtered file:")
for i, line in enumerate(fileRDD_filter.take(4)):
  print(f'({i+1}) - {line}')

The total number of lines with the keyword Spark is 7

First 4 lines of filtered file:
(1) - Examples for Learning Spark
(2) - Examples for the Learning Spark book. These examples require a number of libraries and as such have long build files. We have also added a stand alone example with minimal dependencies and a small build file
(3) - These examples have been updated to run against Spark 1.3 so they may
(4) - be slightly different than the versions in your copy of "Learning Spark".


## Pair RDDs in PySpark

Pair RDD is a special data structure to work with key/value pairs datasets. Pair RDD: Key is the identifier and value is the data.

### Creating pair RDDs - From a list of key-value tuple

In [28]:
my_tuple = [('Sam', 23), ('Mary', 34), ('Peter', 25)]
pairRDD_tuple = sc.parallelize(my_tuple)

# Reviewing the created pair RDDs
print(f'''
Data: {pairRDD_tuple.collect()}
Keys: {pairRDD_tuple.keys().collect()}
Values: {pairRDD_tuple.values().collect()}
''')


Data: [('Sam', 23), ('Mary', 34), ('Peter', 25)]
Keys: ['Sam', 'Mary', 'Peter']
Values: [23, 34, 25]



### Creating pair RDDs - From a regular RDD

In [29]:
my_list = ['Sam 23', 'Mary 34', 'Peter 25']
regularRDD = sc.parallelize(my_list)
pairRDD_RDD = regularRDD.map(lambda s: (s.split(' ')[0], s.split(' ')[1]))

# Reviewing the created pair RDDs
print(f'''
Data: {pairRDD_RDD.collect()}
Keys: {pairRDD_RDD.keys().collect()}
Values: {pairRDD_RDD.values().collect()}
''')


Data: [('Sam', '23'), ('Mary', '34'), ('Peter', '25')]
Keys: ['Sam', 'Mary', 'Peter']
Values: ['23', '34', '25']



### reduceByKey(func) - Transformations on pair RDDs

- `reduceByKey()` transformation combines values with the same key.
- It runs parallel operations for each key in the dataset.
- It is a transformation and not action.

In [30]:
regularRDD = sc.parallelize([("Messi", 23), ("Ronaldo", 34), ("Neymar", 22), 
                             ("Messi", 24), ("Messi", 100), ])

pairRDD_reducebykey = regularRDD.reduceByKey(lambda x, y : x + y)
pairRDD_reducebykey.collect()

[('Neymar', 22), ('Ronaldo', 34), ('Messi', 147)]

### groupByKey() - Transformations on pair RDDs

- `sortByKey()` operation orders pair RDD by key.
- It returns an RDD sorted by key in ascending or descending order.

In [31]:
pairRDD_reducebykey_rev = pairRDD_reducebykey.map(lambda x: (x[1], x[0]))
pairRDD_reducebykey_rev.sortByKey(ascending=False).collect()

[(147, 'Messi'), (34, 'Ronaldo'), (22, 'Neymar')]

### sortByKey() - Transformations on pair RDDs

- `groupByKey()` groups all the values with the same key in the pair RDD.

In [32]:
airports = [("US", "JFK"),("UK", "LHR"),("FR", "CDG"),("US", "SFO")]
regularRDD = sc.parallelize(airports)

pairRDD_group = regularRDD.groupByKey().collect()
for cont, air in pairRDD_group:
    print(cont, list(air))

UK ['LHR']
US ['JFK', 'SFO']
FR ['CDG']


### join() - Transformations on pair RDDs

- `join()` transformation joins the two pair RDDs based on their key.

In [33]:
RDD1 = sc.parallelize([("Messi", 34),("Ronaldo", 32),("Neymar", 24)])
RDD2 = sc.parallelize([("Ronaldo", 80),("Neymar", 120),("Messi", 100)])

RDD1.join(RDD2).collect()

[('Neymar', (24, 120)), ('Messi', (34, 100)), ('Ronaldo', (32, 80))]

## Ex. 6 - ReduceBykey and Collect

One of the most popular pair RDD transformations is `reduceByKey()` which operates on key, value (k,v) pairs and merges the values for each key. In this exercise, you'll first create a pair RDD from a list of tuples, then combine the values with the same key and finally print out the result.

**Instructions:**

1. Create a pair RDD named Rdd with tuples `(1,2),(3,4),(3,6),(4,5)`.
2. Transform the `Rdd` with `reduceByKey()` into a pair RDD `Rdd_Reduced` by adding the values with the same key.
3. Collect the contents of pair RDD `Rdd_Reduced` and iterate to print the output.

In [34]:
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])
Rdd.collect()

[(1, 2), (3, 4), (3, 6), (4, 5)]

In [35]:
# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)
Rdd_Reduced.collect()

[(1, 2), (3, 10), (4, 5)]

In [36]:
# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
  print("Key {} has {} Counts".format(num[0], num[1]))

Key 1 has 2 Counts
Key 3 has 10 Counts
Key 4 has 5 Counts


## Ex. 7 - SortByKey and Collect

Many times it is useful to sort the pair RDD based on the key (for example word count which you'll see later in the track). In this exercise, you'll sort the pair RDD `Rdd_Reduced` that you created in the previous exercise into descending order and print the final output.

**Instructions:**

1. Sort the `Rdd_Reduced` RDD using the key in descending order.
2. Collect the contents and iterate to print the output.

In [37]:
# Reviewing the data to sort
Rdd_Reduced.collect()

[(1, 2), (3, 10), (4, 5)]

In [38]:
# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)
Rdd_Reduced_Sort.collect()

[(4, 5), (3, 10), (1, 2)]

In [39]:
# Iterate over the result and retrieve all the elements of the RDD
for num in Rdd_Reduced_Sort.collect():
  print("Key {} has {} Counts".format(num[0], num[1]))

Key 4 has 5 Counts
Key 3 has 10 Counts
Key 1 has 2 Counts


## Advanced RDD Actions

### reduce() action

- `reduce(func)` action is used for aggregating the elements of a regular RDD.
- The function should be commutative (changing the order of the operands does not change
the result) and associative.

In [40]:
x = [1,3,4,6]
RDD = sc.parallelize(x)
RDD.reduce(lambda x, y : x + y)

14

### saveAsTextFile() action

- `saveAsTextFile()` action saves RDD into a text file inside a directory with each partition as
a separate fil.
- `coalesce()` method can be used to save RDD as a single text filee

In [41]:
# Try to remove previous saved data.
output_file_path = "spark-warehouse/tempSampleText"
output_single_file_path = "spark-warehouse/tempSampleText_single"

try:
    shutil.rmtree(output_file_path)
except OSError as e:
    print("Error: %s - %s." % (e.filename, e.strerror))

try:
    shutil.rmtree(output_single_file_path)
except OSError as e:
    print("Error: %s - %s." % (e.filename, e.strerror))

In [42]:
# Load the data
file_path = 'data-sources/sample_text.md'
fileRDD = sc.textFile(file_path)

fileRDD.saveAsTextFile(output_file_path)
glob.glob(f'{output_file_path}/*')

['spark-warehouse/tempSampleText\\part-00000',
 'spark-warehouse/tempSampleText\\part-00001',
 'spark-warehouse/tempSampleText\\_SUCCESS']

In [43]:
fileRDD.coalesce(1).saveAsTextFile(output_single_file_path)
glob.glob(f'{output_single_file_path}/*')

['spark-warehouse/tempSampleText_single\\part-00000',
 'spark-warehouse/tempSampleText_single\\_SUCCESS']

### countByKey() - Action Operations on pair RDDs

- `countByKey()` only available for type `(K, V)`.
- `countByKey()` action counts the number of elements for each key.

In [44]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

for kee, val in rdd.countByKey().items():
    print(kee, val)

a 2
b 1


### collectAsMap() - Action Operations on pair RDDs

- `collectAsMap()` return the key-value pairs in the RDD as a dictionary.

In [45]:
sc.parallelize([(1, 2), (3, 4)]).collectAsMap()

{1: 2, 3: 4}

## Ex. 8 - CountingBykeys

For many datasets, it is important to count the number of keys in a key/value dataset. For example, counting the number of countries where the product was sold or to show the most popular baby names. In this simple exercise, you'll use the Rdd that you created earlier and count the number of unique keys in that pair RDD.

**Instructions:**

1. `countByKeyand` assign the result to a variable `total`.
2. What is the type of `total`?
3. Iterate over the `total` and print the keys and their counts.

In [46]:
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])
Rdd.collect()

[(1, 2), (3, 4), (3, 6), (4, 5)]

In [47]:
# Count the unique keys
total = Rdd.countByKey()

# What is the type of total?
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
  print("key", k, "has", v, "counts")

The type of total is <class 'collections.defaultdict'>
key 1 has 1 counts
key 3 has 2 counts
key 4 has 1 counts


## Ex. 9 - Create a base RDD and transform it

The volume of unstructured data (log lines, images, binary files) in existence is growing dramatically, and PySpark is an excellent framework for analyzing this type of data through RDDs. In this 3 part exercise, you will write code that calculates the most common words from [Complete Works of William Shakespeare](data-sources/Complete_Shakespeare.txt).

Here are the brief steps for writing the word counting program:

- Create a base RDD from `Complete_Shakespeare.txt` file.
- Use RDD transformation to create a long list of words from each element of the base RDD.
- Remove stop words from your data.
- Create pair RDD where each element is a pair tuple of `('w', 1)`
- Group the elements of the pair RDD by key (word) and add up their values.
- Swap the keys (word) and values (counts) so that keys is count and value is the word.
- Finally, sort the RDD by descending order and print the 10 most frequent words and their frequencies.

In this first exercise, you'll create a base RDD from `Complete_Shakespeare.txt` file and transform it to create a long list of words.

**Instructions:**

1. Create a RDD called baseRDD that reads lines from file_path.
2. Transform the baseRDD into a long list of words and create a new splitRDD.
3. Count the total number words in splitRDD.

In [48]:
# Create a baseRDD from the file path
file_path = 'data-sources/Complete_Shakespeare.txt'
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

Total number of words in splitRDD: 128576


## Ex. 10 - Remove stop words and reduce the dataset

In this exercise you'll remove stop words from your data. Stop words are common words that are often uninteresting, for example, "I", "the", "a" etc. You can remove many obvious stop words with a list of your own. But for this exercise, you will just remove the stop words from a curated list `stop_words` provided to you in your environment.

After removing stop words, you'll create a pair RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, pair RDD is composed of `(w, 1)` where `w` is for each word in the RDD and `1` is a number. Finally, you'll combine the values with the same key from the pair RDD to count the number of occurrences of each word.

**Instructions:**

1. Convert the words in `splitRDD` in lower case and then remove stop words from `stop_words` curated list. Think carefully about which function to use here.
2. Create a pair RDD tuple containing the word and the number 1 from each word element in `splitRDD`.
3. Get the count of the number of occurrences of each word (word frequency) in the pair RDD. Use a transformation which operates on key, value `(k,v)` pairs. Think carefully about which function to use here.

In [49]:
stop_words = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 
              'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 
              'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 
              'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 
              'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 
              'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 
              'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 
              'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 
              'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 
              'with', 'about', 'against', 'between', 'into', 'through', 
              'during', 'before', 'after', 'above', 'below', 'to', 'from', 
              'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 
              'again', 'further', 'then', 'once', 'here', 'there', 'when', 
              'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 
              'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 
              'only', 'own', 'same', 'so', 'than', 'too', 'very', 'can', 
              'will', 'just', 'don', 'should', 'now']

In [50]:
# Convert the words in lower case and remove stop words from the stop_words curated list
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

print(f'''
Total words in text: {splitRDD.count()}
Total words after removing stop words: {splitRDD_no_stop.count()}
''')


Total words in text: 128576
Total words after removing stop words: 73305



In [51]:
# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

print(f'''
First 5 elements:
{splitRDD_no_stop_words.take(5)}
''')


First 5 elements:
[('Project', 1), ('Gutenberg', 1), ('EBook', 1), ('Complete', 1), ('Works', 1)]



In [52]:
# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

print(f'''
First 5 elements:
{resultRDD.take(5)}
''')


First 5 elements:
[('Gutenberg', 7), ('EBook', 1), ('Complete', 3), ('Works', 3), ('Shakespeare', 12)]



## Ex. 11 - Print word frequencies

After combining the values (counts) with the same key (word), in this exercise, you'll return the first 10 word frequencies. You could have retrieved all the elements at once using `collect()`, but it is bad practice and not recommended. RDDs can be huge: you may run out of memory and crash your computer..

What if we want to return the top 10 words? For this, first you'll need to swap the key (word) and values (counts) so that keys is count and value is the word. Right now, `result_RDD` has key as element 0 and value as element 1. After you swap the key and value in the tuple, you'll sort the pair RDD based on the key (count). This way it is easy to sort the RDD based on the key rather than using `sortByKey` operation in PySpark. Finally, you'll return the top 10 words based on their frequencies from the sorted RDD.

**Instructions:**

1. Print the first 10 words and their frequencies from the `resultRDD` RDD.
2. Swap the keys and values in the `resultRDD`.
3. Sort the keys according to descending order.
4. Print the top 10 most frequent words and their frequencies from the sorted RDD.

In [53]:
# Display the first 10 words and their frequencies from the input RDD
for word in resultRDD.take(10):
	print(word)

('Gutenberg', 7)
('EBook', 1)
('Complete', 3)
('Works', 3)
('Shakespeare', 12)
('eBook', 2)
('use', 38)
('anyone', 1)
('cost', 8)
('almost', 25)


In [54]:
# Swap the keys and values from the input RDD
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))
resultRDD_swap.take(10)

[(7, 'Gutenberg'),
 (1, 'EBook'),
 (3, 'Complete'),
 (3, 'Works'),
 (12, 'Shakespeare'),
 (2, 'eBook'),
 (38, 'use'),
 (1, 'anyone'),
 (8, 'cost'),
 (25, 'almost')]

In [55]:
# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)
resultRDD_swap_sort.take(10)

[(650, 'thou'),
 (574, 'thy'),
 (393, 'shall'),
 (311, 'would'),
 (295, 'good'),
 (286, 'thee'),
 (273, 'love'),
 (269, 'Enter'),
 (254, "th'"),
 (225, 'make')]

In [56]:
# Show the top 10 most frequent words and their frequencies from the sorted RDD
for word in resultRDD_swap_sort.take(10):
	print("{}: {} times". format(word[1], word[0]))

thou: 650 times
thy: 574 times
shall: 393 times
would: 311 times
good: 295 times
thee: 286 times
love: 273 times
Enter: 269 times
th': 254 times
make: 225 times


## Close

In [57]:
spark.stop()