In [1]:
import json
import pandas as pd
import numpy as np

from sklearn.datasets import make_classification
from gensim.parsing import preprocessing
from gensim.parsing.preprocessing import strip_tags, strip_punctuation,strip_numeric,remove_stopwords, stem_text
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import sklearn.preprocessing
from sklearn.preprocessing import LabelEncoder

import itertools
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier

from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from string import ascii_uppercase

import gensim
import logging

In [2]:
def format_str(txt):
    return_val = txt.lower().replace('\r',' ')
    return_val = return_val.replace('\n',' ')
    return_val = return_val.lower()
    return return_val

In [3]:
def read_df_fr_path(file_path):
    df_filter = pd.read_excel(file_path)
    # filter all data without any empty data
    df_filter = df_filter.fillna('N/A')
    df_filter = df_filter[df_filter['Radiology text']!='N/A']
    df_filter['Radiology text'] = df_filter['Radiology text'].apply(format_str)
    
    return df_filter

In [4]:
def icd_impression_ext(txt):
    try:
        splited_list = txt.split('impression:')
        new_txt = splited_list[1]
        return new_txt.strip()
    except:
        return ''

In [5]:
def icd_ci_ext(txt):
    try:
        splited_list = txt.lower().split('clinical indication:')
        new_txt = splited_list[1]
        new_txt = new_txt.split(':')[0].split(" ")[:-1]
        return " ".join(new_txt).strip()
    except:
        return ''

In [6]:
def merge_to_icd(ser_1, ser_2):
    new_ser = ser_1+" "+ser_2
    return new_ser

In [7]:
def clean_txt(txt):
    CUSTOM_FILTERS = [lambda x: x.lower(), strip_tags, strip_punctuation,remove_stopwords, stem_text]
    words = preprocessing.preprocess_string(txt.lower(), CUSTOM_FILTERS)
    if not words:
        return 'N/A'
    return ' '.join(words)

In [8]:
# data cleaning for CPT
def df_clean_ICD(df_filter):
    df_return = df_filter.copy()
    # specific cleaning empty entry in CPT_text
    # empty entries mean failed convertion during the extraction process
    df_return['icd_impre_txt'] = df_return['Radiology text'].apply(icd_impression_ext)
    df_return['icd_ci_txt'] = df_return['Radiology text'].apply(icd_ci_ext)
    df_return['ICD_text'] = merge_to_icd(df_return['icd_impre_txt'],df_return['icd_ci_txt'])
    df_return = df_return[df_return['ICD_text']!=' ']
    # transferring words to sentences
    df_return['ICD_text'] = df_return['ICD_text'].apply(clean_txt)
    df_return = df_return[df_return['ICD_text']!='N/A']
    return df_return

In [9]:
icd_10_all = {}
for l in ascii_uppercase:
    for i in range(0,10):
        for j in range(0,10):
            new_str = l+str(i)+str(j)
            if (l=='A') or (l=='B'):
                icd_10_all.update({new_str:'A00-B99'})
            elif (l=='C'):
                icd_10_all.update({new_str:'C00-D49'})
                if (i==4) and (j==4):
                    icd_10_all.update({'C4A':'C00-D49'})
                if (i==7) and (j==7):
                    icd_10_all.update({'C7A':'C00-D49'})
                    icd_10_all.update({'C7B':'C00-D49'})
            elif (l=='D'):
                if (i<=4):
                    icd_10_all.update({new_str:'C00-D49'})
                else:
                    icd_10_all.update({new_str:'D50-D89'})
                if (i==9) and (j==9):
                    icd_10_all.update({'D3A':'C00-D49'})
            elif (l=='E'):
                icd_10_all.update({new_str:'E00-E89'})
            elif (l=='F'):
                icd_10_all.update({new_str:'F01-F99'})
            elif (l=='G'):
                icd_10_all.update({new_str:'G00-G99'})
            elif (l=='H'):
                if (i<=5):
                    icd_10_all.update({new_str:'H00-H59'})
                else:
                    icd_10_all.update({new_str:'H60-H95'})
            elif (l=='I'):
                icd_10_all.update({new_str:'I00-I99'})
            elif (l=='J'):
                icd_10_all.update({new_str:'J00-J99'})
            elif (l=='K'):
                icd_10_all.update({new_str:'K00-K95'})
            elif (l=='L'):
                icd_10_all.update({new_str:'L00-L99'})
            elif (l=='M'):
                icd_10_all.update({new_str:'M00-M99'})
                if (i==1) and (j==4):
                    icd_10_all.update({'M1A':'M00-M99'})
            elif (l=='N'):
                icd_10_all.update({new_str:'N00-N99'})
            elif (l=='O'):
                icd_10_all.update({new_str:'O00-O9A'})
                if (i==9) and (j==9):
                    icd_10_all.update({'O9A':'O00-O9A'})
            elif (l=='P'):
                icd_10_all.update({new_str:'P00-P96'})
            elif (l=='Q'):
                icd_10_all.update({new_str:'Q00-Q99'})
            elif (l=='R'):
                icd_10_all.update({new_str:'R00-R99'})
            elif (l=='S') or (l=='T'):
                icd_10_all.update({new_str:'S00-T88'})
            elif (l=='V') or (l=='W') or (l=='X') or (l=='Y'):
                icd_10_all.update({new_str:'V00-Y99'})
            elif (l=='Z'):
                icd_10_all.update({new_str:'Z00-Z99'})

In [10]:
def load_data(filepath):
    # load data as dataframe
    df_filter = read_df_fr_path(filepath)
    # filter all data without any empty data
    df_return = df_clean_ICD(df_filter)
    return df_return

In [11]:
def load_label(df):
    general_icd_label = []
    for i in df['icd_label']:
        splitted_list = i.split('.')
        code = splitted_list[0]
        x = icd_10_all.get(splitted_list[0])
        general_icd_label.append(x)
    return general_icd_label

In [12]:
train_df = load_data("../../data/train.xlsx")
test_df = load_data("../../data/test.xlsx")

In [13]:
train_df['general_icd_label'] = load_label(train_df)
test_df['general_icd_label'] = load_label(test_df)

In [14]:
X_train = train_df['ICD_text']
y_train = train_df['general_icd_label']
X_test = test_df['ICD_text']
y_test = test_df['general_icd_label']

In [15]:
label_encoder = LabelEncoder()

In [16]:
y_train = label_encoder.fit_transform(y_train)
y_test = label_encoder.fit_transform(y_test)

In [17]:
vect = CountVectorizer(stop_words='english', ngram_range=(1, 2), max_features=400)
tfidf = TfidfTransformer()
def X_vectorization(df):
    return_df = vect.fit_transform(df)
    return_df = tfidf.fit_transform(return_df)
    return return_df

In [18]:
X_train = X_vectorization(X_train)
X_test = X_vectorization(X_test)

In [19]:
def log_reg():
    print(" ")
    print("logistic regression")
    reg = LogisticRegression(random_state=0, solver = 'newton-cg', class_weight='balanced')
    reg = reg.fit(X_train, y_train)
    pred = reg.predict(X_test)
    print (classification_report(y_test, pred, target_names=label_encoder.classes_))


def ran_for():
    print(" ")
    print("random forest")
    reg = RandomForestClassifier(
        random_state=0, 
        class_weight = 'balanced',
        n_jobs=-1)
    reg = reg.fit(X_train, y_train)
    pred = reg.predict(X_test)
    print (classification_report(y_test, pred, target_names=label_encoder.classes_))


def xgboost_test():
    print(" ")
    print("xgboost")
    reg = XGBClassifier(n_estimators=100, 
                        max_depth=3, 
                        random_state=0,
                        n_jobs = -1)
    reg = reg.fit(X_train, y_train)
    pred = reg.predict(X_test)
    print (classification_report(y_test, pred, target_names=label_encoder.classes_))

In [None]:
log_reg()
ran_for()
xgboost_test()