In [None]:
%pip install ultralytics pyyaml -q

In [None]:
# 필요 라이브러리 import
import os
import shutil
from pathlib import Path
import yaml


## 함수 작성

### 방법 1 각 데이터셋을 따로 필터링해서 저장 (파인튜닝용)

In [None]:
from pathlib import Path
import os
import shutil
import yaml


# 1. 원하는 클래스 이미지/라벨 데이터 뽑아내는 함수
def extract_classes_finetuning(original_path, wanted_class_ids, class_names = ['helmet', 'gloves', 'boots'], filtered_folder=None):
    dataset_name = Path(original_path).name
    
    # filtered_folder가 지정되지 않으면 자동으로 데이터셋별 폴더 생성
    if filtered_folder is None:
        filtered_folder = f'dataset/filtered_{dataset_name}'
    
    for split in ['train', 'val', 'test']:
        # 경로 패턴 1: images/train, labels/train
        img_dir1 = os.path.join(original_path, 'images', split)
        lbl_dir1 = os.path.join(original_path, 'labels', split)
        
        # 경로 패턴 2: train/images, train/labels
        img_dir2 = os.path.join(original_path, split, 'images')
        lbl_dir2 = os.path.join(original_path, split, 'labels')
        
        if os.path.exists(img_dir1) and os.path.exists(lbl_dir1):
            img_dir, lbl_dir = img_dir1, lbl_dir1
        elif os.path.exists(img_dir2):
            img_dir = img_dir2
            lbl_dir = lbl_dir2 if os.path.exists(lbl_dir2) else None
        else:
            continue
        
        filtered_img_dir = os.path.join(filtered_folder, split, 'images')
        filtered_lbl_dir = os.path.join(filtered_folder, split, 'labels')
        os.makedirs(filtered_img_dir, exist_ok=True)
        os.makedirs(filtered_lbl_dir, exist_ok=True)
        
        class_mapping = {old_id: new_id for new_id, old_id in enumerate(wanted_class_ids)}
        
        counter = 1
        
        if lbl_dir and os.path.exists(lbl_dir):
            for lbl_file in os.listdir(lbl_dir):
                if not lbl_file.endswith('.txt'):
                    continue
                
                lbl_path = os.path.join(lbl_dir, lbl_file)
                
                with open(lbl_path, 'r') as f:
                    lines = f.readlines()
                
                filtered_lines = []
                for line in lines:
                    parts = line.strip().split()
                    if not parts:
                        continue
                    class_id = int(parts[0])
                    if class_id in class_mapping:
                        parts[0] = str(class_mapping[class_id])
                        filtered_lines.append(' '.join(parts) + '\n')
                
                if not filtered_lines:
                    continue
                
                img_name = os.path.splitext(lbl_file)[0]
                img_found = False
                
                for ext in ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']:
                    img_path = os.path.join(img_dir, img_name + ext)
                    if os.path.exists(img_path):
                        new_img_name = f"{dataset_name}_{split}_{counter:04d}{ext}"
                        new_lbl_name = f"{dataset_name}_{split}_{counter:04d}.txt"
                        
                        shutil.copy2(img_path, os.path.join(filtered_img_dir, new_img_name))
                        
                        with open(os.path.join(filtered_lbl_dir, new_lbl_name), 'w') as f:
                            f.writelines(filtered_lines)
                        
                        counter += 1
                        img_found = True
                        break
        
        print(f'[{dataset_name}] {split}: {counter-1}개 추출 완료')
    
    # 데이터 추출 후 자동으로 yaml 파일 생성
    create_yaml(filtered_folder, class_names)
    
    return filtered_folder


# 2. data.yaml 파일 생성 함수
def create_yaml(filtered_folder, class_names):
    """
    필터링된 데이터셋 폴더에 data.yaml 파일을 생성합니다.
    
    Parameters:
    - filtered_folder: 필터링된 데이터셋이 저장된 폴더 경로
    - class_names: 클래스 이름 리스트 (예: ['Hardhat', 'Mask'])
    """
    yaml_path = os.path.join(filtered_folder, 'data.yaml')
    
    # yaml 파일 내용 구성
    yaml_content = {
        'path': os.path.abspath(filtered_folder),  # 절대 경로
        'train': 'train/images',
        'val': 'val/images',
        'test': 'test/images',
        'nc': len(class_names),  # 클래스 개수
        'names': class_names  # 클래스 이름 리스트
    }
    
    # yaml 파일 저장
    with open(yaml_path, 'w', encoding='utf-8') as f:
        yaml.dump(yaml_content, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
    
    print(f'✓ data.yaml 파일 생성 완료: {yaml_path}')




In [None]:
def combine_datasets_yaml(yaml_paths, output_folder, combined_name='combined_dataset'):
    """
    여러 데이터셋의 yaml 파일을 읽어서 하나의 combined yaml 파일을 생성합니다.
    
    Parameters:
    - yaml_paths: 결합할 데이터셋들의 yaml 파일 경로 리스트 
                  (예: ['dataset1/data.yaml', 'dataset2/data.yaml'])
    - output_folder: 결합된 yaml 파일을 저장할 폴더 경로
    - combined_name: 결합된 yaml 파일 이름 (기본값: 'combined_dataset')
    
    Returns:
    - combined_yaml_path: 생성된 combined yaml 파일의 경로
    """
    
    # 출력 폴더 생성
    os.makedirs(output_folder, exist_ok=True)
    
    # 각 데이터셋의 경로 정보를 저장할 리스트
    train_paths = []
    val_paths = []
    test_paths = []
    all_class_names = []
    
    print('=' * 60)
    print('데이터셋 결합 시작...')
    print('=' * 60)
    
    # 각 yaml 파일 읽기
    for i, yaml_path in enumerate(yaml_paths, 1):
        print(f'\n[{i}/{len(yaml_paths)}] 읽는 중: {yaml_path}')
        
        with open(yaml_path, 'r', encoding='utf-8') as f:
            data = yaml.safe_load(f)
        
        # 데이터셋의 절대 경로
        dataset_path = data['path']
        
        # train/val/test 경로 추가 (절대 경로로 변환)
        train_paths.append(os.path.join(dataset_path, data['train']))
        val_paths.append(os.path.join(dataset_path, data['val']))
        
        if 'test' in data and data['test']:
            test_paths.append(os.path.join(dataset_path, data['test']))
        
        # 클래스 이름 수집 (중복 제거)
        for class_name in data['names']:
            if class_name not in all_class_names:
                all_class_names.append(class_name)
        
        print(f'  ✓ 클래스 수: {data["nc"]}개')
        print(f'  ✓ 클래스 이름: {data["names"]}')
    
    # Combined yaml 내용 구성
    combined_yaml_content = {
        'path': '',  # 여러 경로를 사용하므로 비워둠
        'train': train_paths,  # 리스트 형태로 여러 경로 지정
        'val': val_paths,      # 리스트 형태로 여러 경로 지정
        'nc': len(all_class_names),  # 통합된 클래스 개수
        'names': all_class_names      # 통합된 클래스 이름 리스트
    }
    
    # test 경로가 있으면 추가
    if test_paths:
        combined_yaml_content['test'] = test_paths
    
    # Combined yaml 파일 저장
    combined_yaml_path = os.path.join(output_folder, f'{combined_name}.yaml')
    with open(combined_yaml_path, 'w', encoding='utf-8') as f:
        yaml.dump(combined_yaml_content, f, default_flow_style=False, 
                  allow_unicode=True, sort_keys=False)
    
    # 결과 출력
    print('\n' + '=' * 60)
    print('✓ 데이터셋 결합 완료!')
    print('=' * 60)
    print(f'생성된 파일: {combined_yaml_path}')
    print(f'\n[결합된 데이터셋 정보]')
    print(f'- 총 데이터셋 수: {len(yaml_paths)}개')
    print(f'- Train 경로 수: {len(train_paths)}개')
    print(f'- Val 경로 수: {len(val_paths)}개')
    if test_paths:
        print(f'- Test 경로 수: {len(test_paths)}개')
    print(f'- 총 클래스 수: {len(all_class_names)}개')
    print(f'- 클래스 이름: {all_class_names}')
    print('=' * 60)
    
    return combined_yaml_path


### 방법 2 모든 데이터셋 병합

In [None]:
from pathlib import Path
import os
import shutil

# 원하는 클래스 이미지/라벨 데이터 뽑아내는 함수
def extract_classes(original_path, wanted_class_ids, temp_folder='temp_dataset'):
    dataset_name = Path(original_path).name
    
    for split in ['train', 'val', 'test']:
        # 경로 패턴 1: images/train, labels/train
        img_dir1 = os.path.join(original_path, 'images', split)
        lbl_dir1 = os.path.join(original_path, 'labels', split)
        
        # 경로 패턴 2: train/images, train/labels
        img_dir2 = os.path.join(original_path, split, 'images')
        lbl_dir2 = os.path.join(original_path, split, 'labels')
        
        if os.path.exists(img_dir1) and os.path.exists(lbl_dir1):
            img_dir, lbl_dir = img_dir1, lbl_dir1
        elif os.path.exists(img_dir2):
            img_dir = img_dir2
            lbl_dir = lbl_dir2 if os.path.exists(lbl_dir2) else None
        else:
            continue
        
        temp_img_dir = os.path.join(temp_folder, split, 'images')
        temp_lbl_dir = os.path.join(temp_folder, split, 'labels')
        os.makedirs(temp_img_dir, exist_ok=True)
        os.makedirs(temp_lbl_dir, exist_ok=True)
        
        class_mapping = {old_id: new_id for new_id, old_id in enumerate(wanted_class_ids)}
        
        counter = 1
        
        if lbl_dir and os.path.exists(lbl_dir):
            for lbl_file in os.listdir(lbl_dir):
                if not lbl_file.endswith('.txt'):
                    continue
                
                lbl_path = os.path.join(lbl_dir, lbl_file)
                
                with open(lbl_path, 'r') as f:
                    lines = f.readlines()
                
                filtered_lines = []
                for line in lines:
                    parts = line.strip().split()
                    if not parts:
                        continue
                    class_id = int(parts[0])
                    if class_id in class_mapping:
                        parts[0] = str(class_mapping[class_id])
                        filtered_lines.append(' '.join(parts) + '\n')
                
                if not filtered_lines:
                    continue
                
                img_name = os.path.splitext(lbl_file)[0]
                img_found = False
                
                for ext in ['.jpg', '.jpeg', '.png', '.JPG', '.JPEG', '.PNG']:
                    img_path = os.path.join(img_dir, img_name + ext)
                    if os.path.exists(img_path):
                        new_img_name = f"{dataset_name}_{split}_{counter:04d}{ext}"
                        new_lbl_name = f"{dataset_name}_{split}_{counter:04d}.txt"
                        
                        shutil.copy2(img_path, os.path.join(temp_img_dir, new_img_name))
                        
                        with open(os.path.join(temp_lbl_dir, new_lbl_name), 'w') as f:
                            f.writelines(filtered_lines)
                        
                        counter += 1
                        img_found = True
                        break
        
        print(f'[{dataset_name}] {split}: {counter-1}개 추출 완료')


# 사용 예시
# extract_classes('/path/to/dataset1', [0, 1, 4])


In [None]:
# 임시 폴더 데이터를 최종 폴더로 이동
def move_to_final(temp_folder='temp_dataset', final_folder='final_dataset'):
    for split in ['train', 'val', 'test']:
        temp_img = os.path.join(temp_folder, split, 'images')
        temp_lbl = os.path.join(temp_folder, split, 'labels')
        
        final_img = os.path.join(final_folder, split, 'images')
        final_lbl = os.path.join(final_folder, split, 'labels')
        
        os.makedirs(final_img, exist_ok=True)
        os.makedirs(final_lbl, exist_ok=True)
        
        if os.path.exists(temp_img):
            for file in os.listdir(temp_img):
                shutil.move(os.path.join(temp_img, file), os.path.join(final_img, file))
        
        if os.path.exists(temp_lbl):
            for file in os.listdir(temp_lbl):
                shutil.move(os.path.join(temp_lbl, file), os.path.join(final_lbl, file))
    
    shutil.rmtree(temp_folder)
    print(f'최종 폴더 {final_folder}로 이동 완료')

# 사용 예시
# move_to_final()


In [None]:
# 최종 yaml파일 생성
def create_final_yaml(final_folder='final_dataset'):
    class_names = ['helmet', 'gloves', 'boots']
    
    yaml_data = {
        'path': os.path.abspath(final_folder),
        'train': 'train/images',
        'val': 'val/images',
        'test': 'test/images',
        'nc': len(class_names),
        'names': class_names
    }
    
    yaml_path = os.path.join(final_folder, 'data.yaml')
    
    with open(yaml_path, 'w', encoding='utf-8') as f:
        yaml.dump(yaml_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
    
    print(f'YAML 파일 생성 완료: {yaml_path}')
    print(f'클래스: {class_names}')

# 사용 예시
# create_final_yaml()

## 실행

### 파인튜닝용 데이터셋 생성

In [None]:
# 원본 경로의 각 폴더 이름이 모두 train, val, test, images, labels로 이루어져야함. 만약 아니라면 폴더명과 data.yaml 변경 필수
# 원본 클래스 순서는 무조건 helmet, gloves, boots 순서로 가져오기

extract_classes_finetuning(
    original_path='./dataset/PPE-1',
    class_names = ['helmet', 'vest', 'gloves', 'boots'],
    wanted_class_ids=[0, 5, 3, 2],  # 원본 PPE-1의 데이터셋이 0:헬멧, 3:장갑, 2:신발, 5:vest라 이렇게 뽑음
)

extract_classes_finetuning(
    original_path='./dataset/PPE-2',
    class_names = ['helmet', 'vest', 'gloves', 'boots'],
    wanted_class_ids=[0, 2, 1, 3],
    
)


### 병합된 데이터셋 생성

yaml파일 병합으로 사용하기

In [None]:
# 데이터셋1과 데이터셋2의 yaml 파일 경로
yaml_files = [
    './dataset/filtered_PPE-1/data.yaml',
    './dataset/filtered_PPE-2/data.yaml'
]

# Combined yaml 파일 생성
combined_yaml = combine_datasets_yaml(
    yaml_paths=yaml_files,
    output_folder='./dataset/',
    combined_name='ppe_combined'
)


데이터 자체 병합으로 사용하기

In [None]:
# 원본 경로 및 클래스 설정 
# 원본 경로의 각 폴더 이름이 모두 train, val, test, images, labels로 이루어져야함. 만약 아니라면 폴더명과 data.yaml 변경 필수
# 클래스 순서는 무조건 헬멧, 장갑, 안전화 순서로 하기
datasets = [
    {'path': 'PPE-1', 'classes': [0, 3, 2]},
    {'path': 'PPE-2', 'classes': [0, 1, 3]}
]

# 각 데이터셋에서 원하는 클래스 추출
for dataset in datasets:
    extract_classes(dataset['path'], dataset['classes'])

# 임시 폴더의 모든 데이터를 최종 폴더로 이동
move_to_final()

# YAML 파일 생성
create_final_yaml(final_folder='dataset/final_dataset')