In [1]:
from datetime import datetime, timedelta
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import glob
import shutil
from pathlib import Path
from tqdm import tqdm
import pandas as pd
from datetime import timedelta, datetime
import xml.etree.ElementTree as ET
import re, math, unicodedata
import numpy as np
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package punkt to /home/bmegpu03/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     /home/bmegpu03/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/bmegpu03/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /home/bmegpu03/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /home/bmegpu03/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

### 텍스트 전처리를 하기 위한 준비를 합니다.

In [2]:
# 도메인 보강 불용어 (필요 시 조정)
BIO_STOPWORDS = {
    "et", "al", "using", "based", "however", "therefore", "result", "results",
    "method", "methods", "conclusion", "conclusions", "background", "objective", "objectives",
    "introduction", "aim", "aims", "purpose", "materials", "patients", "study", "studies"
}

# 그리스 문자/단위 기호 치환 테이블
_GREEK_MAP = {
    "α":"alpha","β":"beta","γ":"gamma","δ":"delta","ε":"epsilon","κ":"kappa","μ":"micro","π":"pi",
    "ω":"omega","λ":"lambda","θ":"theta","σ":"sigma","τ":"tau","Δ":"delta","Σ":"sigma","Ω":"omega","µ":"micro"
}
_TRANS_TABLE = str.maketrans(_GREEK_MAP)

# 토큰 보존 규칙: 알파벳/숫자 시작, 하이픈 허용(IL-6, covid-19), 너무 짧은 토큰 제거(길이<2)
_BIO_TOKEN_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9\-]*$")

# 전역 자원
_STOP = set(stopwords.words("english")) | BIO_STOPWORDS
_STEM = PorterStemmer()
_LEM  = WordNetLemmatizer()

### 본격적으로 PubMed Corpus를 관리하기 쉬운 형태의 DataFrame으로 변환합니다.

In [3]:
def load_corpus_to_dataframe(start_date, end_date):
    # ---------------------------
    # 날짜 리스트 만들기
    # ---------------------------
    list_dates = []
    current_date = start_date
    # datetime이면 date로 변환
    if isinstance(current_date, datetime):
        current_date = current_date.date()
    if isinstance(end_date, datetime):
        end_date = end_date.date()

    while current_date <= end_date:
        list_dates.append(current_date)
        current_date += timedelta(days=1)

    # URL용(YYYYMMDD) / 출력용(YYYY-MM-DD) 포맷 모두 준비
    list_dates_str_ftp = [d.strftime("%Y-%m-%d") for d in list_dates]
    list_dates_str_print = [d.strftime("%Y-%m-%d") for d in list_dates]

    # ---------------------------
    # 날짜별 증분패키지 처리 함수 (원래 흐름 유지: wget/tar/rm)
    # ---------------------------
    def process_dataset(dataset_date_str):
        ftp_path = f"https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.{dataset_date_str}.tar.gz"

        # Download
        command = ["wget", "-O", f"pubmed_{dataset_date_str}.tar.gz", ftp_path]
        subprocess.run(command, check=True)

        # Extract
        command = ["tar", "-xzf", f"pubmed_{dataset_date_str}.tar.gz"]
        subprocess.run(command, check=True)

        # Clean up
        command = ["rm", f"pubmed_{dataset_date_str}.tar.gz"]
        subprocess.run(command, check=True)

        return dataset_date_str

    # ---------------------------
    # 병렬 다운로드/압축해제
    # ---------------------------
    max_workers = 32  # 시스템/대역폭에 맞게 조정
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_dataset, ds) for ds in list_dates_str_ftp]
        for future in as_completed(futures):
            try:
                _ = future.result()
            except Exception as e:
                print(f"Error processing dataset: {e}")

    # ---------------------------
    # PMC 폴더 파일 수 요약 (원래 로직 유지)
    # ---------------------------
    pmc_folders = glob.glob("PMC0*")
    total_files = 0
    for folder in pmc_folders:
        if os.path.isdir(folder):
            file_count = len([f for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))])
            print(f"{folder}: {file_count} files")
            total_files += file_count

    # ---------------------------
    # 목적지 폴더 생성 및 XML 이동 (원래 로직 유지하되 이름충돌만 방지)
    # ---------------------------
    destination = "recent_pubmed"
    os.makedirs(destination, exist_ok=True)

    # Find all PMC* folders (조금 포괄적으로)
    pmc_folders = [d for d in os.listdir('.') if d.startswith('PMC') and os.path.isdir(d)]

    def _unique_dest_path(dst_dir: str, fname: str) -> str:
        base = os.path.splitext(fname)[0]
        ext = os.path.splitext(fname)[1]
        candidate = os.path.join(dst_dir, fname)
        idx = 1
        while os.path.exists(candidate):
            candidate = os.path.join(dst_dir, f"{base}__{idx}{ext}")
            idx += 1
        return candidate

    moved_count = 0
    for folder in pmc_folders:
        folder_path = Path(folder)
        xml_files = folder_path.glob("*.xml")
        for xml_file in tqdm(xml_files, desc=f"Move from {folder}"):
            try:
                target = _unique_dest_path(destination, xml_file.name)
                shutil.move(str(xml_file), target)
                moved_count += 1
            except Exception as e:
                print(f"Error moving {xml_file}: {e}")

    # ---------------------------
    # XML 파싱 함수(기존 함수 있으면 사용, 없으면 기본 파서)
    # ---------------------------
    def _clean_ws(text: str) -> str:
        """여러 공백/개행을 하나로 정리."""
        return " ".join((text or "").split())

    def _default_parse_pmc_title_abstract(xml_path: str):
        try:
            tree = ET.parse(xml_path)
            root = tree.getroot()
            def _txt(e):
                return "".join(e.itertext()).strip() if e is not None else ""

            # pmcid
            pmcid = None
            pmid = None
            for aid in root.findall(".//article-meta/article-id"):
                _type = (aid.get("pub-id-type") or "").lower()
                if _type == "pmc":
                    pmcid = _clean_ws(aid.text)
                elif _type == "pmid":
                    pmid = _clean_ws(aid.text)

            # title
            title_el = root.find(".//article-title")
            title = _txt(title_el) if title_el is not None else None

            # abstracts (여러 개면 합침)
            abs_list = []
            for ab in root.findall(".//abstract"):
                t = _txt(ab)
                if t:
                    abs_list.append(t)
            abstract = "\n\n".join(abs_list) if abs_list else None

            return {"pmcid": pmcid, "title": title, "abstract": abstract, "path": xml_path, "pmid": pmid}
        except Exception as e:
            print(f"[WARN] parse failed: {xml_path} -> {e}")
            return None

    # 바깥에 동일 이름 함수가 있으면 사용
    parse_func = globals().get("parse_pmc_title_abstract", _default_parse_pmc_title_abstract)

    def process_xml_file(xml_file):
        xml_path = os.path.join('./recent_pubmed', xml_file)
        if os.path.isfile(xml_path):
            return parse_func(xml_path)
        return None

    # ---------------------------
    # recent_pubmed 내 XML 병렬 파싱 (원래 로직 유지)
    # ---------------------------
    xml_files = [f for f in os.listdir('./recent_pubmed')
                 if os.path.isfile(os.path.join('./recent_pubmed', f)) and f.lower().endswith(".xml")]

    list_records = []
    max_workers = 32  # 시스템에 맞게 조정

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {executor.submit(process_xml_file, xml_file): xml_file for xml_file in xml_files}
        for future in as_completed(future_to_file):
            xml_file = future_to_file[future]
            try:
                result = future.result()
                if result is not None:
                    list_records.append(result)
            except Exception as e:
                print(f"Error processing {xml_file}: {e}")

    df = pd.DataFrame(list_records)
    # 편의를 위해 누락 컬럼 보정
    for col in ["pmcid", "title", "abstract", "path"]:
        if col not in df.columns:
            df[col] = None
    return df


### 수집할 PubMed 말뭉치 데이터의 기간을 지정하여 데이터 수집을 수행합니다.

In [4]:
start_date = datetime(2025, 9, 1)
end_date = datetime(2025, 9, 15)
df = load_corpus_to_dataframe(start_date, end_date)
df

--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-01.tar.gz
--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-02.tar.gz
--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-03.tar.gz
--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-04.tar.gz
--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-05.tar.gz
--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-07.tar.gz
--2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/oa_comm/xml/oa_comm_xml.incr.2025-09-06.tar.gz
Resolving ftp.ncbi.nlm.nih.gov (ftp.ncbi.nlm.nih.gov)... Resolving ftp.ncbi.nlm.nih.gov (ftp.ncbi.nlm.nih.gov)... --2025-11-03 09:05:49--  https://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bu

PMC008xxxxxx: 72 files
PMC012xxxxxx: 21501 files
PMC005xxxxxx: 3 files
PMC010xxxxxx: 80 files
PMC007xxxxxx: 137 files
PMC011xxxxxx: 427 files
PMC006xxxxxx: 1 files
PMC009xxxxxx: 62 files


Move from PMC008xxxxxx: 72it [00:00, 41232.92it/s]
Move from PMC012xxxxxx: 21501it [00:00, 54620.60it/s]
Move from PMC005xxxxxx: 3it [00:00, 2388.10it/s]
Move from PMC010xxxxxx: 80it [00:00, 28263.50it/s]
Move from PMC007xxxxxx: 137it [00:00, 29839.52it/s]
Move from PMC011xxxxxx: 427it [00:00, 35497.04it/s]
Move from PMC006xxxxxx: 1it [00:00, 1833.17it/s]
Move from PMC009xxxxxx: 62it [00:00, 37677.03it/s]


Unnamed: 0,pmcid,title,abstract,path,pmid
0,PMC12423219,Gigantomastia: Advancing a Preference Score Sy...,BackgroundThis study introduces a new Gigantom...,./recent_pubmed/PMC12423219.xml,40180643
1,PMC12410828,Infective Endocarditis Presenting as Unexplain...,Infective endocarditis is a potentially fatal ...,./recent_pubmed/PMC12410828.xml,
2,PMC12423464,“Eating isn’t just about paying attention—It's...,Background: Recent research has focused on sev...,./recent_pubmed/PMC12423464.xml,39397561
3,PMC12417792,Yield of next-generation sequencing in diagnos...,Background and study aimsThis study addressed ...,./recent_pubmed/PMC12417792.xml,
4,PMC12430085,Oncologists’ Perspectives on Ketogenic Diets i...,Background/objectives: Treating pediatric brai...,./recent_pubmed/PMC12430085.xml,
...,...,...,...,...,...
22278,PMC12422480,An updated vocal repertoire of wild adult bono...,Research over the last 20 years has shed impor...,./recent_pubmed/PMC12422480.xml,40929072
22279,PMC12426568,Supporting Police Well-Being Through an Adapti...,BackgroundPolice personnel work under challeng...,./recent_pubmed/PMC12426568.xml,40875980
22280,PMC12423179,Association Between Self-Reported Opioid Use a...,BackgroundArizona observed a sharp increase in...,./recent_pubmed/PMC12423179.xml,
22281,PMC11238350,Prepare Romania: study protocol for a randomiz...,"BackgroundGay, bisexual, and other men who hav...",./recent_pubmed/PMC11238350.xml,38987812


### 말뭉치 데이터를 전처리합니다 (병렬처리 활용)

In [5]:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

# 워커 전역(프로세스별) 초기화용
_WORKER_STOP = None
_WORKER_STEM = None
_WORKER_LEM  = None

def _init_worker(stop_words_list):
    # 각 워커에서 전역 재설정 (피클링 안전)
    global _WORKER_STOP, _WORKER_STEM, _WORKER_LEM
    _WORKER_STOP = set(stop_words_list)
    _WORKER_STEM = PorterStemmer()
    _WORKER_LEM  = WordNetLemmatizer()

def _worker_preprocess(text: str) -> dict:
    # 단일 텍스트 처리 (워커 전역 사용)
    s = (text or "").strip()
    s = unicodedata.normalize("NFKC", s).translate(_TRANS_TABLE)
    s = re.sub(r"http\S+|www\.\S+", " ", s).lower()
    toks = word_tokenize(s)
    toks = [t for t in toks if _BIO_TOKEN_RE.match(t)]
    toks_no_stop = [t for t in toks if t not in _WORKER_STOP and len(t) >= 2]
    stems  = [_WORKER_STEM.stem(t) for t in toks_no_stop]
    lemmas = [_WORKER_LEM.lemmatize(t) for t in toks_no_stop]
    return {
        "tokens": toks,
        "tokens_no_stop": toks_no_stop,
        "stems": stems,
        "lemmas": lemmas,
    }

def preprocess_df_parallel(df: pd.DataFrame, n_jobs: int = None, chunksize: int = 1000) -> pd.DataFrame:
    """
    큰 DF 병렬 전처리:
      - df를 row 청크로 나눠 multiprocess 처리
      - 원래 인덱스 순서 보존
    """
    if n_jobs is None:
        n_jobs = max(1, mp.cpu_count() - 1)

    # 청크 분할
    idx = df.index.to_list()
    chunks = [idx[i:i+chunksize] for i in range(0, len(idx), chunksize)]

    # 결과 담을 컨테이너
    results = {}

    with ProcessPoolExecutor(
        max_workers=n_jobs,
        initializer=_init_worker,
        initargs=(list(_STOP),)  # 메인에서 만든 불용어 집합 전달
    ) as ex:
        for chunk_idx in chunks:
            abstracts = df.loc[chunk_idx, "abstract"].astype(str).tolist()
            # 병렬 map
            outs = list(ex.map(_worker_preprocess, abstracts))
            for k, row_id in enumerate(chunk_idx):
                results[row_id] = outs[k]

    # 출력 DF 구성
    out = df.copy()
    out["tokens"]          = pd.Series({k: v["tokens"] for k, v in results.items()})
    out["tokens_no_stop"]  = pd.Series({k: v["tokens_no_stop"] for k, v in results.items()})
    out["stems"]           = pd.Series({k: v["stems"] for k, v in results.items()})
    out["lemmas"]          = pd.Series({k: v["lemmas"] for k, v in results.items()})
    out["n_tokens"]        = out["tokens"].apply(len)
    out["n_tokens_nostop"] = out["tokens_no_stop"].apply(len)
    return out

# 사용 예시
df_preprocessed = preprocess_df_parallel(df, n_jobs=25, chunksize=5000)
df_preprocessed = df_preprocessed.set_index('pmcid')
df_preprocessed

Unnamed: 0_level_0,title,abstract,path,pmid,tokens,tokens_no_stop,stems,lemmas,n_tokens,n_tokens_nostop
pmcid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
PMC12423219,Gigantomastia: Advancing a Preference Score Sy...,BackgroundThis study introduces a new Gigantom...,./recent_pubmed/PMC12423219.xml,40180643,"[backgroundthis, study, introduces, a, new, gi...","[backgroundthis, introduces, new, gigantomasti...","[backgroundthi, introduc, new, gigantomastia, ...","[backgroundthis, introduces, new, gigantomasti...",219,140
PMC12410828,Infective Endocarditis Presenting as Unexplain...,Infective endocarditis is a potentially fatal ...,./recent_pubmed/PMC12410828.xml,,"[infective, endocarditis, is, a, potentially, ...","[infective, endocarditis, potentially, fatal, ...","[infect, endocard, potenti, fatal, condit, pre...","[infective, endocarditis, potentially, fatal, ...",203,143
PMC12423464,“Eating isn’t just about paying attention—It's...,Background: Recent research has focused on sev...,./recent_pubmed/PMC12423464.xml,39397561,"[background, recent, research, has, focused, o...","[recent, research, focused, several, critical,...","[recent, research, focus, sever, critic, categ...","[recent, research, focused, several, critical,...",257,158
PMC12417792,Yield of next-generation sequencing in diagnos...,Background and study aimsThis study addressed ...,./recent_pubmed/PMC12417792.xml,,"[background, and, study, aimsthis, study, addr...","[aimsthis, addressed, need, improved, diagnost...","[aimsthi, address, need, improv, diagnost, too...","[aimsthis, addressed, need, improved, diagnost...",267,160
PMC12430085,Oncologists’ Perspectives on Ketogenic Diets i...,Background/objectives: Treating pediatric brai...,./recent_pubmed/PMC12430085.xml,,"[treating, pediatric, brain, tumors, remains, ...","[treating, pediatric, brain, tumors, remains, ...","[treat, pediatr, brain, tumor, remain, challen...","[treating, pediatric, brain, tumor, remains, c...",248,168
...,...,...,...,...,...,...,...,...,...,...
PMC12422480,An updated vocal repertoire of wild adult bono...,Research over the last 20 years has shed impor...,./recent_pubmed/PMC12422480.xml,40929072,"[research, over, the, last, 20, years, has, sh...","[research, last, 20, years, shed, important, l...","[research, last, 20, year, shed, import, light...","[research, last, 20, year, shed, important, li...",140,90
PMC12426568,Supporting Police Well-Being Through an Adapti...,BackgroundPolice personnel work under challeng...,./recent_pubmed/PMC12426568.xml,40875980,"[backgroundpolice, personnel, work, under, cha...","[backgroundpolice, personnel, work, challengin...","[backgroundpolic, personnel, work, challeng, c...","[backgroundpolice, personnel, work, challengin...",399,270
PMC12423179,Association Between Self-Reported Opioid Use a...,BackgroundArizona observed a sharp increase in...,./recent_pubmed/PMC12423179.xml,,"[backgroundarizona, observed, a, sharp, increa...","[backgroundarizona, observed, sharp, increase,...","[backgroundarizona, observ, sharp, increas, op...","[backgroundarizona, observed, sharp, increase,...",195,127
PMC11238350,Prepare Romania: study protocol for a randomiz...,"BackgroundGay, bisexual, and other men who hav...",./recent_pubmed/PMC11238350.xml,38987812,"[backgroundgay, bisexual, and, other, men, who...","[backgroundgay, bisexual, men, sex, men, gbmsm...","[backgroundgay, bisexu, men, sex, men, gbmsm, ...","[backgroundgay, bisexual, men, sex, men, gbmsm...",261,164


In [6]:
df_preprocessed.to_parquet('pubmed_dataset_250901-250915.gzip', compression='gzip')