In [1]:
#import tools
import pandas as pd
import numpy as np
import datetime
import matplotlib.pyplot as plt
from matplotlib import style
import re
import difflib as dl
import nltk
from sklearn.model_selection import train_test_split
from datetime import datetime

In [2]:
#import data
filename = "deid_specimens_new_2"
split_reps = pd.read_csv(filename + ".csv")
split_reps = split_reps.fillna('')

In [3]:
#check that data imported properly
split_reps

Unnamed: 0,Patient,Biopsy Source,Laterality,Rad Label,Path Report,Biopsy Description
0,0,0,False,,Benign breast tissue,"Right Breast, Fine Needle Aspiration"
1,1,0,False,,Adenocarcinoma,"Right breast, fine needle aspiration"
2,2,0,False,,Benign breast tissue,"Left Breast, Fine Needle Aspiration"
3,3,0,False,,Benign fatty tissue,"Breast, fine needle aspiration"
4,4,0,False,A,Scant benign fibroadipose tissue,"Fine Needle Aspirate, Axilla"
5,5,1,False,B,Benign serous fluid and fat,"Breast, Fine Needle Aspiration"
6,6,0,False,,Fibroadenoma,"Right Breast, Fine Needle Aspiration"
7,7,0,False,,Benign fibroconnective tissue and fat,"Right Breast, Fine Needle Aspiration"
8,8,0,False,A,Benign - Fibrocystic change,"Left breast, fine needle aspiration"
9,9,1,False,B,Atypical sample,"Right breast, fine needle aspiration"


Required Methods

In [4]:
def source(biop):
    biop_token = nltk.word_tokenize(biop.lower())
    if len(dl.get_close_matches("breast", biop_token)) > 0:
        return "breast"
    elif len(dl.get_close_matches("nipple", biop_token)) > 0:
        return "breast"
    elif len(dl.get_close_matches("mastectomy", biop_token)) > 0:
        return "breast"
    elif len(dl.get_close_matches("lymph", biop_token)) > 0:
        return "lymph node"
    elif len(dl.get_close_matches("node", biop_token)) > 0:
        return "lymph node"
    elif len(dl.get_close_matches("sentinel", biop_token)) > 0:
        return "lymph node"
    elif len(dl.get_close_matches("skin", biop_token)) > 0:
        return "skin"
    elif len(dl.get_close_matches("mole", biop_token)) > 0:
        return "skin"
    elif len([word for wordList in [dl.get_close_matches(br, biop_token) 
            for br in ["axilla", "uterus", "fallopian", "ovary", "adnexa", "ovarian", "hysterectomy"]] 
              for word in wordList]) > 0:
        return "gynecological"
    elif len([word for wordList in [dl.get_close_matches(br, biop_token) 
        for br in ["bone" ,"sternum", "rib"]] for word in wordList]) > 0:
        return "bone"
    elif len([word for wordList in [dl.get_close_matches(br, biop_token) 
        for br in ["pectoralis", "muscle"]] for word in wordList]) > 0:
        return "muscle"
    elif len([word for wordList in [dl.get_close_matches(br, biop_token) 
            for br in ["device", "cath", "hardware"]] for word in wordList]) > 0:
        return "hardware"
    else:
        return "other"

In [5]:
# natural language processing tools

# Negation Processing, applies _not identifier to words following a negation word
def negate_sequence(tokens):
    """
    Detects negations and transforms negated words into "not_" form.
    """
    negation = False
    delims = "?.!:;-"
    result = []
    for token in tokens:
        if any(c == token for c in delims):
            negation = False

        negated = "not_" + token if negation else token
        result.append(negated)

        if any(neg == token.lower() for neg in ["not", "n't", "no", "without", "negative", "treated"]):
            negation = True
            result[-1] = "not_" + result[-1]
            if len(result) > 1:
                result[-2] = "not_" + result[-2]

    return result

In [6]:
def classify_regex(pathRep):
    fx_path_rep = re.compile(r'(\=|\-)').sub(' ', pathRep)
    pathRep = ' '.join(negate_sequence(nltk.word_tokenize(fx_path_rep.lower())))
    
    obs = re.split(re.compile(r"[0-9][ ]?\. "), pathRep)
    #if len(obs) > 1:
    #    obs = obs[1:]
    obs = [re.compile(r'(\.|\;|\-|\,)').sub(' ', ' ' + ob + ' ') for ob in obs] 
    
    # pad with spaces to match markers that come at beginning of strings and remove dashes and commas
    classes = []
    for ob in obs:
        classes.append(classify_breast_ob(ob))
    return classes

atyp_markers = ["flat epithelial atypia", 
                "atypical ductal hyperplasia", 
                "atypical lobular hyperplasia"]

fibro_markers = ["fibroadenoma", "phyllodes"]

ben_markers = ["papilloma", "usual ductal hyperplasia", 
                "apocrine metaplasia", "radial scar",
                "sclerosing adenosis", 
                "pseudoangiomatous stromal hyperplasia",
                "cyst", "mastitis", "benign"]



In [7]:
def classify_breast_ob(ob):
    label = ["na", "na", "na"]
    # Lymphoma
    if re.search(" lymphoma", ob):
        label[0] = "lymphoma"
        return label  
    # Breast Cancer and Metastases
    if re.search(" lcis", ob) or re.search(" lobular carcinoma in situ", ob):
        label = ["breast cancer", "lobular", "in situ"]
        return label
    elif re.search(" dcis", ob) or re.search(" ductal carcinoma in situ", ob):
        label = ["breast cancer", "ductal", "in situ"]
        return label
    elif re.search(" idc", ob) or re.search(" invasive ductal carcinoma", ob):
        label = ["breast cancer", "ductal", "invasive"]
        return label
    elif re.search(" ilc", ob) or re.search(" invasive lobular carcinoma", ob):
        label = ["breast cancer", "ductal", "invasive"]
        return label
    elif re.search(" lobular carcinoma", ob):
        label = ["breast cancer", "lobular", "na"]
        return label
    elif re.search(" ductal carcinoma", ob):
        label = ["breast cancer", "ductal", "na"]
        return label
    elif re.search(" low grade neoplasm", ob):
        label = ["breast cancer", "neoplasm", "na"]
        return label
    
    if re.search(" carcinoma| cancer| malign| adenocarcinoma| Adenocarcinoma", ob):
        label[0] = "breast cancer"
        if re.search(re.compile("[ -]invasive|[ -]infiltrating"), ob):
            label[2] = "invasive"
        elif re.search(re.compile("in[ -]situ"), ob):
            label[2] = "in situ"
        if re.search(" duct(al)?", ob):
            label[1] = "ductal"
        elif re.search("lobular", ob):
            label[1] = "lobular"
        return label
    if re.search(" metastati(c|s)", ob):
            #if organ == "breast":    
        label[2] = "metastasis"
            #else:
            #    label[0] = "metastasis from non-bc"
    
    # Atypical
    for marker in atyp_markers:
        if re.search(" " + marker + " ", ob):
            label[0] = "atypical"
            label[1] = marker
            return label
    
    # Fibroepithelial
    for marker in fibro_markers:
        if re.search(" " + marker + " ", ob):
            label[0] = "fibroepithelial"
            label[1] = marker
            return label
            
    # Benign
    for marker in ben_markers:
        if re.search(" " + marker + " ", ob):
            label[0] = "benign"
            label[1] = marker
            return label
            
    return label
            
    #lymph_marker = re.compile(" lymphoma")
    #mets_marker = re.compile(" metastasis")
    #inv_breast_marker = [re.compile(marker) for marker in []]

In [8]:
def get_single_label(obs_labels):
    label = ["na", "na", "na"]
    first_level = [labels[0] for labels in obs_labels]
    second_level = [labels[1] for labels in obs_labels]
    
    try: 
        if "metastasis" in [labels[2] for labels in obs_labels]:
            label[2] = "metastasis"
        if "lymphoma" in first_level:
            label[0] = "lymphoma"
        #elif "metastasis from non-bc" in first_level:
        #    label[0] = "metastasis from non-bc"
        elif "breast cancer" in first_level:
            label[0] = "breast cancer"
            if "neoplasm" in [labels[1] for labels in obs_labels]:
                label[1] = "neoplasm"
            
            second_level = [labels[2] for labels in obs_labels]
            third_level = [labels[1] for labels in obs_labels]
            if "metastasis" in second_level:
                label[2] = "metastasis"
            else:
                if "invasive" in second_level:
                    label[2] = "invasive"
                    third_level = [third_level[ind] for ind in range(len(obs_labels)) 
                                   if second_level[ind] == "invasive"]
                elif "in situ" in second_level:
                    label[2] = "in situ"
                    third_level = [third_level[ind] for ind in range(len(obs_labels)) 
                                   if second_level[ind] == "in situ"]

                if "ductal" in third_level:
                    label[1] = "ductal"
                elif "lobular" in third_level:
                    label[1] = "lobular"
        elif "atypical" in first_level:
            label[0] = "atypical"
            for marker in atyp_markers:
                if marker in second_level:
                    label[1] = marker
        elif "fibroepithelial" in first_level:
            label[0] = "atypical"
            for marker in fibro_markers:
                if marker in second_level:
                    label[1] = marker
        elif "benign" in first_level:
            label[0] = "benign"
            for marker in ben_markers:
                if marker in second_level:
                    if marker != 'benign':
                        label[1] = marker
    except:
        print(obs_labels)
                
    return tuple(label)

In [9]:
#get a single label instead of the 3-part labels which indicated only cancer/atypical/benign/none without detail
def get_binary_label_helper(patient_data):
    laterality = []
    for i, specimen in patient_data.iterrows():
        if specimen["Single Label [Derived]"][0] == "breast cancer" or specimen["Single Label [Derived]"][0] == "lymphoma" or specimen["Single Label [Derived]"][2]=="metastasis":
            laterality.append(specimen["Laterality [Derived]"])
    if "right" in laterality and "left" in laterality:
        return "Bilateral Positive"
    elif "right" in laterality:
        return "Right Positive"
    elif "left" in laterality:
        return "Left Positive"
    elif laterality:
        return "Positive NOS"
    else:
        return "Negative"

def get_binary_label(input_df):
    patient_labels, binary_labels = [], []
    for patID in input_df['Patient ID'].tolist():
        reps = input_df[(input_df["Patient ID"] == patID)]
        labels = reps["Single Label [Derived]"].tolist()
        single_label = get_single_label(labels)
        patient_labels.append(single_label)
        binary_labels.append(get_binary_label_helper(reps))
    return binary_labels
        

Classification and Extraction of Labels

In [10]:
#classify the path reports using regex
split_reps['All Labels'] = split_reps['Path Report'].apply(classify_regex)
#merge the classifications into a single 3-part label
split_reps["Single Label"] = split_reps["All Labels"].apply(get_single_label)
split_reps["Biopsy Source"] = split_reps["Biopsy Description"].apply(source)


In [11]:
#rename columns for consistency
split_reps = split_reps.rename(index=str, 
    columns={"Patient": "Patient ID", 
             "Biopsy Description": "Path Report I",
             "Path Report": "Path Report II",
             "Laterality": "Laterality [Derived]",
             "Biopsy Source": "Organ [Derived]",
             "All Labels": "All Labels [Derived]",
             "Single Label": "Single Label [Derived]"})
#get a single label from the 3-part label
split_reps["Binary Label [Derived]"] = get_binary_label(split_reps)


In [12]:
#exporting data
date_string = f'{datetime.now():%Y-%m-%d%H:%M:%S%z}'
split_reps.to_csv(filename + "_regex_labelled_" + date_string + '.csv')