# SparkContext

Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. 

# SparkSession
The entry point to programming Spark with the DataFrame API.

# RDD
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
* immutable, partitioned collection of elements that can be operated on in parallel. 
* Optimized for Scala
* Track lineage information to efficiently recompute lost data

You construct RDDs
* by parallelizing existing collections
* by transforming ane existing RDD
* by reading files from storage

# DataFrame
The preferred abstraction in Spark
* Strongly typed collection of distributed elements
  * Built on Resilient Distributed Datasets
* _Immutable once constructed_
* Track lineage information to efficiently recompute lost data
* Enable operations on collection of elements in parallel
* Inspired by data frames in R and Python (Pandas)

You construct DataFrames
* by parallelizing existing collections (e.g., Pandas DataFrames) 
* by transforming an existing DataFrame
* by reading files from storage

Citation: Databricks main page (https://databricks.com/).

## Basic Spark Cluster Architecture
![](http://spark.apache.org/docs/1.1.0/img/cluster-overview.png)
* Master node and one or more slaves nodes. 
* The master node runs the driver program (your actual application) and distributes the work that needs to be done to the so called worker nodes (the slaves). 
* Within a worker node one or more executors can run. 
* An executor is a process that runs the actual computations (the tasks distributed by the master). Note that the executor has, in general, one or more cores available for performing its tasks.
* SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager or Mesos/YARN)

In [1]:
import sys
print(sys.version)
print(spark.version)

3.5.2 |Anaconda 4.2.0 (64-bit)| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
2.1.0


### Create RDDs from collections

In [10]:
arrayRDD = spark.sparkContext.parallelize(range(20))

In [11]:
arrayRDD.getNumPartitions()

8

### Transformations
create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
 
`glom()` returns an RDD created by coalescing all elements within each partition into a list.

In [12]:
arrayRDDByPartition = arrayRDD.glom()

In [13]:
arrayRDDByPartition

PythonRDD[6] at RDD at PythonRDD.scala:48

### Action

Actions are RDD operations that produce non-RDD values. They materialize a value in a Spark program. In other words, an RDD operation that returns a value of any type but `RDD[T]` is an action.

`collect()` is an action, but all the data should fit on memory.

In [10]:
arrayRDDByPartition.collect()

[[0, 1],
 [2, 3, 4],
 [5, 6],
 [7, 8, 9],
 [10, 11],
 [12, 13, 14],
 [15, 16],
 [17, 18, 19]]

# Pi Estimation with Resilient Distributed Datasets (RDDs)

A simple Monte Carlo simulation to approximate the value of  $\pi$  could involve randomly selecting points  ${(x_i,y_i)}_{i=1}^n$  in the unit square and determining the ratio  $f=\frac{m}{n}$,  where $m$ is number of points that satisfy  $x_i^2+y_2^2\leq1$.

Therefore, we need to define the sample size$n$, which in this case we assume 10,000,000. 

In [1]:
#define sample number, and distribute the array using parallelize
numSamples = 10000000
samplesDF = spark.sparkContext.parallelize(range(numSamples))

In [2]:
#show the first 10 values
samplesDF.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

## Mapping in Spark
`map( f(x) )` is a transformation that passes each dataset element through a function and returns a new RDD representing the results.
![](https://www.sharcnet.ca/~jnandez/map_pi.png)

In [3]:
#generate the Monte Carlo simulation
import random
random.seed(0)

def circle(value):
    x, y = random.random()*2 - 1, random.random() * 2 - 1, #find a random number for x and y between -1 and 1
    return 1 if x*x + y*y <=1 else 0 #check if x*x+y*y<=1

In [5]:
mapSamplesDF = samplesDF.map(circle)

In [6]:
mapSamplesDF.take(10)

[1, 1, 0, 0, 1, 1, 1, 1, 1, 0]

## Reducing the RDD
`reduce` is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program.

![](https://www.sharcnet.ca/~jnandez/reduce_pi.png)

In [7]:
count = mapSamplesDF.reduce(lambda x,y: x + y)

In [8]:
count

7851392

In [14]:
print("The value of PI is ",4.0 * count / numSamples)

The value of PI is  3.1405568


# Word Count in Spark

This example uses `DataFrame` since it is the most popular API in Spark. 

Goal: Read a (unstructured) text files and determine which words are more common in those files.

In [15]:
raw_data = spark.read.text("data/pg*.txt") #creates a dataframe when the files is read

`show()` is an action that displays 20 records by default in a short version, you can define the number of record and if you want to truncate the records or not

In [19]:
raw_data.show(5,truncate=False)

+----------------------------------------------------------------------------+
|value                                                                       |
+----------------------------------------------------------------------------+
|The Project Gutenberg EBook of The Complete Works of William Shakespeare, by|
|William Shakespeare                                                         |
|                                                                            |
|This eBook is for the use of anyone anywhere at no cost and with            |
|almost no restrictions whatsoever.  You may copy it, give it away or        |
+----------------------------------------------------------------------------+
only showing top 5 rows



Spark contains many functions that can manipulate columns. Check this page

* https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html

In [20]:
from pyspark.sql.functions import lower,col
lowerDF = raw_data.select( lower(col('value')) )

`col()` returns a `Column` based on the given column name. Otherwise, you will have to do,


`lowerDF = raw_data.select( lower( raw_data['value'] ) )`

In [18]:
lowerDF.show(5,truncate=False)

+----------------------------------------------------------------------------+
|lower(value)                                                                |
+----------------------------------------------------------------------------+
|the project gutenberg ebook of the complete works of william shakespeare, by|
|william shakespeare                                                         |
|                                                                            |
|this ebook is for the use of anyone anywhere at no cost and with            |
|almost no restrictions whatsoever.  you may copy it, give it away or        |
+----------------------------------------------------------------------------+
only showing top 5 rows



In [23]:
lowerDF = raw_data.select( lower(col('value')).alias('value') ) #alias() function changes the name of the Column
lowerDF.show(5,truncate=False)

+----------------------------------------------------------------------------+
|value                                                                       |
+----------------------------------------------------------------------------+
|the project gutenberg ebook of the complete works of william shakespeare, by|
|william shakespeare                                                         |
|                                                                            |
|this ebook is for the use of anyone anywhere at no cost and with            |
|almost no restrictions whatsoever.  you may copy it, give it away or        |
+----------------------------------------------------------------------------+
only showing top 5 rows



#### Regular Expressions in Spark
`regexp_replace(Column e, Column pattern, Column replacement)` replaces all substrings of the specified string value that match regexp with rep. 

A [regular expression](https://en.wikipedia.org/wiki/Regular_expression) is a sequence of characters that define a search pattern. 


Check this website for a course in regular expressions:

* http://v4.software-carpentry.org/regexp/operators.html

In [24]:
from pyspark.sql.functions import regexp_replace

noPunctuationDF = lowerDF.select( regexp_replace(col('value'),'\p{P}','').alias('value') )
noPunctuationDF.show(5,truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|the project gutenberg ebook of the complete works of william shakespeare by|
|william shakespeare                                                        |
|                                                                           |
|this ebook is for the use of anyone anywhere at no cost and with           |
|almost no restrictions whatsoever  you may copy it give it away or         |
+---------------------------------------------------------------------------+
only showing top 5 rows



In [25]:
from pyspark.sql.functions import trim #Trim the spaces from both ends for the specified string column. 

noEndSpaceDF = noPunctuationDF.select( trim(col('value')).alias('value') )
noEndSpaceDF.show(5,truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|the project gutenberg ebook of the complete works of william shakespeare by|
|william shakespeare                                                        |
|                                                                           |
|this ebook is for the use of anyone anywhere at no cost and with           |
|almost no restrictions whatsoever  you may copy it give it away or         |
+---------------------------------------------------------------------------+
only showing top 5 rows



Now that our sentences are clean, we can get each word in each sentence. This task is done by splitting the sentence using the blank spaces. We use the function

`split(Column str, String pattern)` splits str around pattern (pattern is a regular expression).

In [27]:
from pyspark.sql.functions import split

splitValueDF = noEndSpaceDF.select( split(col("value"),'\W+').alias("words") ) 
#\w+ matches one or more word characters
splitValueDF.show(5,truncate=False)

+----------------------------------------------------------------------------------------+
|words                                                                                   |
+----------------------------------------------------------------------------------------+
|[the, project, gutenberg, ebook, of, the, complete, works, of, william, shakespeare, by]|
|[william, shakespeare]                                                                  |
|[]                                                                                      |
|[this, ebook, is, for, the, use, of, anyone, anywhere, at, no, cost, and, with]         |
|[almost, no, restrictions, whatsoever, you, may, copy, it, give, it, away, or]          |
+----------------------------------------------------------------------------------------+
only showing top 5 rows



In [28]:
from pyspark.sql.functions import explode
#explode() creates a new row for each element in the given array or map column. 
wordsDF = splitValueDF.select( explode(col("words")).alias('word') )
wordsDF.show(5,truncate=False)

+---------+
|word     |
+---------+
|the      |
|project  |
|gutenberg|
|ebook    |
|of       |
+---------+
only showing top 5 rows



We can count how many words we have in the `DataFrame` using the action `count()`

In [29]:
wordsDF.count()

1184180

We have our long list of words, some are very likely to be repeated. If we want to know how many times a word repeats in the `DataFrame`, we need to group by word, and then count them. We use `groupBy()` function to group by `word`

In [30]:
groupedWordsDF = wordsDF.groupBy("word")

In [31]:
groupedWordsDF.show()

AttributeError: 'GroupedData' object has no attribute 'show'

#### What does this means?
[GroupedData](https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html#GroupedData(org.apache.spark.sql.DataFrame,%20scala.collection.Seq,%20org.apache.spark.sql.GroupedData.GroupType)) type is not a `DataFrame` is a set of methods for aggregations on a DataFrame, created by DataFrame.groupBy.

In [33]:
groupedWordsDF

<pyspark.sql.group.GroupedData at 0x7f74e24d0748>

In [34]:
# we can use the count() aggregation method and create a DataFrame
groupedWordsDF.count().show(5,truncate=False)

+----------------------+-----+
|word                  |count|
+----------------------+-----+
|online                |24   |
|those                 |722  |
|some                  |1799 |
|726002026compuservecom|1    |
|art                   |925  |
+----------------------+-----+
only showing top 5 rows



In [35]:
from pyspark.sql.functions import desc # we want to have the result in a descending order.
topWordsDF = groupedWordsDF.count().orderBy(desc("count"))

In [36]:
topWordsDF.show(10,truncate=False)

+----+-----+
|word|count|
+----+-----+
|the |39697|
|and |35054|
|to  |27325|
|i   |27058|
|of  |26126|
|a   |19454|
|you |16922|
|    |15859|
|my  |15254|
|in  |15071|
+----+-----+
only showing top 10 rows



## How to shrink what we did?

It is not difficult to shrink what we just did in few lines of code. Let's start by defining a `Python` function for removing the punctuation. Then get the wordDF, and finally give the final result. Note that we forgot to remove all white spaces.

In [37]:
from pyspark.sql.functions import split,explode,lower,trim,regexp_replace,col,udf
def removePunctuation(column):
    return lower( trim( regexp_replace(column,"\p{P}",'') ) ).alias("sentence")

In [38]:
wordsDF = (raw_data.select( explode(split(removePunctuation('value'),'\W+')).alias("word"))
                  .filter(col("word")!= '')
           )

In [39]:
(wordsDF.groupBy("word")
        .count()
        .orderBy(desc("count"))
 ).show(10)

+----+-----+
|word|count|
+----+-----+
| the|39697|
| and|35054|
|  to|27325|
|   i|27058|
|  of|26126|
|   a|19454|
| you|16922|
|  my|15254|
|  in|15071|
|that|14495|
+----+-----+
only showing top 10 rows



### Exercise: What are the least common words?

In [40]:
(wordsDF.groupBy("word")
        .count()
        .orderBy("count")
 ).show(10)

+---------------+-----+
|           word|count|
+---------------+-----+
|       mockable|    1|
|         slaver|    1|
|          bushs|    1|
|      cleopatpa|    1|
|       jugglers|    1|
|      soundness|    1|
|distemperatures|    1|
|     unswayable|    1|
|  fortuneteller|    1|
|       forspoke|    1|
+---------------+-----+
only showing top 10 rows



# removing StopWords

In [41]:
with open("data/stopwords.txt","r") as f:
    stopwords = set([line.strip() for line in f])

In [42]:
def removeStopWords(tokens):
    return [token for token in tokens if token not in stopwords]

In [43]:
testSentence = "I am learning Spark in the COSS"
removeStopWords(testSentence.split(" "))

['I', 'learning', 'Spark', 'COSS']

In [44]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,StringType

In [45]:
splitDF = raw_data.select( split(removePunctuation('value'),'\W+').alias("word") )

In [46]:
splitDF.show(5,truncate=False)

+----------------------------------------------------------------------------------------+
|word                                                                                    |
+----------------------------------------------------------------------------------------+
|[the, project, gutenberg, ebook, of, the, complete, works, of, william, shakespeare, by]|
|[william, shakespeare]                                                                  |
|[]                                                                                      |
|[this, ebook, is, for, the, use, of, anyone, anywhere, at, no, cost, and, with]         |
|[almost, no, restrictions, whatsoever, you, may, copy, it, give, it, away, or]          |
+----------------------------------------------------------------------------------------+
only showing top 5 rows



In [47]:
removeStopWordsUDF = udf(removeStopWords,ArrayType(StringType()))

In [48]:
explodedNoStopWordsDF = (splitDF.select( explode(removeStopWordsUDF("word")).alias("word") )
                                .filter(col("word")!="")
                         )

In [49]:
explodedNoStopWordsDF.show(5)

+---------+
|     word|
+---------+
|  project|
|gutenberg|
|    ebook|
| complete|
|    works|
+---------+
only showing top 5 rows



In [50]:
explodedNoStopWordsDF.groupBy("word").count().orderBy(desc("count")).show(10)

+-----+-----+
| word|count|
+-----+-----+
| thou| 5499|
|  thy| 4050|
|shall| 3913|
|  now| 3257|
| thee| 3188|
| good| 3110|
| lord| 3079|
| king| 2933|
|  sir| 2876|
| well| 2856|
+-----+-----+
only showing top 10 rows



Exercise: What are the least common words?

In [51]:
(explodedNoStopWordsDF.groupBy("word")
        .count()
        .orderBy("count")
 ).show(10)

+------------+-----+
|        word|count|
+------------+-----+
|       shank|    1|
|      papist|    1|
|pibblepabble|    1|
|antipholuses|    1|
|   registers|    1|
|     designd|    1|
|        mede|    1|
|     porches|    1|
|  pastsaving|    1|
|       exion|    1|
+------------+-----+
only showing top 10 rows

