<a href="https://colab.research.google.com/github/nedokormysh/GB_NLP_Healthcare/blob/lesson_2_epicpars_class/GB_nlp_healthcare_2_Berezutskii.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
from re import sub, search, findall
import pandas  as pd
from datetime import datetime
import numpy as np
import glob
import os

import logging
import sys

from typing import TextIO, Union, List

import warnings
warnings.filterwarnings("ignore")

In [None]:
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

# if not logger.hasHandlers():
#         logger.addHandler(logger)
# logger.propagate = False

# Создание класса

In [None]:
class EpicPars:
    """
    Класс EpicPars используется для обработки исследований и формирования
    таблиц

    :param folder_name: названия папки, где находятся необработанные текстовые
        файлы эпикризов
    :type folder_name: str, значение по умолчанию: 'rwd_raw'

    :param measurements_map: словарь, в котором ключ - название анализа,
        а значение - это словарь в котором ключ тип исследования (сущность над
        анализом), а значение это идентификатор.
        Пример:{'фибриноген': {'гемостазиограмма': 6}}
    :type measurements_map: dict, значение по умолчанию: {}

    :param treatment_map: словарь - "маппинг" препаратов к ингредиенту. Ключ -
        название ингридиента, значение - словарь, где ключом является
        коммерческое название препарата, а значение - идендификационный номер
        Пример:
        {'лизиноприл':{'л.з.....ил|лизитар|лизинеоприл|лизоретик|диротон': 20}}
    :type treatment_map: dict, значение по умолчанию: {}

    :param condition_procedures_map: словарь заболеваний. "Маппинг" заболеваний.
        Кодируем различные варианты написания одного и того же заболевания.
        Словарь, в котором ключ - идентификационный номер, значения - словарь,
        в котором ключ - полное название заболевания, значения - варианты
        написания болезни.
        Пример словаря:
        {1:	{'Внегоспитальная пневмония':
         'пне...ния|внегоспитальная|внебольничная'}}
    :type condition_procedures_map: dict, значение по умолчанию {}

    .. note:: Необходимо поместить папку с текстовыми эпикризами и файл
        блокнота в единую папку.

    .. warning:: Необходимо подать словари исследований measurements_map,
        ингридиент-коммерческое название лекарств treatment_map, заболеваний
        condition_procedures_map


    Methods
    -------
    prepare_person_table(self)
        Метод формирует датафрейм pandas с данными о пациентах
    measurements_template(self)
        Метод формирует датафрейм pandas с данными о анализах
    treatment_detection(self)
        Метод формирует датафрейм pandas с данными о лекарствах
    condition_detection(self)
        Метод формирует датафрейм pandas с данными о заболеванияx
    procedures_detection(self)
        Метод формирует датафрейм pandas с данными о процедурах
    read_txt(ff)
        Статический метод. Открывает файлы для чтения. Используется для открытия
        необработанных файлов эпикризов.
    epi_dates_preparation(strng)
        Статический метод. Парсит дату из эпикриза.
    get_gender(file_)
        Статический метод. Обрабатывает файл эпикриза и определяет пол пациента.
    remover(file_)
        Статический метод. Удаляет неинформативные символы из текста файла.
    lists_former(self)
        Метод формирует листы с названиями файлов. Затем из данных файлов
        получает информацию по индексами пациентов, эпикризами,
        гендерами пациентов
    """

    def __init__(self,
                 folder_name: str = "rwd_raw",
                 measurements_map: dict = {},
                 treatment_map: dict = {},
                 condition_procedures_map = {}
                 ) -> None:
        """
        Метод конструктора класса.
        """
        self.folder_name = folder_name
        self.measurements_map = measurements_map
        self.treatment_map = treatment_map
        self.condition_procedures_map = condition_procedures_map


    def __repr__(self):
        """
        Стандартный метод python
        выдает текстовое или строковое представление сущности
        """
        if self.folder_name == "rwd_raw":
            return (
                f"Класс EpicPars. "
                f"Название папки с эпикризами: {self.folder_name}. "
                f"Выбрана папка по умолчанию."
                # f"{self.measurements_map}"
                # f"{self.treatment_map}"
                # f"{self.condition_procedures_map}"

            )
        else:
            return (f"Экземпляр класса EpicPars. "
                    f"Название папки с эпикризами {self.folder_name}")

    @staticmethod
    def read_txt(ff: str) -> str:
        """
        Метод открывает файлы для чтения. Используеся для открытия
        необработанных файлов эпикризов.

        :param ff: строка с адресом текстового файла эпикриза
        :type ff: str

        :return: считанную строку с полным текстом эпикриза
        :rtype: str
        """
        with open(ff, encoding="utf-8", errors="ignore") as f:
            f = f.read()

        return f

    @staticmethod
    def epi_dates_preparation(strng: str) -> str:
        """
        Метод парсит дату из строки

        :param strng: текст эпикриза
        :type strng: str

        :return: Строку с датой
        :rtype: str
        """
        data_1 = sub(
            r"(0?[1-9]|[12]\d|30|31)[.](0?[1-9]|1[0-2])[.](\d{4})",
            "\\1-\\2-\\3", strng
        )

        return sub(
            r"(0?[1-9]|[12]\d|30|31)[.](0?[1-9]|1[0-2])[.](\d{2})",
            "\\1-\\2-20\\3",
            data_1,
        )

    @staticmethod
    def get_gender(file_: str) -> int:
        """
        Метод обрабатывает файл эпикриза и определяет пол пациента

        :param file_: текст эпикризом
        :type file_: str

        :return: код с полом пациента. 25 - женский, 26 - мужской
        :rtype: int

         .. note:: После поля 'диагноз' удаляем. Если в отчестве есть
         'вна' - женский пол, иначе - мужской
         """
        file_ = sub("диагноз.*", "", file_)
        if "вна" in file_:
            return 25
        else:
            return 26

    @staticmethod
    def remover(file_: str) -> str:
        """
        Метод удаляет неинформативные символы из текста файла.

        :param file_: текст эпикриза
        :type file_: str

        :return: строку с удалёнными лишними данными
        :rtype: str

        ..notes:: Не нужны запятые, табуляция, стоп-слова (которые не несут
        информацию в нашем случае)
        """
        file_ = "".join(file_.split()).lower()
        stop_element = [",", ":", "/t", "менее"]
        for elem in stop_element:
            if elem == ",":
                file_ = sub(elem, ".", file_)
        else:
            file_ = sub(elem, "", file_)
        return file_

    def lists_former(self):

        """
        Метод формирует листы с названиями файлов. Затем из данных файлов
        получает информацию по индексами пациентов, эпикризами,
        гендерами пациентов.

        :return: листы с названиями файлов эпикризов, индексами пациентов,
        эпикризами, гендерами пациентов.
        :rtype: Tuple(Union[List, List, List, List])
        """
        lst_of_txts = list(glob.glob(os.path.join(self.folder_name, "*.txt")))
        patient_ids = list(
            map(lambda file_: int(search(r"\d+", file_)[0]), lst_of_txts)
        )
        list_of_epi = list(
            map(
                lambda file_: EpicPars.epi_dates_preparation(
                    EpicPars.remover(EpicPars.read_txt(file_))
                ),
                lst_of_txts,
            )
        )
        gender_list = list(
            map(
                lambda file_: EpicPars.get_gender(
                    EpicPars.epi_dates_preparation(
                        EpicPars.remover(EpicPars.read_txt(file_))
                    )
                ),
                lst_of_txts,
            )
        )

        return lst_of_txts, patient_ids, list_of_epi, gender_list

    def prepare_person_table(self) -> pd.DataFrame:
        """
        Метод формирует датафрейм pandas с данными о пациентах

        :return: Датафрейм с данными о пациентах.
        Колонки: id пациента, дата рождения, количество дней госпитализации,
        id пола пациента
        :rtype: pd.DataFrame
        """

        hospital_days=[]
        date_of_birth=[]

        lst_of_txts, patient_ids, list_of_epi, gender_list = self.lists_former()

        for file_ in list_of_epi:
            # находим поступление
            admission = findall("\d{2}-\d{2}-\d{4}", file_)[1]
            # находим выписку
            discharge = findall("\d{2}-\d{2}-\d{4}", file_)[-1]
            # определяем количество дней госпитализации
            hospital_days.append(
                datetime.strptime(discharge, "%d-%m-%Y").date()
                - datetime.strptime(admission, "%d-%m-%Y").date()
            )
            # находим дату рождения
            date_of_birth.append(findall("\d{2}-\d{2}-\d{4}", file_)[0])
        data_dct = {
                "person_id ": patient_ids,
                "date_of_birth": date_of_birth,
                "hospital_days": hospital_days,
                "sex_concept_id ": gender_list,
            }
        # формируем датафрейм, обрабатываем пропуски, меняем тип данных
        df = pd.DataFrame(data_dct)
        df = df.replace(r"", np.nan, regex=True)
        df["hospital_days"] = df["hospital_days"].dt.days.astype("int16")
        df["date_of_birth"] = pd.to_datetime(df.date_of_birth)
        return df

    def measurements_template(self) -> pd.DataFrame:

        """
        Метод формирует датафрейм pandas с данными о анализах

        :return: Датафрейм с данными о анализах.
        Колонки: id пациента, id анализа, дата анализа, значение, идентификатор
        исследования
        :rtype: pd.DataFrame
        """

        list_of_epicrisis = self.lists_former()[2]
        patient_ids = self.lists_former()[1]
        measurement_id = []
        measurement_date  = []
        measurement_concept_id = []
        patient_id = []
        value = []

        try:
            # проверка на существования словаря анализов
            assert bool(self.measurements_map) == True

            for measurement_name , sub_dict in self.measurements_map.items():
                for measurement_type, concept_id in sub_dict.items():
                    for file_, patient_id_ in zip(list_of_epicrisis,
                                                  patient_ids):
                        pattern0 = ''.join(
                            ['r(\d{2}-\d{2}-\d{4})(', measurement_type, ')'])
                        file_ = sub(pattern0, r'\2\1', file_)
                        # ищем тип исследования (т.е. "родителя" анализа )
                        pattern1 = ''.join(
                          ['(?<=', measurement_type, ')','(\d{2}-\d{2}-\d{4}|)']
                            )
                        for date in findall(pattern1, file_):
                            # добавляем дату, id пациента, id исследования
                            measurement_date.append(date)
                            patient_id.append(patient_id_)
                            measurement_concept_id.append(concept_id)
                        pattern2 = ''.join(
                            ['(', measurement_type, ')', '(\d{2}-\d{2}-\d{4}|)']
                            )
                        file_1 = sub(pattern2, r'\2\1', file_)
                        # в типе исследования уже находим конкретный "потомок"
                        # исследования
                        pattern3 = ''.join(
                        ['(', measurement_type, '.*?)', '(?=\d{2}-\d{2}-\d{4})']
                            )
                        for value_ in findall(pattern3, file_1):
                            value_ = sub(''.join(
                                ['(.*)(', measurement_name, ')', '(\d.\d*)']),
                                         r'\3\2\1',
                                         value_)
                            value_ = sub(''.join(
                            ['(', measurement_name, '|', measurement_type, ').*']) ,
                                         '',
                                         value_)
                            value_ = re.sub('[^0-9.]|х10|x10', '', value_)

                            if value_:
                                value.append(value_)
                            else:
                                value.append('NA')
            # формирование датафрейма
            data_dct = {'person_id': patient_id,
                        'measurement_concept_id': measurement_concept_id,
                        'measurement_date': measurement_date ,
                        'value': value}
            df = pd.DataFrame.from_dict(data_dct, orient='index')

            df = df.transpose()
            df['measurement_id'] = df.index + 1
            df = df[df.measurement_date != '']
            df = df[df.value != 'NA']
            df['measurement_date'] = pd.to_datetime(df.measurement_date)

            return df
        except:
            print(f'Произошла ошибка формирования таблицы.'
                  f'Возможно необходимо задать словарь анализов.'
                  f'См. документацию класса')

    def treatment_detection(self) -> pd.DataFrame:

        """
        Метод формирует датафрейм pandas с данными о лекарствах.

        :return: Датафрейм с данными о лекарствах.
        Колонки: id пациента, id лекарства, дата приёма, идентификатор
        лекарства
        :rtype: pd.DataFrame
        """

        list_of_epicrisis = self.lists_former()[2]
        patient_ids = self.lists_former()[1]
        drug_date  = []
        drug_concept_id = []
        patient_id = []

        try:
            # проверка на существования словаря лекарств
            assert bool(self.treatment_map) == True

            for drug_name , sub_dct in self.treatment_map.items():
                for variations, drug_id in    sub_dct.items():
                    for file_, patient_id_ in zip(list_of_epicrisis, patient_ids):
                        file_ = sub(variations, drug_name, file_)
                        if drug_name in  file_:
                            drug_date.append(
                            findall('\d{2}-\d{2}-\d{4}', file_)[-1])
                            drug_concept_id.append(drug_id)
                            patient_id.append(patient_id_)
            data_dct = {'person_id': patient_id,
                        'drug_concept_id': drug_concept_id,
                        'drug_date': drug_date }
            df = pd.DataFrame(data_dct)
            df['drug_id'] = df.index + 1
            df['drug_date'] = pd.to_datetime(df.drug_date)
            return(df)
        except:
            print(f'Произошла ошибка формирования таблицы.'
                  f'Возможно необходимо задать словарь лекарств.'
                  f'См. документацию класса')

    def condition_detection(self) -> pd.DataFrame:

        """
        Метод формирует датафрейм pandas с данными о заболеваниях.

        :return: Датафрейм с данными о заболеваниях.
        Колонки: id пациента, id заболевания, дата заболевания, уникальный
        идентификатор заболевания
        :rtype: pd.DataFrame
        """

        list_of_epicrisis = self.lists_former()[2]
        patient_ids = self.lists_former()[1]
        condition_date  = []
        condition_concept_id = []
        patient_id = []

        try:
            # проверка на существования словаря заболеваний
            assert bool(self.condition_procedures_map) == True

            for concept_id, sub_dct in self.condition_procedures_map.items():
                if concept_id < 6:
                    for condition, variations in  sub_dct.items():
                        for file_, patient_id_ in zip(list_of_epicrisis,
                                                      patient_ids):
                            file_ = sub(variations, condition, file_)
                            if condition in  file_:
                                condition_date.append(
                                    findall('\d{2}-\d{2}-\d{4}', file_)[-1])
                                condition_concept_id.append(concept_id)
                                patient_id.append(patient_id_)
            data_dct = {'person_id': patient_id,
                        'condition_concept_id': condition_concept_id,
                        'condition_date': condition_date }
            df = pd.DataFrame(data_dct)
            df['condition_id'] = df.index + 1
            df['condition_date'] = pd.to_datetime(df.condition_date)
            return(df)
        except:
            print(f'Произошла ошибка формирования таблицы.'
                  f'Возможно необходимо задать словарь заболеваний.'
                  f'См. документацию класса')

    def procedures_detection(self) -> pd.DataFrame:

        """
        Метод формирует датафрейм pandas с данными о процедурах.

        :return: Датафрейм с данными о процедурах.
        Колонки: id пациента, id процедуры, дата процедуры, уникальный
        идентификатор процедуры
        :rtype: pd.DataFrame
        """
        list_of_epicrisis = self.lists_former()[2]
        patient_ids = self.lists_former()[1]
        procedure_date  = []
        procedure_concept_id = []
        patient_id = []

        try:
            # проверка на существования словаря заболеваний
            assert bool(self.condition_procedures_map) == True

            for concept_id , sub_dct in self.condition_procedures_map.items():
                if concept_id > 6:
                    for procedure, variations in  sub_dct.items():
                        for file_, patient_id_ in zip(list_of_epicrisis,
                                                      patient_ids):
                            file_ = sub(variations, procedure, file_)
                            if procedure in  file_:
                                procedure_date.append(
                                    findall('\d{2}-\d{2}-\d{4}', file_)[-1]
                                    )
                                procedure_concept_id.append(concept_id)
                                patient_id.append(patient_id_)
            data_dct = {'person_id': patient_id,
                        'procedure_concept_id': procedure_concept_id,
                        'procedure_date': procedure_date}
            df = pd.DataFrame(data_dct)
            df['procedure_id'] = df.index + 1
            df['procedure_date'] = pd.to_datetime(df.procedure_date)
            return(df)
        except:
            print(f'Произошла ошибка формирования таблицы.'
                  f'Возможно необходимо задать словарь заболеваний.'
                  f'См. документацию класса')

# Пример работы класса

## Объявление и инициализация

In [None]:
measurements_map = {
    'фибриноген': {'гемостазиограмма': 6},
    'креатинин': {'биохимическоеисследованиекрови': 7},
    'лейкоциты': {'общийанализкрови': 8},
    'гемоглобин': {'общийанализкрови': 9},
    'тромбоциты': {'общийанализкрови': 10},
    '.-реактивныйбелок': {'биохимическоеисследованиекрови': 11},
    'соэ': {'общийанализкрови': 12},
    'лимфоциты': {'общийанализкрови': 13}
    ,'прокальцитонин': {'ифаанализ': 27}
}

treatment_map = {
    'лизиноприл':{'л.з.....ил|лизитар|лизинеоприл|лизоретик|диротон': 20},
    'цефтриаксон':{'три.....ф|цефт.....он|цефтриакосн': 16},
    "бисопролол":{"б.с.пр..ол|бисопралдол|бикард|конкор": 21},
     "аспирин": {"аспкиард|кардиомагнил|ас....рд": 22},
     "дексаметазон": {"декс.......н|дексаетазон": 15},
     "азитромицин": {"азит.......": 18},
    'гепарин': {'гепарин': 19 }
}

condition_procedures_map = {
    1:	{'Внегоспитальная пневмония': 'пне...ния|внегоспитальная|внебольничная'},
    2:	{'Ишемическая болезнь сердца': 'ссн|ибс|атеросклеротический|кардиосклероз|фп|фибрил....' },
    3:	{'Сахарный диабет': 'д.абет|сдтип|сд2|сд1|сах.....'},
    4:	{'Коронавирусная инфекция': 'covid19|коронавирусная|b34|sarscov2'},
    5:	{'Артериальная гипертензия': 'аг\d|агi|артериальнаягипер '},
    23:	{'Рентгелологическое исследование грудной клетки': 'rgогк|ргогк'},
    24:	{'Электрокардиография': 'экг|электрокардио'}
}

In [None]:
ep = EpicPars(measurements_map=measurements_map,
              treatment_map=treatment_map,
              condition_procedures_map=condition_procedures_map)
ep

Класс EpicPars. Название папки с эпикризами: rwd_raw. Выбрана папка по умолчанию.

## Документация

In [None]:
print(ep.__doc__)


    Класс EpicPars используется для обработки исследований и формирования
    таблиц

    :param folder_name: названия папки, где находятся необработанные текстовые
        файлы эпикризов
    :type folder_name: str, значение по умолчанию: 'rwd_raw'

    :param measurements_map: словарь, в котором ключ - название анализа,
        а значение - это словарь в котором ключ тип исследования (сущность над
        анализом), а значение это идентификатор.
        Пример:{'фибриноген': {'гемостазиограмма': 6}}
    :type measurements_map: dict, значение по умолчанию: {}

    :param treatment_map: словарь - "маппинг" препаратов к ингредиенту. Ключ - 
        название ингридиента, значение - словарь, где ключом является
        коммерческое название препарата, а значение - идендификационный номер
        Пример:
        {'лизиноприл':{'л.з.....ил|лизитар|лизинеоприл|лизоретик|диротон': 20}}
    :type treatment_map: dict, значение по умолчанию: {}

    :param condition_procedures_map: словарь забо

## Формирование таблицы person

In [None]:
patient_table = ep.prepare_person_table()
patient_table.head()

Unnamed: 0,person_id,date_of_birth,hospital_days,sex_concept_id
0,1079,1962-03-04,28,25
1,1090,1973-01-06,32,26
2,1098,1986-01-22,31,26
3,1067,1978-11-11,11,25
4,1091,1961-08-09,12,25


In [None]:
patient_table.to_csv('person.csv', index=False)

## Формирование таблицы measurements

In [None]:
measurement_data = ep.measurements_template()
measurement_data.head()

Unnamed: 0,person_id,measurement_concept_id,measurement_date,value,measurement_id
0,1079,6,2021-01-01,5.1,1
1,1079,6,2021-06-01,6.98,2
2,1090,6,2020-12-17,8.4,3
3,1090,6,2020-12-18,6.57,4
4,1090,6,2020-12-19,6.8,5


In [None]:
measurement_data.to_csv('measurements.csv', index=False)

## Формирование таблицы drugs

In [None]:
drug_data = ep.treatment_detection()
drug_data.head()

Unnamed: 0,person_id,drug_concept_id,drug_date,drug_id
0,1079,20,2021-12-01,1
1,1097,20,2021-01-14,2
2,1073,20,2020-12-24,3
3,1090,16,2021-01-18,4
4,1078,16,2021-05-01,5


In [None]:
drug_data.to_csv('drugs.csv', index=False)

## Формирование таблицы condition

In [None]:
condition_data = ep.condition_detection()
condition_data.head()

Unnamed: 0,person_id,condition_concept_id,condition_date,condition_id
0,1090,1,2021-01-18,1
1,1098,1,2021-01-19,2
2,1067,1,2020-12-24,3
3,1091,1,2020-12-30,4
4,1078,1,2021-05-01,5


In [None]:
condition_data.to_csv('conditions.csv', index=False)

## Формирование таблицы procedures

In [None]:
procedures_data = ep.procedures_detection()
procedures_data

Unnamed: 0,person_id,procedure_concept_id,procedure_date,procedure_id
0,1074,23,2021-01-27,1
1,1083,24,2020-12-21,2
2,1097,24,2021-01-14,3
3,107,24,2021-05-02,4
4,1093,24,2021-08-02,5
5,1071,24,2020-12-31,6
6,108,24,2021-10-02,7


In [None]:
procedures_data.to_csv('procedures.csv', index=False)

# Заметки о домашнем задании

- Не совсем уяснил, что нужно было сделать в домашнем задании. Готов всё переделать
- Функции обработки - не изменял. (хотел сосредоточиться на документации)
- Вроде бы файлы, которые использовались на занятии или точнее, которые использовались для ноутбука "Практическое занятие" и которые попали к нам на домашнее занятие - отличаются. Во всяком случае видел различия в полученных таблицах. Специально проверял raw файлы и там были эти различия.
- Старался в документации использовать sphinx вариант документации. Хотя, насколько я понял, она вам не очень нравится. Впрочем, вот как правильно писать документацию и в данной нотации и вообще - не совсем знаю.
- Исходил из вот устного варианта задачи, что предположим нужно сделать класс, который предъявляет минимальные требования к программированию.
- Предполагал, что возможно человеку требуется сформировать только одну таблицу, а не все. Из-за чего, например, в коде есть дублирование, которое сходу не придумал как обойти: например, в любом основном методе формирования таблицы есть сбор списка индексов пациентов. Это, конечно, минус.
- Также предположил, что у нас могут быть разные болезни, или разные лекарства. Поэтому предложил инициализировать объект класса подобными словари отдельно. Хотя можно было бы их внести и жёстко прописать внутри класса
- Unit тесты - пока совсем не понял как делать

In [None]:
import pytest

@pytest.fixture()
def dataset_path():
    curdir = os.path.dirname(__file__)
    print(curdir)
    return os.path.join(curdir, '/content/rwd_raw/file (1067).txt')

In [None]:
def test_loader(dataset_path: str):
    print(dataset_path)
    with open(dataset_path, encoding="utf-8", errors="ignore") as f:
        f = f.read()

    try:
        assert len(f) != 0
        print('Тест пройден!')
    except:
        print('Тест загрузки данных не пройден. Проверьте указанный путь')

In [None]:
test_loader('rwd_raw/file (1067).txt')

rwd_raw/file (1067).txt
Тест пройден!
