<a href="https://colab.research.google.com/github/lynguyenp/Link_Analysis_AMD/blob/main/Link_Analysis_AMD_Ly.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1. SETTING UP THE WORKING ENVIRONMENT**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz

!pip install pyspark
!pip install findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Start Spark session
spark = SparkSession.builder.appName("LINKANALYSIS").getOrCreate()

from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=43d1adac7245de0e6462db81a8463c9b30435f4dd731a896bfe28b845e407fe5
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# IMPORTING LIBRARIES
import zipfile
import glob
import itertools
import networkx as nx
import matplotlib.pyplot as plt
from functools import reduce
from pyspark.sql.functions import lit, collect_set, col, broadcast, expr, collect_list, countDistinct, count
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# **2. LOADING & PREPROCESSING DATA**


In [None]:
os.environ['KAGGLE_USERNAME'] = "phuonglynguyen"
os.environ['KAGGLE_KEY'] = "48e3a8753b051b512a1ec2a8a094531a"
!kaggle datasets download -d cynthiarempel/amazon-us-customer-reviews-dataset

Downloading amazon-us-customer-reviews-dataset.zip to /content
100% 21.0G/21.0G [04:25<00:00, 57.4MB/s]
100% 21.0G/21.0G [04:25<00:00, 84.6MB/s]


In [None]:
# UNZIP AND EXTRACT ALL THE FILES

# Set the zip path
zip_path = '/content/amazon-us-customer-reviews-dataset.zip'

# Specify the directory to extract the contents of the ZIP file to
extract_path = '/content/customer-review'

# Open the ZIP file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    # Extract all the contents of the ZIP file to the extract_path directory
    zip_ref.extractall(extract_path)


In [None]:
# CREATE THE FINAL DATAFRAME

# Define a directory
directory = glob.glob('/content/customer-review/*.tsv')

# Extracted dataset file
dfiles = [f for f in directory if 'Beauty' in f]

# Creating a dataframe
for file in dfiles:
  datasets = [spark.read.csv(file, sep='\t', header=True, inferSchema=True).select(['customer_id', 'product_id', 'product_title'])]
  product_dataframe = reduce(lambda df1, df2: df1.unionAll(df2), datasets)
row_count = product_dataframe.count()
print("Total number of rows:", row_count)

# Dropping duplicates
product_dataframe = product_dataframe.dropDuplicates()
row_count = product_dataframe.count()
print("Total number of rows after dropping duplicates:", row_count)

# Dropping customers who have less than two reviews
product_count = product_dataframe.groupby('customer_id').count().withColumnRenamed('count', 'product_count')
product_dataframe = product_dataframe.join(broadcast(product_count), 'customer_id',).filter(product_count['product_count'] > 1).drop('product_count')

row_count = product_dataframe.count()
print("The final total number of rows:", row_count) # 3.143.789

product_dataframe.show(20)

Total number of rows: 5115666
Total number of rows after dropping duplicates: 5115138
The final total number of rows: 3143789
+-----------+----------+--------------------+
|customer_id|product_id|       product_title|
+-----------+----------+--------------------+
|   44311418|B000C1UAX4|Paradise Eau de P...|
|   26227724|B00016XJ4M|Thayers Alcohol-f...|
|   41745952|B00HJD8NLY|Even Glow Serum V...|
|   44605318|B004UAM2D4|Millennium Moms P...|
|   22851022|B005OYTSG4|Ema Jane - Assort...|
|   13666852|B00ADQAIQC|High Beams Intens...|
|   16398387|B00W1WWQ98|Nero Women's Hand...|
|   21085451|B00IH0B2M0|Tree Hut Firming ...|
|   30067162|B0048I3B9C|Victoria's Secret...|
|   30102189|B00VKEP3Z2|Primal Pit Paste ...|
|   15280269|B00URN2ZNK|Philips Sonicare ...|
|   51832538|B004FSXYOC|INFINITE SKIN CLA...|
|    8196048|B003P7VX24|NaturOli Soap Nut...|
|    1063926|B006R8AB0I|Bath and Body Wor...|
|   17616366|B000LNHBLW|Wernets Poligrip ...|
|   10595946|B006KA23MM|Bundle Monster 40...|


# **3. CONSTRUCTING PRODUCT LINKS**

In [None]:
# Grouping products by customers
grouped_product = product_dataframe.groupBy("customer_id").agg(collect_set("product_id").alias("grouped_product"))

grouped_product.take(10)

[Row(customer_id=10128, grouped_product=['B008Q8XJKQ', 'B008Q8XIVQ', 'B00DEX61F8']),
 Row(customer_id=10206, grouped_product=['B00F90Y21O', 'B00ACN2Q84']),
 Row(customer_id=10236, grouped_product=['B003842346', 'B00761YIQ8', 'B00BMHWSPA']),
 Row(customer_id=10266, grouped_product=['B005FOPQT6', 'B0077PLO06', 'B00140RXHS']),
 Row(customer_id=10293, grouped_product=['B00LLAYCXE', 'B00OQAZ9FG', 'B00NM7NU6E', 'B00OQAZGL8', 'B00OQAZ9TM']),
 Row(customer_id=10348, grouped_product=['B00NTR9B6A', 'B003SZ4C1W', 'B003IU9HI0', 'B00027EG9C']),
 Row(customer_id=10368, grouped_product=['B00BISHCC2', 'B003AGK628', 'B003HL9TMY', 'B004TSFEBY']),
 Row(customer_id=10485, grouped_product=['B006NBBOIC', 'B00DCBHI2W', 'B007EEXE5S']),
 Row(customer_id=10517, grouped_product=['B004YDWNHC', 'B000C1VX6M']),
 Row(customer_id=10562, grouped_product=['B000NLJFNY', 'B0061DPB04'])]

In [None]:
# Creating product links
schema = ArrayType(StructType([
    StructField("product_1", StringType()),
    StructField("product_2", StringType())]))

product_links_udf = F.udf(lambda x: list(itertools.permutations(x, 2)), schema)

product_links = grouped_product.withColumn("grouped_product", F.explode(product_links_udf(F.col("grouped_product")))) \
    .selectExpr("grouped_product.*")

product_links.show(10)

+----------+----------+
| product_1| product_2|
+----------+----------+
|B008Q8XJKQ|B008Q8XIVQ|
|B008Q8XJKQ|B00DEX61F8|
|B008Q8XIVQ|B008Q8XJKQ|
|B008Q8XIVQ|B00DEX61F8|
|B00DEX61F8|B008Q8XJKQ|
|B00DEX61F8|B008Q8XIVQ|
|B00F90Y21O|B00ACN2Q84|
|B00ACN2Q84|B00F90Y21O|
|B003842346|B00761YIQ8|
|B003842346|B00BMHWSPA|
+----------+----------+
only showing top 10 rows



# **4. CONSTRUCTING THE TRANSITION MATRIX**

In [None]:
# Calculate the sum for each unique value in 'product_1'
count_product_1 = product_links.groupBy('product_1').agg(count("*").alias("count"))

# Join the product_links dataframe with the count_product_1 dataframe
mid_transition_matrix = product_links.join(count_product_1, 'product_1')

# Calculate the transition probability for each entry
transition_matrix = mid_transition_matrix.withColumn("probability", 1 / col('count')).drop('count')

transition_matrix.show(20)

+----------+----------+--------------------+
| product_1| product_2|         probability|
+----------+----------+--------------------+
|1304139220|B0050PQGNK|0.012195121951219513|
|1304139220|1304495396|0.012195121951219513|
|1304139220|B00D0NMMSM|0.012195121951219513|
|1304139220|B005F7IWFI|0.012195121951219513|
|1304139220|B00CL3GUAI|0.012195121951219513|
|1304139220|1304174867|0.012195121951219513|
|1304139220|B004FF93Q8|0.012195121951219513|
|1304139220|1304622193|0.012195121951219513|
|1304139220|B0047EC83W|0.012195121951219513|
|1304139220|B00AEX9JBY|0.012195121951219513|
|1304139220|B007QE1JKI|0.012195121951219513|
|1304139220|B004XQXST2|0.012195121951219513|
|1304139220|B003G9PMHI|0.012195121951219513|
|1304139220|B00CK2L21W|0.012195121951219513|
|1304139220|B003TA7MZ4|0.012195121951219513|
|1304139220|130414674X|0.012195121951219513|
|1304139220|B004AS7JV6|0.012195121951219513|
|1304139220|B004AHF1O4|0.012195121951219513|
|1304139220|B003TMO3EU|0.012195121951219513|
|130413922

# **5. PAGE RANK CALCULATION**

In [None]:
# INITIAL VALUE

n = transition_matrix.select("product_2").distinct().count()

initial_value = (1.0 / n)

page_rank = transition_matrix.select('product_2').distinct().withColumn('page_rank', lit(initial_value)).withColumnRenamed('product_2', 'product')

#page_rank_rdd = page_rank.rdd

page_rank.show(10)

+----------+--------------------+
|   product|           page_rank|
+----------+--------------------+
|B003TMO3EU|2.178592280811831E-6|
|B0091JI3YG|2.178592280811831E-6|
|B0067F28ZW|2.178592280811831E-6|
|B00TSQX6BW|2.178592280811831E-6|
|B00SDPCKFC|2.178592280811831E-6|
|B00VR0WOFQ|2.178592280811831E-6|
|B002UEH3M2|2.178592280811831E-6|
|B00TRLGT5S|2.178592280811831E-6|
|B000B626ZK|2.178592280811831E-6|
|B005IDV0XU|2.178592280811831E-6|
+----------+--------------------+
only showing top 10 rows



In [None]:
tolerance = 2.5e-9
iteration = 1
max_iterations = 30

while iteration < max_iterations:
  mid_page_rank = (transition_matrix
            .join(page_rank, transition_matrix.product_1 == page_rank.product)
            .withColumn('mid_page_rank', (col('page_rank') * col('probability')).cast('double'))
            .groupBy(transition_matrix.product_2)
            .agg(F.sum('mid_page_rank').alias('mid_page_rank')))


  la_distance = (mid_page_rank
            .join(page_rank, mid_page_rank.product_2 == page_rank.product)
            .withColumn('distance', F.abs((col('mid_page_rank') - col('page_rank'))**2))
            .agg(F.sum('distance').alias('total_distance'))
            .collect()[0]['total_distance'])
  print(f"Iteration: {iteration}, Total_distance: {la_distance}")

  if la_distance < tolerance:
    page_rank = mid_page_rank.withColumnRenamed('mid_page_rank', 'page_rank').withColumnRenamed('product_2', 'product')
    break

  page_rank = mid_page_rank.withColumnRenamed('mid_page_rank', 'page_rank').withColumnRenamed('product_2', 'product')
  iteration += 1


Iteration: 1, Total_distance: 2.1678718924855613e-05
Iteration: 2, Total_distance: 1.6249310051065845e-06
Iteration: 3, Total_distance: 2.741249437985978e-07
Iteration: 4, Total_distance: 1.297461313995385e-07
Iteration: 5, Total_distance: 6.70618846475035e-08
Iteration: 6, Total_distance: 3.7546934281090224e-08
Iteration: 7, Total_distance: 2.2335058989531047e-08
Iteration: 8, Total_distance: 1.4003116480200776e-08
Iteration: 9, Total_distance: 9.249650690592226e-09
Iteration: 10, Total_distance: 6.4855648469641665e-09
Iteration: 11, Total_distance: 4.775576457943916e-09
Iteration: 12, Total_distance: 3.719517428047669e-09
Iteration: 13, Total_distance: 3.010149317495592e-09
Iteration: 14, Total_distance: 2.5430383630146263e-09
Iteration: 15, Total_distance: 2.203405343446187e-09


In [None]:
page_rank.show(10)

+----------+--------------------+
|   product|           page_rank|
+----------+--------------------+
|B003TMO3EU|2.478929763914152E-5|
|B0091JI3YG|7.071317469191621E-6|
|B0067F28ZW|2.040237077181946E-4|
|B00RV6MF4A|5.911311401567683E-7|
|B00MW7U4FU|4.758264399925538E-6|
|B004HSO124|8.401630469703312E-5|
|B0018SA31O|4.127750619886106...|
|B00H7L5ZRS|1.025543896908267...|
|B00DO77SUG|5.342078913194232E-7|
|B003Z4QHP0| 4.33998360583395E-5|
+----------+--------------------+
only showing top 10 rows



In [None]:
# Sort the DataFrame by "page_rank" column in ascending order
sorted_page_rank = page_rank.orderBy(col("page_rank"))

# Select the top five products by page_rank
top_five = sorted_page_rank.head(5)
print("Top five products:")
for row in top_five:
    print(row)

# Select the bottom five products by page_rank
bottom_five = sorted_page_rank.tail(5)
print("Bottom five products:")
for row in bottom_five:
    print(row)

Top five products:
Row(product='B0049LUI9O', page_rank=0.0005670479421548141)
Row(product='B0043OYFKU', page_rank=0.0006307481388758013)
Row(product='B00DPE9EQO', page_rank=0.0006551653014446416)
Row(product='B001MA0QY2', page_rank=0.000676126380261458)
Row(product='B0014P8L9W', page_rank=0.0007073837925858547)
Bottom five products:
Row(product='B00GBEUZGI', page_rank=1.361561776350261e-08)
Row(product='B00CUYZ5Y0', page_rank=1.3716519558940939e-08)
Row(product='B00V5FMWIM', page_rank=1.4124060951145538e-08)
Row(product='B0006IVMG2', page_rank=1.4148257192682262e-08)
Row(product='B000E9BZGU', page_rank=1.4197607818895075e-08)
