<a href="https://colab.research.google.com/github/ohmono/clustering-spark/blob/main/7_2_introduction_to_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<p><img alt="udeA logo" height="120px" src="https://github.com/freddyduitama/images/blob/master/logo.png?raw=true" align="left" hspace="10px" vspace="0px" style="width:107px;height:152px;"></p>

# <center> <font color='0B5345'> Introduction to SPARK. </font> </center>
<font  face="Courier New" size="2">
<center>Prof. John Freddy Duitama Muñoz Ph.D.</center>
<center>Prof. Mario Giraldo. Msc.</center>

<center><font face="Verdana"><a  href="https://colab.research.google.com/drive/1YoFxA7cugsnKyJZ_LKWn3I3Ce_AIV_hl#scrollTo=Im-rLP7aPM5p">Extensions of MapReduce</a> <a>&nbsp;&nbsp; | </a><a  href="https://colab.research.google.com/drive/1SgcWf2Z6jSYh5qcxOZL-QU6QZhpY87Nj#scrollTo=wUijAhrCm0-h" >TOC</a><a>&nbsp;&nbsp;    |</a> <a>&nbsp;&nbsp;</a> <a   href="https://colab.research.google.com/drive/13zDbsSAVDTwVtO9LN-Zj47QTr6FWulk1">DataFrames and Structured Data</a></font><center>


##1. The SPARK Framework.
<p align="justify"><font face="Verdana" size="2.5">
This section introduces the SPARK low level API.
<center><img src="https://github.com/freddyduitama/images/blob/master/spark-framework.png?raw=true"  height="200" width="400"></center>
<caption><center><font color='0B5345'> <u> <b>Figure 1:</b><br> </u>Spark Tools</font></center></caption>
This lecture covers the low-level API and the most relevant RDD operations.
</p>



## <font color='0B5345'>1.1 Set the SPARK enviroment.</font>
<p align="justify"><font face="Verdana" size="2.5">
Visit this URL <a href="http://apache.osuosl.org/spark/">click  </a>to verify the last SPARK and hadoop version. You must update lines 2 and 3 with the last version
</p>

In [None]:
# Install the SPARK framework . It only must be executed once.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://apache.osuosl.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
#set the OS environment variables.
import os
from os.path import join, abspath
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [None]:
#import pyspark package
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName("ejemplo").setMaster("local[*]")
sc = SparkContext(conf=conf)

### 1.2. Mount google drive as file systems

In [None]:
# mount your google driver
from google.colab import drive
drive.mount('/gdrive', force_remount=True)

Mounted at /gdrive


In [None]:
#Verify your colab directory in your google drive. You must set the path.
!ls -l '/gdrive/My Drive/Colab Notebooks/algorithms-for-big data/lectures/data'
path='/gdrive/My Drive/Colab Notebooks/algorithms-for-big data/lectures/data'

total 17862
-rw------- 1 root root     7294 Mar  7  2019  2015-summary.csv
-rw------- 1 root root    21622 Mar  6  2019  2015-summary.json
-rw------- 1 root root 10327345 Apr 28  2021  auth.log
-rw------- 1 root root     8910 May  3  2021  barrios.csv
-rw------- 1 root root   143268 Oct  3  2022  california_housing_test1.csv
-rw------- 1 root root   159170 Oct  3  2022  california_housing_test.csv
-rw------- 1 root root       24 Mar  5  2019  clave-valor.txt
-rw------- 1 root root  3602146 May  3  2021  clientes.csv
-rw------- 1 root root   670741 Mar  5  2019  comments.txt
-rw------- 1 root root       50 Mar  5  2019  crear-clave.txt
-rw------- 1 root root      179 Apr 17  2021  data.gdoc
-rw------- 1 root root       97 Apr 17  2021  data.txt
-rw------- 1 root root      179 Nov  8  2021  datos.gdoc
-rw------- 1 root root      440 Apr 23  2021  datos.json
-rw------- 1 root root       91 Apr 16  2021  datos-old.txt
-rw------- 1 root root      180 Nov  8  2021  datos.txt
-rw------- 1 roo

In [None]:
#  INST A: Opcional..si quiere subir archivos al ambiente de trabajo desde su PC
#from google.colab import files
#datafile = files.upload()

## 2. RDD Operations: Transformations and actions.
<p align="justify"><font face="Verdana" size="2.5">
<b>2.1. Transformations: </b> <em>map</font></em> vs <em>flatMap</em><br>
<ul align="justify"><font face="Verdana" size="2.5">
<li> <font color='0B5345'><b>map(func):</b></font> 	Return a new distributed dataset formed by passing each element of the source through a function <em>func</em>. </li>
<li> <font color='0B5345'><b>flatMap(func):</b></font> Like <em>map</em> operation, but each input element can be mapped to 0 or more output elements. As result, for each element,  <em>func</em> should return a <em>Seq</em> rather than a single element. </li>
</ul>
<a>Hint:</a> Each input element can be viewed as a record in a file. This record contains a string.
</p>


<center><img src="https://github.com/freddyduitama/images/blob/master/map-flatmap.png?raw=true"  height="200" width="400"></center>
<caption><center><font color='0B5345'> <u> <b>Figure 2:</b><br> </u>Map ad flatMap transformations.</font></center></caption>


<p align="justify"><font face="Verdana" size="2.5">
<b>2.2. Actions:</b> <em>collect</em> vs <em>saveAsTextFile</em><br>
<ul align="justify">
<li> <font color='0B5345'><b>collect():</b></font> Return all the elements of the dataset as an array at the driver program. This is usually useful after a <em>filter</em> or other operation that returns a sufficiently small subset of the data. </li>
<li> <font color='0B534'><b>saveAsTextFile(path):</b></font> Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call the <em>toString</em> function on each element to convert it to a line of text in the file. </li>


<font face="Verdana" size="2.5">
To download file1.txt <a href="https://drive.google.com/file/d/1QzWerWjtZqmqNgFjBu8zE3imVUsDO-GH/view?usp=sharing">click</a>

In [None]:
# sc.textFile read each RDD entry as a string and create 4-partition RDD:
RDD1 = sc.textFile(path+'/file11.txt',4)

In [None]:
# take six first rows in RDD
RDD1.take(6)

['coffee panda',
 'happy panda',
 'happiest panda party',
 'coffee panda party',
 'happy panda panda',
 'happiest panda party']

In [None]:
# split each row in RDD using " "
RDD1.map(lambda line:  line.split(" ")).collect()

[['coffee', 'panda'],
 ['happy', 'panda'],
 ['happiest', 'panda', 'party'],
 ['coffee', 'panda', 'party'],
 ['happy', 'panda', 'panda'],
 ['happiest', 'panda', 'party']]

In [None]:
RDD1.flatMap(lambda line:  line.split(" ")).collect()

['coffee',
 'panda',
 'happy',
 'panda',
 'happiest',
 'panda',
 'party',
 'coffee',
 'panda',
 'party',
 'happy',
 'panda',
 'panda',
 'happiest',
 'panda',
 'party']

<p align="jsutify"><font face="Verdana" size="2.5">
<b>2.3. Transformation:</b> Filter, distinct, sample<br>

- <font color='0B5345'>**filter(func):**</font> Return a new dataset formed by selecting those elements of the source on which *func* returns true.
- <font color='0B5345'>**distinct():**</font> Return a new dataset that contains the distinct elements of the source dataset.
- <font color='0B5345'>**sample(withReplacement,fraction,seed):**</font> Sample a fraction *fraction* of the data, with or without replacement, using a given random number generator seed.



<p alin="justify"><font face="Verdana" size="2.5">
<b>2.4. Action:</b>count()<br>

- <font color='0B5345'>**count():**</font> Return the number of elements in the dataset.

<p alin="justify"><font face="Verdana" size="2.5">
To download comments.txt <a href="https://drive.google.com/file/d/1_QPMsYlpWtzcTOZMZB4I9Z-ct8WMxqV4/view?usp=sharing">click</a>

In [None]:
#  The code in this example uses the filter transformation to count how many comment lines has the file comments.txt
sc.textFile(path+'/comments.txt',4).filter(lambda x : "#" in  x).count()

2325

In [None]:
sc.textFile(path+'/comments.txt',4).count()

12482

In [None]:
sc.textFile(path+'/comments.txt').sample(False,0.1,57).count()

1245

In [None]:
sc.textFile(path+'/comments.txt').sample(False,0.1,57).take(4)

['',
 'take delight in vexing me. You have no compassion for my poor nerves.#',
 '',
 '#But I hope you will get over it, and live to see many young men of four']

## 3. Some Key-Value Transformations and Actions

<p align="justify"><font face="Verdana" size="2.5">
<b>3.1.  Transformation:</b> reduceByKey vs groupByKey<br>

- <font color='0B5345'>**reduceByKey(func, [numPartitions]):**</font> When called on a dataset of $(K, V)$ pairs, returns a dataset of $(K, V)$ pairs where the values for each key are aggregated using the given reduce function *func*, which must be of type $(V,V) => V$. The number of reduce tasks is configurable through an optional second argument *numPartitions*.
<p align="justify"><font face="Verdana" size="2.5">
 - <b>Key issue:</b> the <em>reduceByKey</em> transformation implements a <em>map side combiner</em> which performs some aggregation in map side memory.
- <font color='0B5345'>**groupByKey([numPartitions]):**</font> When called on a dataset of $(K, V)$ pairs, returns a dataset of *(K, Iterable<V>)* pairs.
<p align="justify"><font face="Verdana" size="2.5">
  - if you are grouping in order to perform an aggregation (such as a sum or average) over each key, using <em>reduceByKey</em> or <em>aggregateByKey</em> will yield much better performance.
  - By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional *numPartitions* argument to set a different number of tasks.
  
<p align="justify"><font face="Verdana" size="2.5">
<a>Remark: </a> The Action <em>take(n)</em></a> take <em>n<em> first elements in RDD.

<center><img src="https://github.com/freddyduitama/images/blob/master/groupby-reduceby.png?raw=true"  height="300" width="900"></center>
<caption><center><font color='0B5345'> <u> <b>Figure 3:</b><br> </u>key-value transformations.</font></center></caption><br>


<p alin="justify"><font face="Verdana" size="2.5">
To download the file Pride_and_Prejudice.txt <a href="https://drive.google.com/file/d/1w6eCJtPjNZLADMoOGKyRNtpcKSECUsDb/view?usp=sharing">click</a>

In [None]:
# WordCount program.
# 1) flatMap tokenizes each word into the lines.
# 2) map builds <key,value> pairs.
# 3) reduceBykey  sum values by key
sc.textFile(path+'/Pride_and_Prejudice.txt',4) \
                                            .flatMap(lambda line: line.split(" ")) \
                                            .map(lambda word: (word, 1))    \
                                            .reduceByKey(lambda a, b: a + b).take(20)

[('PRIDE', 1),
 ('', 2465),
 ('It', 198),
 ('acknowledged,', 7),
 ('in', 1759),
 ('of', 3554),
 ('good', 153),
 ('must', 298),
 ('known', 47),
 ('may', 176),
 ('neighbourhood,', 11),
 ('he', 1036),
 ('considered', 20),
 ('other', 157),
 ('daughters.', 9),
 ('Bennet,"', 8),
 ('lady', 36),
 ('let', 45),
 ('not.', 17),
 ('she;', 2)]

<p align="justify"><font face="Verdana" size="2.5">
<b>3.2.  Transformation:</b> join<br>

- <font color='0B5345'>**join(otherDataset, [numPartitions]):**</font> When called on datasets of type $(K, V)$ and $(K, W)$, returns a dataset of $(K, (V, W))$ pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

<p align="justify"><font face="Verdana" size="2.5">

To download matriz-M.txt <a href="https://drive.google.com/file/d/1OMWbxn5uUq20hTRSzZDzWl2rB2jA3Jmh/view?usp=share_link">click</a>. <em>Format:</em> row, column, value.<br>
To download matrix-N.txt <a href="https://drive.google.com/file/d/12lijk_X27SbMPzNYcftzzHVqtk4TDxmM/view?usp=share_link">click</a>. <em>Format:</em> row, column, value.

<center><img src="https://github.com/freddyduitama/images/blob/master/matrix-matrix.png?raw=true"   align="left" height="150" width="450"><center>

<font color='0B5345'>
<img src="https://github.com/freddyduitama/images/blob/master/matrix-matrix-1.png?raw=true"   align="roght" height="250" width="250">

<caption><center><font color='0B5345'> <b>Figure 5: </b><br>Matrix multiplication.</font></center></caption>  
</p>

In [None]:
# Matrix multiplication.
# See Algorithms using MapReduce, lecture 4.2. section 5.
MatrixM = sc.textFile(path+'/matrix-M.txt')
MatrixN = sc.textFile(path+'/matrix-N.txt')
# Prepare join input.  Map operation build (key,values) input to join.
M=MatrixM.map(lambda record :record.split(",")).map(lambda item : (item[1],(item[0],item[2])))
N=MatrixN.map(lambda record : record.split(",")).map(lambda item : (item[0],(item[1],item[2])))
# execute join and group by.
M.join(N).map(lambda x : ((x[1][0][0],x[1][1][0]), int(x[1][0][1]) * int(x[1][1][1]) )).reduceByKey(lambda a,b:a +b).sortByKey().take(15)

[(('1', '1'), 2),
 (('1', '2'), 4),
 (('2', '1'), 7),
 (('2', '2'), 5),
 (('3', '1'), 4),
 (('3', '2'), 4)]

In [None]:
!ls '/gdrive/My Drive/Colab Notebooks/algorithms-for-big data/lectures/data/matrix-M.txt'

'/gdrive/My Drive/Colab Notebooks/algorithms-for-big data/lectures/data/matrix-M.txt'


In [None]:
MatrixM = sc.textFile(path+'/matrix-M.txt')
MatrixM.take(2)

['1,1,1 ', '1,3,2']

In [None]:
#step by step
MatrixM.map(lambda record :record.split(",")).take(4)

[['1', '1', '1 '], ['1', '3', '2'], ['2', '1', '3 '], ['2', '2', '1 ']]

In [None]:
# step by step
MatrixM.map(lambda record :record.split(",")).map(lambda item:(item[1],(item[0],item[2]))).take(4)

[('1', ('1', '1 ')), ('3', ('1', '2')), ('1', ('2', '3 ')), ('2', ('2', '1 '))]

In [None]:
M.take(4)

[('1', ('1', '1 ')), ('3', ('1', '2')), ('1', ('2', '3 ')), ('2', ('2', '1 '))]

In [None]:
N.take(6)

[('1', ('1', '2 ')), ('2', ('1', '1')), ('2', ('2', '1')), ('3', ('2', '2'))]

In [None]:
M.join(N).sortByKey().take(8)

[('1', (('1', '1 '), ('1', '2 '))),
 ('1', (('2', '3 '), ('1', '2 '))),
 ('1', (('3', '1 '), ('1', '2 '))),
 ('2', (('2', '1 '), ('1', '1'))),
 ('2', (('2', '1 '), ('2', '1'))),
 ('2', (('3', '2 '), ('1', '1'))),
 ('2', (('3', '2 '), ('2', '1'))),
 ('3', (('1', '2'), ('2', '2')))]

In [None]:
M.join(N).map(lambda x : ((x[1][0][0],x[1][1][0]), int(x[1][0][1]) * int(x[1][1][1]) )).sortByKey().take(8)

[(('1', '1'), 2),
 (('1', '2'), 4),
 (('2', '1'), 6),
 (('2', '1'), 1),
 (('2', '2'), 4),
 (('2', '2'), 1),
 (('3', '1'), 2),
 (('3', '1'), 2)]

In [None]:
M.join(N).map(lambda x : ((x[1][0][0],x[1][1][0]), int(x[1][0][1]) * int(x[1][1][1]) )).reduceByKey(lambda a,b:a +b).sortByKey().take(8)

[(('1', '1'), 2),
 (('1', '2'), 4),
 (('2', '1'), 7),
 (('2', '2'), 5),
 (('3', '1'), 4),
 (('3', '2'), 4)]

## 4. Using user-defined function as argument.
<p align="justify"><font face="Verdana" size="2.5">
To download *Pride_and_Prejudice.txt* <a href="https://drive.google.com/file/d/1w6eCJtPjNZLADMoOGKyRNtpcKSECUsDb/view?usp=sharing">click</a>.

In [None]:
# define función to be used in flatMap transformation.
import re, string
def uni_to_clean_str(x):
    converted = x.encode('utf-8')                                               #  Universal Coded Character Set to be used.
    punc ='!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~'                                   # characters to be deleted from input string.
    fill = '                               '                                    # blank string
    mytable = x.maketrans(punc,fill)                                            # Create a mapping table
    lowercased_str = x.lower()                                                  # convert string x to lower case
    lowercased_str = lowercased_str.replace('--',' ')                           # replace string -- with ' '
    clean_str = lowercased_str.translate(mytable)                               # translate() replaces any "S" characters with a "P" character in mytable
    #print("Clean str ", clean_str)
    return clean_str

In [None]:
# Improved WordCount version.
#  It uses the function  uni_to_clean_str to remove special characters in text files.
one_RDD=sc.textFile(path+'/comments.txt') \
                                              .flatMap(lambda x: uni_to_clean_str(x)  \
                                              .split()).map(lambda x: (x,1)) \
                                              .reduceByKey(lambda x,y: x + y)

print(one_RDD.take(55))

[('grown-up', 1), ('daughters', 45), ('ought', 40), ('give', 122), ('of', 3472), ('her', 2168), ('own', 176), ('beauty', 24), ('in', 1792), ('cases', 5), ('think', 203), ('but', 959), ('must', 302), ('indeed', 91), ('go', 102), ('mr', 738), ('when', 360), ('he', 1239), ('into', 139), ('neighbourhood', 27), ('is', 813), ('more', 308), ('than', 271), ('i', 1989), ('engage', 5), ('assure', 38), ('consider', 31), ('only', 205), ('an', 340), ('would', 454), ('them', 420), ('lady', 186), ('are', 324), ('determined', 32), ('account', 38), ('know', 224), ('visit', 42), ('no', 474), ('newcomers', 1), ('impossible', 41), ('us', 118), ('do', 329), ('surely', 4), ('dare', 37), ('say', 153), ('very', 464), ('send', 23), ('his', 1194), ('girls', 38), ('though', 214), ('throw', 8), ('good', 173), ('others', 49), ('am', 311), ('sure', 97)]


In [None]:
sc.textFile(path+'/comments.txt').flatMap(lambda x: uni_to_clean_str(x)).take(5)

['g', 'r', 'o', 'w', 'n']

## 5. RDD persistence.


<center><img src="https://github.com/freddyduitama/images/blob/master/lineage-graph.png?raw=true"  height="150" width="400"></center>
  <caption><center><font color='0B5345'> <u> <b>Figure 5:</b><br> </u>Lineage graph.</font></center></caption>
<p align="justify"><font face="Verdana" size="2.5">

- By default, each time you run an action every transformed RDD in the lineage graph must be recomputed.
- <em>Persistence</em>  is an optimization technique in which the intermediate result of the evaluation of a lineage graph (transformed RDD) is saved in memory or disk to be able to use it later. In this way, we can use some transformed RDD’s multiple times in other to decrease the computation overhead.
- We can make persisted RDD through **cache()** and **persist()** methods.
Using **cache()** the default storage level is *MEMORY_ONLY*, using **persist()** we can use various storage levels: *MEMORY_ONLY*, *MEMORY_ONLY_2*, *MEMORY_AND_DISK*, *MEMORY_AND_DISK_2*, *DISK_ONLY*, *DISK_ONLY_2*, and *DISK_ONLY_3*.
- Spark also automatically persists some intermediate data in shuffle operations (e.g. <em>reduceByKey</em>), even without users calling persist
- <a>Hint: </a> In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level.


<p align="justify"><font face="Verdana" size="2.5">
To download the file  <em>movie.txt</em> <a href="https://drive.google.com/file/d/1blzGJH1zV-Nh754vikV1CIIgCBPnBGf7/view?usp=sharing">  Click</a><br>
To download the file <em>movierating.txt</em> <a href="https://drive.google.com/file/d/1vVSv8kFIcmMw1JNM1XFfIBXjATyMPaue/view?usp=sharing">  Click</a>

In [None]:
### Function to extract the movie rating data from the input file
def extractMovie(line):
    val = line.strip()                                                            # remove  whitespaces in string
    (moveid, name, year) = val.split("|")                                         # create record with three elements.
    return (moveid, name)

### Function to extract the movie data (not the rating) from the input file
def extractMovieRating(line):
    val = line.strip()                                                              # remove  whitespaces in string
    (userid, movieid, rating) = val.split("|")                                      # create record with three elements.
    return (movieid, rating)

In [None]:
# path to input files
file_movie = path+"/movie.txt"
file_rating = path+"/movierating.txt"

In [None]:
### Create RDDs from the input data
movie = sc.textFile(file_movie)                                                 # RDD movies
movieRatings = sc.textFile(file_rating)                                         # RDD movie ratings
print("a film (move-id, name, year) :", movie.first())
print(" a rating (user-id, move-id, rating): ", movieRatings.first())

a film (move-id, name, year) : 1|Avatar|2009
 a rating (user-id, move-id, rating):  1|1|1


In [None]:
print(movie.getStorageLevel())

Serialized 1x Replicated


### Without persistence

In [None]:
### sum the movie ratings, group by movies.
movieRatingsAggregated = movieRatings.map(extractMovieRating).reduceByKey(lambda a, b: int(a)+int(b))
movieRatingsAggregated.take(4)

[('1', 232), ('4', 214), ('2', 203), ('3', 157)]

In [None]:
movie.map(extractMovie).take(4)

[('1', 'Avatar'),
 ('2', 'TwoWeeksNotice'),
 ('3', 'Gravity'),
 ('4', 'FastAndFurious')]

In [None]:
### Join the aggregated movie ratings and the movie data
movieSortedTopList = movie.map(extractMovie).join(movieRatingsAggregated).map(lambda a: a[1]).map(lambda a: (a[1],a[0])).sortByKey(ascending=False)
movieSortedTopList.take(5)

[(232, 'Avatar'),
 (214, 'FastAndFurious'),
 (203, 'TwoWeeksNotice'),
 (199, 'TheIncredibles'),
 (196, 'TheLionKing')]

In [None]:
print(movieSortedTopList.getStorageLevel())

Serialized 1x Replicated


### With persistence

In [None]:
### sum the movie ratings, group by movies and cache the RDD
from pyspark import StorageLevel
movieRatingsAggregated = movieRatings.map(extractMovieRating).reduceByKey(lambda a, b: int(a)+int(b))
movieRatingsAggregated.cache()
print(movieRatingsAggregated.getStorageLevel())

Memory Serialized 1x Replicated


In [None]:
### Join the aggregated movie ratings and the movie data
movieSortedTopList = movie.map(extractMovie).join(movieRatingsAggregated).map(lambda a: a[1]).map(lambda a: (a[1],a[0]))
movieSortedTopList.persist(StorageLevel.MEMORY_AND_DISK_2).take(3)
print(movieSortedTopList.getStorageLevel())

Disk Memory Serialized 2x Replicated


In [None]:
### Sorted movie for ratings
top_order=movieSortedTopList.sortByKey(ascending=False)
top_order.take(5)

[(232, 'Avatar'),
 (214, 'FastAndFurious'),
 (203, 'TwoWeeksNotice'),
 (199, 'TheIncredibles'),
 (196, 'TheLionKing')]

In [None]:
### All blocks are deleted of the persistence.
movieSortedTopList.unpersist
movieRatingsAggregated.unpersist

<bound method RDD.unpersist of PythonRDD[149] at RDD at PythonRDD.scala:53>

## 6. Partition Management.
<p align="justify"><font face="Verdana" size="2.5">

- <font color='0B5345'>**sc.textFile(path,[numPartitions]):**</font> <em>numPartitions</em> defines the number of RDD partition.
- <font color='0B5345'>**coalesce(numPartitions):**</font> Decrease the number of partitions in the RDD to <em>numPartitions</em>. Useful for running operations more efficiently after filtering down a large dataset.
- <font color='0B5345'>**repartitionAndSortWithinPartitions(partitioner)**</font> Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling **repartition** and then sorting within each partition because it can push the sorting down into the shuffle machinery.
- <font color='0B5345'>**RDD.glom()**</font> Return an RDD created by coalescing all elements within each partition into a list.

<p align="justify"><font face="Verdana" size="2.5">
To download *test.txt* <a href="https://drive.google.com/file/d/1NM8-b1XhlOvw3Rnk5DXX61UiUoohxFvK/view?usp=sharing">click</a>.

In [None]:
#For  building an RDD from file, having 4 partitions.  By default, RDD partitions are equal to input (file) partitions.
one_RDD=sc.textFile(path+'/test.txt',4)
one_RDD.getNumPartitions()

4

In [None]:
one_RDD.take(2)

['AAAAAAAAAAAAGAGCACAC 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 46 11 9 19 29 19 0 20 7 26 14 25 12 31 26 20 10 29 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 7 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0',
 'AAAAAAAAAAAGAGCACACA 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 30 6 11 23 25 10 0 16 7 17 9 17 11 19 18 14 8 16 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 

In [None]:
# Funtion to delete 0's entries in file.
def build_record(x):
    key = x[0]
    rec = []
    for i in range(len(x) -1):
            if int(x[i+1]) > 0 :
               rec.append((i+1,x[i+1]))
    return (key,rec)

In [None]:
# delete 0's entries in RDD
one_RDD.map( lambda x : x.split(" ")).map(lambda x : build_record(x)).take(2)

[('AAAAAAAAAAAAGAGCACAC',
  [(97, '46'),
   (98, '11'),
   (99, '9'),
   (100, '19'),
   (101, '29'),
   (102, '19'),
   (104, '20'),
   (105, '7'),
   (106, '26'),
   (107, '14'),
   (108, '25'),
   (109, '12'),
   (110, '31'),
   (111, '26'),
   (112, '20'),
   (113, '10'),
   (114, '29'),
   (171, '7')]),
 ('AAAAAAAAAAAGAGCACACA',
  [(97, '30'),
   (98, '6'),
   (99, '11'),
   (100, '23'),
   (101, '25'),
   (102, '10'),
   (104, '16'),
   (105, '7'),
   (106, '17'),
   (107, '9'),
   (108, '17'),
   (109, '11'),
   (110, '19'),
   (111, '18'),
   (112, '14'),
   (113, '8'),
   (114, '16')])]

In [None]:
one_RDD.getNumPartitions()

4

In [None]:
# Reduce  the number of partitions
one_RDD.coalesce(2).getNumPartitions()

2

In [None]:
# build an RDD with 2 partitions and sort elements in each one.
rdd = sc.parallelize([(0, 50), (3, 80), (2, 60), (0, 81), (3, 82), (1, 32), (1, 81), (0, 82), (3, 32)])
rdd.collect()

[(0, 50),
 (3, 80),
 (2, 60),
 (0, 81),
 (3, 82),
 (1, 32),
 (1, 81),
 (0, 82),
 (3, 32)]

In [None]:
rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True).glom().collect()

[[(0, 50), (0, 81), (0, 82), (2, 60)],
 [(1, 32), (1, 81), (3, 80), (3, 82), (3, 32)]]

## 7. Broadcast Variables.
<p align="justify"><font face="Verdana" size="2.5">
<b>Definition:</b> Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce communication costs.

<center><img src="https://github.com/freddyduitama/images/blob/master/spark-application.png?raw=true"  height="300" width="300"></center>
<caption><center><font color='0B5345'> <u> <b>Figure 1:</b><br> </u>Computational model.</font></center></caption><br>

><font face="Verdana" size="2.5"><b>Example:</b></font>
</p>

In [None]:
### stopwords stores the list of English stop words. Suppose we need a read-only list that will be used for removing stop words on any English text.
stopwords=['i','me','my','myself','we','our','ours','ourselves','you',"you're","you've","you'll","you'd",'your','yours','yourself','to','she','of']
### Use broadcast variable for caching a copy of the set of stop words at each node in the cluster instead of shipping a copy of it with each task to be executed on the nodes
bc_stopwords = sc.broadcast(stopwords)

In [None]:
### Print values
print(type(bc_stopwords.value))
bc_stopwords.value

<class 'list'>


['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 "you're",
 "you've",
 "you'll",
 "you'd",
 'your',
 'yours',
 'yourself',
 'to',
 'she',
 'of']

In [None]:
### Without the filter transformation.
one_RDD=sc.textFile(path+'/comments.txt').\
flatMap(lambda x: uni_to_clean_str(x).\
        split()).map(lambda x: (x,1)).\
        reduceByKey(lambda x,y: x + y)
print(one_RDD.take(10))

[('grown-up', 1), ('daughters', 45), ('ought', 40), ('give', 122), ('of', 3472), ('her', 2168), ('own', 176), ('beauty', 24), ('in', 1792), ('cases', 5)]


In [None]:
## using the broadcast variable into the filter transformation.
one_RDD=sc.textFile(path+'/comments.txt').\
flatMap(lambda x: uni_to_clean_str(x).\
        split()).map(lambda x: (x,1)).\
        filter(lambda txt: txt[0] not in bc_stopwords.value).\
        reduceByKey(lambda x,y: x + y)
print(one_RDD.take(10))

[('grown-up', 1), ('daughters', 45), ('ought', 40), ('give', 122), ('her', 2168), ('own', 176), ('beauty', 24), ('in', 1792), ('cases', 5), ('think', 203)]


<p align="left"><b><font face='Courier New' color="black" align="left" size=4>Copyright.</font></b>
<img alt="udeA logo" height="120px" src="https://github.com/freddyduitama/images/blob/master/in2lab.png?raw=true" align="right" hspace="10px" vspace="0px" height="120" width="350"">
                                                                                                                              
<font face='Verdana' size=2>
John Freddy Duitama Muñoz. <a href="https://scienti.minciencias.gov.co/cvlac/visualizador/generarCurriculoCv.do?cod_rh=0000347507">  CvLAC</a><br>
Universidad de Antioquia.<br>
Apartado Aéreo 1226 | Dirección: calle 67 No. 53 - 108.<br>
Medellín, Colombia. Sur America.
    
</p>
</font>

<center><b><font color='0B5345' face="Lucida Calligraphy,Comic Sans MS,Lucida Console" size="4">Universidad de Antioquia.</font></b> </center>