<a href="https://colab.research.google.com/github/laragazzadelsole/AMD-Exam/blob/main/Amazon_US_costumer_Review_Link_Analysis_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project description

The aim of this project is to create a ranking system starting from the
that could be used in order to establish the sequence in which the reviews should be shown. The
idea is to calculate the PageRank score of the customers based on the reviews they did of an item
in common with other customers. This is because it’s difficult that spammers (that want to inflate
the rating of their products) have reviews in common with others of different products. However, in
this way it would be given more importance to customers that left many, but meaningless reviews
compared to those who left fewer, but more effective reviews. The PageRank score doesn’t give
any information of the usefulness of the review left by users. However, on Amazon it is possible
for users to rate a review as ”useful”. This data is captured by the variable helpf ul votes. So, in
addition to the PageRank score the ”helpfulness score” is evaluated.

 <font size="5">**1. Initial Setup**</font>

**1.1 Data Import**

In [1]:
import os
os.environ['KAGGLE_USERNAME'] = "saragironi"
os.environ['KAGGLE_KEY'] = "4b28e3c84038475619b3fff13d413869"
!pip install kaggle --upgrade
!kaggle datasets download -d cynthiarempel/amazon-us-customer-reviews-dataset --unzip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Downloading amazon-us-customer-reviews-dataset.zip to /content
100% 20.9G/21.0G [03:13<00:00, 94.2MB/s]
100% 21.0G/21.0G [03:13<00:00, 116MB/s] 


**1.2 Initializing Spark**

In [2]:
#installing Java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!java -version

openjdk version "11.0.18" 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)


In [3]:
# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [4]:
# install findspark and pyspark using pip

!pip install -q findspark
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=e27f22c8b9be87d3a3c2e5c17d0b3aec3881be11defc49f38c7a45cfd5a9a3b5
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [5]:
#import findspark 

import findspark
findspark.init()

In [6]:
#create a SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

<font size="5">**2. The Data**</font> \

I have decided to focus the link analysis on the customers that left reviews on electronical products. The reason for this choice is that this kind of reviews tend to be more technical and less subjective due to the nature of the product, compared to other categories (such as Books, Music, Grocery, etc). 
Therefore, there could be a higher number of "helpful_votes", which is an important element for my purpose. The chosen categories are: Electronics and PC. These two categories are correlated as people purchasing a pc on Amazon could also probably buy a mouse or other electronical items. 

In [7]:
# The dataset is divided per categories. Here I define only those in which I am interested in. 

df_el = spark.read.csv('amazon_reviews_us_Electronics_v1_00.tsv', sep='\t', header=True)
df_pc = spark.read.csv('amazon_reviews_us_PC_v1_00.tsv', sep='\t', header=True)
df_el.show(10)
df_pc.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   41409413|R2MTG1GCZLR2DK|B00428R89M|     112201306|yoomall 5M Antenn...|     Electronics|          5|            0|          0|   N|                Y|          Five Stars|       As described.| 2015-08-31|
|         US|   49668221|R2HBOEM8LE9928|B000068O48|     734576678|Hosa GPM-103 3.5m...|     Electronics|          5|    

In [8]:
df = df_pc.union(df_el)
type(df)

pyspark.sql.dataframe.DataFrame

In [9]:
#Drop the columns that are not really useful for the purpose of this project

df = df.drop("marketplace","vine", "product_parent", "verfied_purchase", "review_headline", "review_body")
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_date: string (nullable = true)



In [10]:
#The dataframe contains more than 10 millions rows. Here I want to compare the time difference between the action .count() and spark.sql commands.
# .count() will be used further as it's faster

%%time 
tot_rows = df.count()
print(f"Total number of rows: {tot_rows}")

Total number of rows: 10002423
CPU times: user 205 ms, sys: 31.8 ms, total: 237 ms
Wall time: 42.4 s


In [11]:
%%time
df.createOrReplaceTempView("df_view")
total_rows = spark.sql("""SELECT COUNT(review_id) AS total_rows 
                        FROM df_view""")
total_rows.show()

+----------+
|total_rows|
+----------+
|  10002423|
+----------+

CPU times: user 257 ms, sys: 30.9 ms, total: 288 ms
Wall time: 52.6 s


As Spark is lazy the action count() is faster. For this reason I'll use this method instead of spark.sql() in the rest of the code

**2.1 Data Pre-processing**

In [12]:
# Check if there are null values 

import pyspark.sql.functions as F

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+---------+----------+-------------+----------------+-----------+-------------+-----------+-----------------+-----------+
|customer_id|review_id|product_id|product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|review_date|
+-----------+---------+----------+-------------+----------------+-----------+-------------+-----------+-----------------+-----------+
|          0|        0|         0|            0|              11|         11|           11|         11|               11|        333|
+-----------+---------+----------+-------------+----------------+-----------+-------------+-----------+-----------------+-----------+



In [13]:
#Rows with missing data are deleted

df_complete = df.na.drop()
df_complete.count()

10002090

In [14]:
# drop duplicates of reviews if any

df_final = df_complete.dropDuplicates(["review_id"])
type(df_final)

pyspark.sql.dataframe.DataFrame

In [15]:
#check for anomalies in the values like star_rating > 5
#first cast the data type of "star_rating" from string to int

from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import col,when,count

df_final = df_final.withColumn("star_rating",df_final.star_rating.cast('int'))

df_final.select('review_id').where(df_final.star_rating>5).count()

0

In [16]:
#check for consistencies: if "helpful_votes" is always lower or equal to "total_votes"

df_final = df_final.withColumn("helpful_votes",df_final.helpful_votes.cast('int'))
df_final = df_final.withColumn("total_votes",df_final.total_votes.cast('int'))

df_final.select('review_id').where(df_final.helpful_votes>df_final.total_votes).count()

0

The dataframe is now complete and there aren't inconsistencies. However, if I were to implement the algorithm on the dataset as a whole it would be too computationally intense. For this reason a subsample of 50000 rows is used.

In [17]:
sub_df = df_final.limit(50000)

<font size="5">**3. Exploratory Data Analysis**</font> \

In [18]:
#calculate the average number of reviews per person

from pyspark.sql.functions import count, desc, avg

# Group the data by person and count the number of reviews per person
reviews_per_person = sub_df.groupby("customer_id").count().select("count")

# Calculate the average number of reviews per person
avg_reviews_per_person = reviews_per_person.agg({"count": "avg"}).collect()[0][0]

print("The average number of reviews per person is:", avg_reviews_per_person)

The average number of reviews per person is: 1.0109792344865236


In [19]:
#The average is very low so I check the first 10 customers by number of reviews.
# 6 is the maximum number of reviews left by a user.  

reviews_per_person.sort(desc("count")).show(10)

+-----+
|count|
+-----+
|    6|
|    5|
|    4|
|    4|
|    4|
|    4|
|    3|
|    3|
|    3|
|    3|
+-----+
only showing top 10 rows



In [20]:
#The first 10 most reviewed items

sub_df.groupby('product_id', 'product_title').count().sort(desc("count")).show(10)

+----------+--------------------+-----+
|product_id|       product_title|count|
+----------+--------------------+-----+
|B0051VVOB2|Kindle Fire (Prev...|  111|
|B0083PWAPW|Kindle Fire HD 7"...|   85|
|B00JG8GOWU|Kindle Paperwhite...|   80|
|B006GWO5WK|Amazon Kindle 9W ...|   77|
|B00BWYQ9YE|Kindle Fire HDX 7...|   74|
|B003L1ZYYM|AmazonBasics High...|   68|
|B002Y27P3M|Kindle Keyboard, ...|   63|
|B0015T963C|Kindle Wireless R...|   61|
|B004XC6GJ0|ARRIS SURFboard D...|   57|
|B00I15SB16|Kindle, 6" Glare-...|   56|
+----------+--------------------+-----+
only showing top 10 rows



In [21]:
# First 10 most useful reviews.

sub_df.select("product_title", "star_rating", "helpful_votes", "customer_id").sort(desc("helpful_votes")).show(10)

+--------------------+-----------+-------------+-----------+
|       product_title|star_rating|helpful_votes|customer_id|
+--------------------+-----------+-------------+-----------+
|Zune HD Video MP3...|          5|         2431|   52855449|
|SanDisk Extreme P...|          4|         1920|   51960937|
|Patagonia Kindle ...|          4|         1277|   12372811|
|Samsung Chromeboo...|          5|         1109|   41866357|
|Motorola SURFboar...|          4|          800|   28204599|
|Samsung Galaxy Ta...|          4|          637|   45035381|
|Datamancer The So...|          4|          618|   27055887|
|Samsung Galaxy Ta...|          5|          515|   15208771|
|Photive Hydra Wir...|          5|          437|   45843410|
|ASUS MeMOPad HD 7...|          1|          424|   53090839|
+--------------------+-----------+-------------+-----------+
only showing top 10 rows



In [22]:
#First 10 days by total number of reviews

# first cast the data type of "review_date" from string to date format

from pyspark.sql.functions import to_date, lit
sub_df = sub_df.withColumn("review_date",col("review_date").cast(DateType()))

In [23]:
#The highest numbers of reviews are left in January of years 2014 and 2015. 
#The possible explanation is that Christmas is the moment where most products are purchased
#and after few days or weeks trying the product he/she leaves the reviews on Amazon. 

sub_df.groupby('review_date').count().sort(desc("count")).show(10)

+-----------+-----+
|review_date|count|
+-----------+-----+
| 2015-01-04|  125|
| 2015-01-05|  105|
| 2015-01-07|   99|
| 2015-01-03|   97|
| 2015-06-03|   91|
| 2015-01-13|   91|
| 2015-01-06|   88|
| 2014-12-31|   84|
| 2015-08-18|   82|
| 2015-01-09|   82|
+-----------+-----+
only showing top 10 rows



<font size="5">**4. PageRank Algorithm**</font> \


In order to create the graph I want to compare two methods. The first one relies on the udf() a user defined function that allows to apply a function directly to a Pyspark dataframe. The second one relies only on for loops. 

**4.1 Creation of the Graph - Method with udf()**

In [24]:
#The first step is to create the edges of the graph. 
#An edge exists when two customers have reviewed the same product

from pyspark.sql.functions import collect_set, sort_array, udf, explode
from pyspark.sql.types import ArrayType, StringType, StructType, StructField

# group by product_id and use collect_set() to aggregate the customer_id into an ArrayType to create a list of the customers by product
custom_by_prod = sub_df.groupBy('product_id').agg(collect_set('customer_id').alias('customer_id'))
custom_by_prod.show(10)

+----------+--------------------+
|product_id|         customer_id|
+----------+--------------------+
|016642966X|          [19955871]|
|0511189877|          [33355906]|
|0972683275|[53074039, 435844...|
|1394860919|          [16522736]|
|1400501466|[51932300, 48648451]|
|1400501776|          [49886899]|
|1400532620|          [51280446]|
|1400532655|          [17086455]|
|140053271X|[25652547, 195277...|
|1400599997|          [19325576]|
+----------+--------------------+
only showing top 10 rows



In [25]:
#METHOD WITH UDF 
%%time

# The function create_edges() creates an edge between two customers who reviewed the same product
# by creating a tuple containing the customer ID of the two people. 
def create_edges(customer_id):
    edges = []
    for i in range(len(customer_id)):
        for j in range(i+1, len(customer_id)):
            edge = tuple(sorted([customer_id[i], customer_id[j]]))
            edges.append(edge)
    return edges

CPU times: user 6 µs, sys: 0 ns, total: 6 µs
Wall time: 9.54 µs


In [26]:
# edge_udf() allows to apply the create_edges() function to a PySpark DataFrame. 
# the UDF will return an array of tuples, each of which has two string fields.
# However udf() is very expensive

%%time

edge_udf = udf(create_edges, ArrayType(StructType([
    StructField('src', StringType()),
    StructField('dst', StringType())])))

# edges is a new DataFrame with a row for each edge between customers that rated the same product ID

df_edges = custom_by_prod.select('customer_id').withColumn('edges', edge_udf(sort_array('customer_id'))).select(explode('edges').alias('edge'))
#df_edges.show(10)


CPU times: user 13.7 ms, sys: 2.24 ms, total: 15.9 ms
Wall time: 237 ms


In [27]:
#JUST TO TEST 

#In order to verify that the edges have been created correctly I create a dictionary from custom_by_prod 
#and display the first 5 elements. I choose customer_id '53074039' because it is the first one 
# with edges and I retrieve the same customer_id from edges dataframe. If the edges in common coincide the 
#dataframe edges has been created correctly 

# convert the DataFrame to a list of Row objects and create a dictionary
custom_dict = {row['product_id']: row['customer_id'] for row in custom_by_prod.collect()}

# print the first 10 entries in the dictionary
print(dict(list(custom_dict.items())[:5]))

{'016642966X': ['19955871'], '0511189877': ['33355906'], '0972683275': ['53074039', '43584487', '29997920', '41490022', '26324308', '26780671', '13630979', '11988204'], '1394860919': ['16522736'], '1400501466': ['51932300', '48648451']}


In [28]:
#The edges linked to customer 53074039 coincide both in the dataframe and in the dictionary. 
# Therefore we can go on with the analysis. 

df_edges.filter(df_edges.edge.dst == '53074039' ).show()

+--------------------+
|                edge|
+--------------------+
|[11988204, 53074039]|
|[13630979, 53074039]|
|[26324308, 53074039]|
|[26780671, 53074039]|
|[29997920, 53074039]|
|[41490022, 53074039]|
|[43584487, 53074039]|
+--------------------+



In [29]:
#create the network 
%%time

#create lists from customer_id and edges to iterate on them 

custom_list = list(sub_df.select('customer_id').toPandas()['customer_id'])

edges_list = list(df_edges.select('edge').toPandas()['edge'])
#print(edges_list[:10])

CPU times: user 3.09 s, sys: 282 ms, total: 3.37 s
Wall time: 3min 15s


In [30]:
#now let's substitute the nodes with some indexes to save memory and use them later in the PageRank implementation 
# Get list of unique nodes
n_int_list = list(set([e[0] for e in edges_list] + [e[1] for e in edges_list]))
n_int_list[:5]

['36506801', '14234944', '42926371', '44108825', '52980765']

In [31]:
nodes = range(len(n_int_list))
links = range(len(edges_list))
nodes, links

(range(0, 26968), range(0, 118183))

In [32]:
# Create a dictionary mapping nodes to a corresponding index

%%time
n_dict = {node: index for index, node in enumerate(n_int_list)}

# Replace node names with their indices in the edge list
links_idx_list = [(n_dict[a], n_dict[b]) for (a, b) in edges_list]
print(links_idx_list[:5])

[(142, 12808), (142, 2245), (142, 15319), (142, 26881), (142, 16740)]
CPU times: user 59.3 ms, sys: 2.5 ms, total: 61.8 ms
Wall time: 61.9 ms


In [33]:
#create a dictionaty with a numeric label for each node and each edge

node_id = {c: i for i, c in enumerate(n_int_list)}
links_id = {c: i for i, c in enumerate(edges_list)}
#print(node_id)

In [34]:
import networkx as nx

G = nx.Graph()
 
for n in nodes:
    node = G.add_node(n)
 
for (a,b) in links_id:
    G.add_edge(node_id[a], node_id[b])

print('Number of nodes:', G.number_of_nodes())
print('Number of edges:', G.number_of_edges())
print('Is the graph connected?', nx.is_connected(G))

Number of nodes: 26968
Number of edges: 118183
Is the graph connected? False


**4.2 Creation of the Graph - Method with for loops**

In [35]:
#METHOD WITH FOR LOOPS 
%%time
from pyspark.sql.functions import collect_set
from itertools import combinations

# create an empty list to store the edges
edges = []

# iterate through each row of custom_by_prod dataframe
for row in custom_by_prod.collect():
    customer_ids = row.customer_id
    # generate all possible combinations of the customer_ids for each product
    customer_pairs = combinations(customer_ids, 2)
    # append each customer pair as an edge to the edges list
    for pair in customer_pairs:
        edges.append(pair)

# convert the edges list to a PySpark dataframe
edges_df = spark.createDataFrame(edges, ['src', 'dst'])
#edges_df.show(10)

CPU times: user 1.97 s, sys: 37.3 ms, total: 2.01 s
Wall time: 2.62 s


In [36]:
# edges_df.collect() returns a list of Row objects, where each Row represents a row in the pyspark dataframe.
# Then the for loop iterates on each Row and returns a list of tuples

%%time
ed_list = [tuple(row) for row in edges_df.collect()]
print(ed_list[:5])

[('53074039', '43584487'), ('53074039', '29997920'), ('53074039', '41490022'), ('53074039', '26324308'), ('53074039', '26780671')]
CPU times: user 931 ms, sys: 33.5 ms, total: 965 ms
Wall time: 2.29 s


In [37]:
#now let's substitute the nodes with some indexes to save memory and use them later in the PageRank implementation 
# Get list of unique nodes
nodes_list = list(set([e[0] for e in ed_list] + [e[1] for e in ed_list]))
nodes_list[:5]

['36506801', '14234944', '42926371', '44108825', '52980765']

In [38]:
# Create a dictionary mapping nodes to a corresponding index

%%time
node_dict = {node: index for index, node in enumerate(nodes_list)}

# Replace node names with their indices in the edge list
ed_idx_list = [(node_dict[a], node_dict[b]) for (a, b) in ed_list]
#print(ed_idx_list)

CPU times: user 61 ms, sys: 0 ns, total: 61 ms
Wall time: 61.1 ms


In [39]:
%%time

import networkx as nx
graph = nx.Graph()

num_of_nodes = range(len(nodes_list))
 
for n in num_of_nodes:
    node = graph.add_node(n)
 
for (a,b) in ed_idx_list:
    graph.add_edge(num_of_nodes[a], num_of_nodes[b])

print('Number of nodes:', graph.number_of_nodes())
print('Number of edges:', graph.number_of_edges())
print('Is the graph connected?', nx.is_connected(graph))


Number of nodes: 26968
Number of edges: 118183
Is the graph connected? False
CPU times: user 405 ms, sys: 121 ms, total: 526 ms
Wall time: 552 ms


On average each node has around 4.38 edges.

The udf() is a lot faster than for loops. If I were to compare the two methods on the entire dataset the cost of using for loops would be definitely higher.  

In [40]:
#The top 3 node with the most links  

node_degrees = list(graph.degree())
node_degrees.sort(key=lambda x: x[1], reverse=True)
top_nodes = [node_degrees[i][0] for i in range(3)]
print(dict(graph.degree(nbunch = top_nodes)).items())

dict_items([(6028, 160), (8868, 119), (15959, 113)])


**4.2 Creation of the Transition Matrix**

In [41]:
#cast the type from string to int

ed_int_idx_list = [(int(x), int(y)) for (x, y) in ed_idx_list]
print(ed_int_idx_list[:5])

[(9023, 14880), (9023, 26881), (9023, 16740), (9023, 2245), (9023, 15319)]


In [42]:
# Create an empty dictionary with nodes as keys and an empty list as values
adjacency = {}

# The set() function is used to create a set of all unique nodes in the graph. 
# The adjacency dictionary is created by adding all the source and destination nodes of each edge to the set.

for n in set([e[0] for e in ed_int_idx_list] + [e[1] for e in ed_int_idx_list]):
    adjacency[n] = []
#print(adjacency)


In [43]:
# Iterate over the edges and add the target node to the adjacency list of the source node

for (n1, n2) in ed_int_idx_list:
    adjacency[n1].append(n2)
    adjacency[n2].append(n1)  # as the graph is undirected add the reverse edge as well

# Print the adjacency dictionary
print(dict(list(adjacency.items())[0:5]))

{0: [18240, 7648], 1: [22247, 6701, 1480, 19196, 24579, 13216, 13317, 11040, 11150, 14857, 23526, 25420, 14851, 10205, 14377, 20358, 7158, 25777, 3794, 19026, 4925, 22371, 7949, 6513, 5313], 2: [23852], 3: [7169], 4: [24536, 18697, 13684, 21972]}


In [44]:
#an empty list is created to store the transition probabilities.

transition_matrix = []
for n1 in adjacency:
    for n2 in adjacency[n1]:
        # For each pair (n1,n2) of adjacent nodes, a tuple with 3 values is appended.
        #The first value n2 is the destination node, the second the source node, 
        #and the third value is the probability of moving from node n1 to node n2
        transition_matrix.append((n2, n1, 1./len(adjacency[n1])))
transition_matrix[:10]

[(18240, 0, 0.5),
 (7648, 0, 0.5),
 (22247, 1, 0.04),
 (6701, 1, 0.04),
 (1480, 1, 0.04),
 (19196, 1, 0.04),
 (24579, 1, 0.04),
 (13216, 1, 0.04),
 (13317, 1, 0.04),
 (11040, 1, 0.04)]

**4.3 Parallelization**

In [45]:
# Caching the result of the transformation is one of the optimization tricks to
# improve the performance of the long-running PySpark applications/jobs.

import findspark
findspark.init()
from pyspark.sql import SparkSession

sparkContext=spark.sparkContext
edges_rdd = sparkContext.parallelize(transition_matrix).cache()
edges_rdd.sortByKey().take(10)

#the result is a list of triples of the form (i, j, m_ij)

[(0, 7648, 0.5),
 (0, 18240, 0.5),
 (1, 1480, 0.04),
 (1, 3794, 0.041666666666666664),
 (1, 4925, 0.041666666666666664),
 (1, 5313, 1.0),
 (1, 6513, 0.041666666666666664),
 (1, 6701, 0.041666666666666664),
 (1, 7158, 0.041666666666666664),
 (1, 7949, 0.041666666666666664)]

In [46]:
# The initial vector represents the probability distribution of a random surfer starting in 
#any of the customer nodes. 

import numpy as np
n = len(num_of_nodes)
page_rank = np.ones(n)/n

# Set convergence threshold
threshold = 0.0001
max_rep = 500
# Initialize iteration count and residual error
iteration = 0
residual_error = float('inf')

# Run iterations until convergence
while residual_error > threshold and iteration < max_rep:
    # Calculate new page rank values 
    new_page_rank_values = edges_rdd.map(lambda i_j_mij: (i_j_mij[0], i_j_mij[2]*page_rank[i_j_mij[1]]))
                     
    new_page_rank_values = new_page_rank_values.reduceByKey(lambda a, b: a+b).collect()

    # Calculate residual error
    residual_error = sum(abs(c - page_rank[i]) for (i, c) in new_page_rank_values)

    page_rank = np.array([c for (i, c) in new_page_rank_values])

    #increment iteration count
    iteration += 1

    # Print final results
print("Converged after", iteration, "iterations")
print(page_rank[:30])

Converged after 500 iterations
[3.70666197e-05 3.69146024e-05 3.48185448e-05 3.48631896e-05
 7.14783159e-05 3.48422312e-05 3.48083591e-05 3.48426897e-05
 3.47974385e-05 3.47758226e-05 3.48051743e-05 3.51569230e-05
 3.61238147e-05 3.48116515e-05 3.47916589e-05 3.48073982e-05
 3.48198802e-05 3.47982801e-05 3.48128496e-05 3.48773769e-05
 3.48190426e-05 3.50010041e-05 3.47689842e-05 3.48139688e-05
 3.48149974e-05 3.48079324e-05 1.47658409e-06 3.48185448e-05
 3.48631896e-05 4.57807281e-05]


**4.4 Teleport Variation**

The implementation of PageRank in this way it is effective only if the graph is strongly connected and doesn't present dead ends nor spider traps. 
In the graph obtained dead ends are not possible because each edge is bidirectional, however it is not connected. This means that there could be spider traps. Spider traps are a set of nodes with no dead ends, but no arcs
out. To avoid this problem the calculation of PageRank is modified by allowing each random surfer a small probability beta to be teleported to a random page. 

In [47]:
edges_rdd = sparkContext.parallelize(transition_matrix).cache()

n = len(num_of_nodes)
page_rank_t = np.ones(n)/n

# Set convergence threshold
threshold = 0.0001
max_rep = 500
# Initialize iteration count and residual error
iteration = 0
residual_error = float('inf')

# The conventional value of beta is 0.85
beta = 0.85

# Run iterations until convergence
while residual_error > threshold and iteration < max_rep:
    # Calculate new page rank values 
    new_page_rank_values_t = edges_rdd.map(lambda i_j_mij: (i_j_mij[0], i_j_mij[2]*page_rank_t[i_j_mij[1]]))
                     
    new_page_rank_values_t = new_page_rank_values_t.reduceByKey(lambda a, b: a+b).collect()

    # Calculate residual error
    residual_error = sum(abs(c - page_rank_t[i]) for (i, c) in new_page_rank_values_t)

    page_rank_t = np.array([beta * c + (1 - beta) * 1.0/n for (i, c) in new_page_rank_values_t])

    #increment iteration count
    iteration += 1

    # Print final results
print("Converged after", iteration, "iterations")
print(page_rank_t[:30])

Converged after 500 iterations
[3.70006287e-05 3.69808822e-05 3.56331992e-05 3.56556137e-05
 6.67885463e-05 3.56445968e-05 3.56311188e-05 3.56451396e-05
 3.56261960e-05 3.56210164e-05 3.56304201e-05 3.57857788e-05
 3.65848866e-05 3.56313323e-05 3.56266330e-05 3.56349756e-05
 3.56340369e-05 3.56286290e-05 3.56353522e-05 3.56595108e-05
 3.56409313e-05 3.57301101e-05 3.56180653e-05 3.56350141e-05
 3.56363551e-05 3.56324800e-05 6.81949773e-06 3.58503670e-05
 3.58694194e-05 4.34013736e-05]


In [48]:
# Note the difference in the scores between PageRank scores and its variation 

diff_pagerank = page_rank - page_rank_t
diff_pagerank[:50]

array([ 6.59910179e-08, -6.62798551e-08, -8.14654405e-07, -7.92424162e-07,
        4.68976959e-06, -8.02365691e-07, -8.22759681e-07, -8.02449838e-07,
       -8.28757453e-07, -8.45193778e-07, -8.25245805e-07, -6.28855767e-07,
       -4.61071979e-07, -8.19680809e-07, -8.34974121e-07, -8.27577383e-07,
       -8.14156686e-07, -8.30348876e-07, -8.22502542e-07, -7.82133952e-07,
       -8.21888699e-07, -7.29105997e-07, -8.49081151e-07, -8.21045224e-07,
       -8.21357684e-07, -8.24547592e-07, -5.34291363e-06, -1.03182226e-06,
       -1.00622983e-06,  2.37935450e-06,  2.37943382e-06,  2.43936799e-06,
        2.32181926e-06, -1.67725546e-07, -9.98994484e-08, -7.97077152e-08,
       -8.38395567e-09, -6.26234132e-08, -1.04023959e-06,  1.19267450e-06,
        7.03899969e-07,  1.11774289e-06,  1.22010902e-06,  1.21560184e-06,
        1.05448713e-06,  1.20162793e-06,  1.20412156e-06,  1.19912964e-06,
        1.15974806e-06,  1.19774051e-06])

In [49]:
# Create a dictionary to store the PageRank with teleport scores for each node
pagerank_scores = {}
for i, node in enumerate(node_dict.keys()):
    pagerank_scores[node] = page_rank[i]

# Print the top 10 nodes with the highest PageRank scores
top_nodes = sorted(pagerank_scores.items(), key=lambda x: x[1], reverse=True)[:20]
for node, score in top_nodes:
    print("Node {}: PageRank score = {:.5f}".format(node, score))

Node 39851134: PageRank score = 0.00014
Node 14257719: PageRank score = 0.00011
Node 37315844: PageRank score = 0.00011
Node 48784470: PageRank score = 0.00011
Node 16276436: PageRank score = 0.00011
Node 9913444: PageRank score = 0.00011
Node 10372192: PageRank score = 0.00011
Node 22249337: PageRank score = 0.00009
Node 13331217: PageRank score = 0.00009
Node 12406773: PageRank score = 0.00008
Node 52974830: PageRank score = 0.00008
Node 21433522: PageRank score = 0.00008
Node 52363526: PageRank score = 0.00008
Node 16741753: PageRank score = 0.00008
Node 5448082: PageRank score = 0.00008
Node 44848235: PageRank score = 0.00008
Node 42277193: PageRank score = 0.00008
Node 1404310: PageRank score = 0.00008
Node 16455264: PageRank score = 0.00008
Node 25077527: PageRank score = 0.00008


In [50]:
# Create a dictionary to store the PageRank with teleport scores for each node
pagerank_t_scores = {}
for i, node in enumerate(node_dict.keys()):
    pagerank_t_scores[node] = page_rank_t[i]

# Print the top 10 nodes with the highest PageRank scores
top_nodes_t = sorted(pagerank_t_scores.items(), key=lambda x: x[1], reverse=True)[:20]
for node, score in top_nodes_t:
    print("Node {}: PageRank score = {:.5f}".format(node, score))

Node 39851134: PageRank score = 0.00013
Node 48784470: PageRank score = 0.00010
Node 14257719: PageRank score = 0.00010
Node 10372192: PageRank score = 0.00010
Node 16276436: PageRank score = 0.00009
Node 37315844: PageRank score = 0.00009
Node 9913444: PageRank score = 0.00009
Node 22249337: PageRank score = 0.00008
Node 13331217: PageRank score = 0.00008
Node 12406773: PageRank score = 0.00008
Node 21433522: PageRank score = 0.00007
Node 52974830: PageRank score = 0.00007
Node 5448082: PageRank score = 0.00007
Node 52363526: PageRank score = 0.00007
Node 1404310: PageRank score = 0.00007
Node 50339489: PageRank score = 0.00007
Node 16741753: PageRank score = 0.00007
Node 42277193: PageRank score = 0.00007
Node 2535508: PageRank score = 0.00007
Node 22539698: PageRank score = 0.00007


 <font size="5">5. Introduction of the "helpfulness vector"</font>

Pagerank values are all pretty low which is normal as in the exploratory analysis it was already shown that the most of the customers had few reviews in common. This is also because the sample chosen is very restricted compared to the size of the original dataset. 
Furthermore, many nodes have exactly the same score. For this reason I decide to introduce the variable "helpful_votes", to try to diversify between who left a useful review and who didn't, in order to create a better ordering. 

In [51]:
# Group by customer_id and collect product_ids into a list
hv_df = sub_df.select("customer_id", "helpful_votes").groupBy("customer_id").sum("helpful_votes").alias("helpful_votes")
hv_df.show(10)

+-----------+------------------+
|customer_id|sum(helpful_votes)|
+-----------+------------------+
|   42706553|                 1|
|   19159455|                 0|
|   52808364|                 0|
|   12774562|                 6|
|   22295949|                 0|
|   47440766|                11|
|   12468782|                 0|
|   18658791|                 0|
|   22500595|                 0|
|   14340978|                 0|
+-----------+------------------+
only showing top 10 rows



In [52]:
#find the total number of times the reviews have been considered useful 

filtered_hv_df = hv_df.filter(col("customer_id").isin(nodes_list))
total_hv = filtered_hv_df.groupBy().sum("sum(helpful_votes)").collect()[0][0]
total_hv

42957

In [53]:
#calculate the helpfulness score for each customer  

filtered_hv_df = filtered_hv_df.withColumn('helpful_votes', col('sum(helpful_votes)') / total_hv)
filtered_hv_df.show(10)

+-----------+------------------+--------------------+
|customer_id|sum(helpful_votes)|       helpful_votes|
+-----------+------------------+--------------------+
|   52808364|                 0|                 0.0|
|   22295949|                 0|                 0.0|
|   47440766|                11|2.560700235118839...|
|   12468782|                 0|                 0.0|
|   14340978|                 0|                 0.0|
|   39528786|                 0|                 0.0|
|   23382296|                 1|2.327909304653490...|
|    7801327|                 0|                 0.0|
|   24225794|                 0|                 0.0|
|    2862763|                 0|                 0.0|
+-----------+------------------+--------------------+
only showing top 10 rows



In [54]:
#dictinary with as key the customer_id and value the "helpfulness"
hv_dict = dict(filtered_hv_df.rdd.map(lambda x: (x[0], x[2])).collect())


In [55]:
#pagerank scores

pagerank_t_scores = {}
for i, node in enumerate(node_dict.keys()):
    pagerank_t_scores[node] = page_rank_t[i]

In [56]:
#sum the helpfulness score to the pagerank score for each customer 

final_score = {k: pagerank_t_scores.get(k, 0) + hv_dict.get(k, 0) for k in set(pagerank_t_scores) | set(hv_dict)}
#final_score

In [57]:
# Print the top 10 nodes with the highest final score 

top_nodes_t = sorted(final_score.items(), key=lambda x: x[1], reverse=True)[:20]
for node, score in top_nodes_t:
    print("Node {}: Final score = {:.5f}".format(node, score))

Node 52855449: Final score = 0.05663
Node 12372811: Final score = 0.02976
Node 41866357: Final score = 0.02586
Node 45035381: Final score = 0.01486
Node 15208771: Final score = 0.01203
Node 53090839: Final score = 0.01128
Node 45843410: Final score = 0.01021
Node 51840028: Final score = 0.00953
Node 50695896: Final score = 0.00951
Node 18833993: Final score = 0.00921
Node 30669680: Final score = 0.00898
Node 35832624: Final score = 0.00865
Node 41766042: Final score = 0.00798
Node 31864628: Final score = 0.00779
Node 52860694: Final score = 0.00737
Node 32684861: Final score = 0.00709
Node 52810431: Final score = 0.00681
Node 51297632: Final score = 0.00600
Node 35360153: Final score = 0.00595
Node 51721371: Final score = 0.00567
