In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# pair_extractor.py
import os, sys, time, threading, re, shutil
import pandas as pd
import spacy

# ===== 環境判定 =====
try:
    import google.colab  # type: ignore
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

# ===== Pathの設定 =====
if IN_COLAB:
    # colab環境
    from google.colab import drive
    drive.mount("/content/drive", force_remount=False)

    WORKING_DIR_PATH = "/content/drive/MyDrive/dep_collocation_tutorial"
    CORPUS_DIR = "NICER/nicer_plain"
    INPUT_ROOT = os.path.join(WORKING_DIR_PATH, CORPUS_DIR) # /content/drive/MyDrive/dep_collocation_tutorial/NICER/nicer_plain
    OUTPUT_DIR = os.path.join(WORKING_DIR_PATH, "collocation") # /content/drive/MyDrive/dep_collocation_tutorial/collocation
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    sys.path.append(WORKING_DIR_PATH) # ライブラリmylibを読み込むディレクトリを指定
else:
    # ローカル環境
    sys.path.append(os.path.abspath(os.path.dirname(__file__))) # 現在のスクリプトが置かれているディレクトリにあるライブラリを探す

import mylib  # is_adj_noun / is_verb_obj / is_verb_adv / is_adv_verb / is_adv_adj

# ===== 共通ユーティリティ =====
def get_all_txt_files(root_folder: str):
    txt_files = []
    for dirpath, _, filenames in os.walk(root_folder):
        for f in filenames:
            if f.lower().endswith(".txt"):
                txt_files.append(os.path.join(dirpath, f))
    return sorted(txt_files, key=lambda x: os.path.basename(x))

def add_result(results, file_path, pair_type, sentence_text, i1, i2, token1, token2, span):
    ZERO_WIDTH = "".join(["\u200b", "\u200c", "\u200d", "\ufeff"])  # ZWSP/ZWNJ/ZWJ/BOM
    lemma1 = token1.lemma_.lower()
    lemma2 = token2.lemma_.lower()
    lemma1 = re.sub(f"[{ZERO_WIDTH}]", "", lemma1)
    lemma2 = re.sub(f"[{ZERO_WIDTH}]", "", lemma2)
    results.append({
        "file": os.path.basename(file_path),
        "pair_type": pair_type,
        "sentence": sentence_text,
        "i1": i1, "i2": i2,
        "lemma1": lemma1, "lemma2": lemma2,
        "word1": token1.text, "word2": token2.text,
        "pos1": token1.pos_, "pos2": token2.pos_,
        "tag1": token1.tag_, "tag2": token2.tag_,
        "dep1": token1.dep_, "dep2": token2.dep_,
        "pair": f"{token1.text} - {token2.text}",
        "lemma_pair": f"{lemma1} - {lemma2}",
        "span": span
    })

def build_nlp():
    nlp = spacy.load("en_core_web_sm", disable=["ner", "textcat"])
    if "sentencizer" not in nlp.pipe_names:
        nlp.add_pipe("sentencizer", first=True)
    return nlp

def process_files(INPUT_ROOT: str, OUTPUT_DIR: str, log_func=print):
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    txt_files = get_all_txt_files(INPUT_ROOT)
    if not txt_files:
        log_func(f"ERROR: No .txt files in {INPUT_ROOT}")
        return

    nlp_parser = build_nlp()
    all_results = []
    t0 = time.time()

    for idx, file_path in enumerate(txt_files, 1):
        try:
            rel = os.path.relpath(file_path, INPUT_ROOT)
        except ValueError:
            rel = os.path.basename(file_path)

        try:
            with open(file_path, "r", encoding="utf-8", errors="replace") as f:
                lines = f.read().splitlines()
        except Exception as e:
            log_func(f"[WARN] Skip (read error): {rel} -> {e}")
            continue

        msg = f"files {idx}/{len(txt_files)} : {rel}"
        if log_func is print:
            print("\r" + msg, end="", flush=True)
        else:
            log_func(msg)

        file_results = []
        for line_text in lines:
            if not line_text.strip():
                continue
            doc_line = nlp_parser(line_text)
            for sent in doc_line.sents:
                s_start = sent.start
                for token in sent:
                    if mylib.is_adj_noun(token):
                        i1, i2 = token.i - s_start, token.head.i - s_start
                        if i1 > i2: i1, i2 = i2, i1
                        span = sent[i1:i2+1].text
                        add_result(file_results, rel, "ADJ_NOUN", sent.text, i1, i2, token, token.head, span)
                    if mylib.is_verb_obj(token):
                        i1, i2 = token.head.i - s_start, token.i - s_start
                        span = sent[i1:i2+1].text
                        add_result(file_results, rel, "VERB_OBJ", sent.text, i1, i2, token.head, token, span)
                    if mylib.is_verb_adv(token):
                        i1, i2 = token.head.i - s_start, token.i - s_start
                        span = sent[i1:i2+1].text
                        add_result(file_results, rel, "VERB_ADV", sent.text, i1, i2, token.head, token, span)
                    if mylib.is_adv_verb(token, sent):
                        i1, i2 = token.i - s_start, token.head.i - s_start
                        span = sent[i1:i2+1].text
                        add_result(file_results, rel, "ADV_VERB", sent.text, i1, i2, token, token.head, span)
                    if mylib.is_adv_adj(token):
                        i1, i2 = token.i - s_start, token.head.i - s_start
                        span = sent[i1:i2+1].text
                        add_result(file_results, rel, "ADV_ADJ", sent.text, i1, i2, token, token.head, span)

        all_results.extend(file_results)

    df_all = pd.DataFrame(all_results)
    if not df_all.empty:
        out_all = os.path.join(OUTPUT_DIR, "pairs_all.csv")
        df_all.to_csv(out_all, index=False, encoding="utf-8-sig")
        for pt in ["ADJ_NOUN", "VERB_OBJ", "VERB_ADV", "ADV_ADJ", "ADV_VERB"]:
            sub = df_all[df_all["pair_type"] == pt]
            if not sub.empty:
                freq = sub["lemma_pair"].value_counts().reset_index()
                freq.columns = ["lemma_pair", "frequency"]
                freq.to_csv(os.path.join(OUTPUT_DIR, f"frequencies_{pt}.csv"), index=False, encoding="utf-8-sig")
        log_func(f"\n✓ Results saved to: {OUTPUT_DIR}")
    else:
        log_func("\nNo dependency pairs were extracted.")
    log_func(f"✓ Total time: {time.time()-t0:.2f} sec")

# ===== エントリーポイント =====
if IN_COLAB:
    # =====キャッシュの設定（colab） =====
    print("[Colab] INPUT_DIR:", INPUT_ROOT)
    if not os.path.isdir(INPUT_ROOT):
        raise FileNotFoundError(f"Not found: {INPUT_ROOT}")
    shutil.copytree(INPUT_ROOT, "/content/text_cache", dirs_exist_ok=True)
    INPUT_CACHE = "/content/text_cache"
    print("[Colab] Copied to :", INPUT_CACHE)
    print("[Colab] Using cache ...")
    print("[Colab] OUTPUT_DIR:", OUTPUT_DIR)
    process_files(INPUT_CACHE, OUTPUT_DIR, log_func=print)

else:
    # --- Local GUI: 入力/出力をGUIで選択して開始 ---
    import tkinter as tk
    from tkinter import filedialog, messagebox, ttk

    class App(tk.Tk):
        def __init__(self):
            super().__init__()
            self.title("PairExtractor")
            self.geometry("720x520")
            self.input_dir = tk.StringVar()
            self.output_dir = tk.StringVar()
            # PNGファイルを指定
            #icon = tk.PhotoImage(file="icon.png")
            #self.iconphoto(True, icon)

            pad = {"padx": 10, "pady": 8}

            frm_in = ttk.LabelFrame(self, text="Input")
            frm_in.pack(fill="x", **pad)
            ttk.Button(frm_in, text="Input folder", command=self.select_input).pack(anchor="w", padx=10, pady=(10,4))
            ttk.Entry(frm_in, textvariable=self.input_dir, state="readonly").pack(fill="x", padx=10, pady=(0,10))

            frm_out = ttk.LabelFrame(self, text="Output")
            frm_out.pack(fill="x", **pad)
            ttk.Button(frm_out, text="Output folder", command=self.select_output).pack(anchor="w", padx=10, pady=(10,4))
            ttk.Entry(frm_out, textvariable=self.output_dir, state="readonly").pack(fill="x", padx=10, pady=(0,10))

            frm_start = ttk.Frame(self); frm_start.pack(fill="x", **pad)
            self.btn_start = ttk.Button(frm_start, text="Extract start", command=self.on_start, state="disabled")
            self.btn_start.pack(side="left")
            self.lbl_status = ttk.Label(frm_start, text="Select input/output folders.")
            self.lbl_status.pack(side="left", padx=10)

            frm_log = ttk.LabelFrame(self, text="Log")
            frm_log.pack(fill="both", expand=True, padx=10, pady=10)
            cnt = ttk.Frame(frm_log)
            cnt.pack(fill="both", expand=True)
            self.txt_log = tk.Text(cnt, height=14, wrap="none")
            self.txt_log.pack(side="left", fill="both", expand=True)
            self.scroll_y = ttk.Scrollbar(cnt, orient="vertical", command=self.txt_log.yview)
            self.scroll_y.pack(side="right", fill="y")
            self.txt_log.configure(yscrollcommand=self.scroll_y.set)

        def select_input(self):
            p = filedialog.askdirectory(title="Select input folder (e.g., JPN_text)")
            if p:
                self.input_dir.set(p); self.update_state()

        def select_output(self):
            p = filedialog.askdirectory(title="Select output folder (empty or existing)")
            if p:
                self.output_dir.set(p); self.update_state()

        def update_state(self):
            self.btn_start.config(state=(tk.NORMAL if self.input_dir.get() and self.output_dir.get() else tk.DISABLED))

        def log(self, msg):
            self.txt_log.insert("end", msg + "\n"); self.txt_log.see("end"); self.update_idletasks()

        def on_start(self):
            in_p, out_p = self.input_dir.get().strip(), self.output_dir.get().strip()
            if not in_p or not out_p:
                messagebox.showwarning("Warning", "Both input and output folders must be selected."); return
            self.btn_start.config(state="disabled"); self.lbl_status.config(text="Running...")
            threading.Thread(target=self._run, args=(in_p, out_p), daemon=True).start()

        def _run(self, in_p, out_p):
            try:
                process_files(in_p, out_p, log_func=self.log)
                self.lbl_status.config(text="Done.")
            except Exception as e:
                self.log(f"[ERROR] {e}"); messagebox.showerror("Error", str(e)); self.lbl_status.config(text="Error.")
            finally:
                self.btn_start.config(state="normal")

    if __name__ == "__main__":
        App().mainloop()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[Colab] INPUT_DIR: /content/drive/MyDrive/dep_collocation_tutorial/NICER/nicer_plain
[Colab] Copied to : /content/text_cache
[Colab] Using cache ...
[Colab] OUTPUT_DIR: /content/drive/MyDrive/dep_collocation_tutorial/collocation
files 381/381 : JPN881_plain.txt
✓ Results saved to: /content/drive/MyDrive/dep_collocation_tutorial/collocation
✓ Total time: 49.09 sec
