In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import json
from collections import defaultdict
from typing import List, Dict, Set, Optional, Tuple

def load_table_index(table_index_path):
    """
    table_index.jsonl 파일을 로드하여 DB별 테이블 정보 생성

    Args:
        table_index_path: table_index.jsonl 파일 경로

    Returns:
        db_tables: {db_id: set(table_names_lower)} 형태의 딕셔너리
        table_info: 전체 테이블 정보 리스트
    """
    db_tables = defaultdict(set)
    table_info = []

    with open(table_index_path, 'r', encoding='utf-8') as f:
        for line in f:
            data = json.loads(line.strip())
            db_id = data['db_id']

            # table_name_db를 소문자로 저장 (실제 DB 테이블 이름)
            table_name = data['table_name_db'].lower()
            db_tables[db_id].add(table_name)

            table_info.append(data)

    return db_tables, table_info


def infer_db_id_from_tables(table_names: List[str], db_tables: Dict[str, Set[str]]) -> Optional[str]:
    """
    MMQA 샘플의 table_names를 보고 Spider db_id를 추정하는 함수.

    매칭 전략:
    1단계: table_names가 모두 포함되는 db_id 후보를 찾는다.
           - 후보가 1개면 그것을 사용
           - 후보가 여러 개면, 테이블 개수가 작은 DB를 선택 (더 작은 스키마 우선)

    2단계(백업): 1단계에서 후보가 없으면,
           table_names와 가장 많이 겹치는(overlap) DB를 선택
           overlap가 0이면 None 반환

    Args:
        table_names: 찾을 테이블 이름 리스트
        db_tables: {db_id: set(table_names)} 형태의 딕셔너리

    Returns:
        해당하는 db_id 또는 None
    """
    if not table_names:
        return None

    # 입력 테이블 이름들을 소문자로 변환
    target_tables = {t.lower() for t in table_names}

    # 1단계: 완전 포함하는 DB 찾기 (target ⊆ db_tables)
    full_match_candidates = []
    for db_id, tables in db_tables.items():
        if target_tables.issubset(tables):
            # (db_id, 해당 DB의 테이블 수)
            full_match_candidates.append((db_id, len(tables)))

    if full_match_candidates:
        # 테이블 수가 적은 DB를 우선 선택 (더 작은 스키마)
        full_match_candidates.sort(key=lambda x: x[1])
        return full_match_candidates[0][0]

    # 2단계: overlap가 가장 큰 DB 선택
    best_db = None
    best_overlap = 0

    for db_id, tables in db_tables.items():
        overlap = len(target_tables & tables)
        if overlap > best_overlap:
            best_overlap = overlap
            best_db = db_id

    # overlap가 0이면 None 반환
    if best_db is None or best_overlap == 0:
        return None

    return best_db


def load_mmqa_data(path: str) -> List[Dict]:
    """
    MMQA JSON 파일 로더
    - 전체가 하나의 JSON 리스트일 수도 있고
    - jsonl 형식(줄마다 JSON object)일 수도 있으니 둘 다 처리
    """
    with open(path, 'r', encoding='utf-8') as f:
        text = f.read().strip()

    # 먼저 전체를 JSON으로 파싱 시도
    try:
        data = json.loads(text)
        if isinstance(data, dict):
            # dict 한 개로 감싸져 있으면 values() 사용
            data = list(data.values())
        if not isinstance(data, list):
            raise ValueError("JSON is not a list")
        return data
    except Exception:
        # 안 되면 jsonl 형식이라고 가정
        data = []
        for line in text.splitlines():
            line = line.strip()
            if not line:
                continue
            data.append(json.loads(line))
        return data


def preprocess_mmqa_data(input_json_path: str, table_index_path: str, output_json_path: str):
    """
    MMQA JSON 파일을 전처리하여 새로운 형식으로 저장합니다.

    Args:
        input_json_path: 입력 MMQA JSON 파일 경로
        table_index_path: table_index.jsonl 파일 경로
        output_json_path: 출력 JSON 파일 경로
    """
    print("=" * 70)
    print("MMQA 데이터 전처리 시작")
    print("=" * 70)

    # table_index.jsonl 로딩
    print("\n[1/4] table_index.jsonl 로딩 중...")
    db_tables, table_info = load_table_index(table_index_path)
    print(f"  ✓ 총 {len(db_tables)}개의 데이터베이스")
    print(f"  ✓ 총 {len(table_info)}개의 테이블")

    # MMQA 데이터 로딩
    print("\n[2/4] MMQA 데이터 로딩 중...")
    data = load_mmqa_data(input_json_path)
    print(f"  ✓ 총 {len(data)}개 항목 로드 완료")

    # 데이터 전처리
    print("\n[3/4] DB ID 매칭 및 전처리 중...")
    processed_data = []
    db_id_stats = defaultdict(int)
    unmatched_items = []
    match_type_stats = defaultdict(int)  # 매칭 타입 통계

    for idx, item in enumerate(data):
        table_names = item.get('table_names', [])

        # DB ID 추론
        db_id = infer_db_id_from_tables(table_names, db_tables)

        if db_id:
            db_id_stats[db_id] += 1

            # 매칭 타입 확인 (완전 매칭 vs 부분 매칭)
            target_tables = {t.lower() for t in table_names}
            if target_tables.issubset(db_tables[db_id]):
                match_type_stats['full_match'] += 1
            else:
                match_type_stats['partial_match'] += 1
        else:
            unmatched_items.append({
                'id': item.get('id_', idx),
                'table_names': table_names,
                'question': item.get('Question', '')[:100]  # 질문 일부만 저장
            })

        # 전처리된 데이터 구조 생성
        processed_item = {
            'id': item.get('id_', idx),
            'db_id': db_id,
            'question': item.get('Question', ''),
            'sql': item.get('SQL', ''),
            'table_names': table_names
        }

        processed_data.append(processed_item)

        # 진행상황 출력
        if (idx + 1) % 100 == 0:
            print(f"  진행: {idx + 1}/{len(data)} ({(idx+1)/len(data)*100:.1f}%)")

    # 결과 파일 저장
    print(f"\n[4/4] 결과 파일 저장 중...")
    with open(output_json_path, 'w', encoding='utf-8') as f:
        json.dump(processed_data, f, ensure_ascii=False, indent=2)
    print(f"  ✓ 저장 완료: {output_json_path}")

    # 통계 정보 출력
    print("\n" + "=" * 70)
    print("전처리 완료 - 통계 정보")
    print("=" * 70)

    db_ids_found = sum(1 for item in processed_data if item['db_id'] is not None)
    print(f"\n✓ 총 처리 항목: {len(processed_data)}")
    print(f"✓ DB ID 매칭 성공: {db_ids_found}/{len(processed_data)} ({db_ids_found/len(processed_data)*100:.1f}%)")
    print(f"✓ DB ID 매칭 실패: {len(unmatched_items)} ({len(unmatched_items)/len(processed_data)*100:.1f}%)")

    # 매칭 타입 통계
    if match_type_stats:
        print(f"\n[매칭 타입 분석]")
        print(f"  - 완전 매칭 (모든 테이블 포함): {match_type_stats.get('full_match', 0)}")
        print(f"  - 부분 매칭 (일부 테이블만): {match_type_stats.get('partial_match', 0)}")

    # DB별 분포
    if db_id_stats:
        print(f"\n[DB별 분포 - 상위 15개]")
        sorted_dbs = sorted(db_id_stats.items(), key=lambda x: x[1], reverse=True)[:15]
        for rank, (db_id, count) in enumerate(sorted_dbs, 1):
            percentage = count / len(processed_data) * 100
            print(f"  {rank:2d}. {db_id:25s}: {count:4d}개 ({percentage:5.1f}%)")

    # 매칭 실패 항목 상세
    if unmatched_items:
        print(f"\n[매칭 실패 항목 분석 - 최대 10개]")
        for i, item in enumerate(unmatched_items[:10], 1):
            print(f"\n  [{i}] ID: {item['id']}")
            print(f"      테이블: {item['table_names']}")
            print(f"      질문: {item['question']}...")

            # 가능한 후보 DB 찾기 (부분 매칭)
            target = {t.lower() for t in item['table_names']}
            candidates = []
            for db_id, tables in db_tables.items():
                overlap = len(target & tables)
                if overlap > 0:
                    candidates.append((db_id, overlap, len(target)))

            if candidates:
                candidates.sort(key=lambda x: x[1], reverse=True)
                best = candidates[0]
                print(f"      가장 유사한 DB: {best[0]} (매칭: {best[1]}/{best[2]} 테이블)")

    print("\n" + "=" * 70)

    return processed_data


# Google Colab 실행 예제
if __name__ == "__main__":
    # 파일 경로 설정
    input_json_path = '/content/drive/MyDrive/ai_intensive2/Synthesized_two_table.json'  # 입력 MMQA 파일
    table_index_path = '/content/drive/MyDrive/ai_intensive2/spider_data/preprocessed/table_index.jsonl'  # table_index.jsonl 파일
    output_json_path = '/content/drive/MyDrive/ai_intensive2/mmqa2.json'  # 출력 파일

    # 전처리 실행
    processed_data = preprocess_mmqa_data(
        input_json_path,
        table_index_path,
        output_json_path
    )

    # 샘플 데이터 출력
    print("\n처리된 데이터 샘플 (처음 3개):")
    print("=" * 70)
    for i in range(min(3, len(processed_data))):
        print(f"\n[샘플 {i+1}]")
        sample = processed_data[i]
        print(f"ID: {sample['id']}")
        print(f"DB: {sample['db_id']}")
        print(f"Question: {sample['question'][:80]}...")
        print(f"Tables: {sample['table_names']}")
        print(f"SQL: {sample['sql'][:80]}...")

MMQA 데이터 전처리 시작

[1/4] table_index.jsonl 로딩 중...
  ✓ 총 166개의 데이터베이스
  ✓ 총 876개의 테이블

[2/4] MMQA 데이터 로딩 중...
  ✓ 총 2592개 항목 로드 완료

[3/4] DB ID 매칭 및 전처리 중...
  진행: 100/2592 (3.9%)
  진행: 200/2592 (7.7%)
  진행: 300/2592 (11.6%)
  진행: 400/2592 (15.4%)
  진행: 500/2592 (19.3%)
  진행: 600/2592 (23.1%)
  진행: 700/2592 (27.0%)
  진행: 800/2592 (30.9%)
  진행: 900/2592 (34.7%)
  진행: 1000/2592 (38.6%)
  진행: 1100/2592 (42.4%)
  진행: 1200/2592 (46.3%)
  진행: 1300/2592 (50.2%)
  진행: 1400/2592 (54.0%)
  진행: 1500/2592 (57.9%)
  진행: 1600/2592 (61.7%)
  진행: 1700/2592 (65.6%)
  진행: 1800/2592 (69.4%)
  진행: 1900/2592 (73.3%)
  진행: 2000/2592 (77.2%)
  진행: 2100/2592 (81.0%)
  진행: 2200/2592 (84.9%)
  진행: 2300/2592 (88.7%)
  진행: 2400/2592 (92.6%)
  진행: 2500/2592 (96.5%)

[4/4] 결과 파일 저장 중...
  ✓ 저장 완료: /content/drive/MyDrive/ai_intensive2/mmqa2.json

전처리 완료 - 통계 정보

✓ 총 처리 항목: 2592
✓ DB ID 매칭 성공: 2592/2592 (100.0%)
✓ DB ID 매칭 실패: 0 (0.0%)

[매칭 타입 분석]
  - 완전 매칭 (모든 테이블 포함): 2592
  - 부분 매칭 (일부 테이블만): 0

[DB별 분포 - 상위 15개]
  

---
### train, test 분리

In [2]:

import json
import os
import random

BASE_DIR = "/content/drive/MyDrive/ai_intensive2"

def split_mmqa_json(
    filename: str,
    train_ratio: float = 0.2,
    seed: int = 42,
):
    """
    MMQA json 파일을 읽어서
      - db_id null/빈 값 개수 출력
      - train_ratio 비율만큼 train, 나머지 test로 분리
      - *_train.json, *_test.json 으로 저장
    """
    path = os.path.join(BASE_DIR, filename)
    print("=" * 70)
    print(f"[INFO] Processing file: {path}")

    # 1) JSON 로드
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)

    total = len(data)
    print(f"[INFO] Total samples: {total}")

    # 2) db_id가 None / 빈 문자열 / 키 없음인 샘플 체크
    missing_dbid = [
        ex for ex in data
        if ("db_id" not in ex) or (ex["db_id"] is None) or (ex["db_id"] == "")
    ]
    print(f"[CHECK] #samples with missing/empty db_id: {len(missing_dbid)}")

    # 일부 예시 출력 (id가 있으면 id만, 없으면 앞부분)
    if missing_dbid:
        print("[CHECK] Examples with missing db_id (up to 5):")
        for ex in missing_dbid[:5]:
            print("  - id:", ex.get("id"), "| db_id:", ex.get("db_id"))

    # 3) 랜덤 셔플 + 20% / 80% split
    rng = random.Random(seed)
    rng.shuffle(data)

    n_train = int(len(data) * train_ratio)
    train_data = data[:n_train]
    test_data  = data[n_train:]

    print(f"[SPLIT] train: {len(train_data)}  ({train_ratio*100:.1f}%)")
    print(f"[SPLIT] test : {len(test_data)}  ({(1-train_ratio)*100:.1f}%)")

    # 4) 저장 경로 생성
    name, ext = os.path.splitext(filename)  # ex) mmqa2, .json
    train_path = os.path.join(BASE_DIR, f"{name}_train.json")
    test_path  = os.path.join(BASE_DIR, f"{name}_test.json")

    with open(train_path, "w", encoding="utf-8") as f:
        json.dump(train_data, f, ensure_ascii=False, indent=2)

    with open(test_path, "w", encoding="utf-8") as f:
        json.dump(test_data, f, ensure_ascii=False, indent=2)

    print(f"[SAVE] Train saved to: {train_path}")
    print(f"[SAVE] Test  saved to: {test_path}")
    print()


# 실제 실행: mmqa2, mmqa3 각각 20% train / 80% test
split_mmqa_json("mmqa2.json", train_ratio=0.2, seed=42)
split_mmqa_json("mmqa3.json", train_ratio=0.2, seed=42)


[INFO] Processing file: /content/drive/MyDrive/ai_intensive2/mmqa2.json
[INFO] Total samples: 2592
[CHECK] #samples with missing/empty db_id: 0
[SPLIT] train: 518  (20.0%)
[SPLIT] test : 2074  (80.0%)
[SAVE] Train saved to: /content/drive/MyDrive/ai_intensive2/mmqa2_train.json
[SAVE] Test  saved to: /content/drive/MyDrive/ai_intensive2/mmqa2_test.json

[INFO] Processing file: /content/drive/MyDrive/ai_intensive2/mmqa3.json
[INFO] Total samples: 721
[CHECK] #samples with missing/empty db_id: 0
[SPLIT] train: 144  (20.0%)
[SPLIT] test : 577  (80.0%)
[SAVE] Train saved to: /content/drive/MyDrive/ai_intensive2/mmqa3_train.json
[SAVE] Test  saved to: /content/drive/MyDrive/ai_intensive2/mmqa3_test.json

