# __Task__

Using the pre-processed data in this notebook, we will try to identify the article that has contributed the most to a particular field of study based on the citation relationships between the articles based on degree centrality and pagerank.

# __Import packages__

Mounting the drive on google colab and importing necessary packages to the runtime

In [None]:
# Mouting the drive
from google.colab import drive
drive.mount('/content/drive')

# Setting the path to the directory of the data
root_path = '<path to the root path>'
%cd $root_path

# Importing necessary packages
import pandas as pd
import csv
import math

In [None]:
# Installing the pyspark for the runtime
!pip install pyspark

# Importing the pyspark to the notebook
import pyspark
from pyspark.sql.functions import lit

# Importing the package necessary to start a spark session
from pyspark.sql import SparkSession

# Importing a function in order to be able to collect a list
# after groupby
from pyspark.sql.functions import collect_list

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=39558bbb94718432a4968bf126cc9011ffcbde4686c01da7d83421bbe51129dc
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


# __Spark Session__

In [None]:
# Create a spark session
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.executor.memory", "40g") \
    .config("spark.driver.memory", "50g") \
    .config("spark.executor.cores", "8") \
    .config("spark.default.parallelism", "20") \
    .config("spark.sql.shuffle.partitions", "10") \
    .getOrCreate()

# __Data loading__

In the following sections we will load the pre-processed data regradin the articles' information and the citation relationships to our runtime from the constructed .csv files.

## __Citations__

Here we will load the citation relationship between the articles from the Citations.csv file.

In [None]:
%%time

# Lazy load of the citation data
Citations_df = spark.read.csv(f'{root_path}Filtered_citations.csv', header=True, inferSchema=True)

# Cache the data once loaded
Citations_df.cache()

# Report the structure of the dataframe
Citations_df.printSchema()

root
 |-- Main: long (nullable = true)
 |-- Reference: long (nullable = true)

CPU times: user 79.8 ms, sys: 7.5 ms, total: 87.3 ms
Wall time: 12.6 s


Checking the number of the citations in the network.

In [None]:
%%time

# Report the number of the records in the citations dataframe
print('The number of the edges is: ', Citations_df.count())

The number of the edges is:  5635143
CPU times: user 34.5 ms, sys: 3.82 ms, total: 38.4 ms
Wall time: 5.31 s


__Note__: For the first time the dataframe would be cached to the memory here, as spark would store the data after explicitly loading the data.

## __Articles__

Reading data containing the id and the title of the articles.



In [None]:
%%time

# Lazy load of the articles' info
Articles_df = spark.read.option('header', 'true').csv(f'{root_path}Filtered_Articles.csv')

# Cache the data once loaded
Articles_df.cache()

# Report the structure of the dataframe
Articles_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Title: string (nullable = true)

CPU times: user 5.89 ms, sys: 1.12 ms, total: 7 ms
Wall time: 616 ms


Checking the number of the articles that we have their information

In [None]:
%%time

# Report the number of the records in the article dataframe
Articles_num = Articles_df.count()
print('The number of the article is: ', Articles_num)

The number of the article is:  500035
CPU times: user 18.7 ms, sys: 71 µs, total: 18.8 ms
Wall time: 2.3 s


Checking the number of the articles that are present in our network based on the citation relationships.

In [None]:
%%time

# Pick the distinct article ids of the union of the main and the referenced articles
Vertices_distinct_df = Citations_df.select('Main').union(Citations_df.select('Reference')).distinct()

# Report the number of the distinct articles present in the network
print('The number of the articles based on the citation relationship: ', Vertices_distinct_df.count())

The number of the articles based on the citation relationship:  498019
CPU times: user 18.8 ms, sys: 3.46 ms, total: 22.2 ms
Wall time: 2.48 s


# __Graph Modelling__

In order to convert our data into a graph representation we would have the following structure :

- __Graph type__: we have a directed graph

- __Nodes__: In our graph the nodes would represent a single article

- __Edges__: In our graph the edges would represent a citation relationship. Have an edge of $(\text{Article}_1, \text{Article}_2)$ means that $\text{Article}_1$ has cited $\text{Article}_2$.

## __Most contributed article__

In this work we are trying to find the most contributed article in the domain of study. There are different algorithms that would try to find the most important node in a graph.

Here we would compare the results obtained from __degree centrality__ and __pageRank__.

# __Degree centrality__

In this kind of centrality measure, we would __define the importance__ of a node $v$ as the __number of the neighbors__ of node $v$.
 -

__Note__: as in our modelling we have a __directed graph__ we should model the importance of the nodes based on the following two metrics:
  1. The number of the __outgoing edges__ (given citations)
  2. The numbmer of the __incoming edges__ (received citations) - _more relevant for our work_

## __OutDegree (#outgoing edges)__

Here we would report the articles that have given the highest number of the citations to the other articles.



In [None]:
%%time

# Computing the number of times that each article has been listed in the reference list of other articles
# and sort them based on the count in descending order
Degree_centrality_out_df = Citations_df.groupBy("Main").count().orderBy(['count'], ascending = [0])

# Showing the results
Degree_centrality_out_df.show()

+----------+-----+
|      Main|count|
+----------+-----+
|2895896816|  307|
|2891004411|  291|
|1569512051|  266|
|2738162907|  241|
|2506633516|  240|
|2988916019|  228|
|2962883549|  222|
|2971058209|  217|
|  47957325|  216|
|2788653635|  214|
|2890715498|  212|
|2765367188|  203|
|2609532991|  203|
|2916332638|  200|
|2529696250|  191|
|2951003875|  188|
|2600147483|  186|
|2580175322|  178|
|2943528199|  172|
|2964248347|  167|
+----------+-----+
only showing top 20 rows

CPU times: user 15.6 ms, sys: 3.7 ms, total: 19.3 ms
Wall time: 1.52 s


In [None]:
%%time

# Taking the id of the article that has given the highest number of citations
Highest_degree_centrality_out = Degree_centrality_out_df.take(1)

# Fitler the information of this article from the other dataframe to get the title of the article
Highest_degree_centrality_out_info = Articles_df.filter(Articles_df.ID == Highest_degree_centrality_out[0].Main).take(1)

print('Article with the highest #citations given: \n', )

# Reporting the ID of this article
print('Article ID: ', Highest_degree_centrality_out_info[0].ID)

# Reporting the title of this article
print('Article Title', Highest_degree_centrality_out_info[0].Title)

# Reporting the number of the citations that this article has given
print('#Citations given: ', Highest_degree_centrality_out[0]['count'], end = '\n\n')

Article with the highest #citations given: 

Article ID:  2895896816
Article Title Deep Reinforcement Learning.
#Citations given:  307

CPU times: user 19.2 ms, sys: 2.64 ms, total: 21.8 ms
Wall time: 1.46 s


## __InDegree (#incoming edges)__

Here we would report the articles that have the highest number of the citations received by other articles.

__Note:__ this measurement would be more relavant for us as the number of the citations received by others would imply that better or more important that article is.



In [None]:
%%time

# Computing the number of times that each article has been listed in the reference list of other articles
# and sort them based on the count in descending order
Degree_centrality_in_df = Citations_df.groupBy("Reference").count().orderBy(['count'], ascending = [0])

# Showing the results
Degree_centrality_in_df.show()

+----------+-----+
| Reference|count|
+----------+-----+
|2151103935|10123|
|1686810756| 8050|
|2153635508| 7069|
|2194775991| 6317|
|2064675550| 5935|
|2618530766| 5219|
|2949650786| 4655|
|2133665775| 4355|
|2117539524| 4350|
|2102605133| 4154|
|2099471712| 3528|
|2168356304| 3456|
|2095705004| 3382|
|2162915993| 3156|
|1903029394| 3035|
|2031489346| 3028|
|2163352848| 2979|
|2100379340| 2976|
|2129812935| 2968|
|2250539671| 2963|
+----------+-----+
only showing top 20 rows

CPU times: user 13.4 ms, sys: 1.99 ms, total: 15.3 ms
Wall time: 1.28 s


Reporting the article that has received the highest number of citations

In [None]:
%%time

# Taking the id of the article that has received the highest number of citations
Highest_degree_centrality_in = Degree_centrality_in_df.take(1)

# Fitler the information of this article from the other dataframe to get the title of the article
Highest_degree_centrality_in_info = Articles_df.filter(Articles_df.ID == Highest_degree_centrality_in[0].Reference).take(1)

print('Article with the highest #citations received: \n', )

# Reporting the ID of this article
print('Article ID: ', Highest_degree_centrality_in_info[0].ID)

# Reporting the title of this article
print('Article Title', Highest_degree_centrality_in_info[0].Title)

# Reporting the number of the citations that this article has received
print('#Citations received: ', Highest_degree_centrality_in[0]['count'], end = '\n\n')

Article with the highest #citations received: 

Article ID:  2151103935
Article Title Distinctive Image Features from Scale-Invariant Keypoints
#Citations received:  10123

CPU times: user 15.5 ms, sys: 4.26 ms, total: 19.8 ms
Wall time: 1.29 s


# __PageRank__

In order to measure the importance of a node, PageRank algorithm would take into account the number of the citaitons received by an article as well as the quality of the citations received.

__Note:__ we can define the __quality__ of the citation given to article $a$ as follows:
  - If the article that cited article $a$ has __higher importance__, the quality of the citation is __higher__
  - If the article that cited article $a$ has __lower imporatnce__, the quality of the citation is __lower__.

#### Steps

PageRank algorithm follows the following steps: <br/><br/>
  1. Give the same importance to all of the articles. Set the initial importance of the articles as:
  
   $$ \text{Initial_rank} = \frac{1}{N} $$

   where $N$ is the __#articles__ in the network <br/><br/>

  2. For a specific article $a$ we have to share the __old rank__ of $a$ among the articles that have been cited by $a$. So we will increase the rank of these articles by following factor:
  
  $$d * \frac{\text{old rank $a$}}{\text{# citations by $a$}}$$

  where $d \in [0, 1]$ is the __decay factor__ which is the rate of continue reading the other articles

 __Note:__ if we don't have the references of article $b$ (__dangling node in graph__), we would increase the rank of all of the article by:

  $$d * \frac{\text{old rank b}}{N}$$<br/>

3. Finally we would add the following factor to the rank of all of the articles:

 $$\frac{1 - d}{N}$$

 which is the probability of a new reader start to come into read the article.

So the new rank of an article $s$ is calculated as follows:

$$\sum_{i = 0}^{k} d * \frac{\text{old rank }a_i}{\text{#citations by }a_i} + \sum_{j = 0}^{l} d * \frac{\text{old rank }b_j}{N} + \frac{1-d}{N}$$
where  $a_0, ..., a_k \in s_{\text{neighbors}}$ and $b_0, ..., b_l \in \{\text{dangling nodes}\}$ <br/><br/>


 __Stopping criteria:__ we should perform the steps 2, and 3 until we reach a convergence. We will reach a convergence when the rank of the articles doesn't change so much (the difference between the old an new rank is small).

 __Note:__ as PageRank is computationally heavy algorithm to be run, we would follow the steps for __10 iterations__.

### PageRank implementation

In [None]:
%%time

# Number of the iterations
Iterations_num = 10

# Get a rdd for the edges (citation relationships) in the network
Edges_rdd = Citations_df.rdd.map(tuple)

# Get the id of the nodes (articles) in the network based on the edges in the network
Vertices_distinct_df = Citations_df.select('Main').union(Citations_df.select('Reference')).distinct()

# Count the number of the distinc vertices in the graph
Vertices_num = Vertices_distinct_df.count()

# Grouping the citations to get the number of the outgoing edges for each specific node
Citations_grouped = Citations_df.groupBy('Main').agg(collect_list('Reference'))

# For each node in the graph, put the initial rank of the node and the number of the outgoing edges
Vertices_prob_len_rdd = Vertices_distinct_df.join(Citations_grouped, on = 'Main', how = 'outer').withColumn('rank', lit(1 / Vertices_num)).rdd.map(lambda x: (x[0], (x[2], 0 if x[1] == None else len(x[1]))))

print('The number of the distinct nodes in the graph: ', Vertices_num)

# Keep track of the number of the partitions in the ranking rdd
Rank_partitions_num = Vertices_prob_len_rdd.getNumPartitions()

# Probability of going to one of the references of a specific article
damping_factor = 0.85

# Simulating a markov chain process
for i in range(Iterations_num):

  # To evenly distributing the page rank of the dangling nodes
  For_dangling_nodes = Vertices_prob_len_rdd.map(lambda x: 0 if x[1][1] != 0 else damping_factor * (x[1][0]/Vertices_num)).sum()

  # The values that should be aggregated in order to compute the new page rank
  New_page_rank_info_rdd = Edges_rdd.join(Vertices_prob_len_rdd).map(lambda x: (x[1][0], (damping_factor * (x[1][1][0] / x[1][1][1]))))

  # Computing the new rank of the node that have incoming edges
  Ranks_to_update_partial = New_page_rank_info_rdd.reduceByKey(lambda a, b: a + b)

  # Rate for teleportation
  Teleport_rate = (1 - damping_factor) / Vertices_num + For_dangling_nodes

  # Updating the rank of all of the nodes in the graph
  Vertices_prob_len_rdd = Vertices_prob_len_rdd.leftOuterJoin(Ranks_to_update_partial).map(lambda x: (x[0], (x[1][1] + Teleport_rate if x[1][1] != None else Teleport_rate, x[1][0][1] )))

  # Repartitioning the RDD for the next round
  Vertices_prob_len_rdd = Vertices_prob_len_rdd.repartition(Rank_partitions_num)

The number of the distinct nodes in the graph:  498019
CPU times: user 1.7 s, sys: 242 ms, total: 1.94 s
Wall time: 3min 59s


## Highest PageRank

In [None]:
%%time

# Cache the result of the PageRank analysis
Vertices_prob_len_rdd.cache()

# Identify the article with the highest PageRank
Highest_page_rank = Vertices_prob_len_rdd.max(key = lambda x: x[1][0])
Highest_page_rank

CPU times: user 147 ms, sys: 21.9 ms, total: 169 ms
Wall time: 23.4 s


(2154952480, (0.0026920859150006884, 6))

# __Degree centrality vs PageRank__

In [None]:
print('Paper with highest degree centrality: ', end = '\n\n')
print('ID: ', Highest_degree_centrality_in_info[0].ID)
print('#Citations: ', Highest_degree_centrality_in[0]['count'])
print('Rank based on PageRank algorithm: ', Vertices_prob_len_rdd.filter(lambda x: x[0] == int(Highest_degree_centrality_in_info[0].ID)).take(1)[0][1][0])
print('-' * 50)
print('Paper with the highest rank based on PageRank algorithm: ', end = '\n\n')
print('ID: ', Highest_page_rank[0])
print('#Citations: ', len(Edges_rdd.filter(lambda x: x[1] == Highest_page_rank[0]).collect()))
print('Rank based on the PageRank algorithm: ', Highest_page_rank[1][0])


Paper with highest degree centrality: 

ID:  2151103935
#Citations:  10123
Rank based on PageRank algorithm:  0.0013002304003835025
--------------------------------------------------
Paper with the highest rank based on PageRank algorithm: 

ID:  2154952480
#Citations:  296
Rank based on the PageRank algorithm:  0.0026920859150006884


In [None]:
print('The article with he highest degree centrality is: ', Articles_df.filter(Articles_df.ID == 2151103935).collect()[0]['Title'])
print('The article with he highest PageRank is: ', Articles_df.filter(Articles_df.ID == 2154952480).collect()[0]['Title'])

The article with he highest degree centrality is:  Distinctive Image Features from Scale-Invariant Keypoints
The article with he highest PageRank is:  Learnability and the Vapnik-Chervonenkis dimension


# __TF_IDF score__

TF-IDF stands for Term Frequency Inverse Document Frequency of records. It evaluates the __importance of a word__ (or term) within a document relative to a collection of documents (corpus).

We can define the TF-IDF as follows:


$$\text{Tf_Idf (t, d, D)} = \text{Tf (t, d) $*$ Idf (t, D)}$$

Where


$$\text{Tf (t, d)} = \log(1 + \text{freq (t, d)})$$

and


$$Idf(t,D) = \log(\frac{N}{\text{count} (d \in D: t \in d)})$$

__Note:__ In our study, we willl consider the __Title of the articles__ as our set of documents. Then given a query, we are going to find the articles that their title matches our query better.

In order to make our data to compute the TF-IDF of the terms in each we have to go through the following steps:

1. __Tokenization__: we will separate the terms that exist in the title of each article

2. __Removing stopword__: stopwords are the words that are really common among in the text such as 'a', 'an', 'the', ...

3. __Removing punctuations__: punctuations are the symbols that exist in a text such as '.', ',' ...

4. __Stemming__: stemming is the process of removing affixes (prefixes and suffixes) from words in order to obtain their root forms, also known as word stems.

## Importing the necessary packages

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
nltk.download('stopwords')
nltk.download('punkt')

# These are the set of stopwords that we want to remove from the words
StopWords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


Turning our dataframe to a RDD where the values are the ID of the papers and the values are the title of the papers.

In [None]:
# Setting the ID of the papers as the keys and Titles as the values
Article_title_rdd = Articles_df.rdd.map(lambda row: (row['ID'], row['Title']))

# Cache the data once loaded
Article_title_rdd.cache()

PythonRDD[411] at RDD at PythonRDD.scala:53

A function to perform the tokenization, stopword removal, removing punctuations and stemming the word

In [None]:
# tokenization, stopword removal, removing punctuations and stemming the words in the title of an article
def Pre_processing(Title):
  return [PorterStemmer().stem(w.lower()) for w in nltk.word_tokenize(str(Title)) if (w.lower() not in StopWords) and (w.isalpha())]

## Pre processing + TF (t,d)

Here we are tokenize the titles as well as removing the stopwords, removing punctuations and stemming the terms in the titles.

We will construct a RDD where:
  - Keys: Article ID
  - Value: term occurence in that article

In [None]:
%%time
# Performing the pre-processing: Key: Article, Value: list of words in the title
Article_TF_rdd = Article_title_rdd.flatMapValues(Pre_processing).map(lambda x: ((x[0],x[1]), 1)).reduceByKey(lambda a, b: a + b)
Article_TF_rdd.cache()

print('The terms that present in each article: ')
print(*Article_TF_rdd.collect()[:10], sep = '\n')

The terms that present in each article: 
(('2100297733', 'relight'), 1)
(('2100297733', 'collect'), 1)
(('2908749843', 'base'), 1)
(('2004877333', 'decis'), 1)
(('1501560890', 'optim'), 1)
(('2964324415', 'quadrangul'), 1)
(('2015363144', 'contain'), 1)
(('2120220749', 'condit'), 1)
(('2245001294', 'error'), 1)
(('2902458453', 'embed'), 1)
CPU times: user 2.11 s, sys: 949 ms, total: 3.05 s
Wall time: 1min 10s


## Inverse Document Frequency (IDF)

Here we are computing the IDF of each term among the articles' terms.

In [None]:
%%time
# Calculating the IDF of the terms in the articles: Key: Term, Value: IDF value
IDF_rdd = Article_TF_rdd.map(lambda x: (x[0][1], 1)).reduceByKey(lambda a,b: a + b).map(lambda x: (x[0], math.log(Articles_num/x[1])))
IDF_rdd.cache()

print('The IDF of each term: ')
print(*IDF_rdd.collect()[:10], sep = '\n')

The IDF of each term: 
('quadrangul', 10.031390921596127)
('larg', 4.827883859810765)
('cognit', 4.909322677357768)
('energi', 4.604840263516879)
('kernel', 5.0781279679638045)
('beyond', 5.886094032200099)
('concurr', 5.473693585998201)
('polici', 5.409989540679454)
('challeng', 4.9090516379198705)
('process', 3.8006410776841175)
CPU times: user 45.4 ms, sys: 11.3 ms, total: 56.7 ms
Wall time: 2.7 s


## TF-IDF

Here we are computing the TF-IDF of the terms in each article's title.

In [None]:
%%time
# Computing the TF_IDF for each term in each specific document
TF_IDF_rdd = Article_TF_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))).join(IDF_rdd).map(lambda x: (x[1][0][0], (x[0], x[1][0][1] * x[1][1])))
TF_IDF_rdd.cache()

print('The IDF of the terms in each document: ')
print(*TF_IDF_rdd.collect()[:10], sep = '\n')

The IDF of the terms in each document: 
('2964324415', ('quadrangul', 10.031390921596127))
('2757356930', ('quadrangul', 10.031390921596127))
('1998130497', ('quadrangul', 10.031390921596127))
('2076333493', ('quadrangul', 10.031390921596127))
('1964803452', ('quadrangul', 10.031390921596127))
('1967057670', ('quadrangul', 10.031390921596127))
('2108857611', ('quadrangul', 10.031390921596127))
('2810599149', ('quadrangul', 10.031390921596127))
('2912720455', ('quadrangul', 10.031390921596127))
('2107997516', ('quadrangul', 10.031390921596127))
CPU times: user 1.35 s, sys: 585 ms, total: 1.94 s
Wall time: 13.6 s


# __Cosine similarity__

Following the computation of the TF-IDF of the terms in each document, we can use this information to retrieve the most relevant documents based on a given query.

When we receive a query, we compute the TF-IDF of the terms in the query as well. To pre-process the query, we will follow the same steps as before (tokenization, ...).

Assuming that we have a vector of TF-IDF terms for each document (d) and query (q), we can define the cosine similarity between a query and one document as follows:

$$\text{cosine similarity (q,d)} = \frac{\text{dot_product}(q, d)}{||q|| * ||d||}$$

the documents with the highest cosine similarity value will then be defined as the most relevant to our query.

## Receiving the query

Here we are receiving a query, follow the pre-processing to make the terms in the query applicable for our needs and compute the TF-IDF of the temrs in the given query.

In [None]:
%%time
# Receiving the query
Query = Pre_processing(input("Please enter your query: "))

# Computing the term frequency of the terms in the query
Query_frequency =  spark.sparkContext.parallelize(Query).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)

# Computing the TF_IDF of the terms in the query
Query_TF_IDF_rdd = Query_frequency.join(IDF_rdd).map(lambda x: (x[0], x[1][0] * x[1][1]))

# Storing the RDD for the future use
Query_TF_IDF_rdd.cache()


print('The TF_IDF of the terms in the query: ')
print(*Query_TF_IDF_rdd.collect()[:5], sep = '\n')


Please enter your query: Big Data Computing 
The TF_IDF of the terms in the query: 
('data', 2.8541638295856857)
('comput', 3.495556080903046)
('big', 5.416720551060016)
CPU times: user 1.56 s, sys: 292 ms, total: 1.85 s
Wall time: 1min 17s


## Dot product between articles and query

Here we will compute the dot product between the articles and the query for computing the cosine similarity between the article's title and the given query.

In [None]:
%%time
# Keys are the terms, values are the articles and the TfIdf of the term in that document
terms_article_TfIdf_rdd = TF_IDF_rdd.map(lambda x: (x[1][0], (x[0], x[1][1])))

print('Terms to article and TfIdf: ')
print(*terms_article_TfIdf_rdd.collect()[:10], sep = '\n')

# Computing the dot product between the query terms and the documents
Dot_product_query = Query_TF_IDF_rdd.join(terms_article_TfIdf_rdd).map(lambda x: (x[1][1][0], x[1][0] * x[1][1][1])).reduceByKey(lambda a,b: a + b)

# Saving the RDD for the future use
Dot_product_query.cache()

print('\nThe result of the dot product between the query and the docuemnts: ')
print(*Dot_product_query.collect()[:10], sep = '\n')

Terms to article and TfIdf: 
('quadrangul', ('2964324415', 10.031390921596127))
('quadrangul', ('2757356930', 10.031390921596127))
('quadrangul', ('1998130497', 10.031390921596127))
('quadrangul', ('2076333493', 10.031390921596127))
('quadrangul', ('1964803452', 10.031390921596127))
('quadrangul', ('1967057670', 10.031390921596127))
('quadrangul', ('2108857611', 10.031390921596127))
('quadrangul', ('2810599149', 10.031390921596127))
('quadrangul', ('2912720455', 10.031390921596127))
('quadrangul', ('2107997516', 10.031390921596127))

The result of the dot product between the query and the docuemnts: 
('752888290', 8.146251166115228)
('1546670027', 8.146251166115228)
('2610980058', 37.48711269439115)
('2052089314', 8.146251166115228)
('2569038937', 8.146251166115228)
('2980081042', 8.146251166115228)
('2772466107', 8.146251166115228)
('2784003762', 8.146251166115228)
('2294319731', 8.146251166115228)
('2912981821', 8.146251166115228)
CPU times: user 1.53 s, sys: 605 ms, total: 2.14 s
Wa

## Norms of filtered articles

Now we will compute the norms of the articles to compute the cosine similarity later on.

__Note:__ to compute the similarity, we will filter the articles that have at least one of the terms in the given query in their title.

In [None]:
%%time
# Filter the articles that have some of the terms in the query
Filtered_articles = Dot_product_query.keys().collect()

# Computing the norm of the vectors
Articles_norm = TF_IDF_rdd.filter(lambda x: x[0] in Filtered_articles).map(lambda x: (x[0], x[1][1]**2)).reduceByKey(lambda a, b: a + b)\
                                  .map(lambda x: (x[0], x[1]**.5))

# Storing the norms for the future use
Articles_norm.cache()

print("The norms of the articles that had the terms in the query: ")
print(*Articles_norm.collect()[:5], sep = '\n')

The norms of the articles that had the terms in the query: 
('2912981821', 17.185310048167153)
('2526069705', 18.864957245856903)
('2489482098', 12.675352659730322)
('2742484346', 21.675815443824522)
('2992209313', 17.499780245109594)
CPU times: user 4.09 s, sys: 614 ms, total: 4.7 s
Wall time: 11min 56s


Now we can compute the cosine similarity between the articles and the given query:

In [None]:
%%time
# Computing the norm of the query
Query_norm  = sum(Query_TF_IDF_rdd.map(lambda x: (x[0], x[1]**2)).values().collect())**.5

# Computing the cosine similarity between the articles' title and the given query
Cosine_similarity = Dot_product_query.join(Articles_norm).map(lambda x: (x[0], x[1][0]/(x[1][1] * Query_norm))).sortBy(lambda x: x[1], ascending=False)


print("Here we have the cosine similarity between the articles' title and the given query: ")
print(*Cosine_similarity.collect()[:5], sep = '\n')

Here we have the cosine similarity between the articles' title and the given query: 
('71381689', 0.8684333216167935)
('2519706319', 0.8684333216167935)
('2550984048', 0.8181325769898594)
('2406823926', 0.8181325769898594)
('1991304154', 0.7975507516868309)
CPU times: user 110 ms, sys: 11.4 ms, total: 121 ms
Wall time: 6.55 s


## Top 5 articles

Here we can set the top 5 articles that have the highest cosine similarity wrt to our query.

In [None]:
%%time

# Sorting the results based on the cosine similarity of the article's titles and the given query
Top_cosine_similarity = Cosine_similarity.join(Article_title_rdd).sortBy(lambda x: x[1][0], ascending=False).collect()[:5]

print('Top 5 articles with the highest cosine similarity: (Article_id, (cosine_similarity, title))\n')
print(*Top_cosine_similarity, sep = '\n')

Top 5 articles with the highest cosine similarity: (Article_id, (cosine_similarity, title))

('71381689', (0.8684333216167935, 'Big Data – A State-of-the-Art'))
('2519706319', (0.8684333216167935, 'Big data'))
('2550984048', (0.8181325769898594, 'Content-Centric and Software-Defined Networking with Big Data'))
('2406823926', (0.8181325769898594, 'Big Data over Networks'))
('1991304154', (0.7975507516868309, 'Efficient computation of the well-founded semantics over big data'))
CPU times: user 97.9 ms, sys: 20.6 ms, total: 118 ms
Wall time: 6.9 s


# __ContentLink score__

We would like to have an information retrieval system that, in response to a query, not only retrieves articles that are more relevant to the query, but also considers the importance of those articles.

Our new metric is called the __ContentLink score__ because it combines the Cosine Similarity (a content-based score) and PageRank (a link-based score).

__Note:__ before combining these two metrics, we will normalize their values to be in the range [0, 1] using Min_Max_Scaler, and then define our metric.

## PageRank filtering

To obtain a fair score for the PageRank of the articles, we will filter the PageRank of the articles that contain at least one of the query terms in their title.

First we will filter the pageRank of the articles that had the terms in their title

In [None]:
%%time

# Filtering the pageRank of the articles that have at least one of the terms in their title
Filtered_pageRank = Vertices_prob_len_rdd.filter(lambda x: str(x[0]) in Filtered_articles)

#Caching the result
Filtered_pageRank.cache()

CPU times: user 11.8 ms, sys: 1.9 ms, total: 13.7 ms
Wall time: 36.7 ms


PythonRDD[491] at RDD at PythonRDD.scala:53

## PageRank and Cosine similarity normalization

Then, among the filtered articles we will find the maximum and minimum values for the pageRank and the cosine similarity

In [None]:
%%time

# Get the maximum pageRank
Max_page_rank = Filtered_pageRank.map(lambda x: x[1][0]).reduce(max)

# Get the minimum pageRank
Min_page_rank = Filtered_pageRank.map(lambda x: x[1][0]).reduce(min)

# Get the maximum cosine similarity
Max_cosine_similarity = Cosine_similarity.map(lambda x: x[1]).reduce(max)

# Get the minimum cosine similarity
Min_cosine_similarity = Cosine_similarity.map(lambda x: x[1]).reduce(min)

CPU times: user 716 ms, sys: 113 ms, total: 829 ms
Wall time: 2min 5s


We will normalize the values using the Min_max scaling

In [None]:
%%time

# Scaling the page ranks
Scaled_pageRank = Filtered_pageRank.map(lambda x: (str(x[0]), (x[1][0] - Min_page_rank)/(Max_page_rank - Min_page_rank)))

# Scaling the cosine_similarities
Scaled_cosine_similarity = Cosine_similarity.map(lambda x: (str(x[0]), (x[1] - Min_cosine_similarity)/(Max_cosine_similarity - Min_cosine_similarity)))

CPU times: user 42 µs, sys: 0 ns, total: 42 µs
Wall time: 46 µs


## Defining ContentLink score

Then we can report the results based on the given query of the user with a joint metric of pageRank cosine similarity.

Here we will set two parameters:

  - $\alpha$: the rate of contribution of the cosine similarity
  - $\beta$: the rate of contribution of the pageRank

We will define our __ContentLinkScore__ as:

 $$\text{ContentLinkScore} = \alpha * \text{cosine_similarity} + \beta * \text{pageRank}$$

## Different simulations

In this section we will set the parameters of $alpha$ and $beta$ to different values and check the final result.

### $\alpha = 0.5, \beta = 0.5$

Here we are going to give the same rate of contribution to both of cosine similarity and pageRank

In [None]:
%%time

# Setting the rate of the contribution of the cosine similarity and the pageRank
Alpha, Beta = 0.5, 0.5

# Computing the ContentLinkScore
ContentLinkScore = Scaled_pageRank.join(Scaled_cosine_similarity).map(lambda x: (x[0], x[1][0] * Beta + x[1][1] * Alpha ))

# Then we will sort the articles based on their score and report the top 5
Top_articles = ContentLinkScore.join(Article_title_rdd).sortBy(lambda x: x[1][0], ascending=False).collect()[:5]

print('Top 5 articles with the highest ContentLinkScore: (Article_id, (ContentLinkScore, title))\n')
print(*Top_articles, sep = '\n')

Top 5 articles with the highest ContentLinkScore: (Article_id, (ContentLinkScore, title))

('2003938193', (0.6553278353064494, 'Computer Processing of Line-Drawing Images'))
('2519706319', (0.5016216767899693, 'Big data'))
('71381689', (0.5000859347398483, 'Big Data – A State-of-the-Art'))
('2406823926', (0.4701387545966372, 'Big Data over Networks'))
('2550984048', (0.47009345400788566, 'Content-Centric and Software-Defined Networking with Big Data'))
CPU times: user 138 ms, sys: 15.7 ms, total: 154 ms
Wall time: 9 s


### $\alpha$ = 0.4, $\beta$ = 0.6

In this setting we will give more rate of contribution to the pageRank. In this case the importance of the article is more important for us.

In [None]:
%%time

# Setting the rate of the contribution of the cosine similarity and the pageRank
Alpha, Beta = 0.4, 0.6

# Computing the ContentLinkScore
ContentLinkScore = Scaled_pageRank.join(Scaled_cosine_similarity).map(lambda x: (x[0], x[1][0] * Beta + x[1][1] * Alpha ))

# Then we will sort the articles based on their score and report the top 5
Top_articles = ContentLinkScore.join(Article_title_rdd).sortBy(lambda x: x[1][0], ascending=False).collect()[:5]

print('Top 5 articles with the highest ContentLinkScore: (Article_id, (ContentLinkScore, title))\n')
print(*Top_articles, sep = '\n')

Top 5 articles with the highest ContentLinkScore: (Article_id, (ContentLinkScore, title))

('2003938193', (0.7242622682451595, 'Computer Processing of Line-Drawing Images'))
('1990247061', (0.49581570180783674, 'Specification and implementation of resilient, atomic data types'))
('2033736966', (0.4230630361722136, 'A Survey of Data Structures for Computer Graphics Systems'))
('2041674806', (0.42179363388943775, 'Clustering categorical data: an approach based on dynamical systems'))
('2519706319', (0.4019460121479631, 'Big data'))
CPU times: user 125 ms, sys: 19.5 ms, total: 144 ms
Wall time: 9.33 s


### $\alpha = 0.6, \beta = 0.4$

In this setting we will give more rate of contribtuion to the cosine similarity. In this case the similarity between the document title and the query is more important for us.

In [None]:
%%time

# Setting the rate of the contribution of the cosine similarity and the pageRank
Alpha, Beta = 0.6, 0.4

# Computing the ContentLinkScore
ContentLinkScore = Scaled_pageRank.join(Scaled_cosine_similarity).map(lambda x: (x[0], x[1][0] * Beta + x[1][1] * Alpha ))

# Then we will sort the articles based on their score and report the top 5
Top_articles = ContentLinkScore.join(Article_title_rdd).sortBy(lambda x: x[1][0], ascending=False).collect()[:5]

print('Top 5 articles with the highest ContentLinkScore: (Article_id, (ContentLinkScore, title))\n')
print(*Top_articles, sep = '\n')

Top 5 articles with the highest ContentLinkScore: (Article_id, (ContentLinkScore, title))

('2519706319', (0.6012973414319753, 'Big data'))
('71381689', (0.6000687477918786, 'Big Data – A State-of-the-Art'))
('2003938193', (0.5863934023677393, 'Computer Processing of Line-Drawing Images'))
('2406823926', (0.5641483852804641, 'Big Data over Networks'))
('2550984048', (0.5641121448094628, 'Content-Centric and Software-Defined Networking with Big Data'))
CPU times: user 121 ms, sys: 19.2 ms, total: 141 ms
Wall time: 9.15 s
