# Bài 2.3


### Khai báo thư viện


In [1]:
from pyspark.sql import SparkSession
import numpy as np

### Hàm tính tích có hướng hai vector


In [2]:
def cross(o, a, b):
    return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])

### Hàm tìm bao lồi dựa vào thuật toán Monotone Chain


In [3]:
def convex_hull(points):
    valid_points = [tuple(p) for p in points if p is not None and len(p) == 2 and None not in p]
    if len(valid_points) < 3:
        return []

    points = sorted(valid_points)
    lower = []
    for p in points:
        while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0:
            lower.pop()
        lower.append(p)

    upper = []
    for p in reversed(points):
        while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0:
            upper.pop()
        upper.append(p)

    return lower[:-1] + upper[:-1]

### Hàm kiểm tra một điểm có nằm trong đa giác không?


In [4]:
def is_point_in_polygon(point, polygon):
    x, y = point
    inside = False
    n = len(polygon)
    if n < 3:
        return False

    p1x, p1y = polygon[0]
    for i in range(1, n + 1):
        p2x, p2y = polygon[i % n]
        if y > min(p1y, p2y) and y <= max(p1y, p2y):
            if x < max(p1x, p2x):
                if p1y != p2y:
                    xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
                if p1x == p2x  or x < xinters:
                    inside = not inside
        p1x, p1y = p2x, p2y
    return inside

### Hàm kiểm tra hai đoạn thẳng có cắt nhau không?


In [5]:
def edges_intersect(p1, p2, q1, q2):
    def ccw(a, b, c):
        return (c[1]-a[1]) * (b[0]-a[0]) > (b[1]-a[1]) * (c[0]-a[0])
    
    return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and (ccw(p1, p2, q1) != ccw(p1, p2, q2))

### Hàm kiểm tra điểm có nằm trên đoạn thẳng không?


In [6]:
def is_point_on_segment(p, a, b):
    cross = (b[0] - a[0]) * (p[1] - a[1]) - (b[1] - a[1]) * (p[0] - a[0])
    if abs(cross) > 1e-10:
        return False
    dot = (p[0] - a[0]) * (b[0] - a[0]) + (p[1] - a[1]) * (b[1] - a[1])
    if dot < 0:
        return False
    squared_len = (b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2
    return dot <= squared_len

### Hàm kiểm tra điểm có nằm trong đa giác không?


In [7]:
def is_point_in_polygon(point, polygon, strict=True):
    x, y = point
    inside = False
    n = len(polygon)
    if n < 3:
        return False

    for i in range(n):
        j = (i + 1) % n
        xi, yi = polygon[i]
        xj, yj = polygon[j]

        # Nếu nằm trên cạnh
        if is_point_on_segment(point, (xi, yi), (xj, yj)):
            return not strict

        if (yi > y) != (yj > y):
            xinters = (y - yi) * (xj - xi) / (yj - yi + 1e-10) + xi
            if x < xinters:
                inside = not inside

    return inside

### Hàm kiểm tra trùng lắp giữa hai tập bao lồi


In [8]:
def check_overlap(hull1, hull2):
    if len(hull1) < 3 or len(hull2) < 3:
        return False

    for p in hull1:
        if is_point_in_polygon(p, hull2):
            return True
    for p in hull2:
        if is_point_in_polygon(p, hull1):
            return True

    for i in range(len(hull1)):
        for j in range(len(hull2)):
            if edges_intersect(hull1[i], hull1[(i+1)%len(hull1)], hull2[j], hull2[(j+1)%len(hull2)]):
                return True

    return False

### Hàm kiểm tra trùng lắp giữa hai hình


In [9]:
def is_overlap(poly1, poly2):
    hull1 = convex_hull(poly1)
    hull2 = convex_hull(poly2)
    return check_overlap(hull1, hull2)

### Khởi tạo một phiên làm việc Spark


In [10]:
spark = SparkSession.builder \
    .appName("Polygon Overlap Detection") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

25/04/09 11:50:34 WARN Utils: Your hostname, MacBook-Pro-cua-Nguyen-3.local resolves to a loopback address: 127.0.0.1; using 10.14.176.48 instead (on interface en0)
25/04/09 11:50:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/09 11:50:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Đọc dữ liệu từ tập đầu vào


In [11]:
# Đọc file shapes.parquet
df = spark.read.parquet("shapes.parquet")

# Chuyển về RDD [(shape_id, vertices)]
shapes_rdd = df.rdd.map(lambda row: (row['shape_id'], np.array(row['vertices'], dtype=np.float64)))

### Sinh các cặp hình không trùng nhau


In [12]:
pairs_rdd = shapes_rdd.cartesian(shapes_rdd).filter(lambda x: x[0][0] < x[1][0]) 

### Kiểm tra trùng lắp giữa các cặp hình


In [13]:
overlapping_pairs = pairs_rdd.filter(lambda x: is_overlap(x[0][1], x[1][1]))

### Sắp xếp bảng dữ liệu


In [14]:
def extract_number(shape_name):
    return int(shape_name.split("_")[1])

result = overlapping_pairs.map(lambda x: (x[0][0], x[1][0]))

sorted_result = result.sortBy(lambda x: (extract_number(x[0]), extract_number(x[1])))

### Lưu lại kết quả


In [None]:
# RDD -> chuỗi CSV: "shape_1,shape_2"
rdd_csv_lines = sorted_result.map(lambda x: f"{x[0]},{x[1]}")
rdd_csv_lines.coalesce(1).saveAsTextFile("output_folder")