
## The PageRank algorithm, motivation and update equations

The idea behind PageRank, as its very name states, is to have some criteria to order pages on a web based on their relationships on the network.

Computing PageRank is a fairly straightforward process, in a graph the update equation which is run until convergence (note that this concepts are loosely defined and will be precisely formulated later) for node $i$ is the following,

\begin{equation}
PR(i) = (1 - d) + d \cdot \left( \frac{PR(j)}{L(j)} + \frac{PR(k)}{L(k)} + \ldots \right)
\end{equation}

where $d$ is the damping factor and the sums are taken for all links from node $i$. The PageRank results are the long term staying times in all nodes in the network in an infinitely long random walk.

#### Random Walk on a Discrete time Markov Chain

A Discrete Time Markov Chain (DTMC) is a mathematical model for a sequence of random variables $X_0, X_1, X_2, \ldots$, where each $X_n$ represents the state of a system at discrete time step $n$. The system exhibits the \textbf{Markov property}, meaning that the future state $X_{n+1}$ depends only on the current state $X_n$ and not on the sequence of states that preceded it. This is formally expressed as:

\begin{equation}
    P(X_{n+1} = j \mid X_n = i, X_{n-1} = i_{n-1}, \ldots, X_0 = i_0) = P(X_{n+1} = j \mid X_n = i)
\end{equation}

The key defining characteristic of a DTMC is its transition probabilities. Let $(p_{ij} = P(X_{n+1} = j \mid X_n = i))$ denote the probability of transitioning from state $i$ to state $j$ in one time step. These probabilities are often arranged in a transition probability matrix \(P\) where $P_{ij} = p_{ij}$. The rows of $P$ sum to 1, reflecting the fact that the system must move to some state in the next time step.

The Markov chain is said to be \textbf{ergodic} if, under certain conditions, it possesses a unique stationary distribution. The stationary distribution is a probability distribution that remains unchanged by the transition probabilities. Formally, a distribution $\pi$ is stationary if:

\begin{equation}
    \pi_j = \sum_{i} \pi_i \cdot p_{ij} \quad \text{for all } j
\end{equation}

Ergodicity implies that, as $n$ approaches infinity, the probability of being in a particular state becomes independent of the initial state. In other words, the system forgets its initial condition and converges to a steady-state distribution $\pi$.

Ergodicity is a key property for the convergence of pageRank as it ensures that the results would be the same regardless of the starting point of the random walk.

In the context of PageRank, a Markov chain is constructed based on the hyperlink structure of a graph, and the PageRank values represent the steady-state probabilities of being on each page after an infinitely long random walk.

This random walk idea is captured in the random surfer model.


#### Computation and practical considerations

To compute the PageRank we need the stationary distribution of the DTMC, and that in turn will be computed by iterative multiplication  of the transition matrix and the chain disribution vector.

Consider a Markov chain with transition probability matrix $P$ and an initial probability distribution $\pi_0$. The goal is to estimate the stationary distribution $\pi$.

Initialize the iteration with the initial distribution:
$$
\pi_1 =  P \pi_0
$$
with $\pi_0$ being a one dimensional row vector. Iterate the following until convergence is reached:

$$
\pi_{k+1} =  P \pi_k
$$

Normalize the resulting vector to ensure it remains a valid probability distribution:

$$
\pi_{k+1} = \frac{\pi_{k+1}}{\|\pi_{k+1}\|_1}
$$


Now this approach suffices for the purposes of this final project however, due to its very definition, ergodicity, which is a necessary condition for convergence, implies that 1 belongs to the spectra of the transition matrix, and because we know that the stationary distribution $\pi$ is such that

$$
 \pi P = \pi
$$

we can determine $\pi$ as the eigenvector corresponding to the eigenvalue 1, via power iteration for example.

Now in order to compute the PageRank, we need to build the transition matrix, which will be stored under a Sparse Matrix data type for efficiency purposes, the initial distribution is assumed to be uniform, however this is not the key aspect as it really does not matter on which state the random walk starts in.


As we are working in a probability space $ S = (\Omega, \mathcal{F}, P) $, with $\mathbb{P}(\Omega) = 1$, which means that the norm of any distribution $\pi_k$ has to be 1 $\forall k $.

#### The Damping factor

The random walk may get stuck in a node with no outgoing links, which obviously would stop it, thus the damping factor is introduced to help avoid this, there is a random chance that it goes to any node at any particular time. This probability will be known as the **damping factor** $\beta$. This in turn makes the chain ergodic as it assures that the period is 1 and that there is only one communication class.


#### Convergence

Convergence is ensured due to ergodicity, which we have not proved, it would amount to simply computing the number of communicating classes and checking if it's 1, we could do this by simply counting the number of strongly connected components with a NetworkX function, and checking the periods of all states, they must be 1, with a built-in function as well.

The only consideration here is time, which will be proportionate to the size of the data.

### Implementation

First the necessary imports, numpy and pandas are very important, typing to not mix up the different data types and have a clear view of the pipeline is very important as well.

In [None]:
# The PySpark-specific ones
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Used for creating indeces
from pyspark.sql.window import Window

# PySpark types: Both for building the data structures and typing functionallity
from pyspark.mllib.linalg.distributed import (CoordinateMatrix, MatrixEntry,
                                              DenseMatrix, IndexedRowMatrix, IndexedRow)

# The general Python ones
import pandas as pd
import re
import numpy as np
from typing import Union, List


Since the dataset is too large, it is advised to sample, in an unbiased manner. For easier data handling we create a different field with sample indices, so as to avoid outofbounds errors.

In [None]:

sample_frac = 0.5#%
sample_seed = 1 # for reproducibility

# dataset source
data_src = "dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet"

# Add a new index over the window of the dataframe
w = Window.orderBy("id")

# Create our sampled Dataset
wikipediaDF = (spark.read.parquet(data_src) # load the data
              .sample(fraction=sample_frac/100, seed=sample_seed).withColumn("db_id", row_number().over(w) - 1).cache()) # cache this dataset, as all data transformations will be done throught
                        # this dataset
N = wikipediaDF.count()


Our data has a number of attributes, however we are only interested in 'title', 'id' (index), 'db_id' (sample index) and 'text', which we will eventually parse to look for links

#### Creating the transition matrix $M$

For extracting the links a suitable regular expression capable of extracting the links accounting for duplicates and headings is used.


In [None]:
def parse_links(document_body,regexp: str = r"\[\[(.*?)(?:\#.*?)?(?:\|.*?)?\]\]"):
    data = re.findall(regexp, document_body)
    return list(map(lambda s: s.lower(), data))


Now that we have defined our link parser function, we can transform it into a `udf` in order to use it inside `pyspark.sql`:

In [None]:
# This function is registered as a UDF - user-defined function in order to be used inside of
# a `select()` statement
parse_links_udf = udf(parse_links, ArrayType(StringType(), False))

#### Creating links
To build the document-link pairs we perform distributed joins, in a scalable manner. With a join scheme we can build a dataframe where each row contains the start and end of a link (based on sample ids)

In [None]:
# to identify articles by sample id
id_link_join_lookupDF = (wikipediaDF.select(
    "db_id", # first column will be the sample id of each page
    explode(parse_links_udf("text").alias("links"))) # second column will be the link in 
    .cache())                                        # text form the page contains

# map link titles to id's without using an udf
id_title_lookup_DF = (wikipediaDF.select("db_id", lower("title").alias("title"))
                      .withColumnRenamed("db_id", "outgoing_id"))


# cleaned cached df to store doc-link pairs 
docLinkPairs = (id_title_lookup_DF.join( # Perform a join of the two latter dataframes.
                        id_link_join_lookupDF,
                        id_title_lookup_DF.title == id_link_join_lookupDF.col, #join on title match
                        "left").na.drop().select("db_id", "outgoing_id").distinct().where("db_id != outgoing_id") # remove documents pointing to themselves.
                .cache())

display(docLinkPairs)

db_id,outgoing_id
28940,0
28908,0
28780,0
28754,0
28420,0
28382,0
26917,0
26911,0
24858,0
18152,0


#### Creation of the transition matrix $M$

Due to the nature of the data, a sparse matrix is the most efficient way to store the transition matrix. Two different data types are used for this very purpose, `CoordinateMatrix` to build it, and `IndexedRowMatrix` to actually be able to multiply it.

In [None]:
# Useful for prob injection
link_countDF = (docLinkPairs.groupBy("db_id") # group all the id ($i$) occurances
                .count() # count how many links belong to each id ($N_i$)
                .withColumnRenamed("count", "link_count"))
# Display the dataframe
display(link_countDF)

db_id,link_count
28940,1
28908,1
28780,1
28754,1
28420,1
28382,1
26917,1
26911,1
24858,1
18152,1


In [None]:
def getTransitionMatrix(docLinkPairs=docLinkPairs, 
                        link_countDF=link_countDF, N=N, 
                        transpose=True) -> CoordinateMatrix:  
    # Join the table with the information of how many outgoing links ($N_i$)
    # each document has. This will produce an Rdd containing coordinate tuples
    # with the form (i, j, probability of transition from i to j)
    docLinkPairsCounts = (docLinkPairs.join(link_countDF, "db_id", "leftouter"))
    coordinates = (docLinkPairsCounts.select(
                            "db_id", "outgoing_id", 
                           (1/docLinkPairsCounts.link_count).alias("probability"))
                   .rdd)
    
    # Convert the coordinate tuples into a Coordinate matrix
    matrix = CoordinateMatrix(coordinates.map(lambda coords: MatrixEntry(*coords)), N, N)
    
    # Transpose the matrix if necessary, (which in our case it is), and transform it into
    # an IndexedRowMatrix to be able to multiply it.
    if transpose:
        return matrix.transpose().toIndexedRowMatrix()
    else:
        return matrix.toIndexedRowMatrix()
    

def L2norm(R: Union[DenseMatrix, np.ndarray]):
    if not isinstance(R, np.ndarray):
        R = R.toArray()
    return np.round(np.linalg.norm(R), 2)



#### Estimating the dominant eigenvector

Since the page rank probabilities are nothing more than the stationary distribution of the ergodic chain, which is assured by the damping factor we can simply use an algorithm to approximate the dominant eigenvector, which corresponds to that of eigenvalue 1. There are a plethora of methods we can use for this purpose using householder matrices or QR factorizations, we stick with power iteration as it is the simplest.

In [None]:
def irmtodense(matrix: IndexedRowMatrix) -> DenseMatrix:
    """
    Necessary for power iteration, to preserve data types across computations.
    """
    # Collect the rows to the driver
    rows = matrix.rows.collect()

    # Extract the number of rows and columns
    num_rows = matrix.numRows()
    num_cols = matrix.numCols()

    # Initialize a NumPy array to store the matrix data
    matrix_data = np.zeros((num_rows, num_cols))

    # Fill in the matrix data
    for row in rows:
        matrix_data[row.index, :] = row.vector.toArray()

    # Convert the NumPy array to a Spark DenseMatrix
    dense_matrix = DenseMatrix(num_rows, num_cols, matrix_data.flatten(), isTransposed=False)

    return dense_matrix

# to normalize dense matrix
def normd(matrix, norm_value):
    normalized_data = matrix.toArray() / norm_value
    normalized_matrix = DenseMatrix(matrix.numRows, matrix.numCols, normalized_data, isTransposed=False)
    return normalized_matrix

In [None]:
def powit(matrix, niter = 20):
    # Initialize a random DenseMatrix vector
    vector_size = matrix.numRows()

    # Initialize a random DenseMatrix vector
    v = DenseMatrix(vector_size, 1, np.random.rand(vector_size))

    for _ in range(niter):
        # Perform matrix-vector multiplication using IndexedRowMatrix
        Av = matrix.multiply(v)
        # Normalize the resulting vector, keeping in mind the datatypes
        if niter != 20:
            # L2 normalization of the vector only if it isn't the last iteration
            v = normd(v, L2norm(irmtodense(Av)))
    return v

In [None]:
M = getTransitionMatrix()
vals = powit(M)

In [None]:
def applyPageRankScaling(vals, rankCoeff=1_000_000):
    # scaling inspired by original paper
    return np.round(vals.toArray() * rankCoeff, 2)

Presenting results in a dataframe as requested by the project statement

In [None]:
# Display the results using a Pandas Dataframe (as specified in the project statement)
results_pdf = wikipediaDF.select("id", "title").toPandas()
results_pdf["PageRank"] = applyPageRankScaling(vals)
display(results_pdf)

### Conclusions

In this final homework we have implemented a pipeline to compute the page rank probabilities of a random and unbiased sample of pages from the wikipedia database. In our implementation we have relied heavily on mathematical theory of Discrete-Time Markov Chains, and on numerical methods. Because of ergodicity being assured by the damping factor introduced, we utilized the well known result of 1 being the dominant eigenvalue of the transition matrix. 
By definition we wanted the stationary distribution of the DTMC which is the dominant eigenvector of the ergodic chain, for which we had to resort to numerical approximation. Many methods exists for approximating it, some use different factorization techniques to obtain all eigenvectors and eigenvalues like the QR, but Occam's razzor suggests to use the simplest possible method in the scenario which is the power iteration.

In these concluding remakrs we want to introduce a small analysis on the convergence rate of power iteration and how further study should be done to get a better grasp of the performance on the subsample of wikipedia pages. Convergence is assured since the algebraic multiplicity of the eigenvalue 1, is in fact, 1, but we have not information regarding the convergence rate until this point. 

The power method is an iterative numerical technique used for approximating the dominant eigenvector of a matrix. The convergence speed of the power method can be analysed under certain assumptions.

Suppose matrix A is diagonalisable, with eigenvalues sorted in order of decreasing magnitude: |λ₁| > |λ₂| > · · ·. Additionally, let's assume we expand our initial vector x in the basis of the eigenvectors:

$$ x = c₁x₁ + c₂x₂ + \ldots $$

After n steps, the power method produces a scalar multiple of $(A^n x$, which can be expressed as a multiple of the eigenvectors:

$$\text{multiple of } A^n x = \text{multiple of } (\lambda₁^n c₁x₁ + \lambda₂^n c₂x₂ + \lambda₃^n c₃x₃ + \ldots) $$

The overall exponentially growing (or decaying) term $\lambda₁^n$ gets removed by the normalisation. Notably, the error terms like $x₂, x₃$, and others not proportional to $x₁$ decay as the ratios of their eigenvalues/λ₁ to the power of n.

For large n, the error is dominated by the $x₂$ term, representing the next-biggest $|λ|$, as it decays most slowly. The magnitude of this term decays proportionally to $ | \frac{\lambda₂}{\lambda₁} |^n $, which is the ratio of the magnitudes.

Thus studying the magnitude of the second largest eigenvalue would be interesting to determine the convergence rate for this particular matrix. Another option would be to get the lowest eigenvalue in the spectra of the transition matrix, to determine the condition number and see if the matrix is ill-conditioned for this types of numerical approximations. We can get it in many ways, but one of them would be to apply our existing power iteration method to the inverse of the transition matrix and then using the Rayleigh quotient to estimate the actual eigenvalue, once we obtain this we can get the condition number of the transition matrix and determine how ill-conditioned or well-conditioned it is.
