In [5]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame

# 1. Membuat SparkSession dengan GraphFrames package sudah terpasang
spark = SparkSession.builder \
    .appName("AmazonCoPurchasePageRank") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.1-spark3.0-s_2.12") \
    .getOrCreate()



In [11]:
# 1 Baca file mtx sebagai RDD baris per baris
rdd_raw = spark.sparkContext.textFile("com-Amazon.mtx")

# Buang baris komentar (biasanya diawali '%')
rdd_data = rdd_raw.filter(lambda line: not line.startswith('%'))

# Tampilkan beberapa baris pertama
rdd_data.take(10)


['334863 334863 925872',
 '53526 1',
 '71632 1',
 '98006 1',
 '148224 1',
 '209320 1',
 '268299 1',
 '270060 1',
 '302148 1',
 '16283 2']

In [12]:
# 2 Pisahkan baris jadi kolom
edges_rdd = rdd_data.map(lambda line: line.strip().split())

# Ambil src dan dst saja (abaikan kolom ke-3 kalau ada)
edges_rdd = edges_rdd.map(lambda cols: (int(cols[0]), int(cols[1])))

# Buat DataFrame edges
edges_df = edges_rdd.toDF(["src", "dst"])
edges_df.show(10)


+------+------+
|   src|   dst|
+------+------+
|334863|334863|
| 53526|     1|
| 71632|     1|
| 98006|     1|
|148224|     1|
|209320|     1|
|268299|     1|
|270060|     1|
|302148|     1|
| 16283|     2|
+------+------+
only showing top 10 rows



In [13]:
# 3 Ambil semua id unik dari src dan dst
vertices_rdd = edges_rdd.flatMap(lambda x: [x[0], x[1]]).distinct()

# Konversi jadi DataFrame vertices
vertices_df = vertices_rdd.map(lambda x: (x,)).toDF(["id"])
vertices_df.show(10)


+------+
|    id|
+------+
| 53526|
| 71632|
| 98006|
|148224|
|209320|
|270060|
|302148|
|     2|
|181370|
|200002|
+------+
only showing top 10 rows



In [10]:
# 4 dan 5 buat graph dan lakukan pagerank
from graphframes import GraphFrame

# Buat Graph
graph = GraphFrame(vertices_df, edges_df)

# Jalankan PageRank
result = graph.pageRank(resetProbability=0.15, maxIter=10)

# Tampilkan hasil PageRank
result.vertices.select("id", "pagerank").orderBy("pagerank", ascending=False).show(10)


+-----+------------------+
|   id|          pagerank|
+-----+------------------+
|27287| 60.52333936844408|
|12994| 48.50092854672143|
| 6410| 46.19725016853048|
| 3958|42.230366050738134|
| 1991|41.796636413874964|
|  832| 41.66108216095725|
|33532|41.206145538503925|
| 4493| 40.56672444690634|
|11097|40.157690738311125|
|23200|39.742303647024144|
+-----+------------------+
only showing top 10 rows



In [None]:
Penjelasan tiap langkah:
1. Membuat SparkSession
Membuat SparkSession baru dengan konfigurasi yang sudah memuat library GraphFrames.
graphframes:graphframes:0.8.1-spark3.0-s_2.12 adalah versi GraphFrames yang kompatibel.

2. Membaca Data
Kamu harus punya dataset edge (pasangan produk yang co-purchased).
Biasanya berupa file CSV yang tiap baris berisi dua kolom: src dan dst (produk sumber dan produk tujuan).
Kalau pakai .mtx, biasanya perlu konversi dulu ke format CSV.

3. Membuat Vertices
Vertices adalah produk unik yang muncul di sisi src dan dst.
Ambil kolom src dan dst, gabungkan, lalu buat unik (distinct).

4. Membuat GraphFrame
GraphFrame dibuat dari vertices dan edges DataFrame.

5. Menjalankan PageRank
Panggil method .pageRank() pada GraphFrame, atur damping factor (resetProbability) dan jumlah iterasi (maxIter).

6. Ambil dan urutkan hasil
Ambil dataframe vertices yang sudah diberi kolom pagerank oleh GraphFrame, lalu urutkan berdasarkan PageRank menurun.

In [14]:
# menggabungkan pagerank dan community
# A. Load komunitas
communities_all_df = spark.read.text("com-Amazon_Communities_all.mtx")

# Parse isi: pastikan formatnya sesuai (misalnya dua kolom node dan komunitas)
communities_all_rdd = communities_all_df.rdd \
    .filter(lambda row: not row.value.startswith('%')) \
    .map(lambda row: row.value.split()) \
    .map(lambda parts: (int(parts[0]), int(parts[1])))

communities_df = communities_all_rdd.toDF(["id", "community_id"])


In [16]:
# Merge dengan community_all
# Ambil hasil PageRank dalam bentuk DataFrame
pagerank_df = results.vertices  

# Gabungkan dengan komunitas
pagerank_with_community = pagerank_df.join(communities_df, on="id")

# Tampilkan hasil akhir
pagerank_with_community.orderBy("pagerank", ascending=False).show(10)



+-----+-----------------+------------+
|   id|         pagerank|community_id|
+-----+-----------------+------------+
|27287|60.52333936845344|       38085|
|27287|60.52333936845344|        3875|
|27287|60.52333936845344|       13168|
|27287|60.52333936845344|        3876|
|27287|60.52333936845344|       70859|
|27287|60.52333936845344|        3877|
|27287|60.52333936845344|       25155|
|27287|60.52333936845344|       38086|
|27287|60.52333936845344|       67734|
|27287|60.52333936845344|       67735|
+-----+-----------------+------------+
only showing top 10 rows



In [None]:
Produk 27287 adalah:

Produk paling sentral atau penting dalam jaringan (dilihat dari PageRank tertinggi),

Dan dia menjembatani atau hadir di banyak komunitas pembelian, artinya:

Produk ini sangat umum,

Bisa juga bersifat "penghubung" antar berbagai kelompok produk.



In [17]:
# menggabungkan pagerank dan community
# A. Load komunitas
communities_all_df = spark.read.text("com-Amazon_Communities_top5000.mtx")

# Parse isi: pastikan formatnya sesuai (misalnya dua kolom node dan komunitas)
communities_all_rdd = communities_all_df.rdd \
    .filter(lambda row: not row.value.startswith('%')) \
    .map(lambda row: row.value.split()) \
    .map(lambda parts: (int(parts[0]), int(parts[1])))

communities_df = communities_all_rdd.toDF(["id", "community_id"])


In [34]:
# Merge dengan community_all
# Ambil hasil PageRank dalam bentuk DataFrame
pagerank_df = results.vertices  

# Gabungkan dengan komunitas
pagerank_with_community = pagerank_df.join(communities_df, on="id")

# Tampilkan hasil akhir
pagerank_with_community.orderBy("pagerank", ascending=False).show(100)



+-----+------------------+------------+
|   id|          pagerank|community_id|
+-----+------------------+------------+
| 7290|13.922389811656725|        3981|
| 7290|13.922389811656725|        3145|
| 7290|13.922389811656725|        1629|
| 7290|13.922389811656725|         687|
| 7290|13.922389811656725|         681|
| 6412|11.677526191440467|        1613|
| 6412|11.677526191440467|         593|
| 6412|11.677526191440467|         578|
|20129|11.542265499052812|        3985|
|20129|11.542265499052812|        3334|
|20129|11.542265499052812|        3148|
|20129|11.542265499052812|        1630|
|20129|11.542265499052812|         694|
|20129|11.542265499052812|         693|
|18153|11.494478183537911|        3982|
|18153|11.494478183537911|        3744|
|18153|11.494478183537911|        3144|
|18153|11.494478183537911|        1157|
|18153|11.494478183537911|         650|
|18153|11.494478183537911|         649|
|15429|10.689605718626723|        3985|
|15429|10.689605718626723|        3334|


In [28]:
# Load data
nodeid_df = spark.read.text("com-Amazon_nodeid.mtx")

# Parse isi
nodeid_rdd = nodeid_df.rdd \
    .filter(lambda row: not row.value.startswith('%')) \
    .map(lambda row: row.value.split()) \
    .filter(lambda parts: len(parts) >= 2) \
    .map(lambda parts: (int(parts[0]), parts[1]))

nodeid_df_clean = nodeid_rdd.toDF(["id", "product_code"])


In [29]:
results = graph.pageRank(resetProbability=0.15, maxIter=10)
pagerank_df = results.vertices.select("id", "pagerank")


In [30]:
# Show final 
final_results = pagerank_df.join(nodeid_df_clean, on="id")
final_results.orderBy("pagerank", ascending=False).show(10)


+------+------------------+------------+
|    id|          pagerank|product_code|
+------+------------------+------------+
|334863|0.6112374546035292|           1|
+------+------------------+------------+



In [31]:
nodeid_df.show(10, truncate=False)


+--------------------------------------------------------------------------------+
|value                                                                           |
+--------------------------------------------------------------------------------+
|%%MatrixMarket matrix array real general                                        |
|%-------------------------------------------------------------------------------|
|% SuiteSparse Matrix Collection, Tim Davis                                      |
|% https://sparse.tamu.edu/SNAP/com-Amazon                                       |
|% name: SNAP/com-Amazon : nodeid matrix                                         |
|%-------------------------------------------------------------------------------|
|334863 1                                                                        |
|1                                                                               |
|2                                                                               |
|4  

In [33]:
print("Jumlah ID unik di PageRank:", pagerank_df.select("id").distinct().count())
print("Jumlah ID unik di NodeID:", nodeid_df_clean.select("id").distinct().count())

# Cek ID yang cocok:
matched_ids = pagerank_df.join(nodeid_df_clean, on="id")
print("Jumlah ID yang cocok:", matched_ids.count())


Jumlah ID unik di PageRank: 334863
Jumlah ID unik di NodeID: 1
Jumlah ID yang cocok: 1


In [36]:
combined_df = pagerank_df \
    .join(communities_df, on="id") \
    .join(nodeid_df_clean, on="id", how="left")  # pakai left join jika tidak semua ada di nodeid


In [37]:
combined_df.filter(combined_df.id == 334864).show()


+---+--------+------------+------------+
| id|pagerank|community_id|product_code|
+---+--------+------------+------------+
+---+--------+------------+------------+



In [38]:
pagerank_df.filter(pagerank_df.id == 334864).show()


+---+--------+
| id|pagerank|
+---+--------+
+---+--------+



In [42]:
communities_df.filter(communities_df.id == 334864).show()


+---+------------+
| id|community_id|
+---+------------+
+---+------------+



In [43]:
combined_df = nodeid_df_clean \
    .join(pagerank_df, on="id", how="left") \
    .join(communities_df, on="id", how="left")


In [44]:
combined_df.filter(combined_df.id == 334864).show()


+---+------------+--------+------------+
| id|product_code|pagerank|community_id|
+---+------------+--------+------------+
+---+------------+--------+------------+

