# 군용 이미지 합성

QWEN으로 드론뷰 여부 판별 후 기록된 내용을 토대로 합성 이미지 생성

In [None]:
# ---------------------------------------------------------------------------------
# 패키지 임포트
import os
import random
import math
import json
import cv2
import numpy as np
from diffusers import (
    AutoencoderKL,
    ControlNetModel,
    DDPMScheduler,
    UNet2DConditionModel,
    UniPCMultistepScheduler,
)
from pipeline_controlnet_inpaint import StableDiffusionControlNetInpaintPipeline
from transformers import AutoTokenizer, PretrainedConfig
import torch
from PIL import Image, ImageOps
from transparent_background import Remover

# 재현성을 위한 시드 설정
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

# ---------------------------------------------------------------------------------
# 모델 및 파이프라인 로드
controlnet = ControlNetModel.from_pretrained('./checkpoints/controlnet/controlnet')

def import_model_class_from_model_name_or_path(pretrained_model_name_or_path: str, revision: str):
    text_encoder_config = PretrainedConfig.from_pretrained(
        pretrained_model_name_or_path,
        subfolder="text_encoder",
        revision=revision,
    )
    model_class = text_encoder_config.architectures[0]
    if model_class == "CLIPTextModel":
        from transformers import CLIPTextModel
        return CLIPTextModel
    elif model_class == "RobertaSeriesModelWithTransformation":
        from diffusers.pipelines.alt_diffusion.modeling_roberta_series import RobertaSeriesModelWithTransformation
        return RobertaSeriesModelWithTransformation
    else:
        raise ValueError(f"{model_class} is not supported.")

tokenizer = AutoTokenizer.from_pretrained(
    "stabilityai/stable-diffusion-2-inpainting",
    subfolder="tokenizer",
    use_fast=False,
)
sd_inpainting_model_name = "stabilityai/stable-diffusion-2-inpainting"
text_encoder_cls = import_model_class_from_model_name_or_path(sd_inpainting_model_name, None)
noise_scheduler = DDPMScheduler.from_pretrained(sd_inpainting_model_name, subfolder="scheduler")
text_encoder = text_encoder_cls.from_pretrained(sd_inpainting_model_name, subfolder="text_encoder", revision=None)
vae = AutoencoderKL.from_pretrained(sd_inpainting_model_name, subfolder="vae", revision=None)
unet = UNet2DConditionModel.from_pretrained(sd_inpainting_model_name, subfolder="unet", revision=None)
weight_dtype = torch.float32
pipeline = StableDiffusionControlNetInpaintPipeline.from_pretrained(
    sd_inpainting_model_name,
    vae=vae,
    text_encoder=text_encoder,
    tokenizer=tokenizer,
    unet=unet,
    controlnet=controlnet,
    safety_checker=None,
    revision=None,
    torch_dtype=weight_dtype,
)
pipeline.scheduler = UniPCMultistepScheduler.from_config(pipeline.scheduler.config)
pipeline = pipeline.to('cuda')
pipeline.set_progress_bar_config(disable=True)

# ---------------------------------------------------------------------------------
# 유틸리티 함수
def resize_with_padding(img, expected_size): # 비율을 유지하면서 이미지를 리사이즈하고, 결과가 expected_size와 일치하도록 패딩을 적용
    img.thumbnail((expected_size[0], expected_size[1]))
    delta_width = expected_size[0] - img.size[0]
    delta_height = expected_size[1] - img.size[1]
    pad_width = delta_width // 2
    pad_height = delta_height // 2
    padding = (pad_width, pad_height, delta_width - pad_width, delta_height - pad_height)
    return ImageOps.expand(img, padding)

def get_min_area_bbox(mask): # 주어진 PIL 마스크에서 객체의 최소 바운딩 박스를 계산
    mask_np = np.array(mask)
    if len(mask_np.shape) == 3:
        mask_np = cv2.cvtColor(mask_np, cv2.COLOR_RGB2GRAY)
    _, binary = cv2.threshold(mask_np, 1, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return (0, 0, mask.width, mask.height)
    cnt = max(contours, key=cv2.contourArea)
    rect = cv2.minAreaRect(cnt)
    box = cv2.boxPoints(rect)
    box = np.int0(box)
    x_min = int(np.min(box[:, 0]))
    y_min = int(np.min(box[:, 1]))
    x_max = int(np.max(box[:, 0]))
    y_max = int(np.max(box[:, 1]))
    return (x_min, y_min, x_max, y_max)

def classify_image(filename): # 파일 이름에 키워드를 사용하여 이미지를 분류
    lower_fname = filename.lower()
    if "bmp3_" in lower_fname:
        return "bmp3"
    elif "k200_" in lower_fname:
        return "k200"
    elif "k2_" in lower_fname:
        return "k2"
    elif "t80_" in lower_fname:
        return "t80"
    elif "military truck" in lower_fname:
        return "military_truck"
    else:
        return None
    
# subject를 기반으로 프롬프트 목록을 생성
# is_drone_view가 True면 드론 시점에 대한 문장을 추가
def get_prompts(subject, is_drone_view=False):
    base_prompts = [
        f"A {subject} on the road.",
        f"A {subject} on the grass.",
        f"A {subject} on the mountains.",
        f"A {subject} on a dry dirt field",
        f"A {subject} on the hill.",
        f"A {subject} on a snowy road.",
        f"A {subject} on the dry grassland near a forested hillside.",
        f"A {subject} floating on a river with trees in the background.",
    ]
    if is_drone_view:
        base_prompts = [prompt + " seen from the air." for prompt in base_prompts] # 조사한 내용: 드론뷰 이미지인 경우 stable diffusion에 잘 동작하는 프롬프트가 "see from the air."라고 함
    return base_prompts

# 클래스별 프롬프트 속 객체명(subject) 설정
subject_dict = {
    "k2": "tank",
    "t80": "tank",
    "k200": "armored car",
    "bmp3": "armored car",
    "military_truck": "military truck"
}

# 클래스별 이미지 생성 개수
num_seeds_dict = {
    "k2": 1,
    "k200": 1,
    "bmp3": 1,
    "military_truck": 1,
    "t80": 1,
}

# cond_scale 낮을수록 객체 품질 < 배경 품질
# cond_scale 높을수록 객체 품질 > 배경 품질
cond_scale = 0.5
num_inference_steps = 20

# ---------------------------------------------------------------------------------
# 경로 설정
source_folder = r'D:\py\AIM\Projects\Drone_detection\OVD\dataset_datamaker\train_dataset\OD_2\images'
base_save_root = r'D:\py\AIM\Projects\Drone_detection\OVD\dataset_datamaker\train_dataset\fusion\train\images'

drone_classification_path = 'qwen으로 분류한 드론뷰 이미지 여부 작성된 .json 경로'
if os.path.exists(drone_classification_path):
    with open(drone_classification_path, "r", encoding="utf-8") as f:
        drone_view_results = json.load(f)
else:
    print("Drone view classification JSON file not found. Defaulting to 'Normal View'.")
    drone_view_results = {}

# 소스 폴더의 이미지 파일 목록
image_files = [f for f in os.listdir(source_folder) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]
if not image_files:
    print("No image files found in the source folder.")
else:
    print(f"Found {len(image_files)} images in the source folder.")

# ---------------------------------------------------------------------------------
# 드론 시점 분류에 따라 프롬프트를 조정하여 각 이미지에 대해 합성 실행
for img_file in image_files:
    cls = classify_image(img_file)
    if cls is None:
        print(f"File {img_file} does not meet classification criteria. Skipping.")
        continue

    # JSON에서 드론 시점 분류 가져오기 ('Drone View' 또는 'Normal View')
    drone_result = drone_view_results.get(img_file, "Normal View")
    is_drone_view = True if drone_result == "Drone View" else False

    subject = subject_dict.get(cls, "object")
    prompts = get_prompts(subject, is_drone_view=is_drone_view)
    num_seeds = num_seeds_dict.get(cls, 1)

    save_folder = os.path.join(base_save_root, cls)
    os.makedirs(save_folder, exist_ok=True)
    mask_save_folder = os.path.join(base_save_root, f"{cls}_mask")
    os.makedirs(mask_save_folder, exist_ok=True)

    image_path = os.path.join(source_folder, img_file)
    print(f"\nProcessing file: {image_path} -> Classified as: {cls}, Drone View: {drone_result}")
    try:
        img = Image.open(image_path)
        if img.mode != "RGB":
            img = img.convert("RGB")
    except Exception as e:
        print(f"Failed to open image: {image_path}, error: {e}")
        continue

    img = resize_with_padding(img, (640, 640)) # 이미지 리사이즈 -> yolo input size=640*640
    base_filename = os.path.splitext(os.path.basename(img_file))[0]

    # Remover를 사용하여 객체 마스크 추출
    remover = Remover(mode='base')
    fg_mask = remover.process(img, type='map')
    mask = fg_mask

    for _ in range(num_seeds):
        selected_prompt = random.choice(prompts)
        generator = torch.Generator(device='cuda').manual_seed(42)
        margin = 1

        # 마스크에서 객체의 최소 바운딩 박스 계산
        bbox = get_min_area_bbox(fg_mask)
        x1, y1, x2, y2 = bbox
        cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
        hw, hh = (x2 - x1) / 2, (y2 - y1) / 2
        
        # 스케일 변환 시도 최대 횟수
        max_attempts = 500
        attempt = 0
        valid_transform = False

        while attempt < max_attempts and not valid_transform:
            # 스케일 선택
            scale_factor = random.uniform(0.8, 0.9)
            new_hw, new_hh = scale_factor * hw, scale_factor * hh
            dx_min = margin - (cx - new_hw)
            dx_max = mask.width - margin - (cx + new_hw)
            dy_min = margin - (cy - new_hh)
            dy_max = mask.height - margin - (cy + new_hh)
            if dx_max < dx_min or dy_max < dy_min:
                attempt += 1
                continue
            dx = random.uniform(dx_min, dx_max)
            dy = random.uniform(dy_min, dy_max)
            t_x = (1 - scale_factor) * cx + dx
            t_y = (1 - scale_factor) * cy + dy
            forward_matrix = (scale_factor, 0, t_x, 0, scale_factor, t_y)
            inv_matrix = (1/scale_factor, 0, -t_x/scale_factor, 0, 1/scale_factor, -t_y/scale_factor)
            shifted_mask = mask.transform(mask.size, Image.AFFINE, inv_matrix, fillcolor=0)
            transformed_bbox = shifted_mask.getbbox()
            if transformed_bbox is None:
                attempt += 1
                continue
            tx1, ty1, tx2, ty2 = transformed_bbox
            if tx1 >= margin and ty1 >= margin and tx2 <= mask.width - margin and ty2 <= mask.height - margin:
                valid_transform = True
            else:
                attempt += 1

        if not valid_transform:
            # 유효한 변환을 찾지 못한 경우 순수 이동 변환 사용
            print("Could not find a valid affine transform. Using pure translation instead.")
            allowed_dx_min = margin - x1
            allowed_dx_max = mask.width - margin - x2
            allowed_dy_min = margin - y1
            allowed_dy_max = mask.height - margin - y2
            dx = random.uniform(allowed_dx_min, allowed_dx_max)
            dy = random.uniform(allowed_dy_min, allowed_dy_max)
            forward_matrix = (1.0, 0, dx, 0, 1.0, dy)
            inv_matrix = (1.0, 0, -dx, 0, 1.0, -dy)
            shifted_mask = mask.transform(mask.size, Image.AFFINE, inv_matrix, fillcolor=0)

        shifted_img = img.transform(img.size, Image.AFFINE, inv_matrix, fillcolor=0)
        invert_mask = ImageOps.invert(shifted_mask)

        with torch.autocast("cuda"):
            generated_image = pipeline(
                prompt=selected_prompt,
                image=shifted_img,
                mask_image=invert_mask,
                control_image=invert_mask,
                num_images_per_prompt=1,
                generator=generator,
                num_inference_steps=num_inference_steps,
                guess_mode=False,
                controlnet_conditioning_scale=cond_scale
            ).images[0]

        # 합성 이미지, 마스크 저장
        ext = os.path.splitext(img_file)[1] or '.jpg'
        save_path = os.path.join(save_folder, f"{base_filename}_42{ext}")
        generated_image.save(save_path)
        print(f"Seed 42 | Prompt: {selected_prompt} -> Saved image to {save_path}")

        mask_save_path = os.path.join(mask_save_folder, f"{base_filename}_42{ext}")
        shifted_mask.save(mask_save_path)
        print(f"Saved shifted mask to {mask_save_path}")

print("Image synthesis completed for all images.")


소스 이미지 일부 샘플링하여 합성하는 코드

In [1]:
# ---------------------------------------------------------------------------------
# 패키지 임포트
import os
import random
import math
import json
import cv2
import numpy as np
from diffusers import (
    AutoencoderKL,
    ControlNetModel,
    DDPMScheduler,
    UNet2DConditionModel,
    UniPCMultistepScheduler,
)
from pipeline_controlnet_inpaint import StableDiffusionControlNetInpaintPipeline
from transformers import AutoTokenizer, PretrainedConfig
import torch
from PIL import Image, ImageOps
from transparent_background import Remover

# ---------------------- 재현성을 위한 시드 설정 ----------------------
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

# ---------------------------------------------------------------------------------
# 모델 및 파이프라인 로드
controlnet = ControlNetModel.from_pretrained('./checkpoints/controlnet/controlnet')

def import_model_class_from_model_name_or_path(pretrained_model_name_or_path: str, revision: str):
    text_encoder_config = PretrainedConfig.from_pretrained(
        pretrained_model_name_or_path,
        subfolder="text_encoder",
        revision=revision,
    )
    model_class = text_encoder_config.architectures[0]
    if model_class == "CLIPTextModel":
        from transformers import CLIPTextModel
        return CLIPTextModel
    elif model_class == "RobertaSeriesModelWithTransformation":
        from diffusers.pipelines.alt_diffusion.modeling_roberta_series import RobertaSeriesModelWithTransformation
        return RobertaSeriesModelWithTransformation
    else:
        raise ValueError(f"{model_class} is not supported.")

tokenizer = AutoTokenizer.from_pretrained(
    "stabilityai/stable-diffusion-2-inpainting",
    subfolder="tokenizer",
    use_fast=False,
)
sd_inpainting_model_name = "stabilityai/stable-diffusion-2-inpainting"
text_encoder_cls = import_model_class_from_model_name_or_path(sd_inpainting_model_name, None)
noise_scheduler = DDPMScheduler.from_pretrained(sd_inpainting_model_name, subfolder="scheduler")
text_encoder = text_encoder_cls.from_pretrained(sd_inpainting_model_name, subfolder="text_encoder", revision=None)
vae = AutoencoderKL.from_pretrained(sd_inpainting_model_name, subfolder="vae", revision=None)
unet = UNet2DConditionModel.from_pretrained(sd_inpainting_model_name, subfolder="unet", revision=None)
weight_dtype = torch.float32
pipeline = StableDiffusionControlNetInpaintPipeline.from_pretrained(
    sd_inpainting_model_name,
    vae=vae,
    text_encoder=text_encoder,
    tokenizer=tokenizer,
    unet=unet,
    controlnet=controlnet,
    safety_checker=None,
    revision=None,
    torch_dtype=weight_dtype,
)
pipeline.scheduler = UniPCMultistepScheduler.from_config(pipeline.scheduler.config)
pipeline = pipeline.to('cuda')
pipeline.set_progress_bar_config(disable=True)

# ---------------------------------------------------------------------------------
# 유틸리티 함수
def resize_with_padding(img, expected_size):
    """
    비율을 유지하면서 이미지를 리사이즈 후, 중심에 회색패딩을 넣어 expected_size에 맞춤
    """
    img.thumbnail((expected_size[0], expected_size[1]))
    delta_width = expected_size[0] - img.size[0]
    delta_height = expected_size[1] - img.size[1]
    pad_width = delta_width // 2
    pad_height = delta_height // 2
    padding = (pad_width, pad_height, delta_width - pad_width, delta_height - pad_height)
    return ImageOps.expand(img, padding)

def get_min_area_bbox(mask):
    """
    주어진 PIL 마스크에서 컨투어를 찾아 최소 바운딩 박스를 반환
    """
    mask_np = np.array(mask)
    if len(mask_np.shape) == 3:
        mask_np = cv2.cvtColor(mask_np, cv2.COLOR_RGB2GRAY)
    _, binary = cv2.threshold(mask_np, 1, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return (0, 0, mask.width, mask.height)
    cnt = max(contours, key=cv2.contourArea)
    rect = cv2.minAreaRect(cnt)
    box = cv2.boxPoints(rect)
    box = np.int0(box)
    x_min = int(np.min(box[:, 0]))
    y_min = int(np.min(box[:, 1]))
    x_max = int(np.max(box[:, 0]))
    y_max = int(np.max(box[:, 1]))
    return (x_min, y_min, x_max, y_max)

def classify_image(filename):
    """
    파일명에 포함된 키워드를 기준으로 클래스 반환
    - bmp3_ -> "bmp3"
    - t80_  -> "t80"
    - truck -> "military_truck"
    """
    lower_fname = filename.lower()
    if "bmp3_" in lower_fname:
        return "bmp3"
    elif "t80_" in lower_fname:
        return "t80"
    elif "truck" in lower_fname:
        return "military_truck"
    else:
        return None

def get_prompts(subject, is_drone_view=False):
    """
    subject (tank, armored car, military truck 등)를 사용하여 기본 프롬프트 목록 생성.
    is_drone_view=True이면 끝에 'seen from the air.' 추가
    """
    base_prompts = [
        f"A {subject} on the road.",
        f"A {subject} on the grass.",
        f"A {subject} on the mountains.",
        f"A {subject} on a dry dirt field",
        f"A {subject} on the hill.",
        f"A {subject} on a snowy road.",
        f"A {subject} on the dry grassland near a forested hillside.",
        f"A {subject} floating on a river with trees in the background.",
    ]
    if is_drone_view:
        base_prompts = [prompt + " seen from the air." for prompt in base_prompts]
    return base_prompts

# 클래스별 subject 매핑
subject_dict = {
    "t80": "tank",
    "bmp3": "armored car",
    "military_truck": "military truck"
}

# 클래스별 인퍼런스 파라미터 (여기서는 1)
num_seeds_dict = {
    "bmp3": 1,
    "military_truck": 1,
    "t80": 1,
}

# ControlNet 인퍼런스 파라미터
cond_scale = 0.5
num_inference_steps = 20

# ---------------------------------------------------------------------------------
# 경로 설정 (실제 환경에 맞춰 수정)
source_folder = r'D:\py\AIM\Projects\Drone_detection\OVD\dataset_datamaker\train_dataset\OD_2\images'
base_save_root = r'D:\py\AIM\Projects\Drone_detection\OVD\dataset_datamaker\train_dataset\fusion\train\images'

# Qwen 분류 스크립트에서 생성한 JSON 파일 경로 (반드시 실제 JSON 경로로 바꿔주세요)
drone_classification_path = r'D:\py\AIM\Projects\Drone_detection\OVD\dataset_datamaker\train_dataset\fusion\train\drone_view_classification.json'

if os.path.exists(drone_classification_path):
    with open(drone_classification_path, "r", encoding="utf-8") as f:
        drone_view_results = json.load(f)
else:
    print(f"JSON 파일을 찾을 수 없습니다: {drone_classification_path}")
    drone_view_results = {}

# ---------------------------------------------------------------------------------
# 소스 폴더의 이미지 파일 목록(.jpg, .jpeg, .png, .bmp)
image_files = [f for f in os.listdir(source_folder) if f.lower().endswith((".jpg", ".jpeg", ".png", ".bmp"))]
if not image_files:
    print("소스 폴더에 이미지 파일이 없습니다.")
else:
    print(f"소스 폴더에서 {len(image_files)}개의 이미지를 찾았습니다.")

# ---------------------------------------------------------------------------------
# 1. 클래스별 파일 리스트 생성
files_by_class = {
    "bmp3": [],
    "t80": [],
    "military_truck": []
}

for fname in image_files:
    cls = classify_image(fname)
    if cls in files_by_class:
        files_by_class[cls].append(fname)

# ---------------------------------------------------------------------------------
# 2. 픽셀 면적 기준(전체 640*640 = 409600픽셀, 20% = 81920픽셀)으로 유효한 샘플 100장씩 수집
threshold_ratio = 0.20
total_pixels = 640 * 640
threshold_pixels = int(total_pixels * threshold_ratio)  # 81920

sampled_files = []

for cls_name, file_list in files_by_class.items():
    if len(file_list) < 100:
        raise ValueError(f"클래스 '{cls_name}'의 이미지가 100장 미만입니다. 현재 개수: {len(file_list)}")
    
    # 시드 고정 후 셔플
    random.seed(42)
    random.shuffle(file_list)

    valid_samples = []
    for fname in file_list:
        image_path = os.path.join(source_folder, fname)
        try:
            img = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"ERROR: 이미지 열기 실패({fname}): {e}")
            continue

        # 640x640으로 패딩 리사이즈
        img_resized = resize_with_padding(img, (640, 640))
        
        # 마스크 추출
        remover = Remover(mode='base')
        fg_mask = remover.process(img_resized, type='map')
        mask_np = np.array(fg_mask)
        if len(mask_np.shape) == 3:
            mask_np = cv2.cvtColor(mask_np, cv2.COLOR_RGB2GRAY)
        _, binary_mask = cv2.threshold(mask_np, 1, 255, cv2.THRESH_BINARY)
        fg_count = np.count_nonzero(binary_mask)

        # 유효 여부 판단: foreground 픽셀 > threshold
        if fg_count > threshold_pixels:
            valid_samples.append(fname)

        if len(valid_samples) == 100:
            break

    if len(valid_samples) < 100:
        raise ValueError(f"클래스 '{cls_name}'에서 유효한 샘플 100장을 찾지 못했습니다. (찾은 개수: {len(valid_samples)})")
    
    sampled_files.extend(valid_samples)
    print(f"클래스 '{cls_name}' - 유효 샘플 100장 수집 완료.")

# ---------------------------------------------------------------------------------
# 3. 샘플링된 파일들에 대해 합성 수행 및 메타데이터 기록
generated_metadata = {}  # key: 상대 경로(클래스/파일명), value: 사용된 프롬프트

for img_file in sampled_files:
    cls = classify_image(img_file)  # 반드시 bmp3, t80, 또는 military_truck 중 하나여야 함

    # JSON에서 드론뷰 분류 결과 가져오기 (키: 파일명, 값: "Drone View" 또는 "Normal View")
    drone_result = drone_view_results.get(img_file, "Normal View")
    is_drone_view = True if drone_result == "Drone View" else False

    subject = subject_dict.get(cls, "object")
    prompts = get_prompts(subject, is_drone_view=is_drone_view)
    num_seeds = num_seeds_dict.get(cls, 1)

    # 합성된 이미지 및 마스크를 저장할 폴더 생성
    save_folder = os.path.join(base_save_root, cls)
    os.makedirs(save_folder, exist_ok=True)
    mask_save_folder = os.path.join(base_save_root, f"{cls}_mask")
    os.makedirs(mask_save_folder, exist_ok=True)

    image_path = os.path.join(source_folder, img_file)
    print(f"\nProcessing file: {image_path} -> Classified as: {cls}, Drone View: {drone_result}")
    try:
        img = Image.open(image_path)
        if img.mode != "RGB":
            img = img.convert("RGB")
    except Exception as e:
        print(f"이미지 열기 실패: {image_path}, 오류: {e}")
        continue

    # YOLO 입력 크기에 맞춰 패딩 리사이즈 (640x640)
    img = resize_with_padding(img, (640, 640))
    base_filename = os.path.splitext(os.path.basename(img_file))[0]

    # Remover를 사용해 전경 마스크 추출
    remover = Remover(mode='base')
    fg_mask = remover.process(img, type='map')
    mask = fg_mask

    # num_seeds 만큼 합성 수행 (여기서는 1회)
    for _ in range(num_seeds):
        selected_prompt = random.choice(prompts)
        generator = torch.Generator(device='cuda').manual_seed(42)
        margin = 1

        # 객체의 최소 바운딩 박스 계산
        bbox = get_min_area_bbox(fg_mask)
        x1, y1, x2, y2 = bbox
        cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
        hw, hh = (x2 - x1) / 2, (y2 - y1) / 2
        
        # 스케일 변환 시도
        max_attempts = 500
        attempt = 0
        valid_transform = False

        while attempt < max_attempts and not valid_transform:
            scale_factor = random.uniform(0.8, 0.9)
            new_hw, new_hh = scale_factor * hw, scale_factor * hh
            dx_min = margin - (cx - new_hw)
            dx_max = mask.width - margin - (cx + new_hw)
            dy_min = margin - (cy - new_hh)
            dy_max = mask.height - margin - (cy + new_hh)
            if dx_max < dx_min or dy_max < dy_min:
                attempt += 1
                continue
            dx = random.uniform(dx_min, dx_max)
            dy = random.uniform(dy_min, dy_max)
            t_x = (1 - scale_factor) * cx + dx
            t_y = (1 - scale_factor) * cy + dy
            forward_matrix = (scale_factor, 0, t_x, 0, scale_factor, t_y)
            inv_matrix = (1/scale_factor, 0, -t_x/scale_factor, 0, 1/scale_factor, -t_y/scale_factor)
            shifted_mask = mask.transform(mask.size, Image.AFFINE, inv_matrix, fillcolor=0)
            transformed_bbox = shifted_mask.getbbox()
            if transformed_bbox is None:
                attempt += 1
                continue
            tx1, ty1, tx2, ty2 = transformed_bbox
            if tx1 >= margin and ty1 >= margin and tx2 <= mask.width - margin and ty2 <= mask.height - margin:
                valid_transform = True
            else:
                attempt += 1

        if not valid_transform:
            # 유효한 변환을 찾지 못했을 때 순수 이동 변환 적용
            print("유효한 어파인 변환을 찾지 못했습니다. 순수 이동 변환으로 대체합니다.")
            allowed_dx_min = margin - x1
            allowed_dx_max = mask.width - margin - x2
            allowed_dy_min = margin - y1
            allowed_dy_max = mask.height - margin - y2
            dx = random.uniform(allowed_dx_min, allowed_dx_max)
            dy = random.uniform(allowed_dy_min, allowed_dy_max)
            forward_matrix = (1.0, 0, dx, 0, 1.0, dy)
            inv_matrix = (1.0, 0, -dx, 0, 1.0, -dy)
            shifted_mask = mask.transform(mask.size, Image.AFFINE, inv_matrix, fillcolor=0)

        # 이미지를 이동/스케일 변환
        shifted_img = img.transform(img.size, Image.AFFINE, inv_matrix, fillcolor=0)
        invert_mask = ImageOps.invert(shifted_mask)

        # Stable Diffusion + ControlNet 합성
        with torch.autocast("cuda"):
            generated_image = pipeline(
                prompt=selected_prompt,
                image=shifted_img,
                mask_image=invert_mask,
                control_image=invert_mask,
                num_images_per_prompt=1,
                generator=generator,
                num_inference_steps=num_inference_steps,
                guess_mode=False,
                controlnet_conditioning_scale=cond_scale
            ).images[0]

        # 합성 이미지 및 마스크 저장
        ext = os.path.splitext(img_file)[1] or '.jpg'
        save_path = os.path.join(save_folder, f"{base_filename}_42{ext}")
        generated_image.save(save_path)
        print(f"Seed 42 | Prompt: {selected_prompt} -> Saved image to {save_path}")

        mask_save_path = os.path.join(mask_save_folder, f"{base_filename}_42{ext}")
        shifted_mask.save(mask_save_path)
        print(f"Saved shifted mask to {mask_save_path}")

        # 메타데이터에 기록 (상대 경로로 저장)
        rel_path = os.path.relpath(save_path, base_save_root).replace("\\", "/")
        generated_metadata[rel_path] = selected_prompt

print("샘플링된 모든 이미지에 대한 합성이 완료되었습니다.")

# ---------------------------------------------------------------------------------
# 4. 메타데이터를 JSON 파일로 저장
metadata_save_path = os.path.join(base_save_root, "generated_images_prompts.json")
with open(metadata_save_path, "w", encoding="utf-8") as meta_f:
    json.dump(generated_metadata, meta_f, ensure_ascii=False, indent=4)
print(f"생성된 이미지 메타데이터가 저장되었습니다: {metadata_save_path}")


ModuleNotFoundError: No module named 'pipeline_controlnet_inpaint'