<center><img src='./figs/cs-logo.png' width=200></center>

# 1. Introduction


<p align="justify">
<font size="3">
In this lab assignment you'll deepen your Spark programming skills by implementing algorithms that apply computations on matrices.
One interesting application of matrix computation is **PageRank**, the algorithm that Google uses to assign an importance score to each Web page; based on this score, the Google search engine ranks by importance the Web pages matching a particular search.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following cell in order to initialize the _SparkContext_.**</font>
<hr style=" border:none; height:2px;">
</p>

In [37]:
from os import getcwd
from os.path import join, isfile, dirname, abspath, pardir, exists
from pathlib import Path
import test_spark

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [14]:
parent_dir = Path(test_spark.__file__).resolve().parents[1]
data_dir = join(parent_dir, "data")

In [1]:
import findspark
findspark.init()

import pyspark
import random
sc = pyspark.SparkContext(appName="lab2")
print("Initialization successful")

23/01/23 13:53:42 WARN Utils: Your hostname, Mohammads-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 138.195.40.174 instead (on interface en0)
23/01/23 13:53:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/23 13:53:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Initialization successful


# 2. Matrix Representation

<p align="justify">
<font size="3">
**Please read carefully this section that presents how matrices will be represented in this assignment.**
</font>
</p>

<p align="justify">
<font size="3">
Without loss of generality, we consider that our input matrices are stored 
in textual files (see folder _./data_).
As an example, the file _./data/matrix-a.txt_ looks like as follows:
<p>
0 1 2 4<br>
1 2 3 10<br>
2 12 15 150<br>
</p>
</font>
</p>
<p>
<font size="3">
Each line is a row in a matrix $A$. The first number of the line is the 
row identifier (starting from 0), the subsequent values (separated by a whitespace)
are the elements in each column of the row. The matrix represented in this file is the 
following:
<p>
<center>
  $A= \begin{bmatrix}
    1 & 2 & 4   \\
    2 & 3 & 10  \\
    12 & 15 & 150
\end{bmatrix}$
</center>
</font>
</p>


<p>
<font size="3">
We provide the implementation of  basic functions to load a matrix from file, visualize it
and get attributes.
</font>
</p>

## 2.1 Function $loadMatrix$

<p>
<font size="3">
The function $loadMatrix()$ loads a matrix from a file.
It takes in the name of the file and returns an RDD containing the matrix.

Each element of an RDD matrix is a key-value pair, where the key is the coordinate (row identifier, column identifier) of an element, and the value is the element itself.
For instance, the RDD corresponding to the matrix $A$ is the following:
<p>
$( (0, 0), 1 ), ( (0, 1), 2 ), ( (0, 2), 4 ), ( (1, 0), 2 ), ( (1, 1), 3 ), ( (1, 2), 10 ), ( (2, 0), 12 ), ( (2, 1), 15 ), ( (2, 2), 150 ) $
</p>
</font>
</p>

## 2.2 Function $shape$

<p>
<font size="3">
The function $shape()$ takes in an RDD matrix and returns the size of the matrix as a pair $(nbRows, nbCols)$, where $nbRows$ (resp., $nbCols$) denotes the number of rows (resp., columns) of the matrix.
</font>
</p>

## 2.3 Function $collect$

<p>
<font size="3">
The function $collect()$ takes in an RDD matrix and returns a representation of the matrix as a Python list $L$. Each element of $L$ is itself a list that corresponds to a row in the matrix.
For instance, the output of the function $collect$ for the matrix $A$ is as follows:   

<p>

$[ [1, 2, 4], [2, 3, 10], [12, 15, 150] ]$

</p>

</font>
</p>


## 2.3 Function $nice$

<p>
<font size="3">
The function $nice()$ prints the matrix in a nice and readable way.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following cell in order to initialize the definition of the functions**</font>
<hr style=" border:none; height:2px;">
</p>

In [17]:
'''
Loads a matrix from a file.
Takes in: the name of the input file
Returns: an RDD containing the matrix
'''
def loadMatrix(filename):
    # Load the file into an RDD matrix
    matrix = sc.textFile(filename)
    # Splits each line. Each element is a list [nbRow, e1, e2, ..., ej]
    matrix = matrix.map(lambda line : line.split(' '))
    # Convert each element to a number (the first is an integer, the others are float)
    matrix = matrix.map(lambda row: [int(row[0])] + [float(row[i]) for i in range(1, len(row))])
    # Get an RDD where each element is a key-value pair ((row, col), element)
    matrix = matrix.flatMap(lambda row: [((row[0], j-1), row[j]) for j in range(1, len(row))])
    return matrix

'''
Returns the number of rows and colums of the matrix
Takes in: An RDD representing a matrix
Returns: the size of the matrix as (nbRows, nbCols)
'''
def shape(matrix):
    M = collect(matrix)
    if len(M) == 0:
        return (0, 0)
    else:
        return (len(M), len(M[0]))

'''
Returns a matrix represented as a list of lists.
Takes in: an RDD representing a matrix
Returns: the matrix represented as a list of lists.
'''
def collect(matrix):
    # Obtain an RDD, where the key is the row identifier and the value is (colId, element)
    matrix = matrix.map(lambda x: (x[0][0], (x[0][1], x[1])))
    # Groups all the values in a row.
    matrix = matrix.groupByKey()
    # Sorts the element by row identifier.
    matrix = matrix.sortByKey()
    # Sort the elements by column identifier.
    matrix = matrix.map(lambda x: sorted(list(x[1])))
    # Now obtain an RDD, where each element is a list containing the elements of a row.
    matrix = matrix.map(lambda row: [x[1] for x in row])
    # Finally, return the RDD as a Python list.
    return matrix.collect()
    
'''
Prints the matrix in a nice way.
Takes in: the name of the matrix (var) and the matrix in the form of an RDD.
'''
def nice(var, matrix):
    # Obtain a representation of the matrix as a Python list.
    M = collect(matrix)
    # Print the name of the matrix
    print(f"Matrix: '{var}'")
    # Print the matrix and format the output nicely
    print('\n'.join([''.join(['{:12.2f}'.format(item) for item in row]) 
      for row in M]))


# 3. Matrix Addition

<p align="justify">
<font size="3">
The code below loads two matrices $A$ and $B$ from file and calls the function $sum()$ to compute $A+B$.
</font>
</p>

<p>
<font size="3">
The function $sum()$ takes in:
<ul>
<li> $A$: an RDD containing the first matrix.
<li> $B$: an RDD containing the second matrix.
</ul>
The function $sum()$ returns an RDD containing the matrix obtained by summing $A$ and $B$.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $sum()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [18]:
'''
Computes the sum of two matrices.
Takes in: two RDDs containing the input matrices
Returns: the RDD containing the sum of the two input matrices
'''
def sum(A, B):
    
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Each element of the RDD A and B is ((r,c), e), where e is an element of the matrix and (r, c) is the 
    # coordinate of the element in terms of row and column.
    
    # 1. Put the two RDDs A and B together. Use the transformation union. Remember that a transformation 
    # always returns a new RDD with the result of the transformation.
    C = A.union(B)
    nice("C - after union", C)

    #2. Transforms the RDD C into one where the values having the same key (i.e., same row and column) 
    # are summed together. Which transformation are you going to use on C?
    C = C.reduceByKey(lambda x, y: x + y)
    nice("C - reduceByKey", C)
    # We return the RDD containing the sum of the two input matrices
    return C

    ################## END OF MODIFICATIONS ##################

# Load matrix A from file and print it.
A = loadMatrix(join(data_dir, "matrix-a.txt"))
nice("A", A)

# Load matrix B from file and print it.
B = loadMatrix(join(data_dir, "matrix-b.txt"))
nice("B", B)

# Compute A+B and print it
C = sum(A, B)
nice("C", C)

############################################################## 
#YOU SHOULD OBTAIN THE FOLLOWING MATRIX C AS RESULT
# 5.00        4.00        6.00      324.00       23.00
# 3.00        6.00       13.00      333.00      423.00
# 35.00       49.00      162.00       12.00        0.00
##############################################################


Matrix: 'A'
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix: 'B'
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00
Matrix: 'C - after union'
        1.00        4.00        2.00        2.00        2.00        4.00      324.00       23.00
        1.00        2.00        3.00        3.00        3.00       10.00      333.00      423.00
       12.00       23.00       15.00       34.00       12.00      150.00       12.00        0.00
Matrix: 'C - reduceByKey'
        5.00        4.00        6.00      324.00       23.00
        3.00        6.00       13.00      333.00      423.00
       35.00       49.00      162.00       12.00        0.00
Matrix: 'C'
        5.00        4.00        6.00      324.00       23.00
        3.00        6.00       13.00      333.00      423.00
       35.00       49.

# 4. Scalar Multiplication

<p>
<font size="3">
The code below calls the function $scalarMultiply()$ to obtain the matrix $c\times A$, where $c$ is a scalar value.    
</font>
</p>

<p>
<font size="3">
The function $scalarMultiply()$ takes in:
<ul>
<li> $c$: a scalar value.
<li> $M$: an RDD containing a matrix.
</ul>
The function $scalarMultiply()$ returns an RDD containing the matrix obtained by multiplying $c$ with the input matrix.
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $scalarMultiply()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [23]:
'''
Computes the scalar multiplication.
Takes in a scalar value c and an RDD matrix M
Returns the RDD containing the matrix resulting from the scalar multiplication c * M.
'''
def scalarMultiply(c, M):
    
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Apply a transformation on M, so each element of the matrix M is multiplied by c
    # Which transformation are you going to use?
    R = M.map(lambda x: (x[0], x[1] * c))
    
    return R
    ################## END MODIFICATION ##################


# Prints 
nice("A", A)
nice("2 * A", scalarMultiply(2, A))

############################################################## 
# THE RESULT SHOULD BE 
#2.00        4.00        8.00
#4.00        6.00       20.00
#24.00       30.00      300.00
##############################################################

Matrix: 'A'
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix: '2*A'
        2.00        4.00        8.00
        4.00        6.00       20.00
       24.00       30.00      300.00


# 5. Matrix Multiplication


We want to implement a function $multiply()$ to obtain the matrix $A \times B$.

The function $multiply()$ takes in:

- $A$: an RDD containing the first matrix.
- $B$: an RDD containing the second matrix.

The function $multiply$ returns an RDD containing the matrix obtained by multiplying the first and the second matrix.
The multiplication can only be computed if the number of columns of $A$ equals the number of rows of $B$.

Let $A$ be an $n \times m$ matrix and $B$ an $m \times p$ matrix.
The matrix $C = A \times B$ is a $n \times p$ matrix, where each element $c_{i, k}$ is computed as 
follows:

$c_{i, k} = \sum\limits_{j=0}^{m-1} a_{i, j} \cdot b_{j, k} \quad\quad (1)$ 

One possible implementation of this function is based on a MapReduce schema.
Remember that in a MapReduce schema, the idea is to group elements by a key and apply a function to the elements 
that share the same key.
As you can see, for a given $(i, k)$, the element of $A$ that is in the j-th column is multiplied by the value of $B$ that is in the j-th row, for any column $j$.
Therefore, we can change the representation of $A$ and $B$ so that their elements are indexed by using $j$ as the key.

More specifically, we can represent the matrix $A$ as follows:

$(j, (0, i, a_{i, j})) \quad 0 \leq i \leq n-1 \quad 0 \leq j \leq m-1  \quad\quad (2)$

where the value $0$ in the triple $(0, i, a_{i, j})$ means that the element $a_{i, j}$ comes from the matrix $A$.

Similarly, we can represent the matrix $B$ as follows:

$(j, (1, k, b_{j, k})) \quad 0 \leq j \leq m-1 \quad 0 \leq k \leq p-1 \quad\quad (3) $

where the value $1$ in the triple $(1, k, b_{j, k})$ means that the element $b_{j, k}$ comes from the matrix $B$.

As a first step, we want to code two functions $transformA()$ and $transformB()$ to obtain the two representations of $A$ and $B$ respectively, as described in Equation (2) and (3).
The two representations returned by both functions **must be RDDs**.

**Complete the definition of the functions $transformA()$ and $transformB()$ and execute the code**

In [41]:
################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

'''
Transforms the RDD matrix A into an RDD as described in Equation (2)
'''
def transformA(A):

    # Transform A as indicated in the text above
    A = A.map(lambda x: (x[0][1], (0, x[0][0], x[1])))
    return A

'''
Transforms the RDD matrix B into an RDD as described in Equation (3)
'''
def transformB(B):
    # Transform B as indicated in the text above
    B = B.map(lambda x: (x[0][0], (1, x[0][1], x[1])))
    return B

################## END MODIFICATIONS ##################

# Displayes the two matrices
nice("A", A)
nice("B", B)

# Transforms them
Atransformed = transformA(A)
Btransformed = transformB(B)

# Display the result.
print("\n********** Representation for A ************\n")
print(Atransformed.collect())
# print("\n********** Representation for B ************\n")
# print(Btransformed.collect())


'\nTransforms the RDD matrix A into an RDD as described in Equation (2)\n'

'\nTransforms the RDD matrix B into an RDD as described in Equation (3)\n'

Matrix: 'A'
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix: 'B'
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00

********** Representation for A ************

[(0, (0, 0, 1.0)), (1, (0, 0, 2.0)), (2, (0, 0, 4.0)), (0, (0, 1, 2.0)), (1, (0, 1, 3.0)), (2, (0, 1, 10.0)), (0, (0, 2, 12.0)), (1, (0, 2, 15.0)), (2, (0, 2, 150.0))]


In order to group all the elements of both matrices by the key $j$, we need to merge the two RDDs $Atransformed$ and $Btransformed$.
The function $merge()$ declared below takes in $Atransformed$ and $Btransformed$ and returns an RDD that results from the union of the two input RDDs.

**Complete the definition of the functions $merge()$ and execute the code**

In [42]:
Atransformed.collect()
Btransformed.collect()

[(0, (0, 0, 1.0)),
 (1, (0, 0, 2.0)),
 (2, (0, 0, 4.0)),
 (0, (0, 1, 2.0)),
 (1, (0, 1, 3.0)),
 (2, (0, 1, 10.0)),
 (0, (0, 2, 12.0)),
 (1, (0, 2, 15.0)),
 (2, (0, 2, 150.0))]

[(0, (1, 0, 4.0)),
 (0, (1, 1, 2.0)),
 (0, (1, 2, 2.0)),
 (0, (1, 3, 324.0)),
 (0, (1, 4, 23.0)),
 (1, (1, 0, 1.0)),
 (1, (1, 1, 3.0)),
 (1, (1, 2, 3.0)),
 (1, (1, 3, 333.0)),
 (1, (1, 4, 423.0)),
 (2, (1, 0, 23.0)),
 (2, (1, 1, 34.0)),
 (2, (1, 2, 12.0)),
 (2, (1, 3, 12.0)),
 (2, (1, 4, 0.0))]

In [45]:
'''
Returns the union of Atransformed and Btransformed.
'''
def merge(Atransformed, Btransformed):
    
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Put together the two input RDDs
    # R = Atransformed.join(Btransformed)
    # R = R.map(lambda x: ((x[1][0][1], x[1][1][1]), x[1][0][2]*x[1][1][2]))

    R = Atransformed.union(Btransformed).groupByKey().mapValues(lambda x: list(x))
    R = R.map(lambda x: (x[0], [i for i in x[1] if i[0] == 0], [i for i in x[1] if i[0] == 1]))
    R = R.map(lambda x: (x[0], [(i[1], j[1], i[2]*j[2]) for i in x[1] for j in x[2]]))
    R = R.flatMap(lambda x: x[1]).map(lambda x: ((x[0], x[1]), x[2]))
    
    return R

    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)
    
merged = merge(Atransformed, Btransformed)    

nice("merged", merged)

# print("\n********** Representation for merged ************\n")
# print(merged.collect())

'\nReturns the union of Atransformed and Btransformed.\n'

Matrix: 'A'
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix: 'B'
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00
Matrix: 'merged'
        2.00        4.00       92.00        2.00        6.00      136.00        2.00        6.00       48.00       48.00      324.00      666.00        0.00       23.00      846.00
        3.00        8.00      230.00        4.00        9.00      340.00        4.00        9.00      120.00      120.00      648.00      999.00        0.00       46.00     1269.00
       15.00       48.00     3450.00       24.00       45.00     5100.00       24.00       45.00     1800.00     1800.00     3888.00     4995.00        0.00      276.00     6345.00


In [None]:
Atransformed.union(Btransformed).groupByKey().mapValues(lambda x: list(x))

Now we can group the values of the RDD $merged$ obtained above by their key $j$. 
We define a function $group()$ that returns an RDD obtained by grouping the values of the input RDD by their key.


**Complete the definition of the functions $group()$ and execute the code**

In [None]:
'''
Returns an RDD where the values of the input RDD are grouped by their key.
'''
def group(merged):
    
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Groups the element of the input RDD by key.
    R = 
    return R

    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)
    
grouped = group(merged)    

print("\n********** Representation for grouped ************\n")
L = grouped.collect()
print('[')
for l in L:
    print("(",l[0], ",", end='', sep='')
    print("[", end='')
    for el in l[1]:
        print(el, end="")
    print("],")
print(']')

######################################################################
# Note that in the output, each element is (j, L), where
# L is a list that contains all the elements in the j-th column of A
# and all the elements in j-th row of B
######################################################################

<p>
<font size="3">
Each element of the RDD $grouped$ obtained above is a key-value pair, where the key is the index $j$ and the value is a list $L$ containing all the triples corresponding to the elements of matrix $A$ in the $j-$th column  and the elements of matrix $B$  in the $j-$row, as follows: 
<p>
<center>
$(0, i, a_{i, j})\ 0 \leq i \leq n-1 \quad (1, k, b_{j, k})\ 0 \leq k \leq p-1$
</center>
</p>
<p>
Remember that all triples corresponding to matrix $A$ have 0 as their first value, while those corresponding to matrix $B$ have 1.
</p>
<p>
From Equation (1), you can see that the product $a_{i, j} \cdot b_{j, k}$ contributes to the value $c_{i, k}$, for $0 \leq i \leq n-1$ and $0 \leq k \leq p-1$. 
Therefore, given the list $L$, we associate each value $a_{i, j} \cdot b_{j, k}$  to the pair $(i, k)$.
</p>
<p>
In other words, we now transform the RDD $grouped$ into an RDD where 
each element is a key-value pair, where the key is $(i, k)$ and the value is $a_{i, j} \cdot b_{j, k}$.

</p>    
<p>
In the code below, the function $multiplyElements()$ is given that takes in a value $(j, L)$ of the RDD $grouped$ 
and returns a list, where each element is a pair $((i, k), a_{i, j} \cdot b_{j, k})$, $0 \leq i \leq n-1$ and $0 \leq k \leq p-1$.  
</p>
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $groupProducts()$ that:
    <ul>
      <li> Takes in the RDD $grouped$.
      <li> Applies the function $multiplyElements()$ to each element of $grouped$.
      <li> Returns an RDD where each element is $((i, k), a_{i, j} \cdot b_{j, k})$, $0 \leq i \leq n-1$ and $0 \leq k \leq p-1$. 
    </ul>
    Execute the code.**</font>
<hr style=" border:none; height:2px;">
</p>




In [None]:
import sys

'''
Multiplies each element from matrix A with each element from 
matrix B in the list L (see description above).
Takes in: a value (j, L) of the RDD grouped.
Returns: a list where each element is ((i, k), a_ij * b_jk)
'''
def multiplyElements(value):
    j = value[0]
    L = value[1]
    
    '''
    The output key-value pairs.
    '''
    kv = []
    '''
    Maybe not necessary, we make sure that all triples with 
    the first element 0 (those from matrix A)
    comes before any triple from matrix B.
    '''
    L = sorted(list(L))
    
    '''
    For convenience, we store the triples from matrix A
    and those from matrix B in two separate lists 
    LA and LB.
    '''
    sep = 0
    while L[sep][0] == 0:
        sep += 1
    LA = [L[i] for i in range(0, sep)]
    LB = [L[i] for i in range(sep, len(L))]
    '''
    For each element (0, i, a_ij) in LA  
    and each element (1, k, b_jk) in LB, 
    we add the pair ((i, k), a_ij * b_jk) to kv.
    '''
    for a in LA:
        for b in LB:
            i = a[1]
            k = b[1]
            kv.append(((i, k), a[2]*b[2]))
    return kv

'''
Returns an RDD where each value is a pair ((i, k), a_ij * b_jk)
'''
def groupProducts(grouped):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Apply a transformation to the input RDD to get an RDD as described in the text.
    # Which tranformations are you going to use? map or flatMap?
    R = 
    return R
    
    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)

print("\n********** Representation for multipliedElements ************\n")
multipliedElements = groupProducts(grouped)
print(multipliedElements.collect())

<p>
<font size="3">
Each element of the RDD $multipliedElements$ obtained above is $((i, k), a_{i, j} \cdot b_{j, k})$, 
$0 \leq i \leq n-1$, $0 \leq j \leq m-1$, $0 \leq k \leq p-1$. 
From Equation (1), we can see that each $c_{i, k}$ is obtained by summing up the products $a_{i, j} \cdot b_{j, k}$, $0 \leq j \leq m-1$.
<p>
Therefore, in order to obtain the matrix $C = A \times B$, the only thing that we need to do is to sum all 
values $a_{i, j} \cdot b_{j, k}$ associated with the same key $(i, k)$.
</p>
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $getResult()$ that:
    <ul>
      <li> Takes in the RDD $multipliedElements$.
      <li> Transforms the input RDD into one where each element is $((i, k), \sum\limits_{j=0}^{m-1} a_{i, j} \cdot b_{j, k})$
      <li> Returns the resulting RDD. 
    </ul>
    Execute the code.We finally obtain the matrix $C$.**</font>
<hr style=" border:none; height:2px;">
</p>



In [None]:
def getResult(multipliedElements):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Apply a transformation to the input RDD so that all values with the same key are summed.
    R = 
    return R
    
    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)
C = getResult(multipliedElements)
nice("C", C)

############################################################## 
#YOU SHOULD OBTAIN THE FOLLOWING MATRIX C AS RESULT
# 98.00      144.00       56.00     1038.00      869.00
# 241.00      353.00      133.00     1767.00     1315.00
# 3513.00     5169.00     1869.00    10683.00     6621.00
##############################################################


<p>
<font size="3">
We now wrap every function that we implemented above in only one function $multiply()$ that we can use any time we need to multiply two matrices.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the code below to define the function $multiply()$**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
def multiply(A, B):
    if shape(A)[1] != shape(B)[0]:
        sys.exit("The number of colums of A must equal the number of rows of B")
    A = transformA(A)
    B = transformB(B)
    C = merge(A, B)
    C = group(C)
    C = groupProducts(C)
    C = getResult(C)
    return C


# 6. PageRank

<p>
<font size="3">
**PageRank** is the algorithm used by the Google search engine to assign each Web page a numeric score (in short, _PR-score_) representing its importance. Google uses the PR-score to rank the Web pages returned in response to a search; the most important pages come first.

<p>    
The PR-score of a Web page $p$ depends on the number 
of Web pages linking to $p$ and on their PR-score. 
Stated otherwise, a Web page that is linked by many important Web pages is itself considered as important.
</p>
</font>
</p>

<p>
<font size="3">
PageRank is an iterative algorithm. At the beginning, all the $n$ Web pages have the same PR-score ($1/n$). 
At each step, the PR-score of each Web page $p$ is modified based on the PR-score of the pages linking to $p$. 
The algorithm stops when the PR-scores of all pages converge (i.e., they don't change anymore from one iteration to the next).

<p>
We're now going to implement the simplified version of PageRank and we'll see why this doesn't always work.
</p>
</font>
</p>

<center>
<figure>
<img src='./figs/pagerank-example.pdf' width=210>
<figcaption>Figure 1 - A matrix M.</figcaption>
</figure>
</center>    
   
<p>
<font size="3">
PageRank simulates the way a random surfer would browse the Web by 
randomly choosing the links to follow at any given page.
Referring to Figure 1, 
a random web surfer at page $A$ would choose to go 
either to page $B$, $C$ or $D$ with probability $1/3$, because 
there are three links leaving $A$. 
</font>
</p>

<p>
<font size="3">
There is no way a random surfer will visit $D$ from $C$ because the latter 
does not link to the former.
Similarly, a random surfer at $B$ would go to 
$A$ or $D$ with probability $1/2$, 
because $B$ has two outgoing links.
</font>
</p>

<p>
<font size="3">
By iterating many times these _random walks_
across the graph, PageRank simulates the behavior of multiple random 
surfers; the pages that receive a larger  number of visits are considered
more important than the ones that are visited only rarely.
</font>
</p>

<p>
<font size="3">
These transition probabilities at each node can be described 
in a $n\times n$ _transition matrix_ $M$, 
where $n$ is the number of nodes
in the graph and the element $m_{i, j}$ is the 
probability that a random surfer moves from $j$ to $i$.
</font>
</p>

<p>
<font size="3">
Assuming that the nodes are in alphabetical 
order (first row/column correspond to $A$, second row/column correspond to $B$ and so on), 
the transition matrix corresponding to the graph represented in 
Figure 1 is as follows:

<center>    
$M = 
\begin{bmatrix}
    0 & 1/2 & 1 & 0  \\
    1/3 & 0 & 0 & 1/2  \\
    1/3 & 0 & 0 & 1/2  \\
    1/3 & 1/2 & 0 & 0  
\end{bmatrix}$
</center>
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the code below to load the matrix $M$ from the file _./data/matrix-m.txt_ and display the matrix calling the function $nice()$.**</font>
<hr style=" border:none; height:2px;">
</p>



In [None]:
################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

# Get the matrix from file.
M = 
nice("M", M)
################## END MODIFICATIONS ##################



<p>
<font size="3">    
When a random surfer starts her walk, she can be anywhere in the graph. 
If we have no reason to believe that she would be more likely to choose one 
page over another one as her starting point, we can say that the initial 
probability of the surfer of being at a certain page is $1/n$. 
</font>
</p>

<p>
<font size="3">    
The probability distribution of the position of the surfer can be described 
by a column vector $\pmb{v^{(0)}}$ with $n$ elements, which in our example looks like as 
follows:
<center>
$
\pmb{v^{(0)}} = 
\begin{bmatrix}
    1/4  \\
    1/4 \\
    1/4 \\
    1/4 
\end{bmatrix}
$
</center>
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Implement the function $initialization()$ that:
    <ul>
     <li> Takes in a matrix $M$.
     <li> Returns the vector $v_0$. $v_0$ must be an RDD, in the same way as the matrix $M$.
    </ul>
    Execute the code.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
'''
Initialization of the vector v0
'''
def initialize(M):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Get the number n of rows of the input matrix

    
    # Create the column vector having n elements equal to 1/n
    # The vector must be represented in the same way as the matrix M.
    # First, create a list L in Python where each element is ((i, 0), 1/n) for i=0, ..., n-1
    L =
    
    # Then tranforms this list into an RDD (remember the transformation parallelize...)
    L = 
    
    return L
    
    ################## END MODIFICATIONS ##################

v0 = initialize(M)
nice("v0", v0)

<p>
<font size="3">
We now want to compute the vector $\pmb{v^{(1)}}$
that gives 
the distribution probability of the position of the surfer after 
one iteration.
The probability $\pmb{v^{(1)}}_i$
that the surfer will be at node $i$ after the
first iteration is expressed as follows:
<p>
<center>
$\pmb{v^{(1)}}_i = \sum \limits_{j=1}^n m_{i, j} \pmb{v^{(0)}}_j$
</center>
</p>
where $m_{i,j}$ is the probability that the surfer 
moves from node $j$ to node $i$ and 
$\pmb{v^{(0)}}_j$ is the 
probability that the surfer is at node $j$ at the iteration 0.
</font>
</p>

<p>
<font size="3">
Therefore, $\pmb{v^{(1)}}$ can be obtained by multiplying $M$ with $\pmb{v^{(0)}}$ :
<p>
<center>
$\pmb{v^{(1)}} = M \cdot \pmb{v^{(0)}}$
</center>
</p>
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Implement the function $performStep()$ that:
    <ul>
     <li> Takes in a matrix $M$ and a vector $v$
     <li> Returns the vector $M \cdot v$. 
    </ul>
    Execute the code.**</font>
<hr style=" border:none; height:2px;">
</p>


In [None]:
'''
Performs a step of the PageRank algorithm
This function returns a vector obtained by multiplying 
matrix M with vector v.
'''
def performStep(M, v):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Multiply M and v. We already defined the function multiply() above to multiply two matrices.
    # Use it :)
    R = 
    
    return R
    ################## END MODIFICATIONS ##################


v1= performStep(M, v0)
nice("v1", v1)

##############################################################
#
# YOU SHOULD OBTAIN THE FOLLOWING VALUES:
#  0.38
#  0.21
#  0.21
#  0.21
#
##############################################################

<p>
<font size="3">
If we want to obtain the 
probability distribution $\pmb{v^{(i)}}$ after
$i$ iterations, we compute:
<p>
<center>
$\pmb{v^{(i)}} = M \cdot \pmb{v^{(i-1)}}$
</center>
</p>
</font>
</p>

<p>
<font size="3">
The algorithm stops at the iteration $k$ where $\lvert \pmb{v^{(k-1)}} - \pmb{v^{(k)}} \rvert < \epsilon$, $\epsilon$ being an arbitrarily small constant.
</font>
</p>

<p>
<font size="3">
We now wrap up all the functions that we defined, to obtain an implementation of PageRank.
The code below defines a function $SimplifiedPageRank()$ that takes in a matrix, initializes the vector $v_0$
and calls a function $SimplifiedPageRankStep()$ that recursively performs a step until convergence.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Implement the function $converge()$ that:
    <ul>
     <li> Takes in two vectors $v_{prev}$ and $v_{next}$.
     <li> Returns whether $\lvert v_{prev} - v_{next} \rvert < \epsilon$
    </ul>
    Follow the comments in the code for the implementation. Execute the code.**</font>
<hr style=" border:none; height:2px;">
</p>



In [None]:
# Variable eps
eps = 0.001

'''
Returns true if |v_prev - v_next| < eps
'''
def converge(v_prev, v_next):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Put together v_prev and v_next
    v = 
    
    # Each element of the RDD v is a key-value pair ((i, 0), e), where e is an element of 
    # either the vector v_prev or v_next, for i = 0, .., n-1
    # If you don't believe it, print it :)
    # We now want to obtain a new RDD from v, such that for any pair ((i, 0), e1), ((i, 0), e2) 
    # of elements of v, we obtain an element ((i, 0), |e1 - e2|).
    # In practice, the new RDD represents the vector |v_prev-v_next|.
    # Which transformation are you going to apply on v?
    v = 
    
    # Now we want to obtain a new RDD by keeping only the elements from v that are greater than eps.
    # Which transformation are you going to apply on v?
    v = 
    
    # Finally counts the number of elements left in v. Which actions are you going to apply on v?
    c = 

    # If no element is greater than eps, then the algorithm converged.
    return c == 0
    
    ################## END MODIFICATIONS ##################
    
    
'''
One iteration of PageRank.
Calls recursively itself until convergence.
'''
def SimplifiedPageRankIteration(M, v):
    v_next = performStep(M, v)
    if converge(v, v_next):
        return v_next
    else:
        return SimplifiedPageRankIteration(M, v_next)

'''
Initializes the vector v0 and starts the iterations.
'''
def SimplifiedPageRank(M):
    (n, m) = shape(M)
    v0 = sc.parallelize([((i, 0), 1./n) for i in range(m)])
    return SimplifiedPageRankIteration(M, v0)

M = loadMatrix("./data/matrix-m.txt")
nice("M", M)
pr = SimplifiedPageRank(M)
nice("PageRank for M", pr)
############################################################## 
# YOU SHOULD OBTAIN THE FOLLOWING VECTOR FOR M AS RESULT
# 0.33
# 0.22
# 0.22
# 0.22
##############################################################


M1 = loadMatrix("./data/matrix-m1.txt")
nice("M1", M1)
pr = SimplifiedPageRank(M1)
nice("Page Rank for M1", pr)


############################################################## 
#YOU SHOULD OBTAIN THE FOLLOWING VECTOR PAGE_RANK AS RESULT
# 0.22
# 0.44
# 0.33
##############################################################
    

## 6.1 Advanced PageRank

<p>
<font size="3">
The simplified version of PageRank works well, as long as the graph  meets the following conditions:
<ol>
    <li> The graph is strongly connected, which means that it is possible to get from any
node to any other node.
     <li> The graph has no _dead ends_, that is nodes with no leaving links.
     <li> The graph has no _spider traps_, that is cycles of nodes with no possibility of leaving the cycle.
</ol>
<center>
<figure>
<img src='./figs/pagerank-dead.pdf' width=210>
<figcaption>Figure 2 - The node $C$ is a dead end.</figcaption>
</figure>
</center>
</font>
</p>

<p>
<font size="3">
Figure 2 shows the same graph as in Figure 1, where the link from $C$ to $A$ has been removed, so that $C$ is now a dead end.    
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following code, that loads the transition matrix corresponding to the graph in Figure 2 and computes the PageRank of its nodes.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
#dead ends
dead = loadMatrix("./data/matrix-dead.txt")
nice("dead", dead)
pr = SimplifiedPageRank(dead)
nice("Page Rank for dead", pr)


<p>
<font size="3">
As you can see, the PR-score of all nodes is 0. In fact, as soon as the random surfer gets to $C$, she cannot leave and "disappears" from the graph.
</font>
</p>

<p>
<font size="3">
Figure 3 shows an example of graph with $C$ being a spider trap. $C$ is not a dead end, because it has a leaving link, but that link points to $C$ itself. In other words, the random surfer cannot escape from $C$ once she gets there.

<p>
<center>
<figure>
<img src='./figs/pagerank-spider.pdf' width=210>
<figcaption>Figure 3 - The node $C$ is a spider trap.</figcaption>
</figure>
</center>
</p>
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following code, that loads the transition matrix corresponding to the graph in Figure 3 and computes the PageRank of its nodes.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
# spider trap
spider = loadMatrix("./data/spider-trap.txt")
nice("spider", spider)
pr = SimplifiedPageRank(spider)
nice("Page Rank for spider", pr)


<p>
<font size="3">
As you can see, the spider trap "drains" all the PR-scores of the other nodes and it becomes the most important node in the graph. 
Spider traps are usually created intentionally by spammers to fool PageRank and they might contain million of nodes.
</font>
</p>

<p>
<font size="3">
To mitigate these problems, an advanced version of PageRank has been proposed, that includes the possibility for a random surfer of _teleporting_ to a random page with a given probability.
More specifically, one iteration of PageRank is described by the following equation:
<p>
<center>
$\pmb{v^{(k+1)}} = \beta M \pmb{v^{(k)}} + (1-\beta)\frac{\pmb{e}}{n}\quad\quad (4)$ 
</center>
</p>
where:
<ul> 
<li> $0 \leq \beta \leq 1$ is the probability that a random surfer follows a link to go from one page to another.
<li> $1-\beta$ is the  probability that a random surfer jumps to a random page without following a link.
<li> $\frac{\pmb{e}}{n}$ is a vector with $n$ elements $1/n$.
</ul>
The value of $\beta$ is usually set to $0.8$, meaning that the random surfer is much more likely to visit pages by following links. Note that the simplified version of PageRank can be obtained with $\beta=1.0$.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Using the implementation of the simplified PageRank as a reference, complete the following code to implemented the advanced PageRank.
     Execute the code.**</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

beta = 0.8

def PageRankIteration(M, v, e):
    v_next = # COMPLETE HERE THE CODE (implementation of equation (4))
    
    if converge(v, v_next):
        return v_next
    else:
        return PageRankIteration(M, v_next, e)


def PageRank(M):
    
    # INITIALIZE VECTORS v AND e
    v = 
    e = 
    
    return PageRankIteration(M, v, e)

################## END MODIFICATIONs ##################

nice("spider", spider)
pr = PageRank(spider)
nice("Page Rank for Spider", pr)



<p>
<font size="3">
As you can see, $C$ still manages to get more than half of the PR-score in the graph. In general, a Web search engine implements variants of PageRank (e.g., _TrustRank_) in order to eliminate from the index Web pages that are likely to be generators of link spam.
</font>
</p>