In [2]:
# تثبيت Java وSpark وFindSpark
!apt-get install openjdk-11-jdk -y
!wget -q https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

# إعداد المتغيرات البيئية
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# بدء Spark
import findspark
findspark.init()


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre
  x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk
  openjdk-11-jre x11-utils
0 upgraded, 10 newly installed, 0 to remove and 35 not upgraded.
Need to get 6,920 kB of archives.
After this operation, 16.9 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB]
Get:3 http://archive.ubuntu.com/ubuntu jam

In [4]:
from pyspark.sql import SparkSession

# بدء جلسة Spark
spark = SparkSession.builder.appName("SocialGraph").getOrCreate()
sc = spark.sparkContext

# قراءة البيانات
lines = sc.textFile("friends_common.txt")

# تحليل كل سطر إلى (user_id, name, [friend_ids])
def parse_line(line):
    # Skip lines that start with '#'
    if line.strip().startswith('#'):
        return None
    parts = line.strip().split()
    user_id = int(parts[0])
    name = parts[1]
    friends = list(map(int, parts[2].split(','))) if len(parts) > 2 else []
    return (user_id, name, friends)

# Filter out None values resulting from skipped lines
users_rdd = lines.map(parse_line).filter(lambda x: x is not None)


# عرض عينة من البيانات
users_rdd.take(5)

[(1, 'Sidi', [2, 3, 4]),
 (2, 'Mohamed', [1, 3, 5]),
 (3, 'Aicha', [1, 2, 4, 6]),
 (4, 'Ahmed', [1, 3]),
 (5, 'Leila', [2])]

In [5]:
# الخطوة: توليد أزواج الأصدقاء الممكنة
# كل عنصر هو (user_id, name, friends_list)
# نريد تحويله إلى: ((user_id, friend_id), friends_of_user_id)

def generate_friend_pairs(user_tuple):
    user_id, name, friends = user_tuple
    pairs = []
    for friend_id in friends:
        key = (min(user_id, friend_id), max(user_id, friend_id))
        pairs.append((key, set(friends)))  # استخدم set لتسهيل التقاطع لاحقًا
    return pairs

friend_pairs_rdd = users_rdd.flatMap(generate_friend_pairs)

# عرض عينة
friend_pairs_rdd.take(5)


[((1, 2), {2, 3, 4}),
 ((1, 3), {2, 3, 4}),
 ((1, 4), {2, 3, 4}),
 ((1, 2), {1, 3, 5}),
 ((2, 3), {1, 3, 5})]

In [6]:
# تجميع البيانات حسب كل زوج (1,2), (1,3), ...
grouped_pairs_rdd = friend_pairs_rdd.reduceByKey(lambda a, b: a & b)

# تحويل النتائج إلى شكل قابل للعرض
common_friends_rdd = grouped_pairs_rdd.map(lambda x: (x[0][0], x[0][1], list(x[1])))

# عرض عينة
common_friends_rdd.take(5)


[(1, 3, [2, 4]), (1, 2, [3]), (1, 4, [3]), (2, 3, [1]), (2, 5, [])]

In [7]:
# استخراج الأصدقاء المشتركين بين المستخدم 1 و 2 فقط
target_pair = (1, 2)

# تصفية النتائج لهذا الزوج فقط
result = common_friends_rdd.filter(lambda x: (x[0], x[1]) == target_pair).collect()

# عرض النتيجة
for u1, u2, common in result:
    print(f"{u1}<Sidi> {u2}<Mohamed> {common}")


1<Sidi> 2<Mohamed> [3]


In [8]:
# استخراج (user_id, name)
id_name_rdd = users_rdd.map(lambda x: (x[0], x[1]))

# تحويله إلى قاموس لاستخدامه لاحقًا محليًا
id_to_name = dict(id_name_rdd.collect())


In [9]:
# استخدم القاموس لربط الأسماء
for u1, u2, common in result:
    name1 = id_to_name.get(u1, "Unknown")
    name2 = id_to_name.get(u2, "Unknown")
    print(f"{u1}<{name1}> {u2}<{name2}> {common}")


1<Sidi> 2<Mohamed> [3]
