### Interactive Use of PySpark
- Spark comes with an interactive python shell in which PySpark is already installed in it. PySpark shell is useful for basic testing and debugging and it is quite powerful. The easiest way to demonstrate the power of PySpark’s shell is to start using it. 
- The most important thing to understand here is that we are not creating any SparkContext object because PySpark automatically creates the SparkContext object named sc, by default in the PySpark shell.

In [1]:
# import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master mymaster --total-executor 2 --conf spark.driver.extraJavaOptions=-Dhttp.proxyHost=proxy.mycorp.com-Dhttp.proxyPort=1234 -Dhttp.nonProxyHosts=localhost|.mycorp.com|127.0.0.1 -Dhttps.proxyHost=proxy.mycorp.com -Dhttps.proxyPort=1234 -Dhttps.nonProxyHosts=localhost|.mycorp.com|127.0.0.1 pyspark-shell"

In [1]:
import findspark
findspark.init('C:/spark/spark-3.0.0-bin-hadoop2.7/')
findspark.find()
import pyspark
findspark.find()

'C:/spark/spark-3.0.0-bin-hadoop2.7/'

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [11]:
sc.version

'3.0.0'

In [12]:
sc.pythonVer

'3.7'

In [13]:
sc.master

'local'

In [10]:
# Create a python list of numbers from 1 to 100 
numb = range(1, 101)

# Load the list into PySpark  
spark_data = sc.parallelize(numb)
spark_data

PythonRDD[2] at RDD at PythonRDD.scala:53

### Loading data in PySpark shell
- In PySpark, we express our computation through operations on distributed collections that are automatically parallelized across the cluster. 

In [16]:
lines = sc.textFile('data/Complete.txt')

- SparkContext's textFile() method is quite powerful for creating distributed collections of unstructured data .

### Use of lambda() with map()
- The map() function in Python returns a list of the results after applying the given function to each item of a given iterable (list, tuple etc.). The general syntax of map() function is map(fun, iter). We can also use lambda functions with map(). The general syntax of map() function with lambda() is map(lambda <agument>:<expression>, iter)

In [18]:
my_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Print my_list in the console
print("Input list is", my_list)

# Square all numbers in my_list
squared_list_lambda = list(map(lambda x: x**2, my_list))

# Print the result of the map function
print("The squared numbers are", squared_list_lambda)

Input list is [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
The squared numbers are [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


### Use of lambda() with filter()
- The filter() function in Python takes in a function and a list as arguments. The general syntax of the filter() function is filter(function, list_of_input). Similar to the map(), filter() can be used with lambda() function. The general syntax of the filter() function with lambda() is filter(lambda <argument>:<expression>, list)
    

In [20]:
my_list2 = [10, 21, 31, 40, 51, 60, 72, 80, 93, 101]

# Print my_list2 in the console
print("Input list is:", my_list2)

# Filter numbers divisible by 10
filtered_list = list(filter(lambda x: (x%10 == 0), my_list2))

# Print the numbers divisible by 10
print("Numbers divisible by 10 are:", filtered_list)

Input list is: [10, 21, 31, 40, 51, 60, 72, 80, 93, 101]
Numbers divisible by 10 are: [10, 40, 60, 80]


### RDDs from Parallelized collections
- Resilient Distributed Dataset (RDD) is the basic abstraction in Spark. It is an immutable distributed collection of objects. Since RDD is a fundamental and backbone data type in Spark, it is important that we understand how to create it. 

In [21]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))

The type of RDD is <class 'pyspark.rdd.RDD'>


### RDDs from External Datasets
- PySpark can easily create RDDs from files that are stored in external storage devices such as HDFS (Hadoop Distributed File System), Amazon S3 buckets, etc. However, the most common method of creating RDD's is from files stored in your local file system. This method takes a file path and reads it as a collection of lines.

In [22]:
# Create a fileRDD from file_path
fileRDD = sc.textFile('data/Complete.txt')

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

The file type of fileRDD is <class 'pyspark.rdd.RDD'>


### Partitions in your data
- SparkContext's textFile() method takes an optional second argument called minPartitions for specifying the minimum number of partitions.We'll create an RDD named fileRDD_part with 5 partitions and then compare that with fileRDD that you created

In [23]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile('data/Complete.txt', minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

Number of partitions in fileRDD is 1
Number of partitions in fileRDD_part is 5


- Note that modifying the number of partitions may result in faster performance due to parallelization.

### Map and Collect
- The main method by which we can manipulate data in PySpark is using map(). The map() transformation takes in a function and applies it to each element in the RDD. It can be used to do any number of things, from fetching the website associated with each URL in our collection to just squaring the numbers.

In [26]:
numbRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Create map() transformation to cube numbers
cubedRDD = numbRDD.map(lambda x: x*x*x)

# Collect the results
numbers_all = cubedRDD.collect()

# Print the numbers from numbers_all
for numb in numbers_all:
    print(numb)

1
8
27
64
125
216
343
512
729
1000


- collect() should only be used to retrieve results for small datasets. It shouldn’t be used on large datasets.

### Filter and Count
- The RDD transformation filter() returns a new RDD containing only the elements that satisfy a particular function. It is useful for filtering large datasets based on a keyword. 
- filter out lines containing keyword Spark from fileRDD RDD which consists of lines of text from the README.md file. 
- Next, we'll count the total number of lines containing the keyword Spark and finally print the first 4 lines of the filtered RDD.

In [27]:
filepath = 'README.md'

fileRDD = sc.textFile(filepath)

# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line.split())

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

# Print the first four lines of fileRDD
for line in fileRDD_filter.take(4): 
  print(line)


The total number of lines with the keyword Spark is 12
Spark is a tool for doing parallel computation with large datasets and it integrates well with Python.
## Spark
- Spark is a platform for cluster computing. Spark lets us spread data and computations over clusters with multiple nodes(think of each node as a seperate computer).Splitting up the data makes it easier to work with very large datasets because each node only works with small amount of data.
- However, with greater computing power comes greater complexity. Deciding whether or not Spark is the best solution for your problem takes some experience, but we can consider questions like:


- Note that the filter() operation does not mutate the existing fileRDD. Instead, it returns a pointer to an entirely new RDD.

### ReduceBykey and Collect
- One of the most popular pair RDD transformations is reduceByKey() which operates on key, value (k,v) pairs and merges the values for each key. 
- create a pair RDD from a list of tuples, then combine the values with the same key and finally print out the result.

In [32]:
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])

# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x+y)

# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
    print("Key {} has {} Counts".format(num[0], num[1]))

Key 1 has 2 Counts
Key 3 has 10 Counts
Key 4 has 5 Counts


- reduceByKey() transformation merges the values for each key using an associative reduce function.

### SortByKey and Collect
- Many times it is useful to sort the pair RDD based on the key 
- sort the pair RDD Rdd_Reduced into descending order and print the final output.

In [34]:
# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)

# Iterate over the result and print the output
for num in Rdd_Reduced_Sort.collect():
    print("Key {} has {} Counts".format(num[0], num[1]))

Key 4 has 5 Counts
Key 3 has 10 Counts
Key 1 has 2 Counts


### CountingBykeys
- For many datasets, it is important to count the number of keys in a key/value dataset. For example, counting the number of countries where the product was sold or to show the most popular baby names.
- use the Rdd pair RDD that you created earlier and count the number of unique keys in that pair RDD.

In [36]:
Rdd.collect()

[(1, 2), (3, 4), (3, 6), (4, 5)]

In [35]:
# Transform the rdd with countByKey()
total = Rdd.countByKey()

# What is the type of total?
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
    print("key", k, "has", v, "counts")

The type of total is <class 'collections.defaultdict'>
key 1 has 1 counts
key 3 has 2 counts
key 4 has 1 counts


- Remember unlike reduceByKey() and sortByKey(), countByKey() is an action and not a transformation on the pair RDD.

### Create a base RDD and transform it
- The volume of unstructured data (log lines, images, binary files) in existence is growing dramatically, and PySpark is an excellent framework for analyzing this type of data through RDDs.
- write code that calculates the most common words from Complete Works of William Shakespeare.
- Here are the brief steps for writing the word counting program:

1. Create a base RDD from Complete_Shakespeare.txt file.
2. Use RDD transformation to create a long list of words from each element of the base RDD.
3. Remove stop words from your data.
4. Create pair RDD where each element is a pair tuple of ('w', 1)
5. Group the elements of the pair RDD by key (word) and add up their values.
6. Swap the keys (word) and values (counts) so that keys is count and value is the word.
7. Finally, sort the RDD by descending order and print the 10 most frequent words and their frequencies.

In [28]:
file_path = 'data/Complete.txt'

# Create a baseRDD from the file path
baseRDD = sc.textFile(file_path)

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split())

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

Total number of words in splitRDD: 128576


In [29]:
splitRDD.take(10)

['The',
 'Project',
 'Gutenberg',
 'EBook',
 'of',
 'The',
 'Complete',
 'Works',
 'of',
 'William']

- We have succesfully created and transformed RDD from unstructured data.

#### Remove stop words and reduce the dataset
- After splitting the lines in the file into a long list of words using flatMap() transformation, in the next step, we'll remove stop words from your data. Stop words are common words that are often uninteresting. For example "I", "the", "a" etc., are stop words. We can remove many obvious stop words with a list of our own.
- After removing stop words, we'll next create a pair RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, pair RDD is composed of (w, 1) where w is for each word in the RDD and 1 is a number. Finally, we'll combine the values with the same key from the pair RDD using reduceByKey() operation.

In [30]:
stop_words = ['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 'her',
 'hers',
 'herself',
 'it',
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under',
 'again',
 'further',
 'then',
 'once',
 'here',
 'there',
 'when',
 'where',
 'why',
 'how',
 'all',
 'any',
 'both',
 'each',
 'few',
 'more',
 'most',
 'other',
 'some',
 'such',
 'no',
 'nor',
 'not',
 'only',
 'own',
 'same',
 'so',
 'than',
 'too',
 'very',
 'can',
 'will',
 'just',
 'don',
 'should',
 'now']


- Convert the words in splitRDD in lower case and then remove stop words from stop_words.
- Create a pair RDD tuple containing the word and the number 1 from each word element in splitRDD.
- Get the count of the number of occurrences of each word (word frequency) in the pair RDD using reduceByKey()


In [31]:
# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

In [34]:
splitRDD_no_stop_words.take(2)

[('Project', 1), ('Gutenberg', 1)]

- We are nearly ready to print the words and their frequencies.

### Print word frequencies
- After combining the values (counts) with the same key (word), we'll print the word frequencies using the take(N) action. We could have used the collect() action but as a best practice, it is not recommended as collect() returns all the elements from your RDD. We'll use take(N) instead, to return N elements from your RDD.
- What if we want to return the top 10 words? For this first, we'll need to swap the key (word) and values (counts) so that keys is count and value is the word. After we swap the key and value in the tuple, we'll sort the pair RDD based on the key (count) and print the top 10 words in descending order. 

In [33]:
# Display the first 10 words and their frequencies
for word in resultRDD.take(10):
    print(word)

# Swap the keys and values 
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
    print("{} has {} counts". format(word[1], word[0]))

('Project', 9)
('Gutenberg', 7)
('EBook', 1)
('Complete', 3)
('Works', 3)
('William', 11)
('Shakespeare,', 1)
('Shakespeare', 12)
('eBook', 2)
('use', 38)
thou has 650 counts
thy has 574 counts
shall has 393 counts
would has 311 counts
good has 295 counts
thee has 286 counts
love has 273 counts
Enter has 269 counts
th' has 254 counts
make has 225 counts


### RDD to DataFrame
- Similar to RDDs, DataFrames are immutable and distributed data structures in Spark. Even though RDDs are a fundamental data structure in Spark, working with data in DataFrame is easier than RDD most of the time and so understanding of how to convert RDD to DataFrame is necessary.
- make an RDD using the sample_list which contains the list of tuples ('Mona',20), ('Jennifer',34),('John',20), ('Jim',26) with each tuple contains the name of the person and their age. 
- Next, create a DataFrame using the RDD and the schema (which is the list of 'Name' and 'Age') and finally confirm the output as PySpark DataFrame.

In [37]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [38]:
# Create a list of tuples
sample_list = [('Mona',20), ('Jennifer',34), ('John',20), ('Jim',26)]

# Create a RDD from the list
rdd = sc.parallelize(sample_list)

# Create a PySpark DataFrame
names_df = spark.createDataFrame(rdd, schema=['Name', 'Age'])

# Check the type of names_df
print("The type of names_df is", type(names_df))

The type of names_df is <class 'pyspark.sql.dataframe.DataFrame'>


### Loading CSV into DataFrame
- loading data from CSV file is the most common method of creating DataFrames. 

In [40]:
file_path = 'data/people.csv'

# Create an DataFrame from file_path
people_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Check the type of people_df
print("The type of people_df is", type(people_df))

The type of people_df is <class 'pyspark.sql.dataframe.DataFrame'>


### Inspecting data in PySpark DataFrame
- Inspecting data is very crucial before performing analysis such as plotting, modeling, training etc.

In [41]:
# Print the first 10 observations 
people_df.show(10)

# Count the number of rows 
print("There are {} rows in the people_df DataFrame.".format(people_df.count()))

# Count the number of columns and their names
print("There are {} columns in the people_df DataFrame and their names are {}".format(len(people_df.columns), people_df.columns))

+---+---------+----------------+------+-------------+
|_c0|person_id|            name|   sex|date of birth|
+---+---------+----------------+------+-------------+
|  0|      100|  Penelope Lewis|female|   1990-08-31|
|  1|      101|   David Anthony|  male|   1971-10-14|
|  2|      102|       Ida Shipp|female|   1962-05-24|
|  3|      103|    Joanna Moore|female|   2017-03-10|
|  4|      104|  Lisandra Ortiz|female|   2020-08-05|
|  5|      105|   David Simmons|  male|   1999-12-30|
|  6|      106|   Edward Hudson|  male|   1983-05-09|
|  7|      107|    Albert Jones|  male|   1990-09-13|
|  8|      108|Leonard Cavender|  male|   1958-08-08|
|  9|      109|  Everett Vadala|  male|   2005-05-24|
+---+---------+----------------+------+-------------+
only showing top 10 rows

There are 100000 rows in the people_df DataFrame.
There are 5 columns in the people_df DataFrame and their names are ['_c0', 'person_id', 'name', 'sex', 'date of birth']


### PySpark DataFrame subsetting and cleaning
- After data inspection, it is often necessary to clean the data which mainly involves subsetting, renaming the columns, removing duplicated rows etc., PySpark DataFrame API provides several operators to do this.
- subset 'name', 'sex' and 'date of birth' columns from people_df DataFrame, remove any duplicate rows from that dataset and count the number of rows before and after duplicates removal step.

In [42]:
# Select name, sex and date of birth columns
people_df_sub = people_df.select('name', 'sex', 'date of birth')

# Print the first 10 observations from people_df_sub
people_df_sub.show(10)

# Remove duplicate entries from people_df_sub
people_df_sub_nodup = people_df_sub.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(people_df_sub.count(), people_df_sub_nodup.count()))

+----------------+------+-------------+
|            name|   sex|date of birth|
+----------------+------+-------------+
|  Penelope Lewis|female|   1990-08-31|
|   David Anthony|  male|   1971-10-14|
|       Ida Shipp|female|   1962-05-24|
|    Joanna Moore|female|   2017-03-10|
|  Lisandra Ortiz|female|   2020-08-05|
|   David Simmons|  male|   1999-12-30|
|   Edward Hudson|  male|   1983-05-09|
|    Albert Jones|  male|   1990-09-13|
|Leonard Cavender|  male|   1958-08-08|
|  Everett Vadala|  male|   2005-05-24|
+----------------+------+-------------+
only showing top 10 rows

There were 100000 rows before removing duplicates, and 99998 rows after removing duplicates


### Filtering  DataFrame
- What if we want to subset the DataFrame based on a condition (for example, select all rows where the sex is Female). - We will filter the rows in the people_df DataFrame in which 'sex' is female and male and create two different datasets. Finally, we'll count the number of rows in each of those datasets.

In [43]:
# Filter people_df to select females 
people_df_female = people_df.filter(people_df.sex == "female")

# Filter people_df to select males
people_df_male = people_df.filter(people_df.sex == "male")

# Count the number of rows 
print("There are {} rows in the people_df_female DataFrame and {} rows in the people_df_male DataFrame".format(people_df_female.count(), people_df_male.count()))

There are 49014 rows in the people_df_female DataFrame and 49066 rows in the people_df_male DataFrame
