# Wikipedia Page Rank

PageRank is an algorithm that is used to determine the importance of a webpage or a set of webpages. It was developed by Larry Page and Sergey Brin, the founders of Google, as a way to rank webpages in the search engine's results. The basic idea behind PageRank is that a webpage is important if it is linked to by many other important webpages.

The algorithm works by treating each webpage as a node in a directed graph, with edges representing links between webpages. The importance of a webpage (its PageRank) is then computed based on the importance of the webpages that link to it. The algorithm uses a recursive formula to iteratively calculate the PageRank of each webpage, until a stable rank is reached.



The provided code is an implementation of the PageRank algorithm which is a method used to determine the importance or relevance of a page in a website based on the number and quality of links that point to it. The algorithm is typically used to rank web pages in search engine results.

## Part 0: Imports

In [None]:
import pandas as pd
import re

In [None]:
from pyspark.sql.types import *
from pyspark.sql.types import ArrayType, StringType,LongType
from pyspark.sql.functions import size, explode, collect_list 
from pyspark.sql.functions import col
from pyspark.sql.functions import *

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

## Part 1: Get the data 

- read a parquet file and create a DataFrame
- count the total number of rows in the DataFrame
- take a random sample of the DataFrame with a fraction of 0.001 and random seed 0, and cache it in memory

In [None]:
wikipediaDF=spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")

In [None]:
# Count the total rows of the DF
N=wikipediaDF.count()

In [None]:
PartialWikipediaDF=wikipediaDF.sample(fraction=0.0001,seed=0).cache()

In [None]:

#PartialWikipediaDF.count()

In [None]:
#display(PartialWikipediaDF)

## Part 2: Parse the link in the data frame
Using a udf(user defined function) parse the text field from each record

### (2a) define the function and the udf

In [None]:
# function which parses the text field from each record, and extracts the outgoing links.
def parse_links(document_body):
  # Find all strings that match the pattern [[...]]
  data = re.findall(r'\[\[(.+?)\]\]',document_body)
  
  # If there are any matches
  if (len(data) > 0):
    # convert all the matches to lowercase
    links = [s.lower() for s in data]
  else:
    # if there are no matches, return an empty list
    links = []
  return links


In [None]:
# define the udf for parsing a link
parse_links_udf = udf(parse_links,ArrayType(StringType()))

In [None]:
tolower_udf= udf(lambda x: x.lower())

## (2b) create a title dataframe
title_idDF is created by selecting the lowercase title and id fields from wikipediaDF. And it's been converted to pandas dataframe

In [None]:
title_idDF=wikipediaDF.select(lower("title").alias("title"),"id")


In [None]:
#convert the spark df to pandas
title_idPDF=title_idDF.toPandas()

## (2c) create a dataframe with the parsed link
TempForwardDF is created by selecting the title, id, and links (using parse_links_udf function on text field) fields from PartialWikipediaDF.

In [None]:
TempForwardDF=PartialWikipediaDF.select("title","id",parse_links_udf("text").alias("links"))

In [None]:
#display(TempForwardDF)

## Part 3: create a Forward data frame 
Create a forward df with the forwarding link of each node

## (3a)  convert titles of the link into their ids
Function called titles2id is defined, which takes in a list of links and a pandas dataframe with titles and ids, and returns a list of ids corresponding to the links that are present in the pandas dataframe. This function is then wrapped in a udf called titles2id_UDF, which is used to create a new DataFrame called ForwardDF.

In [None]:
# function that search and replace the links with the ids of the corresponding documents, to handle just numbers
def titles2id(links,titleidPDF):
  # df with titles and ids
  data_titles=titleidPDF
  # if row has outgoing links
  if (len(links)>0):
    # check if a title is in the links and append his id to a list 
    ids=data_titles[data_titles.title.isin(links)].id.to_list()
  else:
    ids=[]
  return list(set(ids))

In [None]:
titles2id_UDF=udf(lambda x: titles2id(x,title_idPDF),ArrayType(LongType(),False))

## (3b) Create the forward df

In [None]:
ForwardDF=TempForwardDF.select("id",titles2id_UDF("links").alias("links"),size("links").alias("counter")).cache()

In [None]:
#display(ForwardDF)

## Part 4: Create the reverse dataframe, and the page rank dataframe 
Create the reverse data frame that will be used lately to compute the page rank algorithm.

This dataframe will have an id(the id of the node), the links that are pointing to the node, and the counters of the links that point to the node.

## (4a) create the df for the outgoing links
For each link select how many links are outgoing from him

In [None]:
OutgoingsLinksCountersDF=ForwardDF.select("id","counter")

In [None]:
#display(OutgoingsLinksCountersDF)

## (4b) Create a temporal reverse link dataframe

The select() function is used to select specific columns from the "ForwardDF" DataFrame, and in this case, it's selecting the "id" column and the "counter" column. 

Then it's using the explode() function on the "links" column and creating a new column called "t_link". The explode() function will create a new row for each element in the "links" column, so if the "links" column contains a list of n elements, the resulting DataFrame will have n rows, each with a copy of the "id" and "counter" columns and a single element from the "links" column in the "t_link" column.

In [None]:
TemporalReverseLinks=ForwardDF.select("id",explode("links").alias("t_link"),"counter")

In [None]:
#display(TemporalReverseLinks)

## (4c) Create the reverse df
reverseDF is grouping the TemporalReverseLinks DataFrame by the "t_link" column, then using the groupBy method to perform two aggregation operations:

- Using collect_list() function to create a new column "Links" which contains a list of all the values of "id" column in the group.

- Using collect_list() function to create a new column "counters" which contains a list of all the values of "counter" column in the group.
  then it renaming the "t_link" column to "id" using withColumnRenamed() function.
  And it's caching the ReverseDF Dataframe for faster access in future operations.
  It's important to note that without the context of the rest of the program and the structure of the TemporalReverseLinks DataFrame, it's hard to say exactly what this line of code does and what the final DataFrame will look like.

In [None]:
ReverseDF=TemporalReverseLinks.groupBy("t_link").agg(collect_list ("id").alias("Links"),collect_list("counter").alias("counters")).withColumnRenamed("t_link","id").cache()

In [None]:
#display(ReverseDF)

In [None]:
ReverseDF.printSchema()


## (4d) Create the page rank df
Create a DataFrame with the id and Page Rank initialized with 0.85/N and convert it to pandas dataframe

In [None]:
# add column PR
pageRankDF=ReverseDF.select("id").withColumn("PR",lit(0.85/N))
pageRankPDF=pageRankDF.toPandas()


In [None]:
pageRankDF.printSchema()

In [None]:
#display(pageRankPDF)

## Part 5: Compute the algorithm

Finally, the PageRank algorithm is implemented by iterating over the ReverseDF, updating the Page Rank values of each page based on the links and counters, and checking for convergence of the algorithm.

## (5a) Define the function for calculating the new page rank
Define a function that calculates the new Page Rank for a given document

In [None]:

def new_pagerank(links, current_pr, counters):
  n_pr = 0;
  # for each incoming link to the page calculate the pagerank
  # zip the links and counters to get them both in the loop
  for l, c in zip(links, counters):
    
    
    # get current_pr of the link l
    try:
      current_link_pr=current_pr[current_pr['id']==l].PR.item()
      
    # this except is used in case that some links doesn't have a page rank
    # this can occurr when using a sample data frame
    except:
      current_link_pr=0.85/N
        
    # update the new Page Rank by adding the contribution of this link
    n_pr += current_link_pr/c
  new_pr = 0.85/N+0.15*n_pr
  return new_pr


## (5b) Define the function for checking if  is converged
Define a function that checks if the Page Rank has converged,checking if the relative error between two values (current and previous) is less than or equal to a certain threshold (0.00001).

In [None]:
def converged(current,previous):
    relative_error = abs(current - previous) / abs(previous)
    return relative_error <= 0.00001
    


## (5c) Compute the algorithm 
 It uses a while loop to iterate until the Page Rank has converged or the maximum number of iterations (20) is reached. 
 
 The loop starts by calculating the new Page Rank of each document using the new_pagerank function. 
 
 This function takes in the links and counters of a document and the current Page Rank DataFrame, then calculates the new Page Rank by adding the contribution of each incoming link.
 
 After that, it checks for convergence by comparing the new and previous Page Rank DataFrames. If all the documents have converged, the loop is stopped. 
 
 Finally, the final Page Rank DataFrame is displayed.

In [None]:
# create a UDF that applies the new_pagerank function to the DataFrame
new_pagerank_udf = udf(lambda x,y: new_pagerank(x,pageRankPDF,y), DoubleType())  


In [None]:
# create a DataFrame to store the previous Page Rank(used for checking if is converged or not)
PreviousPageRankDF=pageRankDF
count=0;
flag=True
# iterate until the Page Rank has converged or the maximum number of iterations is reached
while ((flag==True) & (count<20)):
    # calculate the new Page Rank
    NewPageRankDF=ReverseDF.select(
        ReverseDF["id"],
        new_pagerank_udf(ReverseDF["links"],ReverseDF["counters"]).alias("PR"))
    pageRankPDF=NewPageRankDF.toPandas()
    
    # join the new and previous Page Rank DataFrames to check for convergence
    checkConvergenceDF=NewPageRankDF.withColumnRenamed("PR","New_PR").join(PreviousPageRankDF,NewPageRankDF["id"] == PreviousPageRankDF["id"])
    
    # check if the Page Rank has converged
    checkConvergenceDF=checkConvergenceDF.withColumn("is_converged",converged(checkConvergenceDF["New_PR"],checkConvergenceDF["PR"]))
    # exit condition if all the rows are converged
    if checkConvergenceDF.filter(col("is_converged")).count() == checkConvergenceDF.count():
        flag=False

    # update the udf with the new pagerank pdf
    new_pagerank_udf = udf(lambda x,y: new_pagerank(x,pageRankPDF,y), DoubleType())
    display(pageRankPDF)
    
    PreviousPageRankDF=NewPageRankDF
    count=count+1
    


In [None]:
# display the final Page Rank DataFrame
display(pageRankPDF)

## Analysis

### Sample dataframe problem
Using a Sample dataframe the PageRank algorithm is converging after just a few iterations because some of the nodes that are pointed to by nodes in the sample data frame are not present in the sample data frame.

These missing nodes would not have a PageRank value assigned to them, which could cause the algorithm to converge faster.

This is because the PageRank algorithm relies on the links between pages to propagate the rank scores, and if some of these links are missing, the algorithm can't continue to propagate the rank scores and thus converge faster.

A solution is use a try except construct to try to search the page rank of that link, and if not present assign a default value(0.85/N in this case).
Using this solution will give you good estimates of the page rank, but still not the most accurate because it will converge really fast(2/3 iteration for 0.0001 sample and 4/5 iteration for 0.001).

Another alternative is to remove the missing nodes from the dataset and run the algorithm again to see if the result is more accurate.

To get a more accurate estimate of the page rank values, you would need to run the algorithm on the full dataset, rather than just a sample. This will ensure that all the links between the pages are present and the algorithm can propagate the rank scores correctly. However, it may require significant computational resources depending on the size of the dataset.

### Damping factor
The damping factor is a probability value that is used in the PageRank algorithm to handle the problem of web pages that form a directed cycle, leading to a situation where the algorithm would never converge.

The damping factor is used to introduce a small probability that, at each step, the random surfer will "teleport" to a random webpage rather than following a link. This means that even if a webpage is in a cycle of pages that all link to each other, there is still a small chance that the random surfer will leave the cycle and explore other pages.

The damping factor is typically set to a value between 0 and 1, with a common value being 0.85(as in this case). This means that at each step, there is a 15% chance that the random surfer will "teleport" to a random webpage.

A damping factor of 0 means that the random surfer never teleports to other pages and will eventually get stuck in a loop. A damping factor of 1 means that the random surfer never follows links and only teleports to other pages. 

It is important to note that the damping factor value chosen will affect the result of the algorithm, and a different value might lead to different ranking of pages.

It's also worth noting that the PageRank algorithm is susceptible to manipulation and can be gamed by spammers. Additionally, like all algorithm, the results of PageRank algorithm can be affected by the data bias and damping factor, because spammers often use link farms (a group of web pages that all link to each other) to increase the importance of their web pages, these pages will tend to have artificially high PageRank scores.

### Convergence

Convergence in the context of the PageRank algorithm refers to the point at which the algorithm has iterated enough times that the page rank scores of the nodes in the network have stabilized and are not changing significantly with each iteration. 

This can be determined by comparing the page rank scores of the nodes between consecutive iterations and checking if they have reached a certain level of similarity. 

Once the algorithm has converged, the final page rank scores can be considered to be a good approximation of the true page rank of the nodes in the network.

To check if is converged we use a relative error, a measure of how much the PageRank values have changed between iterations. It is calculated as the absolute difference between the current PageRank values and the previous PageRank values, divided by the absolute value of the previous PageRank values.

### Conclusions
In conclusion, one of the main advantages of the PageRank algorithm is that it can be used to identify authoritative or important webpages in a set of web pages. 

This is because the algorithm takes into account not only the number of inbound links to a webpage but also the quality of those links, as the links from more authoritative web pages will pass on more importance to the linked web page.

It's important to note that PageRank algorithm is a computational intensive algorithm, it's not suitable for large graphs and requires significant computational resources to run efficiently.

In fact with limited resources, like we have using databricks community , the algorithm with the full wikipedia dataset is too slow to run.