# Spark Basics

This notebook will go over some simple PySpark tasks

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark

First, we will need to start the Spark Session

We will name the Spark Session, MyPySpark

We will also give 15 GB of memory to the Spark Driver Process.
By default, Spark only gives the Driver a few GB's. 

The SparkContext is created with the `spark` object.
We will limit the `sc` sparkContext object to give only ERROR messages.
If not, you may see a lot of INFO messages

In [2]:
spark = SparkSession.builder.appName("MyPySpark").config("spark.driver.memory", "15g").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/24 11:57:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

We can see the details of the Spark object.
The `local[*]` shows we will use all the CPU cores on this current compute node

Lets check the number of CPU cores Spark will be using. (This machine has 36 CPU cores)

In [4]:
sc.defaultParallelism

36

---

Lets create a random number list with 1000 numbers <br>Save the list to the variable `num_data`

In [5]:
import numpy as np

np.random.seed(42)

num_data = []
for i in range(0,200):
    num_tmp = np.random.randint(1,100)
    num_data.append(num_tmp)
    
#Print first 5 values from list
num_data[0:5]

[52, 93, 15, 72, 61]

___

Now, Let's create a RDD object from the num_data list

This RDD object will be saved as `num_map_rdd`

In [6]:
num_rdd = sc.parallelize(num_data)
type(num_rdd)

pyspark.rdd.RDD

---

Use .count() to count the number of elements in the list

In [7]:
num_rdd.count()

                                                                                

200

---

Use .map() on the RDD (Spark) list. 

This will create a new RDD object with the map function. This "lazy evaluation" will NOT compute the results so will NOT have the final value. 

This RDD object will just contain the "task" of running x^2 to be computed when you need it. 

This .map() function would be very quick since it will not compute x^2 over the list 

This new RDD object is saved as `num_map_rdd`

In [8]:
num_map_rdd = num_rdd.map(lambda x: x * x)
num_map_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

---

Now will will ask to print the first 5 values from x^2. 

Using .take(N) will return an array with the first N elements.

Spark will now compute the x^2 values since we asked for the values.<br>This will be quicker since the RDD object is parallized over 36 cores

In [9]:
num_map_rdd.take(5)

[2704, 8649, 225, 5184, 3721]

---

Lets use .filter() on the RDD object to return a new RDD object will only the numbers that pass the condition

In [10]:
num_filter_rdd = num_rdd.filter(lambda x: x < 10)
print("Number of values in new filter RDD: ", str(num_filter_rdd.count()))
print("First 5 values in the new RDD: ", str(num_filter_rdd.take(5)))

Number of values in new filter RDD:  28
First 5 values in the new RDD:  [3, 2, 2, 3, 7]


---

Use .collect() on the RDD will return all the elements of the RDD object to a normal Python list

In [11]:
num_filter_array = num_rdd.collect()
print(type(num_filter_array))
print(num_filter_array[0:5])

<class 'list'>
[52, 93, 15, 72, 61]


---

# Working with Spark and word text

In this example, we will use data from Project Gutenberg 

"The Hound of the Baskervilles, by Arthur Conan Doyle

The download TXT file is at `3070.txt`

We will load this TXT file to a RDD object with `.textFile()`

This will load the TXT by LINE

In [12]:
path = "3070.txt"
book_rdd = sc.textFile(path)

Print the first 10 lines of the Text

In [13]:
book_rdd.take(15)

["Project Gutenberg's The Hound of the Baskervilles, by Arthur Conan Doyle",
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '',
 'Title: The Hound of the Baskervilles',
 '',
 'Author: Arthur Conan Doyle',
 '',
 'Posting Date: October 10, 2010',
 'Release Date: February, 2002 [Etext #3070]',
 '']

Count the total number of lines in this text

<font size="5">Count the number of rows in the dataCount the number of rows in the data</font>

---

Lets split the lines into words

We will split the book text into the individual words.

The .flatMap() function can return multiple values for each element in the RDD

In [14]:
book_rdd.count()

7729

---

Now, we create a new "pair" RDD `key_value_rdd` with key/value pairs.

First, this RDD will have ("word",1) for each element of the RDD

In [15]:
words_rdd = book_rdd.flatMap(lambda x: x.split())
num_words = words_rdd.count()
print("Number of words: " + str(num_words))

num_distinct_words = words_rdd.distinct().count()
print("Number of distinct words: " + str(num_distinct_words))

Number of words: 62248
Number of distinct words: 9885


---

We will split the book text into the individual words. 

We will use .reduceByKey() to combine all the same words.

In [17]:
word_kv_rdd = key_value_rdd.reduceByKey(lambda x,y: x+y)
word_kv_rdd.take(5)

[('Project', 78), ('The', 217), ('Hound', 11), ('of', 1694), ('Arthur', 4)]

---

This will sort the key/value pairs by decreasing occurrences of words

First, we will need to flip the key/value pair to show (N, word) where N is the number of occurrences of word

In [18]:
flip_word_kv_rdd = word_kv_rdd.map(lambda x: (x[1],x[0]))
flip_word_kv_rdd.take(5)

[(78, 'Project'), (217, 'The'), (11, 'Hound'), (1694, 'of'), (4, 'Arthur')]

---

We will now use .sortByKey()

In [19]:
word_results_rdd = flip_word_kv_rdd.sortByKey(False)
word_results_rdd.take(5)

[(3230, 'the'), (1694, 'of'), (1562, 'and'), (1449, 'to'), (1279, 'a')]

---

There are many other functions within Spark that can do more linguistics type tasks with a RDD object. Like removing "stop" words and "stemming"

In [16]:
key_value_rdd = words_rdd.map(lambda x: (x,1))
key_value_rdd.take(5)

[('Project', 1), ("Gutenberg's", 1), ('The', 1), ('Hound', 1), ('of', 1)]