In [2]:
# ===== FIXED SETUP FOR JUPYTER =====
import os
import sys

# Clear any existing Spark environment variables
spark_env_vars = ['SPARK_HOME', 'SPARK_LOCAL_DIRS', 'SPARK_CONF_DIR']
for var in spark_env_vars:
    if var in os.environ:
        del os.environ[var]

# Set clean environment
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Import and create SparkContext (only once!)
from pyspark import SparkContext, SparkConf

# Check if SparkContext already exists
try:
    # Try to access existing SparkContext
    sc.version
    print("✅ Using existing SparkContext")
except:
    # Create new SparkContext if none exists
    conf = SparkConf().setAppName("Lab2_Complete").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    print("✅ Created new SparkContext")

print(f"✅ SparkContext ready - Version: {sc.version}")
print(f"✅ Application: {sc.appName}")


25/09/25 09:59:10 WARN Utils: Your hostname, thuan-precision-5560 resolves to a loopback address: 127.0.1.1; using 192.168.1.5 instead (on interface wlp0s20f3)
25/09/25 09:59:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/25 09:59:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ Created new SparkContext
✅ SparkContext ready - Version: 3.5.6
✅ Application: Lab2_Complete


In [3]:
# ===== BÀI 2: LỌC CÁC RDD =====
print("=== BÀI 2: LỌC CÁC RDD ===\n")

# Tạo RDD từ 0 đến 9999 với 5 phân vùng
print("Step 1: Tạo RDD từ 0 đến 9999 với 5 phân vùng")
rdd_numbers = sc.parallelize(range(10000), 5)

print(f"✅ RDD được tạo với {rdd_numbers.count()} số")
print(f"✅ Số phân vùng: {rdd_numbers.getNumPartitions()}")

# Xem cách phân chia dữ liệu
partitions_info = [(i, len(partition)) for i, partition in enumerate(rdd_numbers.glom().collect())]
print("Phân bổ dữ liệu theo phân vùng:")
for partition_id, count in partitions_info:
    print(f"  Phân vùng {partition_id}: {count} số")

=== BÀI 2: LỌC CÁC RDD ===

Step 1: Tạo RDD từ 0 đến 9999 với 5 phân vùng


                                                                                

✅ RDD được tạo với 10000 số
✅ Số phân vùng: 5
Phân bổ dữ liệu theo phân vùng:
  Phân vùng 0: 2000 số
  Phân vùng 1: 2000 số
  Phân vùng 2: 2000 số
  Phân vùng 3: 2000 số
  Phân vùng 4: 2000 số


In [4]:
# ===== HIỂU LOGIC FILTER =====
print("Step 2: Hiểu logic lọc - số chia hết cho 3 nhưng KHÔNG chia hết cho 9")

# Test một vài số để hiểu logic
test_numbers = [3, 6, 9, 12, 15, 18, 21, 24, 27, 30]
print("\nTest logic với một vài số:")
print("Số | Chia hết cho 3? | Chia hết cho 9? | Kết quả filter")
print("-" * 55)

for num in test_numbers:
    div_by_3 = (num % 3 == 0)
    div_by_9 = (num % 9 == 0)
    result = div_by_3 and not div_by_9
    print(f"{num:2} | {div_by_3:13} | {div_by_9:13} | {result}")

print(f"\n📝 Kết luận: Các số thỏa mãn là: {[n for n in test_numbers if n%3==0 and n%9!=0]}")


Step 2: Hiểu logic lọc - số chia hết cho 3 nhưng KHÔNG chia hết cho 9

Test logic với một vài số:
Số | Chia hết cho 3? | Chia hết cho 9? | Kết quả filter
-------------------------------------------------------
 3 |             1 |             0 | True
 6 |             1 |             0 | True
 9 |             1 |             1 | False
12 |             1 |             0 | True
15 |             1 |             0 | True
18 |             1 |             1 | False
21 |             1 |             0 | True
24 |             1 |             0 | True
27 |             1 |             1 | False
30 |             1 |             0 | True

📝 Kết luận: Các số thỏa mãn là: [3, 6, 12, 15, 21, 24, 30]


In [6]:
# ===== ÁP DỤNG FILTER =====
print("Step 3: Áp dụng filter với lambda function")

# Áp dụng filter
filtered_rdd = rdd_numbers.filter(lambda n: n % 3 == 0 and n % 9 != 0)

print("✅ Filter đã được áp dụng (đây là transformation - lazy evaluation)")
print("💡 Chưa có tính toán thực sự xảy ra cho đến khi có action")

# Đếm số phần tử sau khi filter (đây là action - sẽ trigger computation)
print("\nCounting filtered elements (this triggers computation)...")
filtered_count = filtered_rdd.count()
print(f"✅ Số phần tử sau khi lọc: {filtered_count}")

# Tính toán để so sánh
total_div_by_3 = len([n for n in range(10000) if n % 3 == 0])  # Divisible by 3
total_div_by_9 = len([n for n in range(10000) if n % 9 == 0])  # Divisible by 9
expected_result = total_div_by_3 - total_div_by_9

print(f"📊 Verification:")
print(f"  - Số chia hết cho 3: {total_div_by_3}")
print(f"  - Số chia hết cho 9: {total_div_by_9}")
print(f"  - Kết quả mong đợi: {expected_result}")
print(f"  - Kết quả thực tế: {filtered_count}")
print(f"  - Đúng? {'Đúng ✅' if filtered_count == expected_result else 'Sai ❌'}")

Step 3: Áp dụng filter với lambda function
✅ Filter đã được áp dụng (đây là transformation - lazy evaluation)
💡 Chưa có tính toán thực sự xảy ra cho đến khi có action

Counting filtered elements (this triggers computation)...
✅ Số phần tử sau khi lọc: 2222
📊 Verification:
  - Số chia hết cho 3: 3334
  - Số chia hết cho 9: 1112
  - Kết quả mong đợi: 2222
  - Kết quả thực tế: 2222
  - Đúng? Đúng ✅


In [7]:
# ===== XEM KẾT QUẢ THEO PHÂN VÙNG =====
print("Step 4: Xem kết quả theo từng phân vùng")

# Lấy kết quả theo phân vùng
partitioned_results = filtered_rdd.glom().collect()

print("Kết quả lọc theo từng phân vùng:")
for partition_id, partition_data in enumerate(partitioned_results):
    print(f"Phân vùng {partition_id}: {len(partition_data)} số")
    if len(partition_data) > 0:
        print(f"  First 10: {partition_data[:10]}")
        print(f"  Last 10:  {partition_data[-10:]}")
    else:
        print(f"  (empty partition)")
    print()


Step 4: Xem kết quả theo từng phân vùng
Kết quả lọc theo từng phân vùng:
Phân vùng 0: 444 số
  First 10: [3, 6, 12, 15, 21, 24, 30, 33, 39, 42]
  Last 10:  [1956, 1959, 1965, 1968, 1974, 1977, 1983, 1986, 1992, 1995]

Phân vùng 1: 445 số
  First 10: [2001, 2004, 2010, 2013, 2019, 2022, 2028, 2031, 2037, 2040]
  Last 10:  [3957, 3963, 3966, 3972, 3975, 3981, 3984, 3990, 3993, 3999]

Phân vùng 2: 444 số
  First 10: [4002, 4008, 4011, 4017, 4020, 4026, 4029, 4035, 4038, 4044]
  Last 10:  [5955, 5961, 5964, 5970, 5973, 5979, 5982, 5988, 5991, 5997]

Phân vùng 3: 445 số
  First 10: [6000, 6006, 6009, 6015, 6018, 6024, 6027, 6033, 6036, 6042]
  Last 10:  [7959, 7962, 7968, 7971, 7977, 7980, 7986, 7989, 7995, 7998]

Phân vùng 4: 444 số
  First 10: [8004, 8007, 8013, 8016, 8022, 8025, 8031, 8034, 8040, 8043]
  Last 10:  [9957, 9960, 9966, 9969, 9975, 9978, 9984, 9987, 9993, 9996]



In [8]:
# ===== LƯU KẾT QUẢ VÀO FILE =====
print("Step 5: Lưu kết quả vào thư mục")

import os

# Tạo output directory
output_path = "../../results/Bai2"
print(f"Saving results to: {output_path}")

# Xóa thư mục output cũ nếu tồn tại (Spark yêu cầu output directory không tồn tại)
import shutil
if os.path.exists(output_path):
    shutil.rmtree(output_path)
    print("✅ Đã xóa thư mục output cũ")

# Lưu RDD vào file
filtered_rdd.saveAsTextFile(output_path)
print("✅ Đã lưu kết quả vào file")

# Kiểm tra các file được tạo
if os.path.exists(output_path):
    files = os.listdir(output_path)
    print(f"\nCác file được tạo trong {output_path}:")
    for file in sorted(files):
        file_path = os.path.join(output_path, file)
        if os.path.isfile(file_path):
            size = os.path.getsize(file_path)
            print(f"  📄 {file} ({size} bytes)")


Step 5: Lưu kết quả vào thư mục
Saving results to: ../../results/Bai2
✅ Đã lưu kết quả vào file

Các file được tạo trong ../../results/Bai2:
  📄 ._SUCCESS.crc (8 bytes)
  📄 .part-00000.crc (24 bytes)
  📄 .part-00001.crc (28 bytes)
  📄 .part-00002.crc (28 bytes)
  📄 .part-00003.crc (28 bytes)
  📄 .part-00004.crc (28 bytes)
  📄 _SUCCESS (0 bytes)
  📄 part-00000 (1974 bytes)
  📄 part-00001 (2225 bytes)
  📄 part-00002 (2220 bytes)
  📄 part-00003 (2225 bytes)
  📄 part-00004 (2220 bytes)
