In [6]:
from glob import glob
from tqdm import tqdm
import orjson as json

In [3]:
json_list = glob("E:/download/dataset/040.교량 3D 외관점검 영상 데이터/3.개방데이터/1.데이터/Training/02.라벨링데이터/*.json")

In [7]:
dataset = []
for json_path in tqdm(json_list):
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.loads(f.read())
        image_file = data['imagePath']
        annotations = []
        for item in data['shapes']:
            label = item['label']
            points = item['points']
            shape_type = item['shape_type']
            annotations.append({
                'label': label,
                'points': points,
                'shape_type': shape_type
            })

        dataset.append({
            'image_file': image_file,
            'annotations': annotations
        })

with open('preprocessed_dataset.json', 'wb') as f:
    f.write(json.dumps(dataset))

100%|██████████| 336068/336068 [47:13<00:00, 118.59it/s] 


In [3]:
def extract_unique_labels(data):
    labels = set()
    normal = []
    anomaly = []
    for item in tqdm(data):
        if item['annotations'] is None or len(item['annotations']) == 0:
            normal.append(item)
        else:
            anomaly.append(item)
        for ann in item.get("annotations", []):
            labels.add(ann.get("label"))
            
    return normal, anomaly ,list(labels)

# normal_data, anomaly_data, unique_labels = extract_unique_labels(dataset)
# print(unique_labels)

In [None]:
import json
import random
from collections import defaultdict

random.seed(42)  # 재현 가능하게 fix

JSON_PATH = "preprocessed_dataset.json"
MAX_PER_LABEL = 1000          

# 1) json 로드
with open(JSON_PATH, "r", encoding="utf-8") as f:
    data = json.load(f)


NameError: name 'dataset' is not defined

In [9]:

# 2) 라벨별로 이미지 모으기 (중복 제거를 위해 set 사용)

normal_data, anomaly_data, unique_labels = extract_unique_labels(data)
print(unique_labels)
label_to_images = {label: set() for label in unique_labels}

for item in tqdm(data):
    img_name = item["image_file"]
    anns = item.get("annotations", [])
    for ann in anns:
        lab = ann.get("label")
        if lab in label_to_images:
            label_to_images[lab].add(img_name)

# 3) 라벨별로 최대 5000장 랜덤 샘플링
label_to_sampled = {}

for label, img_set in label_to_images.items():
    img_list = list(img_set)
    n = len(img_list)
    if n == 0:
        print(f"[{label}] 이미지 없음, 샘플링 생략")
        label_to_sampled[label] = []
        continue

    if n <= MAX_PER_LABEL:
        sampled = img_list  # 전체 사용
        print(f"[{label}] 이미지 {n}장밖에 없어서 전부 사용")
    else:
        sampled = random.sample(img_list, MAX_PER_LABEL)
        print(f"[{label}] 총 {n}장 중 {MAX_PER_LABEL}장 랜덤 샘플링")

    label_to_sampled[label] = sampled


100%|██████████| 336068/336068 [00:00<00:00, 872910.95it/s]


['Efflorescene', 'SteelDefect', 'Pothole', 'Leak', 'Spalling', 'AspaltCrack', 'PaintDamage', 'Exposure', 'ConcreteCrack']


100%|██████████| 336068/336068 [00:00<00:00, 703437.89it/s]

[Efflorescene] 총 33846장 중 1000장 랜덤 샘플링
[SteelDefect] 이미지 244장밖에 없어서 전부 사용
[Pothole] 이미지 465장밖에 없어서 전부 사용
[Leak] 총 10186장 중 1000장 랜덤 샘플링
[Spalling] 이미지 490장밖에 없어서 전부 사용
[AspaltCrack] 총 116706장 중 1000장 랜덤 샘플링
[PaintDamage] 이미지 104장밖에 없어서 전부 사용
[Exposure] 총 2209장 중 1000장 랜덤 샘플링
[ConcreteCrack] 총 93103장 중 1000장 랜덤 샘플링





In [12]:
from pathlib import Path
import shutil

IMAGES_ROOT = Path("e:/projects/대학원/이상탐지/bridge/data/anomaly_data")         # 원본 이미지가 있는 루트
OUT_ROOT = Path("e:/projects/대학원/이상탐지/sampling")       # 새로 만들 라벨별 서브셋
OUT_ROOT.mkdir(parents=True, exist_ok=True)

for label, img_list in label_to_sampled.items():
    dst_dir = OUT_ROOT / label
    dst_dir.mkdir(parents=True, exist_ok=True)

    for img_name in img_list:
        src = IMAGES_ROOT / img_name
        if not src.exists():
            print(f"[WARN] {src} 파일이 없음, 건너뜀")
            continue
        shutil.copy2(src, dst_dir / img_name)

    print(f"[{label}] {len(img_list)}장 복사 완료 → {dst_dir}")


[Efflorescene] 1000장 복사 완료 → e:\projects\대학원\이상탐지\sampling\Efflorescene
[SteelDefect] 244장 복사 완료 → e:\projects\대학원\이상탐지\sampling\SteelDefect
[Pothole] 465장 복사 완료 → e:\projects\대학원\이상탐지\sampling\Pothole
[Leak] 1000장 복사 완료 → e:\projects\대학원\이상탐지\sampling\Leak
[Spalling] 490장 복사 완료 → e:\projects\대학원\이상탐지\sampling\Spalling
[AspaltCrack] 1000장 복사 완료 → e:\projects\대학원\이상탐지\sampling\AspaltCrack
[PaintDamage] 104장 복사 완료 → e:\projects\대학원\이상탐지\sampling\PaintDamage
[Exposure] 1000장 복사 완료 → e:\projects\대학원\이상탐지\sampling\Exposure
[ConcreteCrack] 1000장 복사 완료 → e:\projects\대학원\이상탐지\sampling\ConcreteCrack


In [1]:
import random
import shutil
from pathlib import Path

# -----------------------
# 설정
# -----------------------
SRC_NORMAL = Path("e:/projects/대학원/이상탐지/bridge/data/normal_data")          # 원본 정상 이미지 폴더
DST_NORMAL = Path("e:/projects/대학원/이상탐지/normal")    # 복사할 폴더
TARGET = 5000                            # 샘플링할 개수

random.seed(42)

# -----------------------
# 폴더 준비
# -----------------------
DST_NORMAL.mkdir(parents=True, exist_ok=True)

# -----------------------
# 이미지 파일 목록 수집
# -----------------------
EXTS = [".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"]

all_images = [p for p in SRC_NORMAL.rglob("*") if p.suffix.lower() in EXTS]

print(f"정상 이미지 총 {len(all_images)}장 발견")

# -----------------------
# 샘플링
# -----------------------
if len(all_images) <= TARGET:
    print(f"⚠ {len(all_images)}장밖에 없어 전체 복사합니다.")
    sampled = all_images
else:
    sampled = random.sample(all_images, TARGET)
    print(f"{TARGET}장 랜덤 샘플링 완료!")

# -----------------------
# 복사 진행
# -----------------------
for img_path in sampled:
    dst_path = DST_NORMAL / img_path.name
    shutil.copy2(img_path, dst_path)

print(f"\n총 {len(sampled)}장 복사 완료 → {DST_NORMAL}")


정상 이미지 총 78715장 발견
5000장 랜덤 샘플링 완료!

총 5000장 복사 완료 → e:\projects\대학원\이상탐지\normal


In [None]:
label_to_sampled