# Final Practical Work

**Coded by: Ander Lopez Galarraga and Marina Rodriguez Hernandez (100429893)**

# Steps to implement **Page Rank Algorithm**
* Check Data Raw structure
* Extract relevant data: [Document ID, List of links]
* Transform *List of links* to *List od Docuemts ID*: *Forward Links Table*
* Calculate *Number of output links*
* Construct *Reverse Links Table* from *Forward Links Table*
* Initialize *Page Rank Table*
* Recalculate *Page Rank Table* until:
  * All the *Page Rank* values are stable
  * Reach number of iterations (sugested value: 20 iterations)

# A) Library configuration

In [0]:
import pandas as pd
import re
import math as m


In [0]:
from pyspark.sql.types import *
from pyspark.sql.types import ArrayType, StringType,LongType
from pyspark.sql import functions as F

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# 1) Check Data Raw Structure
We will uses Databrics Wikipedia dataset, which contains 2012 Wikipedia Database in english

In [0]:
%fs ls dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia

path,name,size
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/,articles-only-parquet/,0
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/perline-xml/,perline-xml/,0
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/raw-compressed/,raw-compressed/,0
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/raw-uncompressed/,raw-uncompressed/,0


In [0]:
%fs ls dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet

path,name,size
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/_SUCCESS,_SUCCESS,0
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/_common_metadata,_common_metadata,779
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/_metadata,_metadata,15986429
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00000-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00000-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,10085419
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00001-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00001-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,15346826
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00002-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00002-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,15762593
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00003-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00003-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,8977575
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00004-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00004-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,15739954
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00005-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00005-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,9298525
dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet/part-r-00006-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,part-r-00006-ff5bbd4d-6518-4e17-8ff5-c8fd3d4a5b86.gz.parquet,5532126


Here we defines the wikipediaDF Spark Dataframe, with the full database content:
We need to know the total number of documents in this database.

In [0]:
wikipediaDF=spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")

In [0]:
wikipediaDF.count() # This would be the total pages that we could work with

To program the PageRank Algorithm, we need to extract a subset of the full database. We will select 0.00001 of the full database, and to avoid randomness behaviour, we se.t to a fixed seed value=0.

**Note:** For the final evaluation, should change the fraction to 0.001

In [0]:
PartialWikipediaDF=wikipediaDF.sample(fraction=0.001,seed=0).cache() # But in this practice, we are going to use the 0.001 of it to make faster the calculations

In [0]:
PartialWikipediaDF.count() # Our database would be of 5852 pages

Now, we can check the data structure:

In [0]:
display(PartialWikipediaDF)

## Conclusions with the data raw analysis:
* There are several columns, but the relevant information is stored in the follow columns:
  * **title**: The title of the document.
  * **id**: Id of the document
  * **text**: The content of the document. The most relevant information here (for the page rank algorithm) are the links to other documents. The link is enclosed in brackets, and contains the title of the document.

# 2) Extract Relevant Data
From the previous conclusions, we knows we need to select just three columns [*title*,*id*,*text*], and the relevant information from the *text* column are the links, identified by the titles enclosed in brackets.

Here we need to use regular expressions to select the relevant information from the *text* column.

Here we will implement a *parse_links* function, who receive a string and return a list of strings with the titles of the pointed documents.

This is a Python function, so is not direct callable from the Spark Dataframe, so, we need to define also the User Defined Function, to be usable in Spark Dataframes.

In [0]:
def parse_links(document_body):
  data = re.findall(r'\[\[(.+?)\]\]', document_body) # initial regular expression that finds the candidate links
  
  if (len(data)>0): # if there is at least one  link in document_body
    links = [] #  list with all the links, if there are any
    words = ["wikipedia:","video:","category:","wkt:","w:","template:"] #words that do not contain links
    
    for s in data: # iterate through all the candidate links found in data
      s = s.lower() # change to lower case
      
      for word in words: #iterate through the different words that do not contain links
        if word not in s: #if this word is not in the candidate links
          
          #Type 1  [[file: [[link]] ]] 
          if "file:" in s and "[[" in s:
            links.append(s.split("[[")[-1])

          #Type 2  [[image: [[link]] ]] 
          elif "image:" in s and "[[" in s:
            links.append(s.split("[[")[-1])

          #Type 3  [[ link1|link2 ]] 
          elif "|" in s and "file:" not in s and "image:" not in s:
            links.extend(s.split("|"))

          #Type 4  [[link]] 
          elif "file:" not in s and "image:" not in s: 
            links.append(s)
  
  else:
    links = [] 
    
  return links

In [0]:
parse_links_udf = udf(parse_links,ArrayType(StringType()))

It is necesarly convert the text to lowercase (both: *title* and *text* columns)

In [0]:
tolower_udf= udf(lambda x: x.lower())

Now, we create parseDF with the selected information, renaming the result tables to "title", and "links"

In [0]:
parsedDF = PartialWikipediaDF.select(tolower_udf("title").alias("title"),"id",parse_links_udf("text").alias("links"))

In [0]:
display(parsedDF)

# 3) Transform *List of links* to *List od Docuemts ID*: *Forward Links Table*

To get the *id* of the target documents, we need analyse the full Wikipedia Database and extract a table with this two information.

This information is static, and used in distributed way. So, we will collect the data and convert to a Pandas Dataframe (PDF suffix).

In [0]:
titleidDF=wikipediaDF.select("id",tolower_udf("title").alias("title"));

In [0]:
titleidPDF=titleidDF.toPandas()

In [0]:
def text_links_2idx(links , title_idxPDF):
  if ( len(links)>0):
    # This command looks in the title column if the elements in the list links exists, and if it exists
    # gets his id value. The result is converted to a list.
    result = title_idxPDF[title_idxPDF.title.isin(links)].id.to_list()
  else:
    result = [] 
  return result

In [0]:
broadcast_title_idPDF = sc.broadcast(titleidPDF)

In [0]:
def text_links_2idx_2(links):
  title_idxPDF = broadcast_title_idPDF.value
  if ( len(links)>0):
    # This command looks in the title column if the elements in the list links exists, and if it exists
    # gets his id value. The result is converted to a list.
    result = title_idxPDF[title_idxPDF.title.isin(links)].id.to_list()
  else:
    result = [] 
  return result

We define a test_list, to check the the text_links_2idx function.

In [0]:
test_list=["shenzhou (spacecraft)|shenzhou", "long march 2f|chang zheng 2f", "jiuquan satellite launch center|jiuquan", "jiuquan launch area 4|la-4/sls-1", "geocentric orbit|geocentric", "low earth orbit|low earth", "yang liwei", "shenzhou 4", "shenzhou 6", "shenzhou program|shenzhou", "human spaceflight", "chinese space program", "shenzhou spacecraft", "long march 2f", "soviet union", "russia", "united states", "file:yang liwei.jpg|thumb|left|[[yang liwei", "yang liwei", "chinese nationalism", "united nations", "people's daily", "spacedaily", "political status of taiwan|taiwan", "people's daily", "general secretary of the communist party of china|general secretary", "president of the people's republic of china|president", "hu jintao", "great hall of the people", "sina.com", "central committee of the communist party of china|cpc central committee", "state council of china|state council", "central military commission", "sina.com", "prime minister of japan|prime minister", "junichiro koizumi", "cctv.com", "george w. bush", "united states department of state|u.s. state department", "sean o'keefe", "dprk", "kwangmyŏngsŏng-1", "yang liwei", "chinese space program", "tiangong program", "shenzhou spacecraft", "long march rocket", "jiuquan satellite launch center", "shenzhou 4", "shenzhou 6", "category:human spaceflights|shenzhou 05", "category:shenzhou program|shenzhou 05", "category:spacecraft launched in 2003", "category:2003 in china"]


In [0]:
idx_list=text_links_2idx(test_list,titleidPDF)

In [0]:
# idx_list

In [0]:
udf_text_links_2idx =udf(text_links_2idx_2,ArrayType(LongType()))

In [0]:
ForwardDF = parsedDF.select("id",udf_text_links_2idx("links").alias("links")).cache()

In [0]:
display(ForwardDF)

Once verified the the function, we need to define the UDF function, to invoke it from the **parsedDF** to select just [id, list of ids].

To be efficient, we can broadcast the variable *titleidPDF*, to call it in the transformation.

We will call this Dataframe as: **ForwardDF**

# 4) Calculate *Number of output links*
Using the **ForwardF**, we need to calculate the number of output links per document. 
Because we will need this information to calculate the PageRank we will collect this information in a Pandas Dataframe, and define a Brodcast variable.

In [0]:
def count_links(list_of_links):
  return len(list_of_links)

In [0]:
udf_count_links=udf(count_links,LongType())

In [0]:
count_linksDF=ForwardDF.select("id",udf_count_links("links").alias("count"))

**Note:** Instead use user defined function udf_count_links, check the pyspark.sql.functions library and use *size* function, which is more efficient!

In [0]:
display(count_linksDF)

In [0]:
count_linksPDF=count_linksDF.toPandas()

# 5) Construct *Reverse Links Table* from *Forward Links Table*
Now, we will define the Reverse Links Table Dataframe (**ReverseDF**), transforming the **ForwardDF** to a Dataframe with [*id*,*list of ids*] or similar.

*Suggestion*: Maybe the *list of ids* could contains not only the id of the target document, also the number of output links. This will improve the Page Rank calcule.

In [0]:
def reverseId(id,links):
  if (len(links)>0):
    reverse = [ (tgt_id,id) for tgt_id in links ]
  else:
    reverse=[]
  return reverse

In [0]:
ForwardRDD = ForwardDF.rdd

In [0]:
ReverseRDD=( ForwardRDD
            .flatMap(lambda r: reverseId(r.id,r.links))
            .groupByKey()
            .map(lambda r: (r[0],list(r[1])))
            )

In [0]:
reverseDF=spark.createDataFrame(ReverseRDD,["id","links"])

In [0]:
display(reverseDF)

# 6) Initialize *Page Rank Table*
We define a Pandas DataFrame (**PageRankPDF**) with the ids of documents in the **ReverseDF**, with the initial value. 

This could be:

\\(\frac{0.85}{N}\\)

where *N* is the number of documents in the **ReverseDF**.

In [0]:
damping_factor = 0.85              # Damping factor that define the probability of a user that stops searching although is not an ending node
max_interations = 20               # The maximum iterations that we will calculate our algorithm in the case that is not stable
threshold = 0.0000001              # The value that defines if the difference between the value calculated in the previous iteration is enough bigger to be consider not stable
N = reverseDF.count()              # The number of documents that we have

In [0]:
data = list()
for index in reverseDF.collect():
  data.append(index["id"])                         
PageRankPDF = pd.DataFrame(data,columns=["id"])    # We create a pandas data frame with 2 columns the ids and the rank of each one
PageRankPDF["rank"] = damping_factor/N             # For the first case we will have the initial value which is 0.85/N

display(PageRankPDF)

id,rank
22936,1.0450091591979249e-05
6368400,1.0450091591979249e-05
31108048,1.0450091591979249e-05
175924,1.0450091591979249e-05
25575836,1.0450091591979249e-05
3350364,1.0450091591979249e-05
10929004,1.0450091591979249e-05
266204,1.0450091591979249e-05
925736,1.0450091591979249e-05
26840,1.0450091591979249e-05


# 7) Recalculate *Page Rank Table* until:
  * All the *Page Rank* values are stable
  * Reach number of iterations (sugested value: 20 iterations)

Should define a PageRank function, who receives the *id* of the document, *list of links*, and current PageRank table (**PageRankPDF**)*, and returns the new PageRank of the document *id* 
The loop must use the **ReverseDF** as master of information.

Now, we define a loop:
* While the conditions of exit are false:
  * Calculate the new page ranks, invoking the PageRank function from the **ReverseDF**, creating a new Dataframe NewPageRankDF with the [id,new_pagerank] info. (Could contains more information, if you use the suggestion in section 5) )
  * Collect the new page ranks and compares it with the previous **PageRankPDF**, checking if the new Page Ranks vary more than a threshold. If not, the exit condition is complied.
  * Update the **PageRankPDF** with the new values,and update the UDF functions.

In [0]:
broadcast_count_linksPDF = sc.broadcast(count_linksPDF)        # As we want to use in our function this count_links we need to create it as a broadcast

def new_pagerank(list_of_ids,current_id,current_page_rank):    # We receive the list_of_ids that are the links that refers to the current_id and the current_page_rank is our Page Rank data frame updated
  new_page_rank = 0                                            # We set the value to 0 for each iteration
  count_links = broadcast_count_linksPDF.value                 # We call to this broadcast table and assign it to count_links variable
  for i in list_of_ids:                                        # To achieve the final value, we need to add the value of each page that refers to this page and divide it by the count of each link
    PR = current_page_rank[current_page_rank["id"] == i]       # PR would be the current value of each of the ids that refer to this page
    if len(PR)>0:                                              # We check if there exist those ids in our current_page_rank
      PR = PR.iloc[0][1]                                       # In the case that we have some pages that links with our current_id, the PR value is the one that is defined in our current_page_rank
    else:                                                     
      PR = 0                                                   # On the other hand, if we don’t have this id collected in our current_page_rank, we set it to 0
    count = count_links[count_links["id"] == i]                # The for the count value we will follow the same idea, we recover from the count_links the count number
    if len(count)>0:                                           # We check if this page has links
      count = count.iloc[0][1]                                 # We store this value in count variable
      new_page_rank += PR/count                                # We add PR/count to our new_page_rank that it will the one that we return. Recall that we only do this calculation if count is distinct from 0 as if is 0 we have a number divided by 0
      
  new_page_rank = (1-damping_factor)/N + damping_factor*new_page_rank                                            # After adding all the values of each links to new_page_rank, we apply the formula with the damping_factor and N values defined previously
  absolute_diff = abs(new_page_rank-current_page_rank[current_page_rank["id"] == current_id].iloc[0]["rank"])    # We are going also to return the difference between the rank of the current_id and the new_page_rank that we have calculated in order to check if is stable                                                                                                                      or not further on the code
  
  return [float(new_page_rank), float(absolute_diff)]          # As we said, we are going to return an array with both vital values 

In [0]:
count = 0
stable = False
while count < max_interations and stable == False:                                                               # We check if we achieve the maximum number of iterations = 20 or our rank column of PageRankPDF is stable, in that case we stop executing the while loop
  stable = True                                                                                                  # We set the stable variable to True in order to repeat the loop unless we change it the following code
  udf_new_pagerank = udf(lambda l, i: new_pagerank(l,i,PageRankPDF), ArrayType(FloatType()))                     # We create a user function for executing our new_pagerank function defined previously
  List_values = reverseDF.select('id',udf_new_pagerank('links','id').alias('rank'))                              # We store the array value that our function new_pagerank returns in List_values
  NewPageRankDF = List_values.select('id',List_values.rank[0].alias('rank'),List_values.rank[1].alias('diff'))   # We create a new spark data frame with 3 columns: id, rank, and the difference value that we get from new_pagerank. Recall that as we were always using the                                                                                                                    number of each column depending on the index and we are adding an extra column, we will not get any index problem.
  PageRankPDF = NewPageRankDF.toPandas()                                                                         # We convert this data frame to pandas data frame and rewrite on our PageRankPDF that is the one that we will introduce again in the next iteration in our                                                                                                                      function new_pagerank
  
  for i in PageRankPDF["diff"]:        # Now we are going to check if is stable or not. For that we need to check the variation of the rank value of each id
    if i >= threshold:                 # We check if the difference value that we get from new_pagerank array is greater that the threshold that we set it previously
      stable = False                   # If there exist a value that is greater than this value, then we need to recalculate and pass to the next step, our PageRankPDF is not stable
                                       # However, if there is no other value that is greater to this value, our PageRankPDF is stable and as we set previously stable = True we will stop executing the while loop 
  count += 1
  print("Step:",count)                 # In order to control better the execution of this loop, we want to print the iteration where we are
   

Step: 1
Step: 2
Step: 3
Step: 4
Step: 5
Step: 6


In [0]:
FinalPageRankPDF = PageRankPDF.filter(["id","rank"]).copy()
display(FinalPageRankPDF)                                       # This is our final ranking of pages

id,rank
22936,3.1640843e-06
6368400,1.8441339e-06
31108048,1.8441339e-06
175924,1.8441339e-06
25575836,1.8441339e-06
3350364,1.8468868e-06
10929004,1.8468868e-06
266204,1.8554803e-06
925736,1.8554803e-06
26840,1.8627434e-06


**Conclusion**

PageRank is a method for determining the importance of each document in a web of linked documents. Given a set of documents and the set of directed links between them, we can calculate the PageRank of each document.

This project ranks a portion of Wikipedia pages (in real life, it would be used to later display the most relevant results to the user based on their query). The ranking is based on the number of in-links and out-links of a page. Google also uses this algorithm (as well as many others) to rank web pages. 

In this last practice, we had to code the pagerank algorithm and improve different functions, such as parse_links. 
We followed the folowing formula in order to code the algorithm:

<img src="https://www.ververica.com/hs-fs/hubfs/Imported_Blog_Media/formula-1.png?width=1195&name=formula-1.png" alt="formulapagerank" style="width:400px;" />

And for the parse_links function, we searched in the data the different types of links that we could find.

I found interesting how python functions work, since they can't be directly callable from the Spark Dataframe, we also need to define the User Defined Function, to be usable in Spark Dataframes. Also, the use of broadcast variables were very useful. And the use of pandas was a bit confusing, but at the end I think I understood everything very clearly.

Personally, I believe this was an engaging project. I had no idea that wikipedia (and google) used this algorithm to display the results, and I think I learned much from it.