# 資料處理

## 設定資料夾

In [4]:
import sys
from pathlib import Path

if "google.colab" in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive')
    RES_DIR_PATH = Path("/content/drive/MyDrive/aicup/res")
    OUT_DIR_PATH = Path("/content/drive/MyDrive/aicup/out")
else:
    RES_DIR_PATH = Path(Path.cwd().parent, "res")
    OUT_DIR_PATH = Path(Path.cwd().parent, "out")

if not RES_DIR_PATH.exists():
    RES_DIR_PATH.mkdir(parents=True)

if not OUT_DIR_PATH.exists():
    OUT_DIR_PATH.mkdir(parents=True)

## 資料前處理

In [5]:
from typing import List
from itertools import chain

class DatasetPreprocessor:
    def __init__(
        self,
        ans_file_path: Path,
        med_dir_path: Path,
        output_tsv_path: Path,
    ) -> None:
        self.ans_file_path = ans_file_path
        self.med_dir_path = med_dir_path
        self.output_tsv_path = output_tsv_path

    @staticmethod
    def read_file(path:Path, encoding:str="utf-8-sig") -> None:
        with open(path, encoding=encoding) as fr:
            return fr.readlines()

    @staticmethod
    def write_file(path:Path, data_list: List, encoding:str="utf-8") -> None:
        with open(path, "w", encoding=encoding) as fw:
            for data_line in data_list:
                fw.write(data_line)

    def create_tsv(self):
        ans_file_dict = self.get_answer_dict()
        med_ans_pair_list = chain.from_iterable([
            self.process_medical_report(med_file_name, ans_file_dict)
            for med_file_name in ans_file_dict.keys()
        ])
        self.write_file(self.output_tsv_path, med_ans_pair_list)

    def get_answer_dict(self):
        """
        處理 anwser.txt 標註檔案
        output : annotation dicitonary
        """

        from collections import defaultdict
        ans_dict = defaultdict(list)
        lines = self.read_file(self.ans_file_path)
        for line in lines:
            items = line.strip("\n").split("\t")
            items_file = items[0]
            items_data = {}
            items_data["phi"] = items[1]
            items_data["st_idx"] = int(items[2])
            items_data["ed_idx"] = int(items[3])
            items_data["entity"] = items[4]
            if len(items) == 6:
                items_data["normalize_time"] = items[5]
            ans_dict[items_file].append(items_data)
        return ans_dict

    def process_medical_report(self, med_file_name, ans_file_dict,):
        """
        處理單個病理報告
        output : 處理完的 sequence pairs
        """

        med_file_path = Path(self.med_dir_path, med_file_name).with_suffix('.txt')
        med_report = "".join(self.read_file(med_file_path))

        bounary, item_idx, phi_info, phi_pairs = 0, 0, "", []
        new_line_idx = 0
        for char_idx, char in enumerate(med_report):
            if char == "\n":
                new_line_idx = char_idx + 1
                med_info_seg = med_report[bounary:new_line_idx]
                if med_info_seg == "\n":
                    continue
                phi_info = phi_info.strip("\\n") if phi_info else "PHI:Null"
                med_info = med_info_seg.strip().replace("\t", " ")

                phi_pair = f"{med_file_name}\t {new_line_idx}\t {med_info}\t {phi_info}\n"
                phi_pairs.append(phi_pair)

                bounary = new_line_idx
                phi_info = ""

            med_item = ans_file_dict[med_file_name][item_idx]
            if char_idx == med_item["st_idx"]:
                phi = med_item["phi"]
                entity = med_item["entity"]
                normalize_time = med_item.get("normalize_time", "")
                if normalize_time:
                    phi_info += f"{phi}:{entity}=>{normalize_time}\\n"
                else:
                    phi_info += f"{phi}:{entity}\\n"
                if item_idx == len(ans_file_dict[med_file_name]) - 1:
                    continue
                item_idx += 1

        return phi_pairs

In [None]:
DatasetPreprocessor(
    ans_file_path=Path(RES_DIR_PATH, "First_Phase_Release(Correction)/answer.txt"),
    med_dir_path=Path(RES_DIR_PATH, "First_Phase_Release(Correction)/First_Phase_Text_Dataset"),
    output_tsv_path=Path(OUT_DIR_PATH, "first_phase_train_single_line.tsv"),
).create_tsv()
DatasetPreprocessor(
    ans_file_path=Path(RES_DIR_PATH, "Second_Phase_Dataset/answer.txt"),
    med_dir_path=Path(RES_DIR_PATH, "Second_Phase_Dataset/Second_Phase_Text_Dataset"),
    output_tsv_path=Path(OUT_DIR_PATH, "second_phase_train_single_line.tsv"),
).create_tsv()