In [None]:
pip install findspark

In [None]:
%pip install pyspark

In [None]:

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

#  1. Khởi tạo SparkSession ---
spark = SparkSession.builder \
    .appName("PPR_Corrected") \
    .master("local[1]") \
    .getOrCreate()
sc = spark.sparkContext

#  2. Dữ liệu đồ thị và tham số  ---
links_data = {
    1: [2, 3],       # P1 trỏ đến P2, P3. Out-degree = 2
    2: [],           # P2 là sink. Out-degree = 0
    3: [1, 2, 5],    # P3 trỏ đến P1, P2, P5. Out-degree = 3
    4: [5, 6],       # P4 trỏ đến P5, P6. Out-degree = 2
    5: [4, 6],       # P5 trỏ đến P4, P6. Out-degree = 2
    6: [4]           # P6 trỏ đến P4. Out-degree = 1
}
b_links = sc.broadcast(links_data)

nodes = sorted(list(links_data.keys()))
nodes_rdd = sc.parallelize(nodes, 1)

# Tham số 
source_node = 1
damping_factor = 0.85  # d 
teleport_prob = 1 - damping_factor # 1-d 

max_iter = 15
tol = 1e-6

#  3. Khởi tạo hạng (ranks) ---
# Khởi tạo P1=1, các nút khác = 0   
ranks = nodes_rdd.map(lambda u: (u, 1.0 if u == source_node else 0.0)) \
                 .persist(StorageLevel.MEMORY_ONLY)

#  4. Hiển thị ---

def print_header(nodes, source, teleport):
    """In tiêu đề cho bảng kết quả."""
    print(f"\nBắt đầu tính toán Personalized PageRank (Nguồn=P{source}, 1-d={teleport:.2f})")
    header = f" {'Vòng':^5} |" + "".join([f"      P{node}      |" for node in nodes]) + "  Thay đổi (Delta) "
    separator = "-" * len(header)
    print(separator)
    print(header)
    print(separator)

print_header(nodes, source_node, teleport_prob)

# In hạng ban đầu (Vòng 0)
init_ranks_list = ranks.sortByKey().collect()
rank_str = " |".join([f"   {pr:^11.5f}" for _, pr in init_ranks_list])
print(f" {0:^5} |{rank_str} |")

# --- 5. Vòng lặp PPR ---
for i in range(1, max_iter + 1):
    # Tính toán đóng góp từ các liên kết
    contribs = ranks.flatMap(
        lambda x: [(nbr, x[1] / len(b_links.value[x[0]]))
                   for nbr in b_links.value[x[0]]]
        if b_links.value[x[0]] else []
    ).reduceByKey(lambda a, b: a + b, numPartitions=1)

    # Áp dụng công thức Personalized PageRank 
    # new_rank = (thành phần teleport) + damping_factor * (tổng đóng góp từ liên kết)
    new_ranks = nodes_rdd.map(lambda u: (u, teleport_prob if u == source_node else 0.0)) \
        .leftOuterJoin(contribs) \
        .mapValues(lambda x: x[0] + damping_factor * (x[1] or 0.0)) \
        .persist(StorageLevel.MEMORY_ONLY)
    


    # Tính toán delta để kiểm tra hội tụ
    delta = ranks.join(new_ranks) \
                 .map(lambda x: abs(x[1][0] - x[1][1])) \
                 .sum()

    # In kết quả của vòng lặp
    current_ranks_list = new_ranks.sortByKey().collect()
    rank_str = " |".join([f"   {pr:^11.5f}" for _, pr in current_ranks_list])
    print(f" {i:^5} |{rank_str} |    {delta:8.2e}    ")

    # Chuẩn bị cho vòng lặp tiếp theo
    ranks.unpersist()
    ranks = new_ranks

    # Kiểm tra điều kiện dừng
    if delta < tol:
        print("-" * (len(rank_str) + 30))
        print(f"Đã hội tụ sau {i} vòng lặp (delta={delta:.2e})")
        break
else:
    print("-" * (len(rank_str) + 30))
    print(f"Đã đạt tối đa {max_iter} vòng lặp.")

# --- 6. In kết quả xếp hạng cuối cùng ---
#print("\nBẢNG XẾP HẠNG CUỐI CÙNG (So sánh với tài liệu ):")
#final_ranks_sorted = ranks.sortBy(lambda x: x[1], ascending=False).collect()
#for idx, (node, rank) in enumerate(final_ranks_sorted):
    #print(f"  {idx + 1}. Nút P{node}: {rank:.5f}")

spark.stop()