In [2]:
!pip install requests

Defaulting to user installation because normal site-packages is not writeable


In [5]:
import json
import requests
import os
from urllib.parse import urlparse
import time
import random

# --- 설정값 ---
FILE_PATH = 'games.json'  # ⚠️ JSON 파일 경로를 실제 파일 이름으로 변경하세요.
BASE_DOWNLOAD_DIR = r'D:\ML\DS'
MAX_FILES_PER_GENRE = 15000
TARGET_TAGS = [
    "Adventure", "Action", "RPG", "Strategy", "Simulation", "Sports", "Racing", 
    "Puzzle", "Sandbox", "Shooter", "Survival", "Fighting", "Music"
]
# ----------------

def clean_filename(url):
    """URL에서 안전한 파일 이름을 추출하고 확장자를 포함합니다."""
    path = urlparse(url).path
    filename = os.path.basename(path)
    # 쿼리 파라미터나 기타 불필요한 문자를 제거하고 안전하게 만듭니다.
    if '?' in filename:
        filename = filename.split('?')[0]
    return filename

def get_genre_file_count(genre_dir):
    """장르 폴더 내의 실제 파일 개수를 계산합니다."""
    if not os.path.exists(genre_dir):
        return 0
    # .DS_Store 같은 숨김 파일을 제외하고 파일 개수를 셉니다.
    return len([name for name in os.listdir(genre_dir) if os.path.isfile(os.path.join(genre_dir, name)) and not name.startswith('.')])

def manage_file_limit(genre_dir, max_limit):
    """폴더의 파일 수가 최대 한도를 초과할 경우 파일을 랜덤하게 삭제합니다."""
    current_count = get_genre_file_count(genre_dir)
    if current_count > max_limit:
        files_to_delete = current_count - max_limit
        all_files = [f for f in os.listdir(genre_dir) if os.path.isfile(os.path.join(genre_dir, f)) and not f.startswith('.')]
        
        # 삭제할 파일을 랜덤하게 선택
        files_to_remove = random.sample(all_files, files_to_delete)
        
        print(f"[{os.path.basename(genre_dir)}] ❗️ 파일 초과: {current_count}개. {files_to_delete}개 삭제 중...")
        
        for file in files_to_remove:
            try:
                os.remove(os.path.join(genre_dir, file))
            except OSError as e:
                print(f"파일 삭제 오류: {e}")

def download_screenshot(url, genre_folder, appid):
    """스크린샷을 다운로드하고 장르 폴더에 저장합니다."""
    
    # 파일명 생성: AppID와 원래 파일명을 조합하여 중복 방지 및 추적 용이
    original_filename = clean_filename(url)
    filename = f"{appid}_{original_filename}"
    save_path = os.path.join(genre_folder, filename)

    if os.path.exists(save_path):
        # 파일이 이미 존재하면 건너뜁니다.
        # print(f"파일이 이미 존재하여 건너뜁니다: {filename}")
        return True

    try:
        # 파일 다운로드 (스트리밍 사용)
        response = requests.get(url, stream=True, timeout=10)
        response.raise_for_status() # HTTP 오류가 발생하면 예외 발생

        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return True
    
    except requests.exceptions.RequestException as e:
        # print(f"다운로드 실패 ({url}): {e}")
        return False

def process_game_data(file_path):
    """JSON 파일을 읽고 지정된 태그별로 스크린샷을 다운로드합니다."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"❌ 오류: 파일을 찾을 수 없습니다. 경로를 확인해 주세요: {file_path}")
        return
    except json.JSONDecodeError:
        print("❌ 오류: JSON 파일 형식이 올바르지 않습니다.")
        return
    
    # 기본 다운로드 디렉토리 생성
    os.makedirs(BASE_DOWNLOAD_DIR, exist_ok=True)
    
    # 장르별 폴더 생성 및 현재 파일 개수 추적
    genre_dirs = {}
    
    for tag in TARGET_TAGS:
        genre_path = os.path.join(BASE_DOWNLOAD_DIR, tag)
        os.makedirs(genre_path, exist_ok=True)
        genre_dirs[tag] = genre_path
        # 초기 파일 개수를 계산하여 MAX_FILES_PER_GENRE를 초과하는지 확인하고 관리합니다.
        manage_file_limit(genre_path, MAX_FILES_PER_GENRE)

    # 전체 데이터 순회 (딕셔너리 구조: {AppID: {게임 데이터}})
    game_list = data.values() if isinstance(data, dict) else data
    
    total_downloads = 0
    
    print("-" * 50)
    print(f"총 {len(game_list)}개의 게임을 처리합니다.")
    print(f"대상 태그: {', '.join(TARGET_TAGS)}")
    print(f"장르별 최대 파일 수: {MAX_FILES_PER_GENRE}개")
    print("-" * 50)

    for appid, game in data.items():
        if not isinstance(game, dict):
            continue
            
        # --------------------------------------------------------------------
        # 🛠️ 오류 수정 부분: 'tags' 키의 값을 가져와 형태에 따라 처리
        tags_data = game.get('tags')
        
        if isinstance(tags_data, dict):
            # 딕셔너리 형태일 경우 (기존 방식)
            game_tags = set(tags_data.keys())
        elif isinstance(tags_data, list):
            # 리스트 형태일 경우 (현재 오류 발생 형태)
            game_tags = set(tags_data)
        else:
            # tags 키가 없거나 다른 형태일 경우 건너뜁니다.
            continue
        # --------------------------------------------------------------------
            
        # 대상 태그와 게임 태그의 교집합을 구합니다.
        matching_tags = game_tags.intersection(TARGET_TAGS)
        
        # 일치하는 태그가 없는 경우 건너뜁니다.
        if not matching_tags:
            continue
            
        # 스크린샷 URL 목록 가져오기
        screenshots = game.get('screenshots', [])
        
        if not screenshots:
            continue
            
        print(f"[{game.get('name', appid)}] - {len(screenshots)}개 스크린샷, 매칭 태그: {', '.join(matching_tags)}")

        # 매칭된 각 태그에 대해 스크린샷을 저장합니다. (중복 저장 허용)
        for tag in matching_tags:
            genre_dir = genre_dirs[tag]
            
            # 파일 개수 확인: 이미 초과한 상태라면 다운로드 건너뛰기
            current_count = get_genre_file_count(genre_dir)
            if current_count >= MAX_FILES_PER_GENRE:
                # print(f"[{tag}] 🛑 최대 파일 수 ({MAX_FILES_PER_GENRE})에 도달하여 건너뜁니다.")
                continue

            # 스크린샷 다운로드 시작
            for shot in screenshots:
                if current_count >= MAX_FILES_PER_GENRE:
                    # 다운로드 도중에 한도를 초과하면 나머지 스크린샷은 건너뜁니다.
                    # print(f"[{tag}] 🛑 다운로드 중 한도 초과. 다음 태그로 이동.")
                    break
                
                # --------------------------------------------------------------------
                # 🛠️ 오류 수정 부분: 'shot'이 문자열인지 딕셔너리인지 확인하고 URL을 가져옵니다.
                if isinstance(shot, str):
                    # shot이 이미 URL 문자열인 경우
                    screenshot_url = shot
                elif isinstance(shot, dict):
                    # shot이 딕셔너리인 경우 (기존 코드)
                    screenshot_url = shot.get('path_full')
                else:
                    # 예상치 못한 형식인 경우 건너뜁니다.
                    continue
                # --------------------------------------------------------------------

                if screenshot_url:
                    if download_screenshot(screenshot_url, genre_dir, appid):
                        total_downloads += 1
                        current_count += 1
                        # 100개 다운로드마다 진행 상황을 출력합니다.
                        if total_downloads % 100 == 0:
                            print(f"--- 총 다운로드 수: {total_downloads}개 ---")
                        
            # 다운로드가 완료된 후, 혹시 모를 초과 상황을 대비하여 다시 파일 제한을 관리합니다.
            manage_file_limit(genre_dir, MAX_FILES_PER_GENRE)

    print("-" * 50)
    print(f"✅ 모든 게임 처리가 완료되었습니다. 총 {total_downloads}개의 파일이 저장되었습니다.")
    print(f"폴더 위치: {os.path.abspath(BASE_DOWNLOAD_DIR)}")
    print("-" * 50)

# 실행
process_game_data(FILE_PATH)

--------------------------------------------------
총 111452개의 게임을 처리합니다.
대상 태그: Adventure, RPG, Strategy, Simulation, Sports, Racing, Puzzle, Sandbox, Shooter, Survival, Fighting, Music
장르별 최대 파일 수: 15000개
--------------------------------------------------
[Galactic Bowling] - 10개 스크린샷, 매칭 태그: Sports
[Henosis™] - 7개 스크린샷, 매칭 태그: Survival, Adventure, Puzzle
[Two Weeks in Painland] - 24개 스크린샷, 매칭 태그: Adventure
[Wartune Reborn] - 12개 스크린샷, 매칭 태그: RPG, Strategy
[TD Worlds] - 12개 스크린샷, 매칭 태그: Strategy
[MazM: Jekyll and Hyde] - 5개 스크린샷, 매칭 태그: Adventure, Simulation, RPG, Strategy
--- 총 다운로드 수: 100개 ---
[Deadlings: Rotten Edition] - 10개 스크린샷, 매칭 태그: Adventure
[WARSAW] - 8개 스크린샷, 매칭 태그: RPG, Strategy
[Cthulhu Realms] - 6개 스크린샷, 매칭 태그: Strategy
[Clockwork Dungeon] - 5개 스크린샷, 매칭 태그: Puzzle
[Diary of Lucie] - 12개 스크린샷, 매칭 태그: Adventure, RPG, Strategy
[Alien Breed 3: Descent] - 10개 스크린샷, 매칭 태그: Shooter
[Hunting Unlimited 3] - 6개 스크린샷, 매칭 태그: Sports, Simulation
--- 총 다운로드 수: 200개 ---
[Hero of the K

KeyboardInterrupt: 

In [1]:
import json
import requests
import os
from urllib.parse import urlparse
import time
import random
import concurrent.futures
import pandas as pd
from typing import Set, List, Dict, Tuple, Any

# --- 설정값 ---
FILE_PATH = 'games.json'  # ⚠️ JSON 파일 경로를 실제 파일 이름으로 변경하세요.
BASE_DOWNLOAD_DIR = r'F:\ML\MUFi'

TARGET_GENRE = "Action"
MAX_FILES_PER_GENRE = 12000

TARGET_TAGS = [
    "Adventure", "Action", "RPG", "Strategy", "Simulation", "Sports", "Racing", 
    "Puzzle", "Sandbox", "Shooter", "Survival", "Fighting", "Music"
]
MAX_WORKERS = 30  # 💡 동시 다운로드 스레드 수 (네트워크 상황에 맞게 조정)
# --------------------
# ----------------

def clean_filename(url):
    """URL에서 안전한 파일 이름을 추출하고 확장자를 포함합니다."""
    path = urlparse(url).path
    filename = os.path.basename(path)
    # 쿼리 파라미터나 기타 불필요한 문자를 제거하고 안전하게 만듭니다.
    if '?' in filename:
        filename = filename.split('?')[0]
    return filename

def get_genre_file_count(genre_dir):
    """장르 폴더 내의 실제 파일 개수를 계산합니다."""
    if not os.path.exists(genre_dir):
        return 0
    # .DS_Store 같은 숨김 파일을 제외하고 파일 개수를 셉니다.
    return len([name for name in os.listdir(genre_dir) if os.path.isfile(os.path.join(genre_dir, name)) and not name.startswith('.')])

def manage_file_limit(genre_dir, max_limit):
    """폴더의 파일 수가 최대 한도를 초과할 경우 파일을 랜덤하게 삭제합니다."""
    current_count = get_genre_file_count(genre_dir)
    if current_count > max_limit:
        files_to_delete = current_count - max_limit
        all_files = [f for f in os.listdir(genre_dir) if os.path.isfile(os.path.join(genre_dir, f)) and not f.startswith('.')]
        
        # 삭제할 파일을 랜덤하게 선택
        files_to_remove = random.sample(all_files, files_to_delete)
        
        print(f"[{os.path.basename(genre_dir)}] ❗️ 파일 초과: {current_count}개. {files_to_delete}개 삭제 중...")
        
        for file in files_to_remove:
            try:
                os.remove(os.path.join(genre_dir, file))
            except OSError as e:
                print(f"파일 삭제 오류: {e}")

def download_screenshot(url, genre_folder, appid):
    """스크린샷을 다운로드하고 장르 폴더에 저장합니다."""
    
    # 파일명 생성: AppID와 원래 파일명을 조합하여 중복 방지 및 추적 용이
    original_filename = clean_filename(url)
    filename = f"{appid}_{original_filename}"
    save_path = os.path.join(genre_folder, filename)

    if os.path.exists(save_path):
        # 파일이 이미 존재하면 건너뜁니다.
        # print(f"파일이 이미 존재하여 건너뜁니다: {filename}")
        return True

    try:
        # 파일 다운로드 (스트리밍 사용)
        response = requests.get(url, stream=True, timeout=10)
        response.raise_for_status() # HTTP 오류가 발생하면 예외 발생

        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return True
    
    except requests.exceptions.RequestException as e:
        # print(f"다운로드 실패 ({url}): {e}") # 디버그 출력 최소화
        return False, filename

def create_tag_vector(matching_tags: Set[str]) -> List[int]:
    """
    매칭되는 태그를 TARGET_TAGS 순서에 따른 원-핫 벡터로 변환합니다.
    """
    vector = [0] * len(TARGET_TAGS)
    for i, target_tag in enumerate(TARGET_TAGS):
        if target_tag in matching_tags:
            vector[i] = 1
    return vector

# (download_screenshot, clean_filename, get_genre_file_count, manage_file_limit 함수는 그대로 유지)


def process_game_data(file_path):
    """JSON 파일을 읽고 지정된 태그별로 스크린샷을 병렬로 다운로드합니다."""
    # ... (파일 로드 및 초기 설정 부분은 그대로 유지) ...
    """JSON 파일을 읽고 지정된 태그별로 스크린샷을 다운로드합니다."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"❌ 오류: 파일을 찾을 수 없습니다. 경로를 확인해 주세요: {file_path}")
        return
    except json.JSONDecodeError:
        print("❌ 오류: JSON 파일 형식이 올바르지 않습니다.")
        return

    if TARGET_GENRE not in TARGET_TAGS:
        print(f"❌ 오류: TARGET_GENRE '{TARGET_GENRE}'가 TARGET_TAGS에 없습니다. 설정을 확인하세요.")
        return
    
    # 기본 다운로드 디렉토리 생성 및 장르별 폴더 관리 (기존 코드 유지)
    # 단일 장르 폴더 설정
    genre_dir = os.path.join(BASE_DOWNLOAD_DIR, TARGET_GENRE)
    os.makedirs(genre_dir, exist_ok=True)
    manage_file_limit(genre_dir, TARGET_MAX_FILES)
    
    current_count = get_genre_file_count(genre_dir)

    game_list = data.values() if isinstance(data, dict) else data
    total_downloads = 0
    
    # for tag in TARGET_TAGS:
    #     genre_path = os.path.join(BASE_DOWNLOAD_DIR, tag)
    #     os.makedirs(genre_path, exist_ok=True)
    #     genre_dirs[tag] = genre_path
    #     manage_file_limit(genre_path, MAX_FILES_PER_GENRE)

    # game_list = data.values() if isinstance(data, dict) else data
    # total_downloads = 0

    # 💡 CSV 데이터 저장을 위한 딕셔너리 (파일명 중복 방지)
    # {filename: tag_vector} 형태로 저장
    csv_data_map: Dict[str, List[int]] = {}
    
    print("-" * 50)
    print(f"단일 장르 크롤링 시작: {TARGET_GENRE}")
    print(f"목표 파일 수: {TARGET_MAX_FILES}개 (현재 {current_count}개)")
    print("-" * 50)

    # 🚀 ThreadPoolExecutor를 사용하여 병렬 처리 시작
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        
        # 다운로드할 모든 작업(Future) 리스트
        future_to_info = []
        
        for appid, game in data.items():
            if not isinstance(game, dict):
                continue
                
            # 태그 처리 로직 (이전 수정 코드 그대로 유지)
            tags_data = game.get('tags')
            if isinstance(tags_data, dict):
                game_tags = set(tags_data.keys())
            elif isinstance(tags_data, list):
                game_tags = set(tags_data)
            else:
                continue
                
            matching_tags = game_tags.intersection(TARGET_TAGS)
            if not matching_tags:
                continue
                
            screenshots = game.get('screenshots', [])
            if not screenshots:
                continue
                
            # print(f"[{game.get('name', appid)}] - {len(screenshots)}개 스크린샷, 매칭 태그: {', '.join(matching_tags)}") # 주석 처리하여 출력 속도 개선

            # 💡 현재 게임의 태그 벡터 생성 (게임에 적용된 모든 TARGET_TAGS 반영)
            tag_vector = create_tag_vector(matching_tags)
            
            # 매칭된 각 태그에 대해 스크린샷을 저장합니다.
            for tag in matching_tags:
                genre_dir = genre_dirs[tag]
                
                if get_genre_file_count(genre_dir) >= MAX_FILES_PER_GENRE:
                    continue

                # 스크린샷 URL 추출 및 다운로드 작업 제출
            for shot in screenshots:
                screenshot_url = None
                if isinstance(shot, str):
                    screenshot_url = shot
                elif isinstance(shot, dict):
                    screenshot_url = shot.get('path_full')
                
                if screenshot_url:
                    original_filename = clean_filename(screenshot_url)
                    filename = f"{appid}_{original_filename}"
                    
                    # 💡 파일명-벡터 쌍을 CSV 데이터 맵에 기록 (파일명 중복 방지)
                    # 이 파일이 가진 최종 태그 벡터를 여기에 기록합니다.
                    csv_data_map[filename] = tag_vector 

                    # ❗️ 파일은 매칭된 모든 폴더에 저장됩니다.
                    for tag in matching_tags:
                        genre_dir = genre_dirs[tag]
                        
                        # 해당 폴더의 파일 수 확인 후 다운로드 제출
                        if get_genre_file_count(genre_dir) < MAX_FILES_PER_GENRE:
                            future = executor.submit(download_screenshot, screenshot_url, genre_dir, appid)
                            # 파일명 정보는 CSV 생성 시 필요하므로 함께 전달합니다.
                            future_to_info.append((future, filename))
                            
        # 🚀 모든 작업이 완료되기를 기다리고 결과를 처리
        downloaded_files: Set[str] = set() # 실제로 다운로드에 성공한 파일명만 추적
        
        for future, filename in concurrent.futures.as_completed(future_to_info):
            try:
                # download_screenshot의 결과는 (성공 여부, 파일명) 입니다.
                is_success, completed_filename = future.result() 
                
                if is_success:
                    # 다운로드 성공 시, 중복 다운로드와 상관없이 파일명만 성공 셋에 기록
                    if filename not in downloaded_files:
                        total_downloads += 1
                        
                    downloaded_files.add(filename)
                
                if total_downloads % 500 == 0 and total_downloads > 0:
                    print(f"--- 현재까지 총 다운로드 성공 파일 수: {total_downloads}개 ---")
                    
            except Exception as exc:
                print(f"다운로드 작업 중 예외 발생: {exc}")
                
        # 최종적으로 파일 제한을 다시 확인하고 관리 (옵션)
        for tag in TARGET_TAGS:
             manage_file_limit(genre_dirs[tag], MAX_FILES_PER_GENRE)



        # --------------------------------------------------------
        # 💡 CSV 파일 생성 및 저장 (csv_data_map과 downloaded_files 사용)
        # --------------------------------------------------------
        if csv_data_map:
            # 실제로 다운로드 성공한 파일만 필터링하여 CSV로 기록
            records = []
            for filename, vector in csv_data_map.items():
                if filename in downloaded_files:
                    records.append({'filename': filename, 'tag_vector': vector})
    
            if records:
                df = pd.DataFrame(records)
                
                # 벡터를 각 태그별 컬럼으로 분리
                vector_df = pd.DataFrame(df['tag_vector'].tolist(), columns=TARGET_TAGS)
                
                # 최종 데이터프레임 결합
                df_final = pd.concat([df[['filename']], vector_df], axis=1)
                
                # CSV 파일 저장 경로
                csv_output_path = os.path.join(BASE_DOWNLOAD_DIR, 'screenshot_tag_vectors_unique.csv')
                df_final.to_csv(csv_output_path, index=False, encoding='utf-8')
                
                print("-" * 50)
                print(f"✅ 태그 벡터 CSV 파일이 성공적으로 저장되었습니다.")
                print(f"파일명: {os.path.basename(csv_output_path)}")
                print(f"저장된 레코드 수: {len(df_final)}개 (실제 다운로드 성공 파일 기준)")
            else:
                 print("❌ 실제로 다운로드 성공한 파일이 없어 CSV로 저장할 레코드가 없습니다.")
                 
        else:
            print("❌ CSV로 저장할 태그 데이터가 없습니다.")

    print("-" * 50)
    print(f"✅ 모든 게임 처리가 완료되었습니다. 총 {total_downloads}개의 파일이 저장되었습니다.")
    print("-" * 50)

# ⚠️ 파일 경로를 실제 JSON 파일 이름으로 변경하세요!
FILE_PATH = 'games.json' 
process_game_data(FILE_PATH)

--------------------------------------------------
총 111452개의 게임을 처리합니다.
대상 태그: Fighting, Music
장르별 최대 파일 수: 12000개
**동시 다운로드 스레드 수: 30개**
--------------------------------------------------


AttributeError: 'tuple' object has no attribute '_condition'

In [2]:
import json
import requests
import os
from urllib.parse import urlparse
import time
import random
import concurrent.futures
import pandas as pd
from typing import Set, List, Dict, Tuple, Any

# --- 설정값 ---
FILE_PATH = 'games.json'  # ⚠️ JSON 파일 경로를 실제 파일 이름으로 변경하세요.
BASE_DOWNLOAD_DIR = r'F:\ML\dataset'

TARGET_GENRE = "Survival"
TARGET_MAX_FILES = 32000

TARGET_TAGS = [
    "Adventure", "Action", "RPG", "Strategy", "Simulation", "Sports", "Racing", 
    "Puzzle", "Sandbox", "Shooter", "Survival", "Fighting", "Music"
]
MAX_WORKERS = 30  # 💡 동시 다운로드 스레드 수 (네트워크 상황에 맞게 조정)
# --------------------
# ----------------

def clean_filename(url):
    """URL에서 안전한 파일 이름을 추출하고 확장자를 포함합니다."""
    path = urlparse(url).path
    filename = os.path.basename(path)
    # 쿼리 파라미터나 기타 불필요한 문자를 제거하고 안전하게 만듭니다.
    if '?' in filename:
        filename = filename.split('?')[0]
    return filename

def get_genre_file_count(genre_dir):
    """장르 폴더 내의 실제 파일 개수를 계산합니다."""
    if not os.path.exists(genre_dir):
        return 0
    # .DS_Store 같은 숨김 파일을 제외하고 파일 개수를 셉니다.
    return len([name for name in os.listdir(genre_dir) if os.path.isfile(os.path.join(genre_dir, name)) and not name.startswith('.')])

def manage_file_limit(genre_dir, max_limit):
    """폴더의 파일 수가 최대 한도를 초과할 경우 파일을 랜덤하게 삭제합니다."""
    current_count = get_genre_file_count(genre_dir)
    if current_count > max_limit:
        files_to_delete = current_count - max_limit
        all_files = [f for f in os.listdir(genre_dir) if os.path.isfile(os.path.join(genre_dir, f)) and not f.startswith('.')]

        
        # 삭제할 파일을 랜덤하게 선택
        files_to_remove = random.sample(all_files, files_to_delete)
        
        print(f"[{os.path.basename(genre_dir)}] ❗️ 파일 초과: {current_count}개. {files_to_delete}개 삭제 중...")
        
        for file in files_to_remove:
            try:
                os.remove(os.path.join(genre_dir, file))
            except OSError as e:
                print(f"파일 삭제 오류: {e}")

def download_screenshot(url: str, genre_folder: str, appid: str) -> Tuple[bool, str]:
    """스크린샷을 다운로드하고 장르 폴더에 저장합니다. 성공 시 파일명을 반환합니다."""
    original_filename = clean_filename(url)
    filename = f"{appid}_{original_filename}"
    save_path = os.path.join(genre_folder, filename)

    if os.path.exists(save_path):
        # 💡 이미 파일이 존재하면 성공 (True)와 파일명을 함께 반환
        return True, filename

    try:
        response = requests.get(url, stream=True, timeout=10)
        response.raise_for_status()

        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        # 💡 다운로드 성공 시 (True)와 파일명을 함께 반환
        return True, filename
    
    except requests.exceptions.RequestException as e:
        # 💡 다운로드 실패 시 (False)와 파일명을 함께 반환
        return False, filename

def create_tag_vector(matching_tags: Set[str]) -> List[int]:
    """
    매칭되는 태그를 TARGET_TAGS 순서에 따른 원-핫 벡터로 변환합니다.
    """
    vector = [0] * len(TARGET_TAGS)
    for i, target_tag in enumerate(TARGET_TAGS):
        if target_tag in matching_tags:
            vector[i] = 1
    return vector

# (download_screenshot, clean_filename, get_genre_file_count, manage_file_limit 함수는 그대로 유지)


def process_game_data(file_path):
    """JSON 파일을 읽고 지정된 태그별로 스크린샷을 병렬로 다운로드합니다."""
    # ... (파일 로드 및 초기 설정 부분은 그대로 유지) ...
    """JSON 파일을 읽고 지정된 태그별로 스크린샷을 다운로드합니다."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"❌ 오류: 파일을 찾을 수 없습니다. 경로를 확인해 주세요: {file_path}")
        return
    except json.JSONDecodeError:
        print("❌ 오류: JSON 파일 형식이 올바르지 않습니다.")
        return

    if TARGET_GENRE not in TARGET_TAGS:
        print(f"❌ 오류: TARGET_GENRE '{TARGET_GENRE}'가 TARGET_TAGS에 없습니다. 설정을 확인하세요.")
        return
    
    # 기본 다운로드 디렉토리 생성 및 장르별 폴더 관리 (기존 코드 유지)
    # 단일 장르 폴더 설정
    genre_dir = os.path.join(BASE_DOWNLOAD_DIR, TARGET_GENRE)
    os.makedirs(genre_dir, exist_ok=True)
    manage_file_limit(genre_dir, TARGET_MAX_FILES)
    
    current_count = get_genre_file_count(genre_dir)

    game_list = data.values() if isinstance(data, dict) else data
    total_downloads = 0
    
    csv_data_map: Dict[str, List[int]] = {}
    
    print("-" * 50)
    print(f"단일 장르 크롤링 시작: {TARGET_GENRE}")
    print(f"목표 파일 수: {TARGET_MAX_FILES}개 (현재 {current_count}개)")
    print("-" * 50)

    # 🚀 ThreadPoolExecutor를 사용하여 병렬 처리 시작
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        
        future_to_filename: Dict[concurrent.futures.Future, str] = {}
        
        for appid, game in data.items():
            if current_count >= TARGET_MAX_FILES:
                break # 전체 게임 탐색 중 목표 수에 도달하면 중단

            if not isinstance(game, dict):
                continue
                
            # 태그 처리 로직
            tags_data = game.get('tags')
            if isinstance(tags_data, dict):
                game_tags = set(tags_data.keys())
            elif isinstance(tags_data, list):
                game_tags = set(tags_data)
            else:
                continue
                
            # ❗️ 단일 장르 매칭 확인
            if TARGET_GENRE not in game_tags:
                continue
                
            # 💡 전체 13개 태그와의 교집합으로 벡터 생성 (CSV 기록용)
            matching_tags = game_tags.intersection(TARGET_TAGS)
            tag_vector = create_tag_vector(matching_tags)

            screenshots = game.get('screenshots', [])
            if not screenshots:
                continue
                
            # 스크린샷 URL 추출 및 다운로드 작업 제출
            for shot in screenshots:
                if current_count >= TARGET_MAX_FILES:
                    break

                screenshot_url = None
                if isinstance(shot, str):
                    screenshot_url = shot
                elif isinstance(shot, dict):
                    screenshot_url = shot.get('path_full')
                
                if screenshot_url:
                    original_filename = clean_filename(screenshot_url)
                    filename = f"{appid}_{original_filename}"
                    
                    # 💡 파일명-벡터 쌍을 CSV 데이터 맵에 기록 (파일명 중복 방지)
                    csv_data_map[filename] = tag_vector 
    
                    # ❗️ 단일 장르 폴더에만 다운로드 작업 제출
                    future = executor.submit(download_screenshot, screenshot_url, genre_dir, appid)
                    
                    # 💡 수정: Future 객체와 파일명을 맵핑 딕셔너리에 추가
                    future_to_filename[future] = filename
                    
        # 🚀 모든 작업이 완료되기를 기다리고 결과를 처리
        downloaded_files: Set[str] = set() # 실제로 다운로드에 성공한 파일명만 추적
        
        # 💡 수정: future_to_filename.keys()를 사용하여 Future 객체만 전달
        for future in concurrent.futures.as_completed(future_to_filename.keys()):
            # 💡 수정: Future 객체로 파일명을 찾습니다.
            filename = future_to_filename[future] 
            
            try:
                # download_screenshot의 결과는 (성공 여부, 파일명) 입니다.
                is_success, completed_filename = future.result() 
                
                if is_success:
                    if filename not in downloaded_files:
                        total_downloads += 1
                        
                    downloaded_files.add(filename)
                
                if total_downloads % 100 == 0 and total_downloads > 0:
                    print(f"--- 현재까지 총 다운로드 성공 파일 수: {total_downloads}개 ---")
                    
            except Exception as exc:
                print(f"다운로드 작업 중 예외 발생: {exc}")
                
        # 최종적으로 파일 제한을 다시 확인하고 관리 (이전에 초과된 파일 정리)
        manage_file_limit(genre_dir, TARGET_MAX_FILES)

    # --------------------------------------------------------
    # 💡 CSV 파일 생성 및 저장
    # --------------------------------------------------------
    if csv_data_map:
        records = []
        # ❗️ 실제로 다운로드 성공한 파일만 필터링하여 CSV로 기록
        for filename, vector in csv_data_map.items():
            if filename in downloaded_files:
                records.append({'filename': filename, 'tag_vector': vector})

        if records:
            df = pd.DataFrame(records)
            vector_df = pd.DataFrame(df['tag_vector'].tolist(), columns=TARGET_TAGS)
            df_final = pd.concat([df[['filename']], vector_df], axis=1)
            
            # CSV 파일 저장 경로
            # 💡 수정된 부분: 파일명에 TARGET_GENRE를 포함시킵니다.
            csv_filename = f'screenshot_tag_vectors_{TARGET_GENRE}.csv'
            csv_output_path = os.path.join(BASE_DOWNLOAD_DIR, csv_filename)
            
            df_final.to_csv(csv_output_path, index=False, encoding='utf-8')
            
            print("-" * 50)
            print(f"✅ 태그 벡터 CSV 파일이 성공적으로 저장되었습니다.")
            print(f"저장된 레코드 수: {len(df_final)}개 (실제 다운로드 성공 파일 기준)")
        else:
             print("❌ 실제로 다운로드 성공한 파일이 없어 CSV로 저장할 레코드가 없습니다.")
             
    print("-" * 50)
    print(f"✅ 모든 처리가 완료되었습니다. 총 {total_downloads}개의 파일이 '{TARGET_GENRE}' 폴더에 저장되었습니다.")
    print("-" * 50)

process_game_data(FILE_PATH)

--------------------------------------------------
단일 장르 크롤링 시작: Survival
목표 파일 수: 32000개 (현재 0개)
--------------------------------------------------
--- 현재까지 총 다운로드 성공 파일 수: 100개 ---
--- 현재까지 총 다운로드 성공 파일 수: 200개 ---
--- 현재까지 총 다운로드 성공 파일 수: 300개 ---
--- 현재까지 총 다운로드 성공 파일 수: 400개 ---
--- 현재까지 총 다운로드 성공 파일 수: 500개 ---
--- 현재까지 총 다운로드 성공 파일 수: 600개 ---
--- 현재까지 총 다운로드 성공 파일 수: 700개 ---
--- 현재까지 총 다운로드 성공 파일 수: 800개 ---
--- 현재까지 총 다운로드 성공 파일 수: 900개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1000개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1100개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1200개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1300개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1400개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1500개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1600개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1700개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1800개 ---
--- 현재까지 총 다운로드 성공 파일 수: 1900개 ---
--- 현재까지 총 다운로드 성공 파일 수: 2000개 ---
--- 현재까지 총 다운로드 성공 파일 수: 2100개 ---
--- 현재까지 총 다운로드 성공 파일 수: 2200개 ---
--- 현재까지 총 다운로드 성공 파일 수: 2300개 ---
--- 현재까지 총 다운로드 성공 파일 수: 2400개 ---
--- 현재까지 총 다운로드 성공 파

In [None]:
#전처리 224*224

from PIL import Image, ImageFile
import os
import glob
import concurrent.futures
from typing import Tuple, List

# 💡 이 설정값들을 사용자의 환경에 맞게 반드시 수정하세요!
# -------------------------------------------------------------
BASE_DOWNLOAD_DIR = r'F:\ML\dataset'      # 👈 원본 장르 폴더들이 있는 기본 경로
RESIZED_DIR_NAME = 'Processed_224x224' # 👈 전처리된 파일들을 저장할 새 폴더 이름
TARGET_SIZE = (224, 224)              # 👈 목표 해상도 (224x224)
TARGET_TAGS: List[str] = [
    "Adventure", "Action", "RPG", "Strategy", "Simulation", "Sports", "Racing", 
    "Puzzle", "Sandbox", "Shooter", "Survival", "Fighting", "Music"
]
# -------------------------------------------------------------
# 파일이 너무 크거나 손상된 경우를 대비하여 파일 로드 한도 확장
ImageFile.LOAD_TRUNCATED_IMAGES = True


# 💡 병렬 처리 설정
MAX_PROCESSES = 4 # 사용 가능한 CPU 코어 수 사용 (최소 4개)


# 💡 새로운 헬퍼 함수: 단일 이미지 처리 및 저장
def process_single_image(input_path: str, output_path: str, target_size: Tuple[int, int]) -> bool:
    """단일 이미지를 전처리하고 저장하며 성공 여부를 반환합니다."""
    try:
        # 파일이 이미 전처리되어 있으면 건너뜁니다.
        if os.path.exists(output_path):
            return True

        # 이미지 로드 및 RGB 변환
        img = Image.open(input_path).convert('RGB')
        
        # 패딩을 적용하여 리사이징 (pad_to_square 함수는 이전 정의 사용)
        resized_img = pad_to_square(img, target_size)
        
        # JPEG 포맷으로 저장
        resized_img.save(output_path, format='JPEG', quality=90)
        
        return True
    
    except Exception as e:
        # print(f"  [오류] 파일 처리 실패 ({os.path.basename(input_path)}): {e}")
        return False

def pad_to_square(img: Image.Image, target_size: Tuple[int, int]) -> Image.Image:
    """종횡비를 유지하면서 이미지를 정사각형으로 만들고 빈 공간을 검은색(0,0,0)으로 채웁니다."""
    
    width, height = img.size
    target_w, target_h = target_size
    
    # 목표 크기 중 작은 쪽을 기준으로 비율 계산 (축소 비율 계산)
    ratio = min(target_w / width, target_h / height)
    new_w = int(width * ratio)
    new_h = int(height * ratio)

    # 1. 축소 리사이징
    # Image.Resampling.LANCZOS는 고품질 축소에 사용됩니다.
    img = img.resize((new_w, new_h), Image.Resampling.LANCZOS) 

    # 2. 목표 크기의 검은색 캔버스 생성
    new_img = Image.new('RGB', target_size, (0, 0, 0)) 
    
    # 3. 리사이징된 이미지를 캔버스 중앙에 붙여넣기
    left = (target_w - new_w) // 2
    top = (target_h - new_h) // 2
    new_img.paste(img, (left, top))
    
    return new_img


def preprocess_images_parallel():
    """
    원본 경로의 모든 장르 폴더를 순회하며 224x224로 병렬 전처리 후 새 경로에 저장합니다.
    """
    output_base_dir = os.path.join(BASE_DOWNLOAD_DIR, RESIZED_DIR_NAME)
    os.makedirs(output_base_dir, exist_ok=True)
    
    total_processed = 0
    all_tasks = [] # 모든 이미지 처리 작업을 담을 리스트

    print(f"이미지 전처리 시작. 병렬 프로세스: {MAX_PROCESSES}개")
    print("-" * 50)

    # 1. 모든 작업(Task) 수집
    for tag in TARGET_TAGS:
        input_genre_dir = os.path.join(BASE_DOWNLOAD_DIR, tag)
        output_genre_dir = os.path.join(output_base_dir, tag)
        
        if not os.path.isdir(input_genre_dir):
            continue
            
        os.makedirs(output_genre_dir, exist_ok=True)
        
        image_files = glob.glob(os.path.join(input_genre_dir, '*.[pj][np]g'))
        
        for input_path in image_files:
            filename = os.path.basename(input_path)
            output_path = os.path.join(output_genre_dir, filename)
            
            # (input_path, output_path, TARGET_SIZE) 튜플을 작업 리스트에 추가
            all_tasks.append((input_path, output_path, TARGET_SIZE))

    print(f"총 {len(all_tasks)}개의 전처리 작업을 등록했습니다. 처리 시작...")
    
    # 2. ProcessPoolExecutor를 사용하여 병렬 처리 실행
    with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
        
        # executor.map을 사용하여 모든 작업을 병렬로 실행
        results = executor.map(process_single_image, 
                               [t[0] for t in all_tasks], # input_path 리스트
                               [t[1] for t in all_tasks], # output_path 리스트
                               [t[2] for t in all_tasks]) # target_size 리스트 (전부 동일)
        
        # 3. 결과 집계
        for success in results:
            if success:
                total_processed += 1
            
            # 진행 상황 출력 (10000개마다)
            if total_processed % 10000 == 0 and total_processed > 0:
                print(f"--- {total_processed}개 이미지 처리 완료 ---")


    print("-" * 50)
    print(f"✅ 모든 전처리가 완료되었습니다. 총 {total_processed}개의 파일이 저장되었습니다.")
    print(f"최종 데이터 경로: {output_base_dir}")
    print("-" * 50)

# 전처리 함수 실행
if __name__ == "__main__":
    preprocess_images_parallel()

이미지 전처리 시작. 병렬 프로세스: 4개
--------------------------------------------------
총 442341개의 전처리 작업을 등록했습니다. 처리 시작...
