In [3]:
import os
import glob
import numpy as np
import pandas as pd
import sys
import re
import ast

In [None]:
# --- 사용자 설정 (하이퍼파라미터) ---
DATA_PATH = '/dataset/usr012/jmpark/Moudlar_data/data'
TRAIN_OUTPUT_PATH = '/dataset/usr012/jmpark/Modular_train'
EXPERT_OUTPUT_PATH = '/dataset/usr012/jmpark/Modular_train/expert'

# 500만 스텝 중 저장할 스텝 수
MAX_STEPS_TO_SAVE = 3_000_000
MAX_WORKERS = max(1, os.cpu_count() - 10)
# ---------------------------------

In [None]:
def process_csv_to_npz(file_path, output_dir, max_steps, file_type):
    """
    단일 .csv.gz 파일을 읽어 지정된 포맷의 .npz 파일로 변환합니다.
    file_type에 따라 다른 이름 정규화 규칙을 적용합니다.
    """
    try:
        filename = os.path.basename(file_path)
        
        # --- [수정 2] : file_type에 따라 다른 패턴 적용 ---
        if file_type == 'train':
            # Train 파일: _날짜_시간_train 을 통째로 제거
            pattern = r'_\d{8}_\d{6}_train'
            cleaned_filename = re.sub(pattern, '', filename)
        elif file_type == 'expert':
            # Expert 파일: _날짜_시간 만 제거 (expert는 남김)
            pattern = r'_\d{8}_\d{6}'
            cleaned_filename = re.sub(pattern, '', filename)
        else:
            # 예외 처리
            cleaned_filename = filename
            
        npz_filename = cleaned_filename.replace('.csv.gz', '.npz')
        # --- [여기까지 수정] ---
        
        output_filepath = os.path.join(output_dir, npz_filename)
        
        print(f"[{filename}] 처리 시작...")
        
        # 1. CSV 로드
        converters = {
            'state': ast.literal_eval,
            'action': ast.literal_eval,
            'next_state': ast.literal_eval
        }
        use_cols = ['state', 'action', 'next_state', 'reward', 'done_code']
        
        try:
            df = pd.read_csv(
                file_path, 
                compression='gzip',
                nrows=max_steps,
                usecols=use_cols,
                converters=converters
            )
        except Exception as e:
            print(f"  [오류] {filename} 파일 읽기 실패: {e}", file=sys.stderr)
            return f"{filename} (실패)"

        # 2. 차원 정보 추출
        try:
            state_dim = len(df['state'].iloc[0])
            action_dim = len(df['action'].iloc[0])
            next_state_dim = len(df['next_state'].iloc[0])
            
            if state_dim != next_state_dim:
                print(f"  [오류] {filename}의 state({state_dim})와 "
                      f"next_state({next_state_dim}) 차원이 다릅니다.", file=sys.stderr)
                return f"{filename} (실패)"
                
        except (IndexError, TypeError) as e:
            print(f"  [오류] {filename}의 state/action 차원 파악 실패: {e}", file=sys.stderr)
            return f"{filename} (실패)"
            
        # 3. transitions 배열 생성
        s_arr = np.array(df['state'].to_list(), dtype=np.float32)
        a_arr = np.array(df['action'].to_list(), dtype=np.float32)
        ns_arr = np.array(df['next_state'].to_list(), dtype=np.float32)
        r_arr = df['reward'].to_numpy(dtype=np.float32).reshape(-1, 1)
        d_arr = df['done_code'].to_numpy(dtype=np.float32).reshape(-1, 1)
        transitions = np.hstack((s_arr, a_arr, ns_arr, r_arr, d_arr))

        # 4. NPZ 파일로 압축 저장
        np.savez_compressed(
            output_filepath,
            transitions=transitions,
            state_dim=state_dim,
            action_dim=action_dim
        )
        
        print(f"  -> {output_filepath} 저장 완료. (shape={transitions.shape}, s_dim={state_dim}, a_dim={action_dim})")
        return f"{filename} (성공)"

    except Exception as e:
        print(f"  [알 수 없는 오류] {filename} 처리 중 실패: {e}", file=sys.stderr)
        return f"{filename} (실패)"

In [None]:
def main():
    # 1. 출력 디렉토리 생성 (존재하면 무시)
    os.makedirs(TRAIN_OUTPUT_PATH, exist_ok=True)
    os.makedirs(EXPERT_OUTPUT_PATH, exist_ok=True)

    # 2. train 파일 변환
    train_files = glob.glob(os.path.join(DATA_PATH, '*train.csv.gz'))
    if not train_files:
        print(f"경고: '{DATA_PATH}'에서 *train.csv.gz 파일을 찾을 수 없습니다.")
    else:
        print(f"\n--- 총 {len(train_files)}개의 Train 파일 변환 시작 ---")
        for file in sorted(train_files):
            process_csv_to_npz(file, TRAIN_OUTPUT_PATH, MAX_STEPS_TO_SAVE)

    # 3. expert 파일 변환
    expert_files = glob.glob(os.path.join(DATA_PATH, '*expert.csv.gz'))
    if not expert_files:
        print(f"경고: '{DATA_PATH}'에서 *expert.csv.gz 파일을 찾을 수 없습니다.")
    else:
        print(f"\n--- 총 {len(expert_files)}개의 Expert 파일 변환 시작 ---")
        for file in sorted(expert_files):
            process_csv_to_npz(file, EXPERT_OUTPUT_PATH, MAX_STEPS_TO_SAVE)

    print("\n--- 모든 작업 완료 ---")

if __name__ == "__main__":
    main()

In [None]:
def main():
    os.makedirs(TRAIN_OUTPUT_PATH, exist_ok=True)
    os.makedirs(EXPERT_OUTPUT_PATH, exist_ok=True)

    # 1. 처리할 파일 목록을 미리 준비합니다.
    train_files = glob.glob(os.path.join(DATA_PATH, '*train.csv.gz'))
    expert_files = glob.glob(os.path.join(DATA_PATH, '*expert.csv.gz'))
    
    total_files = len(train_files) + len(expert_files)
    if total_files == 0:
        print(f"경고: '{DATA_PATH}'에서 변환할 *.csv.gz 파일을 찾을 수 없습니다.")
        return
        
    print(f"총 {total_files}개의 파일 변환을 시작합니다. (최대 {MAX_WORKERS}개 코어 사용)")

    # 2. ProcessPoolExecutor를 생성합니다.
    with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
        
        futures = [] # 작업 목록을 저장할 리스트
        
        # 3. train 파일 작업을 큐에 추가 (submit)
        print(f"\n--- {len(train_files)}개의 Train 파일 작업 제출 ---")
        for file in sorted(train_files):
            # ★★★ train 파일은 TRAIN_OUTPUT_PATH 로 보냅니다.
            futures.append(
                executor.submit(process_csv_to_npz, file, TRAIN_OUTPUT_PATH, MAX_STEPS_TO_SAVE)
            )

        # 4. expert 파일 작업을 큐에 추가 (submit)
        print(f"\n--- {len(expert_files)}개의 Expert 파일 작업 제출 ---")
        for file in sorted(expert_files):
            # ★★★ expert 파일은 EXPERT_OUTPUT_PATH 로 보냅니다.
            futures.append(
                executor.submit(process_csv_to_npz, file, EXPERT_OUTPUT_PATH, MAX_STEPS_TO_SAVE)
            )

        # 5. 작업이 완료될 때마다 결과 확인
        print("\n--- 작업 처리 시작 (로그가 섞여서 나올 수 있습니다) ---")
        success_count = 0
        fail_count = 0
        
        for future in concurrent.futures.as_completed(futures):
            result = future.result() 
            if "(성공)" in result:
                success_count += 1
            else:
                fail_count += 1
        
        print(f"\n--- 모든 작업 완료 (총 {total_files}개 중 {success_count}개 성공, {fail_count}개 실패) ---")

if __name__ == "__main__":
    main()