# 00. Wikidata Extract

이 노트북은 Wikidata의 `latest-all.json.gz` 파일로부터 필요한 데이터를 추출하는 단계입니다.

## 주요 단계
1. JSON.gz line-by-line 스트리밍 파싱으로 추출
2. Subjects / Properties / Triple 추출 (id 및 언어별 label + (sbj-prop-obj) 관계 추출)
3. Chunk 단위로 Parquet 저장 (qid_labels_0001.parquet 등)
4. 추출된 데이터는 `../00.data/00.wikidata/00.wikidata_templates/`에 저장

## Wikidata 구조 (Subject - Property - Object)

| 용어        | 의미                                                   |
| --------- | ---------------------------------------------------- |
| entity    | Subject(QID) 또는 Property(PID)                         |
| claim     | property별 statement 묶음                                |
| statement | 지식의 한 줄 (property + value + qualifiers + references) |
| mainsnak  | statement의 핵심 값 (property + datavalue)                |
| qualifier | statement의 조건                                         |
| reference | 출처                                                    |
| datavalue | object에 해당                                            |



Entity  
├── id  한국
├── type  
├── labels  
├── descriptions  
├── aliases  
├── claims  
│   ├── P01 (각 속성 ID)  (~의 속함)
│   │   ├── mainsnak  
│   │   │   ├── snaktype  
│   │   │   ├── property  
│   │   │   ├── datavalue  
│   │   │   │   ├── value  -> qid (아시아)
│   │   │   │   └── type  
│   │   │   └── datatype   -> wikibase-entityid
│   │   ├── type  
│   │   ├── rank  
│   │   ├── qualifiers  
│   │   ├── qualifiers-order  
│   │   ├── references  
│   │   └── id  
│   └── P02  
├── sitelinks  
│   ├── enwiki  
│   │   ├── site  
│   │   ├── title  
│   │   └── badges  
│   ├── frwiki  
│   └── ...  
└── lastrevid  


In [None]:
import os
import re
import time
import gzip
import glob
import uuid
import psutil
import shutil
import logging
import multiprocessing as mp
import pandas as pd
import orjson 
from tqdm.auto import tqdm


In [None]:
# 경로 선언
DATA_DIR = '../00.data/00.wikidata/latest-all.json.gz'                  # Wikidata 원본 덤프
SAVE_DIR = '../00.data/00.wikidata/00.wikidata_extract/'              # 최종본 Parquet 저장 위치
TEMP_DIR = '../00.data/00.wikidata/00.wikidata_extract/_tmp'          # 임시 분할본 저장 위치
os.makedirs(SAVE_DIR, exist_ok = True)
os.makedirs(TEMP_DIR, exist_ok = True)

# 사용 언어 8개
LANGUAGE_LIST = ['en', 'fr', 'de', 'es', 'it', 'pt', 'ko', 'ja']

# 병렬 세팅
BATCH = 200000                          # 각 Worker 당 Entity line 수
ROW_GROUP_ROWS = 200000                 # Parquet 저장 시 group row 수
N_WORKERS = max(2, mp.cpu_count() - 2) # 사용할 Worker 수 (CPU 코어 개수에 따라)
CHECK_EVERY = 10                       # Batch -> N개 마다 RAM 사용량 체크
MAX_RAM_GB = 900                       # 최대 사용 RAM (현재 서버 max 1TB)

# extract_entity 함수 
## json 파싱 -> line 1 줄 (== wikidata entity 문서 1개) 씩 처리

1. 입력된 entity == line 의 id 체크  
   --> QID or PID 구분해서 처리
2. QID 처리 (QID == Subject, QSUB = Object)  
   QID 의 언어별 LABEL과 Q-P-Q 의 triples 추출 및 저장  
   (datavalue.type == wikidata-entityid 일 때 (즉, Object Entity (QSUB) 가 존재하는 경우만 triples 에 추가)
4. PID 처리 (PID == Property)
   PID 의 언어별 LABEL 추출 및 저장

In [None]:
# Entity 문서의 LABEL 중 사용할 Language 부분만 넘겨주는 함수 (아래의 Extract 함수 초반 부분에 바로 사용됨)
def get_labels(labels_dict):
    return {language: labels_dict.get(language, {}).get('value', "I Don't Know!!") for language in LANGUAGE_LIST}

In [None]:
def extract_entity(entity):
    entity_id = entity.get('id') # 입력된 line 의 Entity ID 부터 파악
    
    if not entity_id or not isinstance(entity_id, str): # 파악 불가 시 None 반환 하고 종료 (매 Line 마다 체크용)
        return None, None, None
    
    labels_dict = entity.get('labels', {}) # 현재 입력된 Entity 문서(QID 또는 PID)의 LABEL 딕셔너리

    # QID: 즉, Subject 문서에서 LABELS 와 TRIPLES 추출하기
    if entity_id.startswith('Q'): 
        # subject 라벨 구성
        # 형태: {'subject': 'Qxxx', 'en': '...', 'ko': '...'}
        subject_id = entity_id 
        subject_labels = {'subject':subject_id, **get_labels(labels_dict)}

        # Q-P-Q 관계 추출하기 (json 구조는 맨 위 마크다운 클릭해서 참고 바람)
        triples = []

        # claims 에 포함된 Property ID, Object ID (QSUB) 파악하기 
        # entity.get('claims') -> claims -> (property, statements) 로 구성됨
        claims = entity.get('claims', {})
        for property_id, statements in claims.items():
            if not property_id.startswith('P'):  continue # PID 존재 확인
            if not isinstance(statements, list): continue

            # Property 정보인 Statements 를 타고 내려가면서 연결된 Object ID 를 찾아냄 
            for statement in statements:
                mainsnak = statement.get('mainsnak', {})
                if not mainsnak.get('snaktype') == 'value': continue
                
                datavalue = mainsnak.get('datavalue', {})
                if not datavalue.get('type') == 'wikibase-entityid':continue
                
                value = datavalue.get('value')
                if not value: continue
                
                object_id = value.get('id') 

                # 간혹 value.get('id') 에서 못 찾으면 numeric-id 로 찾아서 직접 작성해서 넣기
                if not object_id and value.get('entity-type') == 'item':
                    numeric_id = value.get('numeric-id')
                    if isinstance(numeric_id, int): object_id = f"Q{numeric_id}"

                if object_id and isinstance(object_id, str) and object_id.startswith('Q'):
                    triples.append((subject_id, property_id, object_id))

        return subject_labels, None, triples or None
    
    # PID: 즉, Property 문서에서 LABELS 추출하기
    elif entity_id.startswith("P"):
        property_id = entity_id
        property_labels = {'property':property_id, **get_labels(labels_dict)}            
        return None, property_labels, None

        
    return None, None, None
                

In [None]:
# 각 Worker의 데이터 처리
# 입력: lines -> Batch 크기 만큼의 json lines (Entity 문서들)
# 위의 extract_entity() 함수 호출해서 각 문서마다 LABELS, TRIPLES 처리함
# 그 다음 Subject_labels, Property_labels, Triples 리스트에 누적해서 Parquet으로 분할 저장함
def _worker(lines):
    os.makedirs(TEMP_DIR, exist_ok = True)

    subjects_rows = []
    properties_rows = []
    triples_rows = []

    for line in lines:
        raw = line.rstrip()             # 마지막 개행 제거해서 처리 편하게
        
        if raw.endswith(b','):          # Wikidata 구조상 뒤에 ',' 가 붙는 경우 제거
            raw = raw[:-1]
        try:
            entity = orjson.loads(raw)  # orjson이 빠르다고 함
        except Exception:   
            continue                    # json 오류 스킵

        # 함수 호출해서 정보 추출하고
        subjects, properties, triples = extract_entity(entity)
        # 각 리스트에 추가한 뒤
        if subjects: subjects_rows.append(subjects)
        if properties: properties_rows.append(properties)
        if triples: triples_rows.extend(triples)

    # .parquet으로 저장할거임
    written = []         # (kind, saved_path) 를 가진 리스트 | kind : 파일 종류 (Subject, Properies, Triples 중 하나)
    uid = uuid.uuid4().hex # 고유 파일명 생성하는 부분

    # QID_LABELS, PID_LABELS, TRIPLES 저장
    if subjects_rows:
        path = f"{TEMP_DIR}/subjects_{uid}.parquet"
        pd.DataFrame(subjects_rows).to_parquet(path, 
                                               compression = 'snappy',
                                               row_group_size = ROW_GROUP_ROWS,)
        written.append(('subject', path))

    if properties_rows:
        path = f"{TEMP_DIR}/properties_{uid}.parquet"
        pd.DataFrame(properties_rows).to_parquet(path,
                                                 compression = 'snappy',
                                                 row_group_size = ROW_GROUP_ROWS,)
        written.append(('property', path))

    if triples_rows:
        path = f"{TEMP_DIR}/triples_{uid}.parquet"
        pd.DataFrame(triples_rows, 
                     columns = ['subject', 'property', 'object']).to_parquet(path,
                                                                             compression = 'snappy',
                                                                             row_group_size = ROW_GROUP_ROWS,)
        written.append(('triples', path))
    
    # worker 실행 결과: (kind, tmp_path) 목록 return
    return written        

In [None]:
# SAVE_DIR 내의 기존 parquet 파일 중,
# kind_00001.parquet 형태의 숫자를 읽어서 다음 번호(next index)를 반환하는 함수

def _current_idx():
    # "_00001.parquet" 패턴 추출용 정규식
    # 괄호 (…) 안에 있는 (\d{5}) 가 group(1), 즉 숫자 5자리 추출
    pat = re.compile(r'_(\d{5})\.parquet$') 
    nums = []

    # SAVE_DIR 안에 있는 모든 파일 탐색
    for file_name in os.listdir(SAVE_DIR):
        m = pat.search(file_name)
        if m:
            nums.append(int(m.group(1)))

    # nums 가 비어있지 않으면 가장 큰 번호 + 1 반환
    # (즉 다음 저장될 파일의 index)
    return max(nums) +  1 if nums else 0 

In [None]:
def run_extract():
    """
    Wikidata latest-all.json.gz 파일을 스트리밍으로 읽으면서
    QID 라벨 / PID 라벨 / Triple 들을 청크 단위로 추출하여
    Parquet 파일로 저장하는 최상위 함수.
    """
    # 매번 실행 전에 임시 저장 폴더 초기화하는 부분
    shutil.rmtree(TEMP_DIR, ignore_errors = True)
    os.makedirs(TEMP_DIR, exist_ok = True)

    # fork 방식으로 worker 프로세스 시작 (리눅스에서 빠름)
    context = mp.get_context('fork')
    pool = context.Pool(N_WORKERS, maxtasksperchild = 2)

    # SAVE_DIR 안에 기존 parquet 파일들이 몇 번까지 저장됐는지 파악 → 다음 번호 구함
    idx = mp.Value('i', _current_idx())
    
    inflight = [] # 아직 끝나지 않은 worker 작업(AsyncResult 객체) 리스트
    batches = 0   # 지금까지 처리한 청크(batch) 개수 -> CHECK_EVERY 개수 마다 확인할거라 필요

    # flush: 완료된 worker 결과(future)를 받아 최종 폴더로 rename 
    # (Future = Future 객체 (AsyncResult) 아직 실행 중이거나, 나중에 결과를 가져올 수 있는 통신 핸들러)
    def _flush(queue, bar):
        """
        queue(=inflight) 맨 앞에서부터 완료된 worker를 꺼내
        worker가 생성한 tmp parquet 파일들을 SAVE_DIR로 옮기는 단계.
        """

        # queue[0] 이 존재하고, ready() == True → 해당 worker 작업 끝남
        while queue and queue[0].ready():
            # 작업 끝난 worker 하나 pop
            # worker 결과는:  [('subject', tmp_path), ('property', tmp_path2)...]
            future = queue.pop(0)
            for kind, tmp_path in future.get():
                with idx.get_lock():                          # index는 여러 프로세스가 공유하는 값이므로 lock 필요
                    seq = idx.value
                    idx.value += 1 
                    
                final = f"{SAVE_DIR}/{kind}_{seq:05}.parquet" # 최종 저장될 parquet 경로 
                os.rename(tmp_path, final)                    # tmp 파일을 최종 위치로 이동
                bar.write(f"Saved {os.path.basename(final)}") # tqdm 출력

    # gz 파일 라인 단위 스트리밍 읽기 시작
    with gzip.open(DATA_DIR, 'rb') as gz, tqdm(unit = 'ent', mininterval = 4) as bar:
        buffer = []  # BATCH = 50,000 라인 모을 버퍼 (여기서만 라인 누적함)

        for line in gz:
            if line and line[0] == 123:                               # Wikidata JSON 구조: entity 시작은 '{' (ASCII 123)
                buffer.append(line)                                   # → line[0] == 123 일 때만 유효 entity line

            
            if len(buffer) >= BATCH:                                  # 버퍼가 청크(batch) 기준량에 도달하면 → worker 에게 넘김
                inflight.append(pool.apply_async(_worker, (buffer,))) # 비동기로 worker 호출 → inflight에 future 저장
                buffer = []
                batches += 1

                _flush(inflight, bar)                                 # 끝난 worker 있으면 바로 flush

                if batches % CHECK_EVERY == 0:                        # 일정 batch마다 메모리 점검
                    while psutil.virtual_memory().percent > 90:       # RAM 90% 이상 되면 작업 속도 조절
                        _flush(inflight, bar)
                        time.sleep(1)
            bar.update(1)

        # gz 읽기 종료 후 마지막 buffer 처리 (BATCH 사이즈 다 못채운 마지막 버퍼 존재할 경우 처리)
        if buffer:
            inflight.append(pool.apply_async(_worker, (buffer,)))
            _flush(inflight, bar)

        # 남아있는 inflight worker들 전부 flush
        for future in inflight:        
            for kind, tmp_path in future.get():
                with idx.get_lock():
                    seq = idx.value
                    idx.value += 1
                final = f"{SAVE_DIR}/{kind}_{seq:05}.parquet"
                os.rename(tmp_path, final)
                bar.write(f"Saved {os.path.basename(final)}")
    pool.close()
    pool.join()

    logging.info(f"Extraction finished - Total files: {idx.value}")

# 분할된 Parquet 병합 코드 
## Subject / Property / Triples

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
from tqdm import tqdm

In [None]:
def merge_and_sort(input_dir, prefix, output_path, sort_by):
    """
    pyarrow.dataset 을 사용한 완전 스트리밍 병합.
    
    메모리를 거의 안 쓰고, row group 단위로 병합함.

    + ID 기준 정렬해서 저장
    """

    pattern = os.path.join(input_dir, f"{prefix}*.parquet")
    files = sorted(glob.glob(pattern))

    if not files:
        print(f"[WARN] No files found for '{prefix}'")
        return

    dataset = ds.dataset(files, format="parquet")

    # 메모리 최적: table로 바로 변환
    table = dataset.to_table()

    # pandas 변환
    df = table.to_pandas()

    df = df.sort_values(sort_by).reset_index(drop=True)

    df.to_parquet(output_path, compression='snappy')
    print(f"{prefix} merged & sorted → {output_path}")


In [None]:
EXTRACT_DIR = '../00.data/00.wikidata/00.wikidata_extract/'              # 최종본 Parquet 저장 위치
MERGED_DIR = '../00.data/00.wikidata/01.wikidata_merged/'
os.makedirs(MERGED_DIR, exist_ok = True)

merge_and_sort(EXTRACT_DIR, "subject", f"{MERGED_DIR}subject.parquet", "subject")
merge_and_sort(EXTRACT_DIR, "property", f"{MERGED_DIR}property.parquet", "property")
merge_and_sort(EXTRACT_DIR, "triples", f"{MERGED_DIR}triples.parquet", ["subject", "property", "object"])


In [None]:
subject_df = pq.read_table(MERGED_DIR+'/subject.parquet').to_pandas()
property_df = pq.read_table(MERGED_DIR+'/property.parquet').to_pandas()
triples_df = pq.read_table(MERGED_DIR+'/triples.parquet').to_pandas()


In [None]:
mask_valid = ~(subject_df[LANGUAGE_LIST] == "I Don't Know!!").any(axis = 1)
subject_df = subject_df[mask_valid].copy()


In [None]:
mask_valid = ~(property_df[LANGUAGE_LIST] == "I Don't Know!!").any(axis=1)
property_df = property_df[mask_valid].copy()

In [None]:
subject_df_filtered = subject_df.set_index("subject", drop=True)
property_df_filtered = property_df.set_index("property", drop=True)2

In [None]:
subject_df_filtered.to_parquet(f"{MERGED_DIR}subject_filtered.parquet", compression = "snappy", index = True)
subject_df_filtered.to_parquet(f"{MERGED_DIR}property_filtered.parquet", compression = "snappy", index = True)

In [None]:
subject_df_filtered

In [None]:
property_df_filtered

In [None]:
triples_df = triples_df.sort_values(["subject", "property", "object"]).reset_index(drop=True)
triples_df

In [None]:
triples_df.to_parquet(f"{MERGED_DIR}triples_filtered.parquet", compression = "snappy", index = True)

In [None]:
triples_df = triples_df.sort_values(["subject", "property", "object"]).reset_index(drop=True)

In [None]:
subject_df = pq.read_table(MERGED_DIR+'/subject.parquet').to_pandas()

# LANGUAGE_LIST 기준으로 모든 언어 값이 동일한 행 찾기
mask_same = subject_df[LANGUAGE_LIST].nunique(axis=1) == 1

# 해당 행 출력
subject_same = subject_df[mask_same]