# FINAL PRACTICAL WORK 2022/23
## IMPLEMENT BRIN AND PAGE’S PAGERANK ALGORITHM 

### Gracia Estrán Buyo 
#### 100452014

### Marta Almagro Fuello 
#### 100451979

##### First, we import all the libraries that are going to be usefull during this assigment:

In [0]:
import pandas as pd
import numpy as np
import re

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

##### Create a Spark DataFrame from those parquet files with the command:

In [0]:
#%fs ls dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia
#%fs ls dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet

wikipediaDF = spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")

##### It is going to be useful for later to know the total of pages we are going to work with

In [0]:
N = wikipediaDF.count() #5823210

##### Instead to use the full database, it is recommended to use a smaller version to analyse the structure, with 0.01% of records (approx. 582 records):

In [0]:
PartialWikipediaDF = wikipediaDF.sample(fraction = 0.001, seed = 123).cache()
M = PartialWikipediaDF.count() #71
#display(PartialWikipediaDF)

##### The parse_links function was already created and it is to locate the links in the text.

In [0]:
def parse_links(document_body):
    data = re.findall(r'\[\[(.+?)\]\]', document_body)
    if (len(data)>0):
        links = [s.lower() for s in data]
    else:
        links = []
    return links

##### As the functions are no callable from the Spark Database we define the User Defined Function, this will be done for every function we create from here.

In [0]:
parse_links_udf = udf(parse_links, ArrayType(StringType()))

In [0]:
tolower_udf= udf(lambda x: x.lower()) 

##### Create the dataset with the important information (“title”, “id” and “text”) which we are going to use to create the forward and reverse matrix:

In [0]:
data = PartialWikipediaDF.select(tolower_udf(PartialWikipediaDF["title"]).alias("title"),
                                 PartialWikipediaDF["id"],
                                 parse_links_udf(PartialWikipediaDF["text"]).alias("links")).cache()
#display(data)

##### This variable is created to help us get the forward matrix and in the final table to provide the title next to the id with its pagerank:

In [0]:
title_idDF = wikipediaDF.select(tolower_udf(wikipediaDF["title"]).alias("title"), wikipediaDF["id"])
title_idPDF = title_idDF.toPandas()
#display(title_idDF)

##### Also there are going to be many variables that are made broadcast so we dont have to give them as an argument and we can access to them whenever we need it.

In [0]:
broadcast_title_idPDF = sc.broadcast(title_idPDF)

In [0]:
def titles2id(links):
  data_titles = broadcast_title_idPDF.value
  if (len(links)>0):
    ids = data_titles[data_titles.title.isin(links)].id.to_list()
  else:
    ids = []
  return list(set(ids))

In [0]:
titles2id_UDF = udf(titles2id, ArrayType(LongType()))

##### A table with the id of each web and all the ids of the links that go out of it (ForwardDF):

In [0]:
ForwardDF = data.select(data["id"], titles2id_UDF(data["links"]).alias("links")).cache()
#display(ForwardDF)

##### We also are going to get the number of links that go out of each document, will be stored in the OutgoingsLinksCounter matrix:

In [0]:
def count_links(links):
  return len(links)

count_links_udf = udf(count_links, LongType())

In [0]:
OutgoingsLinksCountersDF = ForwardDF.select(ForwardDF["id"], count_links_udf(ForwardDF["links"]).alias("count"))
OutgoingsLinksCountersPDF = OutgoingsLinksCountersDF.toPandas()
#display(OutgoingsLinksCountersDF)

##### We will make it broadcast again as before:

In [0]:
OutgoingsLinksCounters_BV = sc.broadcast(OutgoingsLinksCountersPDF)

##### In order to get the ReverseDF first for each id we return a new row for each element in the given array of links (TemporalReverseLink):

In [0]:
TemporalReverseLinks = ForwardDF.select("id", explode("links").alias("links"))
ReverseDF = TemporalReverseLinks.groupBy("id").agg(collect_list ("id").alias("links")).cache()
#display(ReverseDF)

##### The PageRank algorithm says that anyone who is on the Internet and is clicking random pages, will stop clicking because a page will not direct to any other. The probability for this person to continue clicking is the damping factor which is usually set at 0.85.

##### To do our current page rank data frame, we can first of all select the first column of ReverseDF, that has the links that we will use. Then we create N, that is the number of links that we have. And by doing so, we can add our other column, the page rank, which is the dumping factor divided by N. It will obviously be the same for each link. 
##### Finally, we create a broadcast variable with N that will be used in the future.

In [0]:
pageRankDF = ReverseDF.select(ReverseDF["id"])
pageRankPDF = pageRankDF.toPandas()
N = pageRankDF.count()

total_N = N*np.ones(N)

pageRankPDF["PR"] = pd.Series(0.85/total_N)

N = sc.broadcast(N)
display(pageRankPDF)

##### Now that we have the current page rank of each id, which is the same because we have not execute the function yet, we can start coding the page rank function. It will compute it by calculating how many pages can take a link. To accomplishh that we created a loop that goes through each link in links and calculates how many pages can take that link.

##### We check if each link from the list od ids appears on the current page rank.  We check if there exist those ids in our current_page_rank and if we have more than one or one page linked to that id we set it as the pagerank of the current page and if it does not have any link with that id we set it to 0. We do the same with the counter.

##### Where "current_pr" is used to find the value of our page rank and "counter" to count, so we can express "new_pr" as the pagerank from before plus the current_pr/counter. After adding all the values of each links to new_pr, we apply the formula with the damping factor 0.85 and N values.

##### It will also return the difference between the rank of the current_id and the new_pr that we have calculated in order to check if the substraction is more or less than the tolerance number chosen.

##### Now we can perform the operation that defines the page rank algorithm.

In [0]:
def new_pagerank(list_of_ids, current_id, current_page_rank):   
  new_pr = 0                                            
  counts = OutgoingsLinksCounters_BV.value   
  
  # Go through every id in the list
  for i in list_of_ids: 
    
    # Get current_pr:
    current_pr = current_page_rank[current_page_rank["id"] == i]     
    
    if len(current_pr)>0:                                            
      current_pr = current_pr.iloc[0][1]                                     
    else:                                                     
      current_pr = 0      
      
    # Get counter:
    counter = counts[counts["id"] == i]     
    
    if len(counter)>0:                                           
      counter = counter.iloc[0][1] 
      
      # Get the pagerank:
      new_pr += current_pr/counter   
  
  # Get value of the pagerank:
  new_pr = (1-0.85)/N.value + 0.85 * new_pr 
  
  # Get the difference:
  difference = abs(new_pr - current_page_rank[current_page_rank["id"] == current_id].iloc[0]["PR"])   
  
  return [float(new_pr), float(difference)]    

##### The next step is a while loop to iterate and find the page rank of each link, which will stop if the absolute value of the difference (which is the substraction of the old pagerank minus the new) is more than the tolerance that we chose (0.000001) or if it reaches a maximum of iterations, 20.

##### To do so, we have to change the function into a UDF, create the demanded final dataframe with the id and the page rank value, and then transform it to a pandas data frame.

##### Finally we update the number of iterations that have been done, and the new pagerank is now the current.

In [0]:
max_it = 20             
tolerance = 0.0000001  
count = 0
stop = True

while count < max_it and stop == True:                                                               
  stop = False 
  
  # Create the user define function for new page rank:
  new_pagerank_udf = udf(lambda l, i: new_pagerank(l, i, PageRankPDF), ArrayType(FloatType()))   
  
  # We store the difference and pagerank columns in the variable solutions:
  Solutions = ReverseDF.select(ReverseDF["id"], new_pagerank_udf(ReverseDF["links"], ReverseDF["id"]).alias("PR"))  
  
  # Create the new page rank dataframe with the id, pagerank and difference:
  NewPageRankDF = Solutions.select("id", Solutions.PR[0].alias("PR"), Solutions.PR[1].alias("difference"))
  display(NewPageRankDF)
  # Transform to pandas:
  PageRankPDF = NewPageRankDF.toPandas()                                                       
  
  # Chech tolerance:
  for i in PageRankPDF["difference"]:       
    if i >= tolerance:                
      stop = True    
      
  # Update:    
  count += 1
  print("Step:", count)                

##### Since we also want the titles of each link, we perform a final piece of code to do so. With that, we have our final data frame that only has to be converted to Pandas.

In [0]:
FinalPageRankPDF = PageRankPDF.filter(["id", "PR"]).copy()
ids_1 = FinalPageRankPDF.id.to_list()
titles = []

for j in ids_1:
    bool1 = list(j==FinalPageRankPDF['id'])
    l = 0
    n = 0
    for i in bool1:
        if i == True:
            l = n
            h = title_idPDF.iloc[l]['title']
            titles.append(h)
        n+=1

FinalPageRankPDF['title'] = pd.Series(titles)
display(FinalPageRankPDF)

## Conclusion


##### We could observe that with a larger tolerance and more iterations this project would take much more time that what is already taking, so we started doing it with less iterations where every execution were faster but not as effective than with 20 iterations.

##### This has been a challenging project but after many hours and the explanations given in class we have been able to modify the code given, so we worked more confortable with our own functions. We investigated more about the PageRank Algorithm and we actually were able to understand how other websites use this algorithm.

##### Also at the beginning we werent sure about the User Defined Functions and why the python functions worked like that but after this project we got it pretty good and gained confident for the next time we have to use Spark, which in my case I believe is going to be soon as I am going to start an internship where the knowledge of Apache Spark was valuable.