<a href="https://colab.research.google.com/github/mountaha-ghabri/spark-RDD/blob/main/spark_intro_rdd_tutorial_solutions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center><img src='https://netacad.centralesupelec.fr/img/cs.jpg' width=200></center>

<h6><center><b>Big data algorithms, techniques and platforms</b></center></h6>

<h1>
<hr style=" border:none; height:3px;">
<center>Introduction to Spark RDD programming</center>
<hr style=" border:none; height:3px;">
</h1>

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [61.9 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,642 kB]
Get:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Get:11 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,560 kB]
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 http://archive.ubunt

In [None]:
import pyspark
import random
sc = pyspark.SparkContext(appName="td1")
print("Initialization successful")

!wget -q https://gquercini.github.io/courses/plp/tutorials/data.tgz
!tar xf data.tgz

Initialization successful


## Word count example

Write a Spark program that reads file *./data/moby-dick.txt* and counts the number of occurrences of each word.
The program should:

- Lower case each word
- Remove the stopwords (a list of stopwords can be found in file ./data/stopwords.txt).
- Sort the word by their number of occurrences in decreasing order (the most frequent first).


In [None]:
stopwords = []
with open("./data/stopwords.txt", "r") as f:
    for line in f:
        stopwords.append(line.lower().strip())

wc_rdd = sc.textFile("./data/moby-dick.txt")\
            .flatMap(lambda x: x.split())\
            .map(lambda x: x.lower())\
            .filter(lambda x: x not in stopwords)\
            .map(lambda x: (x, 1))\
            .reduceByKey(lambda x, y: x+y)\
            .sortBy(lambda x: x[1], ascending=False)


wc_rdd.take(10)

## Computing averages

Write a Spark program that reads file *./data/temperature.csv* and computes the average temperature for each year.

In [None]:
temp_rdd = sc.textFile("./data/temperature.csv")\
            .map(lambda x: x.split(","))\
            .map(lambda x: (x[0], (float(x[2]), 1)))\
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
            .mapValues(lambda x: x[0]/x[1])


temp_rdd.collect()

# Matrix operations


Without loss of generality, we consider that our input matrices are stored
in textual files (see folder _./data_).
As an example, the file _./data/matrix-a.txt_ looks like as follows:
<p>
0 1 2 4<br>
1 2 3 10<br>
2 12 15 150<br>
</p>
</font>
</p>
<p>

Each line is a row in a matrix $A$. The first number of the line is the
row identifier (starting from 0), the subsequent values (separated by a whitespace)
are the elements in each column of the row. The matrix represented in this file is the
following:
<p>
<center>
  $A= \begin{bmatrix}
    1 & 2 & 4   \\
    2 & 3 & 10  \\
    12 & 15 & 150
\end{bmatrix}$
</center>


We provide the implementation of  basic functions to load a matrix from file, visualize it
and get attributes.

* Function *loadMatrix*

The function *loadMatrix()* loads a matrix from a file.
It takes in the name of the file and returns an RDD containing the matrix.

Each element of an RDD matrix is a key-value pair, where the key is the coordinate (row identifier, column identifier) of an element, and the value is the element itself.
For instance, the RDD corresponding to the matrix $A$ is the following:
<p>
$( (0, 0), 1 ), ( (0, 1), 2 ), ( (0, 2), 4 ), ( (1, 0), 2 ), ( (1, 1), 3 ), ( (1, 2), 10 ), ( (2, 0), 12 ), ( (2, 1), 15 ), ( (2, 2), 150 ) $
</p>

* Function *shape*

The function *shape()* takes in an RDD matrix and returns the size of the matrix as a pair $(nbRows, nbCols)$, where $nbRows$ (resp., $nbCols$) denotes the number of rows (resp., columns) of the matrix.

* Function *collect*

The function *collect()* takes in an RDD matrix and returns a representation of the matrix as a Python list $L$. Each element of $L$ is itself a list that corresponds to a row in the matrix.
For instance, the output of the function $collect$ for the matrix $A$ is as follows:   


$[ [1, 2, 4], [2, 3, 10], [12, 15, 150] ]$


* Function *nice*

The function *nice()* prints the matrix in a nice and readable way.


In [None]:
'''
Loads a matrix from a file.
Takes in: the name of the input file
Returns: an RDD containing the matrix
'''
def loadMatrix(filename):
    # Load the file into an RDD matrix
    matrix = sc.textFile(filename)
    # Splits each line. Each element is a list [nbRow, e1, e2, ..., ej]
    matrix = matrix.map(lambda line : line.split(' '))
    # Convert each element to a number (the first is an integer, the others are float)
    matrix = matrix.map(lambda row: [int(row[0])] + [float(row[i]) for i in range(1, len(row))])
    # Get an RDD where each element is a key-value pair ((row, col), element)
    matrix = matrix.flatMap(lambda row: [((row[0], j-1), row[j]) for j in range(1, len(row))])
    return matrix

'''
Returns the number of rows and colums of the matrix
Takes in: An RDD representing a matrix
Returns: the size of the matrix as (nbRows, nbCols)
'''
def shape(matrix):
    M = collect(matrix)
    if len(M) == 0:
        return (0, 0)
    else:
        return (len(M), len(M[0]))

'''
Returns a matrix represented as a list of lists.
Takes in: an RDD representing a matrix
Returns: the matrix represented as a list of lists.
'''
def collect(matrix):
    # Obtain an RDD, where the key is the row identifier and the value is (colId, element)
    matrix = matrix.map(lambda x: (x[0][0], (x[0][1], x[1])))
    # Groups all the values in a row.
    matrix = matrix.groupByKey()
    # Sorts the element by row identifier.
    matrix = matrix.sortByKey()
    # Sort the elements by column identifier.
    matrix = matrix.map(lambda x: sorted(list(x[1])))
    # Now obtain an RDD, where each element is a list containing the elements of a row.
    matrix = matrix.map(lambda row: [x[1] for x in row])
    # Finally, return the RDD as a Python list.
    return matrix.collect()

'''
Prints the matrix in a nice way.
Takes in: the name of the matrix (var) and the matrix in the form of an RDD.
'''
def nice(var, matrix):
    # Obtain a representation of the matrix as a Python list.
    M = collect(matrix)
    # Print the name of the matrix
    print("Matrix ", var)
    # Print the matrix and format the output nicely
    print('\n'.join([''.join(['{:12.2f}'.format(item) for item in row])
      for row in M]))


## Data creation

Execute the following cells in order to create two files, one containing matrix $A$, the other containing matrix $B$.

In [None]:
%%file ./data/matrix-a.txt
0 1 2 4
1 2 3 10
2 12 15 150

Writing ./data/matrix-a.txt


In [None]:
%%file ./data/matrix-b.txt
0 4 2 2
1 1 3 3
2 23 34 12

Writing ./data/matrix-b.txt


## Sum of matrices

The code below loads two matrices $A$ and $B$ from file and calls the function $sum()$ to compute $A+B$.

The function $sum()$ takes in:

* $A$: an RDD containing the first matrix.
* $B$: an RDD containing the second matrix.

The function *sum()* returns an RDD containing the matrix obtained by summing $A$ and $B$.

**Complete the definition of the function $sum()$ and execute the code**



In [None]:
'''
Computes the sum of two matrices.
Takes in: two RDDs containing the input matrices
Returns: the RDD containing the sum of the two input matrices
'''
def sum(A, B):
  # COMPLETE THIS FUNCTION
  return A.join(B).mapValues(lambda x: x[0] + x[1])

# Load matrix A from file and print it.
A = loadMatrix("./data/matrix-a.txt")
nice("A", A)

# Load matrix B from file and print it.
B = loadMatrix("./data/matrix-b.txt")
nice("B", B)

# Compute A+B and print it
C = sum(A, B)
nice("C", C)

##############################################################
#YOU SHOULD OBTAIN THE FOLLOWING MATRIX C AS RESULT
# 5.00        4.00        6.00
# 3.00        6.00       13.00
# 35.00       49.00      162.00
##############################################################


## Scalar multiplication

The code below calls the function *scalarMultiply()* to obtain the matrix $c\times A$, where $c$ is a scalar value.    

The function *scalarMultiply()* takes in:

* $c$: a scalar value.
* $M$: an RDD containing a matrix.

The function *scalarMultiply()* returns an RDD containing the matrix obtained by multiplying $c$ with the input matrix.


**Complete the definition of the function scalarMultiply() and execute the code**


In [None]:
'''
Computes the scalar multiplication.
Takes in a scalar value c and an RDD matrix M
Returns the RDD containing the matrix resulting from the scalar multiplication c * M.
'''
def scalarMultiply(c, M):
    # COMPLETE THIS FUNCTION
    R = M.mapValues(lambda e: e*c)
    return R

# Print the input and the output
nice("A", A)
nice("2*A", scalarMultiply(2, A))

##############################################################
# THE RESULT SHOULD BE
#2.00        4.00        8.00
#4.00        6.00       20.00
#24.00       30.00      300.00
##############################################################

## Matrix multiplication

We create a new matrix B.

In [None]:
%%file ./data/matrix-b.txt
0 4 2 2 324 23
1 1 3 3 333 423
2 23 34 12 12 0

**Complete the function multiply that multiplies matrix A and B**

In [None]:
def multiply(A, B):
  # lambda ((i, j), v): (j, (i, v))
  left = A.map(lambda e: (e[0][1], (e[0][0], e[1])))
  # lambda ((j, k), w): (j, (k, w))
  right = B.map(lambda e: (e[0][0], (e[0][1], e[1])))
  productEntries = left.join(right)
  # lambda (x, ((i, v), (k, w))): ((i, k), (v * w))
  productEntries = productEntries.map(lambda e: ( (e[1][0][0], e[1][1][0]), (e[1][0][1] * e[1][1][1]) ) )\
                  .reduceByKey(lambda x,y: x+y)
  return productEntries

In [None]:
def multiply(A, B):
  # lambda ((i, j), v): (j, (i, v))
  left = A.map(lambda e: (e[0][1], (e[0][0], e[1])))
  # lambda ((j, k), w): (j, (k, w))
  right = B.map(lambda e: (e[0][0], (e[0][1], e[1])))
  productEntries = left.join(right)
  # lambda (x, ((i, v), (k, w))): ((i, k), (v * w))
  productEntries = productEntries.map(lambda e: ( (e[1][0][0], e[1][1][0]), (e[1][0][1] * e[1][1][1]) ) )\
                  .reduceByKey(lambda x,y: x+y)
  return productEntries

In [None]:
# Load matrix A from file and print it.
A = loadMatrix("./data/matrix-a.txt")
nice("A", A)

# Load matrix B from file and print it.
B = loadMatrix("./data/matrix-b.txt")
nice("B", B)

# Compute A+B and print it
C = multiply(A, B)
nice("C", C)

##############################################################
#YOU SHOULD OBTAIN THE FOLLOWING MATRIX C AS RESULT
# 98.00      144.00       56.00
# 241.00      353.00      133.00
# 3513.00     5169.00     1869.00
##############################################################


Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00
        1.00        3.00        3.00
       23.00       34.00       12.00
Matrix  C
       98.00      144.00       56.00
      241.00      353.00      133.00
     3513.00     5169.00     1869.00
