In [17]:
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import List, Optional, Tuple
import cv2
import numpy as np

In [18]:
@dataclass
class LineSegment:
    pt1: Tuple[int, int]
    pt2: Tuple[int, int]
    length: float
    angle: float
    label: str

@dataclass
class Rectangle:
    x: int
    y: int
    w: int
    h: int
    area: int
    aspect_ratio: float
    label: str
    confidence: float

@dataclass
class Roof:
    roof_type: str
    apex: Optional[Tuple[int, int]]
    points: List[Tuple[int, int]]
    lines: List[dict]

@dataclass
class DetectionResult:
    image_file: str
    image_size: Tuple[int, int]
    line_segments: List[dict]
    rectangles: List[dict]
    roof: Optional[dict]

In [19]:
def to_gray(img: np.ndarray) -> np.ndarray:
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if img.ndim == 3 else img.copy()

def point_distance(p1: Tuple[int, int], p2: Tuple[int, int]) -> float:
    return float(np.hypot(p1[0] - p2[0], p1[1] - p2[1]))

def classify_line(angle: float) -> str:
    abs_angle = abs(angle)
    if abs_angle < 10 or abs_angle > 170:
        return "horizontal"
    if 80 < abs_angle < 100:
        return "vertical"
    return "diagonal"

Correct Rotated images

In [20]:
def detect_rotation_angle(image: np.ndarray) -> float:
    gray = to_gray(image)
    binary = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY_INV)[1]
    lines = cv2.HoughLines(cv2.Canny(binary, 50, 150), 1, np.pi / 180, 100)
    if lines is None:
        return 0.0

    angles = []
    for line in lines:
        theta = np.degrees(line[0][1])
        if theta <= 45:
            angles.append(theta)
        elif theta >= 135:
            angles.append(theta - 180)
        else:
            angles.append(theta - 90)
    return float(np.median(angles)) if angles else 0.0


def correct_rotation(image: np.ndarray, angle: float) -> np.ndarray:
    if abs(angle) < 0.5:
        return image
    h, w = image.shape[:2]
    matrix = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
    border = (255, 255, 255) if image.ndim == 3 else 255
    return cv2.warpAffine(image, matrix, (w, h), borderMode=cv2.BORDER_CONSTANT, borderValue=border)


def run_rotation_correction(dataset_dir: str, output_dir: str, threshold: float = 2.0) -> dict:
    dataset_path = Path(dataset_dir)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    image_files = sorted(dataset_path.glob("*.png"))
    report = {"threshold_degrees": threshold, "images": []}
    rotated_count = 0

    for img_path in image_files:
        img = cv2.imread(str(img_path))
        if img is None:
            continue

        angle = detect_rotation_angle(img)
        is_rotated = abs(angle) >= threshold
        if is_rotated:
            rotated_count += 1
            img = correct_rotation(img, angle)

        cv2.imwrite(str(output_path / img_path.name), img)
        report["images"].append({
            "file": img_path.name,
            "detected_angle": round(angle, 2),
            "is_rotated": is_rotated,
            "corrected": is_rotated,
        })

    report["summary"] = {
        "total_images": len(image_files),
        "rotated_images": rotated_count,
        "straight_images": len(image_files) - rotated_count,
    }
    return report

Preprocesses

In [21]:
def preprocess_for_detection(img: np.ndarray) -> np.ndarray:
    binary = cv2.threshold(to_gray(img), 200, 255, cv2.THRESH_BINARY_INV)[1]
    connected = cv2.dilate(binary, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)), iterations=1)
    kernels = [
        cv2.getStructuringElement(cv2.MORPH_RECT, (15, 1)),
        cv2.getStructuringElement(cv2.MORPH_RECT, (1, 15)),
        np.eye(11, dtype=np.uint8),
        np.flipud(np.eye(11, dtype=np.uint8)),
    ]
    combined = np.zeros_like(connected)
    for kernel in kernels:
        combined = cv2.bitwise_or(combined, cv2.morphologyEx(connected, cv2.MORPH_CLOSE, kernel))
    return cv2.morphologyEx(combined, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)))


def thin_binary_mask(binary: np.ndarray) -> np.ndarray:
    if hasattr(cv2, "ximgproc") and hasattr(cv2.ximgproc, "thinning"):
        return cv2.ximgproc.thinning(binary)
    skeleton = np.zeros_like(binary)
    work = binary.copy()
    element = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
    while True:
        eroded = cv2.erode(work, element)
        opened = cv2.dilate(eroded, element)
        residue = cv2.subtract(work, opened)
        skeleton = cv2.bitwise_or(skeleton, residue)
        work = eroded
        if cv2.countNonZero(work) == 0:
            return skeleton

Segment distance and angle calculation

In [22]:
def segment_distance(seg1: LineSegment, seg2: LineSegment) -> float:
    return min(
        point_distance(seg1.pt1, seg2.pt1),
        point_distance(seg1.pt1, seg2.pt2),
        point_distance(seg1.pt2, seg2.pt1),
        point_distance(seg1.pt2, seg2.pt2),
    )

def angle_difference(a1: float, a2: float) -> float:
    diff = abs(a1 - a2)
    diff = min(diff, 360 - diff)
    return min(diff, abs(180 - diff))

def segments_are_similar(seg1: LineSegment, seg2: LineSegment, angle_thresh: float, dist_thresh: float) -> bool:
    return angle_difference(seg1.angle, seg2.angle) <= angle_thresh and segment_distance(seg1, seg2) <= dist_thresh

Reduce similar segments

In [23]:
def merge_segment_group(group: List[LineSegment]) -> LineSegment:
    points = [pt for seg in group for pt in (seg.pt1, seg.pt2)]
    best_pair = (points[0], points[-1])
    best_dist = 0.0
    for i, p1 in enumerate(points):
        for p2 in points[i + 1 :]:
            d = point_distance(p1, p2)
            if d > best_dist:
                best_dist = d
                best_pair = (p1, p2)

    pt1, pt2 = best_pair
    angle = float(np.degrees(np.arctan2(pt2[1] - pt1[1], pt2[0] - pt1[0])))
    return LineSegment(pt1, pt2, round(best_dist, 2), round(angle, 2), classify_line(angle))


def merge_similar_segments(segments: List[LineSegment], angle_thresh: float = 10, dist_thresh: float = 20) -> List[LineSegment]:
    if len(segments) < 2:
        return segments

    merged, used = [], [False] * len(segments)
    for i, seg1 in enumerate(segments):
        if used[i]:
            continue
        group = [seg1]
        queue = [seg1]
        used[i] = True


        while queue:                                                          # BFS-style grouping so chains of close segments merge into one line.
            base = queue.pop()
            for j, seg2 in enumerate(segments):
                if used[j]:
                    continue
                if segments_are_similar(base, seg2, angle_thresh, dist_thresh):
                    group.append(seg2)
                    queue.append(seg2)
                    used[j] = True

        merged.append(merge_segment_group(group) if len(group) > 1 else seg1)
    return merged

In [24]:
def point_to_line_distance(point: Tuple[int, int], seg: LineSegment) -> float:
    x0, y0 = point
    x1, y1 = seg.pt1
    x2, y2 = seg.pt2
    denom = np.hypot(x2 - x1, y2 - y1)
    if denom == 0:
        return point_distance(point, seg.pt1)
    return abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1) / denom


def segment_overlap_ratio(seg1: LineSegment, seg2: LineSegment) -> float:
    dx = abs(seg1.pt2[0] - seg1.pt1[0])
    dy = abs(seg1.pt2[1] - seg1.pt1[1])
    if dx >= dy:
        a1, a2 = sorted((seg1.pt1[0], seg1.pt2[0]))
        b1, b2 = sorted((seg2.pt1[0], seg2.pt2[0]))
    else:
        a1, a2 = sorted((seg1.pt1[1], seg1.pt2[1]))
        b1, b2 = sorted((seg2.pt1[1], seg2.pt2[1]))

    overlap = max(0, min(a2, b2) - max(a1, b1))
    shorter = max(1, min(a2 - a1, b2 - b1))
    return overlap / shorter

In [25]:
def is_duplicate_segment(seg: LineSegment, ref: LineSegment, angle_thresh: float = 6, offset_thresh: float = 6) -> bool:
    if seg.label != ref.label:
        return False
    if angle_difference(seg.angle, ref.angle) > angle_thresh:
        return False
    if point_to_line_distance(seg.pt1, ref) > offset_thresh or point_to_line_distance(seg.pt2, ref) > offset_thresh:
        return False
    return segment_overlap_ratio(seg, ref) > 0.6


def suppress_duplicate_segments(segments: List[LineSegment]) -> List[LineSegment]:
    kept: List[LineSegment] = []
    for seg in sorted(segments, key=lambda s: s.length, reverse=True):
        if any(is_duplicate_segment(seg, ref) for ref in kept):
            continue
        kept.append(seg)
    return kept

def detect_line_segments(binary: np.ndarray) -> List[LineSegment]:
    line_mask = thin_binary_mask(binary)
    h, w = line_mask.shape[:2]
    min_dim = min(h, w)
    vote_threshold = max(35, int(min_dim * 0.08))
    min_line_length = max(25, int(min_dim * 0.07))
    max_line_gap = max(8, int(min_dim * 0.02))
    lines = cv2.HoughLinesP(
        cv2.Canny(line_mask, 50, 150),
        1,
        np.pi / 180,
        vote_threshold,
        minLineLength=min_line_length,
        maxLineGap=max_line_gap,
    )
    if lines is None:
        return []

    segments = []
    for line in lines:
        x1, y1, x2, y2 = line[0]
        length = point_distance((x1, y1), (x2, y2))
        if length < 15:
            continue
        angle = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
        segments.append(LineSegment((int(x1), int(y1)), (int(x2), int(y2)), round(length, 2), round(angle, 2), classify_line(angle)))
    merged = merge_similar_segments(segments, angle_thresh=8, dist_thresh=28)
    return suppress_duplicate_segments(merged)


def points_close(p1: Tuple[int, int], p2: Tuple[int, int], threshold: float = 15) -> bool:
    return point_distance(p1, p2) < threshold

Classify Roof(Triangle/flat)

In [26]:
def detect_triangle_roof(diagonals: List[LineSegment]) -> Optional[Roof]:
    if len(diagonals) < 2:
        return None

    slant_left, slant_right = [], []
    for seg in diagonals:
        top, bottom = (seg.pt1, seg.pt2) if seg.pt1[1] < seg.pt2[1] else (seg.pt2, seg.pt1)
        if abs(top[1] - bottom[1]) <= 20:
            continue
        (slant_left if top[0] > bottom[0] else slant_right).append((seg, top, bottom))

    if not slant_left or not slant_right:
        return None

    best_pair, best_top_dist = None, float("inf")
    for left_seg, left_top, left_bottom in slant_left:
        for right_seg, right_top, right_bottom in slant_right:
            top_dist = point_distance(left_top, right_top)
            if top_dist < 50 and left_bottom[0] < right_bottom[0] and top_dist < best_top_dist:
                apex = ((left_top[0] + right_top[0]) // 2, (left_top[1] + right_top[1]) // 2)
                best_pair = (left_seg, right_seg, apex, left_bottom, right_bottom)
                best_top_dist = top_dist

    if best_pair is None:
        return None

    left_seg, right_seg, apex, left_bottom, right_bottom = best_pair
    lines = [
        {"pt1": left_seg.pt1, "pt2": left_seg.pt2, "label": "roof"},
        {"pt1": right_seg.pt1, "pt2": right_seg.pt2, "label": "roof"},
    ]
    return Roof("triangle", apex, [left_bottom, apex, right_bottom], lines)


def detect_flat_roof(horizontals: List[LineSegment], img_w: int) -> Optional[Roof]:
    if not horizontals:
        return None

    top_lines = sorted(horizontals, key=lambda s: min(s.pt1[1], s.pt2[1]))
    for i in range(len(top_lines) - 1):
        line1, line2 = top_lines[i], top_lines[i + 1]
        y1, y2 = min(line1.pt1[1], line1.pt2[1]), min(line2.pt1[1], line2.pt2[1])
        if abs(y2 - y1) >= 30:
            continue

        x1_min, x1_max = sorted((line1.pt1[0], line1.pt2[0]))
        x2_min, x2_max = sorted((line2.pt1[0], line2.pt2[0]))
        if (x1_max - x1_min) > img_w * 0.3 and (x2_max - x2_min) > img_w * 0.3:
            points = [(x1_min, y1), (x1_max, y1), (x2_min, y2), (x2_max, y2)]
            lines = [
                {"pt1": line1.pt1, "pt2": line1.pt2, "label": "roof"},
                {"pt1": line2.pt1, "pt2": line2.pt2, "label": "roof"},
            ]
            return Roof("flat", None, points, lines)

    line = top_lines[0]
    x_min, x_max = sorted((line.pt1[0], line.pt2[0]))
    y = min(line.pt1[1], line.pt2[1])
    if (x_max - x_min) > img_w * 0.4:
        return Roof("flat", None, [(x_min, y), (x_max, y)], [{"pt1": line.pt1, "pt2": line.pt2, "label": "roof"}])
    return None


def label_roof_segments(segments: List[LineSegment], roof: Roof) -> List[LineSegment]:
    output = []
    for seg in segments:
        label = seg.label
        for roof_line in roof.lines:
            match_fwd = points_close(seg.pt1, roof_line["pt1"]) and points_close(seg.pt2, roof_line["pt2"])
            match_rev = points_close(seg.pt1, roof_line["pt2"]) and points_close(seg.pt2, roof_line["pt1"])
            if match_fwd or match_rev:
                label = "roof"
                break
        output.append(LineSegment(seg.pt1, seg.pt2, seg.length, seg.angle, label))
    return output


def detect_roof(segments: List[LineSegment], img_shape: Tuple[int, int]) -> Tuple[Optional[Roof], List[LineSegment]]:
    img_h, img_w = img_shape
    top_threshold = img_h * 0.35

    top_diagonals = [s for s in segments if s.label == "diagonal" and min(s.pt1[1], s.pt2[1]) < top_threshold]
    top_horizontals = [
        s for s in segments
        if s.label == "horizontal" and min(s.pt1[1], s.pt2[1]) < top_threshold and max(s.pt1[1], s.pt2[1]) < top_threshold
    ]

    roof = detect_triangle_roof(top_diagonals) or detect_flat_roof(top_horizontals, img_w)
    return (roof, label_roof_segments(segments, roof)) if roof else (None, segments)

IoU computation

In [27]:
def compute_iou(rect1: Rectangle, rect2: Rectangle) -> float:
    x1, y1 = max(rect1.x, rect2.x), max(rect1.y, rect2.y)
    x2, y2 = min(rect1.x + rect1.w, rect2.x + rect2.w), min(rect1.y + rect1.h, rect2.y + rect2.h)
    if x2 <= x1 or y2 <= y1:
        return 0.0

    intersection = (x2 - x1) * (y2 - y1)
    union = rect1.w * rect1.h + rect2.w * rect2.h - intersection
    return intersection / union if union > 0 else 0.0


def is_contained(inner: Rectangle, outer: Rectangle) -> bool:
    return (
        inner.x >= outer.x
        and inner.y >= outer.y
        and inner.x + inner.w <= outer.x + outer.w
        and inner.y + inner.h <= outer.y + outer.h
        and inner.area < outer.area * 0.8
    )

Rectangle structure detections

In [28]:
def filter_rectangles(rectangles: List[Rectangle]) -> List[Rectangle]:
    if len(rectangles) < 2:
        return rectangles

    sorted_rects = sorted(rectangles, key=lambda r: r.area)
    filtered: List[Rectangle] = []
    for rect in sorted_rects:
        duplicate = False
        for existing in filtered[:]:
            if compute_iou(rect, existing) > 0.5:
                duplicate = True
                break
            if is_contained(rect, existing) or is_contained(existing, rect):
                if rect.area < existing.area:
                    filtered.remove(existing)
                else:
                    duplicate = True
                break
        if not duplicate:
            filtered.append(rect)
    return filtered


def detect_rectangles(binary: np.ndarray, img_shape: Tuple[int, int], roof: Optional[Roof]) -> List[Rectangle]:
    img_h, img_w = img_shape
    roof_bottom_y = max(p[1] for p in roof.points) + 10 if roof else 0

    contours, hierarchy = cv2.findContours(cv2.bitwise_not(binary), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    if hierarchy is None:
        return []

    rectangles = []
    for i, contour in enumerate(contours):
        area = cv2.contourArea(contour)
        if area < 100 or area > img_h * img_w * 0.8:
            continue

        x, y, w, h = cv2.boundingRect(contour)
        rect_area = w * h
        if rect_area == 0 or area / rect_area < 0.6:
            continue

        approx = cv2.approxPolyDP(contour, 0.02 * cv2.arcLength(contour, True), True)
        if len(approx) < 4 or len(approx) > 8:
            continue
        if y < roof_bottom_y and y + h < roof_bottom_y + 20:
            continue

        detected = classify_rectangle(x, y, w, h, img_w, img_h, hierarchy[0][i], roof_bottom_y)
        if detected is None:
            continue
        label, confidence = detected
        rectangles.append(Rectangle(int(x), int(y), int(w), int(h), int(area), round(w / h if h > 0 else 0.0, 2), label, round(confidence, 2)))
    return filter_rectangles(rectangles)


def detect_structures(img: np.ndarray, filename: str = "") -> DetectionResult:
    h, w = img.shape[:2]
    binary = preprocess_for_detection(img)
    segments = detect_line_segments(binary)
    roof, segments = detect_roof(segments, (h, w))
    rectangles = detect_rectangles(binary, (h, w), roof)
    return DetectionResult(filename, (w, h), [asdict(s) for s in segments], [asdict(r) for r in rectangles], asdict(roof) if roof else None)

def classify_rectangle(
    x: int,
    y: int,
    w: int,
    h: int,
    img_w: int,
    img_h: int,
    hierarchy_info: np.ndarray,
    roof_bottom_y: int,
) -> Optional[Tuple[str, float]]:
    area = w * h
    aspect = w / h if h > 0 else 1.0
    relative_area = area / (img_w * img_h)
    relative_y = (y + h / 2) / img_h

    # if relative_area > 0.3 and hierarchy_info[3] < 0:
    #     return "building_outline", 0.8
    if relative_y > 0.6 and aspect < 1.5 and 0.01 < relative_area < 0.15:
        return "door", 0.85 if relative_y > 0.75 else 0.7
    if 0.002 < relative_area < 0.08 and y > roof_bottom_y and relative_y < 0.85:
        return "window", 0.8 if 0.5 < aspect < 2.0 else 0.7
    return None

Vizualize the outputs

In [29]:
def visualize_detections(img: np.ndarray, result: DetectionResult, output_path: Optional[str] = None) -> np.ndarray:
    vis = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) if img.ndim == 2 else img.copy()
    colors = {
        "horizontal": (255, 0, 0),
        "vertical": (255, 0, 0),
        "diagonal": (255, 128, 0),
        "roof": (0, 165, 255),
        "window": (0, 106, 78),
        "door": (65, 105, 225),
    }

    for seg in result.line_segments:
        thickness = 3 if seg["label"] == "roof" else 2
        cv2.line(vis, tuple(seg["pt1"]), tuple(seg["pt2"]), colors.get(seg["label"], (255, 0, 0)), thickness)
        cv2.circle(vis, tuple(seg["pt1"]), 4, (0, 255, 0), -1)
        cv2.circle(vis, tuple(seg["pt2"]), 4, (0, 255, 0), -1)

    if result.roof:
        roof = result.roof
        if roof["roof_type"] == "triangle" and roof["apex"]:
            apex = tuple(roof["apex"])
            cv2.circle(vis, apex, 8, (0, 0, 255), -1)
            cv2.putText(vis, "ROOF (triangle)", (apex[0] - 50, apex[1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
        elif roof["roof_type"] == "flat" and roof["points"]:
            top_y = min(p[1] for p in roof["points"])
            center_x = sum(p[0] for p in roof["points"]) // len(roof["points"])
            cv2.putText(vis, "ROOF (flat)", (center_x - 40, top_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    for rect in result.rectangles:
        x, y, w, h = rect["x"], rect["y"], rect["w"], rect["h"]
        color = colors.get(rect["label"], (128, 128, 128))
        cv2.rectangle(vis, (x, y), (x + w, y + h), color, 2)
        text = f"{rect['label']} ({rect['confidence']:.0%})"
        (text_w, text_h), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
        cv2.rectangle(vis, (x, y - text_h - 4), (x + text_w + 4, y), color, -1)
        cv2.putText(vis, text, (x + 2, y - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

    if output_path:
        cv2.imwrite(output_path, vis)
    return vis

Main Function

In [30]:
def run_structure_detection(dataset_dir: str, output_dir: str, detections_file: str) -> dict:
    dataset_path = Path(dataset_dir)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    results = {"detections": []}
    for img_path in sorted(dataset_path.glob("*.png")):
        img = cv2.imread(str(img_path))
        if img is None:
            continue

        detected = detect_structures(img, img_path.name)
        results["detections"].append(asdict(detected))
        visualize_detections(img, detected, str(output_path / f"vis_{img_path.name}"))

    with open(detections_file, "w") as file:
        json.dump(results, file, indent=2)
    return results

In [33]:
def run_pipeline(
    dataset_dir: str = "dataset",
    preprocessed_dir: str = "preprocessed",
    output_dir: str = "detections_output",
    detections_file: str = "detections.json",
    rotation_threshold: float = 2.0,
) -> dict:
    rotation_report = run_rotation_correction(dataset_dir, preprocessed_dir, rotation_threshold)
    print(f"  Total: {rotation_report['summary']['total_images']} images")
    print(f"  Rotated: {rotation_report['summary']['rotated_images']}")
    detection_results = run_structure_detection(preprocessed_dir, output_dir, detections_file)
    print("Process complete")
    return {"rotation": rotation_report, "detections": detection_results}

def main(
    dataset: str = "dataset",
    preprocessed: str = "preprocessed",
    output: str = "detections_output",
    json_file: str = "detections.json",
    threshold: float = 2.0
) -> None:
    run_pipeline(dataset, preprocessed, output, json_file, threshold)
if __name__ == "__main__":
    main()

  Total: 32 images
  Rotated: 8
Process complete
