<a href="https://colab.research.google.com/github/rahiakela/nlp-research-and-practice/blob/main/text-similarity-works/13_icd_10_code_highlight_with_keyword_match_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Setup

In [None]:
!pip -q install spacy
!python -m spacy download en_core_web_sm

!pip install pillow

!sudo apt install build-essential libpoppler-cpp-dev pkg-config python3-dev
!pip install -U pdftotext
!pip install PyPDF2
!pip install fitz
!pip install PyMuPDF

In [None]:
import pandas as pd
import re
import os
import sys
import glob
import difflib
import pickle
from pathlib import Path
from difflib import SequenceMatcher

import fitz
import pdftotext
from PyPDF2 import PdfFileReader, PdfReader, PdfFileWriter, PdfWriter

import spacy
from spacy.matcher import PhraseMatcher
from spacy.lang.en import English

from keyword_extraction import call
from concurrent import futures

In [None]:
!mkdir -p pdf-files
!mkdir -p txt-files
!mkdir -p output_pdf_files_path

In [2]:
!mkdir -p input_files

In [None]:
# define directory path after creating it
pdf_files_path = "pdf-files"
txt_files_path = "txt-files"
ocr_pdf_files_path = "ocr-pdf-files"

MAX_WORKERS = 20

##Core Functions

In [None]:
def split_pdf(pdf_path):
  pdf_in_file = open(pdf_path, "rb")
  pdf = PdfFileReader(pdf_in_file)
  pdf_list = []
  for page in range(pdf.numPages):
      inputpdf = PdfFileReader(pdf_in_file)
      output = PdfFileWriter()
      output.addPage(inputpdf.getPage(page))
      with open(f"{pdf_files_path}/page-{page}.pdf", "wb") as outputStream:
          output.write(outputStream)
          pdf_list.append(f"page-{page}.pdf")
  return pdf_list


def extract_text_from_pdf(pdf_list):
    txt_file_list = []
    i = 0
    for pdf_file in pdf_list:
        with open(os.path.join(pdf_files_path, pdf_file), "rb") as f:
            pdf = pdftotext.PDF(f)

        # Read all the text into one string
        pdf_text = "\n\n".join(pdf)

        # write text into file
        with open(f"{txt_files_path}/page-{str(i)}.txt", "a") as f:
            f.write(pdf_text)
        txt_file_list.append(f"{txt_files_path}/page-{str(i)}.txt")
        i += 1
    return txt_file_list


def get_opt_pattern(icd_10_code):
  # create alternate pattern
  code_arr = icd_10_code.split(".")
  if len(code_arr) > 1:
    code1 = f"{code_arr[0]}. {code_arr[1]}"
    code2 = f"{code_arr[0]} .{code_arr[1]}"
    code3 = f"{code_arr[0]} . {code_arr[1]}"
    return [code1, code2, code3]
  else:
    return icd_10_code


def isExactMatch(page, term, clip, fullMatch=False, caseSensitive=False):
  # clip is an item from page.search_for(term, quads=True)
  termLen = len(term)
  termBboxLen = max(clip.height, clip.width)
  termfontSize = termBboxLen/termLen
  f = termfontSize*2

  #clip = clip.rect

  validate = page.get_text("blocks", clip = clip + (-f, -f, f, f), flags=0)[0][4]
  flag = 0
  if not caseSensitive:
      flag = re.IGNORECASE

  matches = len(re.findall(f'{term}', validate, flags=flag)) > 0
  if fullMatch:
      matches = len(re.findall(f'\\b{term}\\b', validate))>0
  return matches

def highlight_icd_code_and_keyword(icd10_code_dict, 
                                   icd_keywords_dict=None, 
                                   pdf_file_name=None, 
                                   cords_file_name=None):
  pdf_file = fitz.open(pdf_file_name)
  already_highlighted_list = []

  def highlight_pdf(highlight, icd10_code, code_type):
    cords_list = []
    for inst in highlight:
      highlight = page.add_highlight_annot(inst)
      if code_type == "ICD-9":
        highlight.set_colors(stroke=[1, 0.5, 0.8]) # light red color (r, g, b)
      highlight.update()
      highlight = page.search_for(icd10_code)
      cords_list.append(highlight)

    if cords_list:
      num_page = page_num + 1
      code_cors_output = f"Page-{num_page} | {icd10_code}"
      txt_output_file_name.write("%s\n" % code_cors_output)

  # create file to write cordinate 
  txt_output_file_name = open(cords_file_name, "a")

  for page_num, page in enumerate(pdf_file):
    # highlight ICD-10 code
    if page_num in icd10_code_dict:
      for code in icd10_code_dict[page_num]:
        highlight = page.search_for(code)
        if len(highlight) == 0:
          alternate_code_list = get_opt_pattern(code)
          for alt_code in alternate_code_list:
            highlight = page.search_for(alt_code)
            # highlight pdf for option pattern
            highlight_pdf(highlight, alt_code, code_type="ICD-10")
        # highlight pdf for main pattern   
        highlight_pdf(highlight, code, code_type="ICD-10")

    # highlight ICD key phrase
    if page_num in icd_keywords_dict:
      icd_keyword_dict = icd_keywords_dict[page_num]
      for key_phrase, key_phrase_sents in icd_keyword_dict.items():
        for key_phrase_sent in key_phrase_sents:
          coordinates = page.search_for(key_phrase_sent)
          #print(f"Keyword: {keyword}, Length: {len(coordinates)}")
          cords_list = []
          keyword_cors_output = ""
          for inst in coordinates:
            #print(f"Keyword: {keyword}, inst: {inst}")
            # if isExactMatch(page, key_phrase, inst, fullMatch=True, caseSensitive=True):
            highlight = page.add_highlight_annot(inst)
            highlight.set_colors(stroke=[1, 0.8, 0.8])
            highlight.update()
            highlight = page.search_for(key_phrase_sent)
            cords_list.append(highlight)
            num_page = page_num + 1
            keyword_cors_output = f"Page-{num_page} | {key_phrase} | {key_phrase_sent}"

          if cords_list:
            txt_output_file_name.write("%s\n" % keyword_cors_output)
            #print(f"Page-{page_num}: ", highlight, end='\n')

  txt_output_file_name.close()

  pdf_output_file_name = f"{pdf_file_name.split('.')[0]}_output.pdf"
  pdf_file.save(pdf_output_file_name, garbage=4, deflate=True, clean=True)

  return pdf_output_file_name, cords_file_name


def filter_unwanted_code(code_list, page_text):
    filtered_code_list = []
    # if re.search("ICD", page_text):
    # match_list = re.findall("(ICD-[0-9][a-zA-z]*\-.+)[ ]", page_text)
    match_list = re.findall("(IC[(A-z)]-[0-9][a-zA-z]*\-.+)[ ]", page_text)
    # print("Match list:\n", match_list)
    for found_code in match_list:
        for code in code_list:
            if code in found_code:
                filtered_code_list.append(code)
    return filtered_code_list


def search_icd_code(txt_list, nlp, code_type):
    pdf_page_vocab = {}
    for txt_file in txt_list:
        with open(txt_file, "r") as f:
            page_txt = f.read()
            # filter the page that have line number instead of code
            if not re.search("(P[ ][0-9]+)(,\s)(L[0-9]+)", page_txt):
                doc = nlp(page_txt)
                code_list = [ent.text for ent in doc.ents]
                page_number = 0
                if len(code_list) != 0:
                    page_number = int(txt_file.split("/")[1].split(".")[0].split("-")[1])
                    pdf_page_vocab[page_number] = code_list
                    # print(f"Page[{txt_file.split('/')[1]}]: {code_list}")

                # filter the page that dont have ICD string into it
                if code_type == "ICD-9":
                    filtered_code_list = filter_unwanted_code(code_list, page_txt)
                    pdf_page_vocab[page_number] = filtered_code_list
                    # print(f"Page[{txt_file.split('/')[1]}]: {filtered_code_list}")

    return pdf_page_vocab


def get_json_array_list(text_path):
  json_arr = None
  try:
    #print(f"Running '{text_path}'")
    json_arr = call(text_path)
    #print(f"Got json for '{text_path}'")
  except Exception as err:
    print(f"Error for file[{text_path}] is:\n{err}")
  return json_arr


def get_wrong_keyword_dict2(text_path_list):
  wrong_keyword_dict = {}
  json_arr_list = list(map(get_json_array_list, text_path_list))
  wrong_keyword_dict = {
    idx: set([list(element.values())[0] for element in json_arr if json_arr is not None]) 
    for idx, json_arr in enumerate(json_arr_list)
  }
  return wrong_keyword_dict


def get_wrong_keyword_dict(text_files_list, with_thread=False, with_process=False):

    def get_sorted_dict(json_arr_list):
      wrong_keyword_dict = {
        idx: set([list(element.values())[0] for element in json_arr if json_arr]) 
        for idx, json_arr in enumerate(json_arr_list)
      }
      return dict(sorted(wrong_keyword_dict.items(), key=lambda item: item[0]))

    if with_thread:
        # take care so that unnecessary thread should not be created
        workers = min(MAX_WORKERS, len(text_files_list))
        with futures.ThreadPoolExecutor(max_workers=workers) as executor:
            json_arr_list = executor.map(get_json_array_list, text_files_list)
        return get_sorted_dict(json_arr_list)
    if with_process:
        with futures.ProcessPoolExecutor(max_workers=4) as executor:
            json_arr_list = executor.map(get_json_array_list, text_files_list)
        return get_sorted_dict(json_arr_list)
    else:
      json_arr_list = list(map(get_json_array_list, text_files_list))
      wrong_keyword_dict = {
        idx: set([list(element.values())[0] for element in json_arr if json_arr is not None]) 
        for idx, json_arr in enumerate(json_arr_list)
      }
      return wrong_keyword_dict


def extract_sentence(wrong_kerword_list, sample_text_list):
  match_keyword_dict = {}
  # create file to write cordinate 
  #icd_keyword_found_filename = open("icd_keyword_found.txt", "w")
  #icd_keyword_found_filename2 = open("icd_keyword_match.txt", "w")
  for key, kerword_set in wrong_kerword_list.items():
    match_dicts = {}
    for key_phrase in kerword_set:
      #print(key, key_phrase)
      keyword_found_output2 = f"Page-{key} | {key_phrase} |\n"
      #icd_keyword_found_filename2.write("%s\n" % keyword_found_output2)

      with open(sample_text_list[key], "r") as f:
        file_txt = f.read()
      # match_list = re.findall(f"([^\n]*?(?i){key_phrase}[^.]*\.)", file_txt)
      match_list = re.findall(f"([^\n]*{key_phrase}[^\n]*\n)", file_txt)
      if match_list:
        match_dicts[key_phrase] = [match.replace("\n", "") for match in match_list]
    match_keyword_dict[key] = match_dicts
    #keyword_found_output = f"Page-{key} | {key_phrase} | {match_dicts}|\n"
    #icd_keyword_found_filename.write("%s\n" % keyword_found_output)

  #icd_keyword_found_filename.close()
  #icd_keyword_found_filename2.close()
  return match_keyword_dict


def purge(file_path):
  for f in glob.glob(file_path):
    os.remove(f)

##Core Classes

In [3]:
class Highlighter:
  def __init__(self):
      # loading and updating patterns for ICD-10 code
      self.nlp_code10 = English()
      self.nlp_code10.add_pipe("entity_ruler").from_disk("icd10_code_patterns-v4.jsonl")

      # define required directory path
      self.PDF_FILES_PATH = "pdf-files"
      self.TXT_FILES_PATH = "txt-files"
      self.OUTPUT_FILES_PATH = "output"
      create_directory(self.PDF_FILES_PATH)
      create_directory(self.TXT_FILES_PATH)
      create_directory(self.OUTPUT_FILES_PATH)


  def split_pdf(self, pdf_path):
      pdf_in_file = open(pdf_path, "rb")
      pdf = PdfReader(pdf_in_file)
      pdf_list = []
      for page in range(len(pdf.pages)):
          input_pdf = PdfReader(pdf_in_file)
          output = PdfWriter()
          #output.addPage(input_pdf.getPage(page))
          output.add_page(input_pdf.pages[page])
          with open(f"{self.PDF_FILES_PATH}/page-{page}.pdf", "wb") as outputStream:
              output.write(outputStream)
              pdf_list.append(f"page-{page}.pdf")
      return pdf_list

  def extract_text_from_pdf(self, pdf_list):
      txt_file_list = []
      i = 0
      for pdf_file in pdf_list:
          with open(os.path.join(self.PDF_FILES_PATH, pdf_file), "rb") as f:
              pdf = pdftotext.PDF(f)

          # Read all the text into one string
          pdf_text = "\n\n".join(pdf)

          # write text into file
          with open(f"{self.TXT_FILES_PATH}/page-{str(i)}.txt", "a") as f:
              f.write(pdf_text)
          txt_file_list.append(f"{self.TXT_FILES_PATH}/page-{str(i)}.txt")
          i += 1
      return txt_file_list

  def highlight_icd_code(self, icd10_code_dict, pdf_file_name=None, cords_file_name=None):
      pdf_file = fitz.open(pdf_file_name)
      # create file to write coordinate
      txt_output_file_name = open(f"{self.OUTPUT_FILES_PATH}/{cords_file_name}", "a")

      def highlight_pdf(highlight, icd10_code):
          cords_list = []
          for inst in highlight:
            highlight = page.add_highlight_annot(inst)
            highlight.update()
            highlight = page.search_for(icd10_code)
            cords_list.append(highlight)

          if cords_list:
            num_page = page_num + 1
            code_cors_output = f"Page-{num_page} | {reverse_code_pattern(icd10_code)} | {cords_list} \n"
            txt_output_file_name.write("%s\n" % code_cors_output)

      for page_num, page in enumerate(pdf_file):
          # highlight ICD-10 code
          if page_num in icd10_code_dict:
              for code in icd10_code_dict[page_num]:
                  highlight = page.search_for(code)
                  if len(highlight) == 0:
                      alternate_code_list = self.get_opt_pattern(code)
                      for alt_code in alternate_code_list:
                          highlight = page.search_for(alt_code)
                          # highlight pdf for option pattern
                          highlight_pdf(highlight, alt_code)
                  # highlight pdf for main pattern
                  highlight_pdf(highlight, code)

      txt_output_file_name.close()

      pdf_output_file_name = f"{self.OUTPUT_FILES_PATH}/{pdf_file_name.split('/')[1].split('.')[0]}_output.pdf"
      pdf_file.save(pdf_output_file_name, garbage=4, deflate=True, clean=True)

      return pdf_output_file_name, cords_file_name

  def get_opt_pattern(self, icd_10_code):
    # create alternate pattern
    code_arr = icd_10_code.split(".")
    if len(code_arr) > 1:
      code1 = f"{code_arr[0]}. {code_arr[1]}"
      code2 = f"{code_arr[0]} .{code_arr[1]}"
      code3 = f"{code_arr[0]} . {code_arr[1]}"
      return [code1, code2, code3]
    else:
      return icd_10_code

  def search_icd_code(self, txt_list):
    pdf_page_vocab = {}
    for txt_file in txt_list:
      with open(txt_file, "r") as f:
        page_txt = f.read()

        # check the page that have line number instead of code
        index_page = False
        if re.search("(P[ ][0-9]+)(,\s)(L[0-9]+)", page_txt):
          index_page = True

        doc = self.nlp_code10(page_txt)
        code_list = []
        for ent in doc.ents:
          if index_page:
            # check the code contain letter "L"
            if re.search("(L[0-9]+)", ent.text):
              continue
            else:
              code_list.append(ent.text)
          else:
            code_list.append(ent.text)

        #code_list = [ent.text for ent in doc.ents if not re.search("(P[ ][0-9]+)(,\s)(L[0-9]+)", ent.text)]
        if len(code_list) != 0:
            page_number = int(txt_file.split("/")[1].split(".")[0].split("-")[1])
            pdf_page_vocab[page_number] = code_list
            # print(f"Page[{txt_file.split('/')[1]}]: {code_list}")
    return pdf_page_vocab


def reverse_code_pattern(p_code):
  orig_code = ""

  # check for code contains space(" ")
  tmp_code = p_code.split(" ")
  if len(tmp_code) > 1:
    orig_code = f"{tmp_code[0].strip()}.{tmp_code[1].strip()}"

  # check for code contains dot(".")
  tmp_code = p_code.split(".")
  if len(tmp_code) > 1:
    orig_code = f"{tmp_code[0].strip()}.{tmp_code[1].strip()}"
  
  # check for code contains comma(",")
  tmp_code = p_code.split(",")
  if len(tmp_code) == 2:
    orig_code = f"{tmp_code[0].strip()}.{tmp_code[1].strip()}"
  elif len(tmp_code) == 2:
    orig_code = f"{tmp_code[0].strip()}.{tmp_code[2].strip()}"

  # handle if the first char of code is missing
  alphabats = {"Z": "2", "B": "8", "O": "0", "S": "5", "l": "1", "G": "6"}
  for key, val in alphabats.items():
    if orig_code.startswith(val):
      orig_code = orig_code.replace(val, key)
      break

  return orig_code

def create_directory(dir_name):
  if not os.path.exists(dir_name):
    os.makedirs(dir_name)

In [None]:
class SentenceExtractor:
  def __init__(self):
    self.MAX_WORKERS = 20

  def get_json_array_list(self, text_path):
    json_arr = None
    try:
      # print(f"Running '{text_path}'")
      json_arr = call(text_path)
      # print(f"Got json for '{text_path}'")
    except Exception as err:
      print(f"Error for file[{text_path}] is:\n{err}")
    return json_arr

  def get_wrong_keyword_dict(self, text_files_list, with_thread=False, with_process=False):
    def get_sorted_dict(p_json_arr_list):
      wrong_keyword_dict = {
        idx: set([list(element.values())[0] for element in json_arr if json_arr])
        for idx, json_arr in enumerate(p_json_arr_list)
      }
      return dict(sorted(wrong_keyword_dict.items(), key=lambda item: item[0]))

    if with_thread:
      # take care so that unnecessary thread should not be created
      workers = min(self.MAX_WORKERS, len(text_files_list))
      with futures.ThreadPoolExecutor(max_workers=workers) as executor:
        json_arr_list = executor.map(self.get_json_array_list, text_files_list)
      return get_sorted_dict(json_arr_list)
    if with_process:
      with futures.ProcessPoolExecutor(max_workers=4) as executor:
        json_arr_list = executor.map(self.get_json_array_list, text_files_list)
      return get_sorted_dict(json_arr_list)
    else:
      json_arr_list = list(map(self.get_json_array_list, text_files_list))
      tmp_wrong_keyword_dict = {
        idx: set([list(element.values())[0] for element in json_arr if json_arr is not None])
        for idx, json_arr in enumerate(json_arr_list)
      }
      return tmp_wrong_keyword_dict

  def extract_sentence(self, wrong_keyword_list, sample_text_list):
    match_keyword_dict = {}
    for key, keyword_set in wrong_keyword_list.items():
      match_dicts = {}
      for key_phrase in keyword_set:
        # print(key, key_phrase)
        with open(sample_text_list[key], "r") as f:
          file_txt = f.read()
        # match_list = re.findall(f"([^\n]*?(?i){key_phrase}[^.]*\.)", file_txt)
        match_list = re.findall(f"([^\n]*{key_phrase}[^\n]*\n)", file_txt)
        if match_list:
          match_dicts[key_phrase] = [match.replace("\n", "") for match in match_list]
      match_keyword_dict[key] = match_dicts
    return match_keyword_dict

##Single Searching & Highlighting

In [None]:
# Step-0: Load prerequisite instance
# create nlp instance
nlp_keyword = spacy.load('en_core_web_sm')

# loading and updating patterns for ICD-10 code
nlp_code10 = English()
nlp_code10.add_pipe("entity_ruler").from_disk("./icd10_code_patterns-v3.jsonl")

# loading and updating patterns for ICD-9 code
#nlp_code9 = English()
#nlp_code9.add_pipe("entity_ruler").from_disk("./icd9_code_patterns-v1.jsonl")

<spacy.pipeline.entityruler.EntityRuler at 0x7f0ddf861c40>

In [None]:
purge("pdf-files/*.pdf")
purge("txt-files/*.txt")

In [None]:
# Step-1: spliting pdf file
pdf_file_name = "APS386.pdf"
pdf_list = split_pdf(pdf_file_name)

# Step-2: Extracting text from pdf
txt_list = extract_text_from_pdf(pdf_list)

In [None]:
# Step-3: Searching ICD-10 code
page_code10_dict = search_icd_code(txt_list, nlp_code10, code_type="ICD-10")

In [None]:
%%time

# Step-4: Get coloset match of ICD-10 keyword
wrong_keyword_dict = get_wrong_keyword_dict(txt_list)
# wrong_keyword_dict = get_wrong_keyword_dict(txt_list, with_thread=True)
# wrong_keyword_dict = get_wrong_keyword_dict(txt_list, with_process=False)
# wrong_keyword_dict = get_wrong_keyword_dict2(txt_list)

CPU times: user 5min 35s, sys: 13.5 s, total: 5min 49s
Wall time: 5min 42s


In [None]:
wrong_keyword_dict[21]

{'Acute maxillary sinusitis',
 'Back Pain',
 'Body mass index [BMI]',
 'Contact with and (suspected) exposure to COVID-19',
 'Cough',
 'Dietary counseling and surveillance',
 'Elevated blood-pressure reading, without diagnosis of hypertension',
 'Hernia',
 'Hypertension',
 'Hypertriglyceridemia',
 'Left lower quadrant pain',
 'Low back pain',
 'Overweight',
 'Prostate Cancer',
 'Sciatica',
 'Sprain of calcaneofibular ligament',
 'Sprain of calcaneofibular ligament of right ankle',
 'Sprain of calcaneofibular ligament of right ankle, initial encounter',
 'Thrombocytopenia',
 'sprain'}

In [None]:
%%time

# Step-5: Extract sentence of ICD-10 keyword
icd_keywords_dict = extract_sentence(wrong_keyword_dict, txt_list)

CPU times: user 550 ms, sys: 20.9 ms, total: 571 ms
Wall time: 577 ms


In [None]:
icd_keywords_dict[21]

In [None]:
# Step-6: Highlighting ICD-10 code and keyword into pdf
pdf_output_file, txt_output_file = highlight_icd_code_and_keyword(page_code10_dict, 
                                                                  icd_keywords_dict=icd_keywords_dict,
                                                                  pdf_file_name="APS386.pdf", 
                                                                  cords_file_name="APS386_cords.txt")
print(f"File[{pdf_output_file}] is saved after highlighting ICD-10 code and keyword")
print(f"Highlighted coordinates are saved into [{txt_output_file}] file.")

File[APS386_output.pdf] is saved after highlighting ICD-10 code and keyword
Highlighted coordinates are saved into [APS386_cords.txt] file.


##Multiple Searching & Highlighting

In [None]:
# Step-0: Load prerequisite instance
# create nlp instance
nlp_keyword = spacy.load('en_core_web_sm')

# loading and updating patterns for ICD-10 code
nlp_code10 = English()
nlp_code10.add_pipe("entity_ruler").from_disk("./icd10_code_patterns-v3.jsonl")

<spacy.pipeline.entityruler.EntityRuler at 0x7fd29c418640>

In [None]:
%%time

for pdf_file in os.listdir(ocr_pdf_files_path):
  pdf_file_name = f"{ocr_pdf_files_path}/{pdf_file}"
  cords_file_name = f"{pdf_file_name.split('.')[0]}_cords.txt"

  # Step-1: splitting pdf file
  pdf_list = split_pdf(pdf_file_name)

  # Step-2: Extracting text from pdf
  txt_list = extract_text_from_pdf(pdf_list)

  # Step-3: Searching ICD-10 code
  icd10_code_dict = search_icd_code(txt_list, nlp_code10, code_type="ICD-10")

  # Step-4: Get coloset match of ICD-10 keyword
  wrong_keyword_dict = get_wrong_keyword_dict(txt_list)

  # Step-5: Extract sentence of ICD-10 keyword
  icd_keywords_dict = extract_sentence(wrong_keyword_dict, txt_list)

  # Step-6: Highlighting ICD-10 code and keyword into pdf
  pdf_output_file, txt_output_file = highlight_icd_code_and_keyword(icd10_code_dict, 
                                                                    icd_keywords_dict=icd_keywords_dict,
                                                                    pdf_file_name=pdf_file_name, 
                                                                    cords_file_name=cords_file_name)
  print(f"File[{pdf_output_file}] is saved after highlighting ICD-10 code and keyword")
  print(f"Highlighted coordinates are saved into [{txt_output_file}] file.")

  # remove all pdf and text files
  purge("pdf-files/*.pdf")
  purge("txt-files/*.txt")
  pdf_list = []
  txt_list = []

File[ocr-pdf-files/APS_38600000R_final_output.pdf] is saved after highlighting ICD-10 code and keyword
Highlighted coordinates are saved into [ocr-pdf-files/APS_38600000R_final_cords.txt] file.
CPU times: user 5min 46s, sys: 3.74 s, total: 5min 50s
Wall time: 5min 51s


In [None]:
!mv ocr-pdf-files ocr-pdf-files2

In [None]:
!rm -rf ocr-pdf-files
!mkdir -p ocr-pdf-files

In [None]:
!cp -r ocr-pdf-files2/*.pdf ocr-pdf-files/

In [None]:
!mkdir -p ocr-pdf-files2

In [None]:
purge("ocr-pdf-files/*.txt")
purge("ocr-pdf-files/*_output.pdf")
purge("pdf-files/*.pdf")
purge("txt-files/*.txt")

In [None]:
!zip output.zip ocr-pdf-files/*_cords.txt ocr-pdf-files/*_output.pdf

##Class-based Searching & Highlighting

In [None]:
"input_pdf_files_path/Redacted_Sample.pdf".split("/")[1].split(".")[0]

'Redacted_Sample'

In [None]:
def purge(file_path):
  for f in glob.glob(file_path):
    os.remove(f)

In [None]:
# Step-0: Define prerequisite instance
INPUT_PDF_FILES_PATH = "input_pdf_files_path"

highlighter = Highlighter()

In [None]:
%%time

for pdf_file in os.listdir(INPUT_PDF_FILES_PATH):
  pdf_file_name = f"{INPUT_PDF_FILES_PATH}/{pdf_file}"
  cords_file_name = f"{pdf_file_name.split('/')[1].split('.')[0]}_cords.txt"

  # Step-1: splitting pdf file
  pdf_list = highlighter.split_pdf(pdf_file_name)

  # Step-2: Extracting text from pdf
  txt_list = highlighter.extract_text_from_pdf(pdf_list)

  # Step-3: Searching ICD-10 cod
  icd10_code_dict = highlighter.search_icd_code(txt_list)

  # Step-4: Highlighting ICD-10 code into pdf
  pdf_output_file, txt_output_file = highlighter.highlight_icd_code(icd10_code_dict,
                                                                    pdf_file_name=pdf_file_name,
                                                                    cords_file_name=cords_file_name)
  print(f"File[{pdf_output_file}] is saved after highlighting ICD-10 code")
  print(f"Highlighted coordinates are saved into [{txt_output_file}] file.")

  # remove all pdf and text files
  purge("pdf-files/*.pdf")
  purge("txt-files/*.txt")
  pdf_list = []
  txt_list = []

In [None]:
!rm -rf input_pdf_files_path
!mkdir -p input_pdf_files_path

In [None]:
!rm -rf output

In [None]:
!zip output.zip output/*.*

##Keyword Matching & Highlighting 

- Step 1 - Z87.5
- Step 2 - Personal history of complications of pregnancy, childbirth and the puerperium
- Step 3 - Page keyword
- Step 4 - calculate cosine similirity
- Step 5 - "Green" > 60% otherwise "Yellow"

In [5]:
highlighter = Highlighter()

In [None]:
sent_extractor = SentenceExtractor()

In [9]:
# Step-1: spliting pdf file
pdf_file_name = "APS_38600000R_final.pdf"
pdf_list = highlighter.split_pdf(pdf_file_name)

# Step-2: Extracting text from pdf
txt_list = highlighter.extract_text_from_pdf(pdf_list)

In [10]:
# Step-3: Searching ICD-10 code
page_code10_dict = highlighter.search_icd_code(txt_list)

In [None]:
%%time

# Step-4: Get coloset match of ICD-10 keyword
wrong_keyword_dict = sent_extractor.get_wrong_keyword_dict(txt_list)

CPU times: user 5min 38s, sys: 1.83 s, total: 5min 40s
Wall time: 5min 40s


In [None]:
wrong_keyword_dict[21]

In [None]:
page_code10_dict[21]

['Z68.25',
 '278.9',
 'E78.3',
 'R03',
 'R05',
 'R05',
 'R53.83',
 '268.25',
 'Z78.9',
 'E78.3',
 'R05',
 'J01.00',
 'E78.3',
 'Z00.00',
 'E78.3',
 'R50.9',
 'R50.9',
 'L73.9',
 'Z78.9',
 'Z68.25',
 'Z00.00',
 '212.5',
 '287.5',
 'D69.6',
 'Z78.9',
 '268.25',
 'E78.3',
 '268.25',
 'J01.00',
 '268.25',
 'R10.32',
 'Z78.9',
 'K46.9',
 'Z68.25',
 'J01.00',
 'Z78.9',
 'S93.411A',
 'M54.5',
 'Z78.9',
 'Z68.25']

In [None]:
wrong_keyword_dict[21]

{'Acute maxillary sinusitis',
 'Back Pain',
 'Body mass index [BMI]',
 'Contact with and (suspected) exposure to COVID-19',
 'Cough',
 'Dietary counseling and surveillance',
 'Elevated blood-pressure reading, without diagnosis of hypertension',
 'Hernia',
 'Hypertension',
 'Hypertriglyceridemia',
 'Left lower quadrant pain',
 'Low back pain',
 'Overweight',
 'Prostate Cancer',
 'Sciatica',
 'Sprain of calcaneofibular ligament',
 'Sprain of calcaneofibular ligament of right ankle',
 'Sprain of calcaneofibular ligament of right ankle, initial encounter',
 'Thrombocytopenia',
 'sprain'}

In [None]:
txt_list[21]

'txt-files/page-21.txt'

In [None]:
keyword1 = "Body mass index [BMI] 25.0-25.9, adult"
keyword2 = "Body mass index [BMI]"

SequenceMatcher(None, keyword1, keyword2).ratio()

0.711864406779661

In [None]:
for keyword in wrong_keyword_dict[21]:
  seq = SequenceMatcher(None, keyword, keyword1)
  print(f"{round(seq.ratio(), 3)} : {keyword}")
  if round(seq.ratio(), 3) > .70:
    print(f"Max ratio found: {round(seq.ratio(), 3)} : {keyword}")

0.093 : brace
0.184 : Contact with and (suspected) exposure to COVID-19


In [None]:
curr_keyword = "Unspecified abdominal hernia without obstruction or gangrene"
max([SequenceMatcher(None, keyword, curr_keyword).ratio() for keyword in wrong_keyword_dict[21]])

0.38095238095238093

In [None]:
with open(f"{txt_list[21]}", "r") as f:
  my_text = f.read()

In [None]:
my_text.split("\n")

In [22]:
key_phrase_list = [[key_phrase for key_phrase in textlist.split(",") if len(key_phrase) > 0] for textlist in my_text.split("\n")]
len(key_phrase_list)

52

In [None]:
key_phrase_list

In [24]:
key_phrase_list = []
for textlist in my_text.split("\n"):
  for key_phrase in textlist.split(","):
    if len(key_phrase) > 0:
      key_phrase_list.append(key_phrase)

In [25]:
len(key_phrase_list)

102

In [None]:
key_phrase_list

In [31]:
for key_phrase in key_phrase_list:
  seq = SequenceMatcher(None, key_phrase, "Non-smoker")
  print(f"{round(seq.ratio(), 3)} : {key_phrase}")
  if round(seq.ratio(), 3) > .70:
    print(f"Max ratio found: {round(seq.ratio(), 3)} : {key_phrase}")

0.381 :  Encounters
0.211 : Encounter 21 Date 12/28/202'
0.145 : Diagnosis Contact with and (suspected) exposure to COVID-19
0.211 : Encounter 20 Date 12/27/2021
0.147 : Diagnosis Contact with and (suspected) exposureto COVID-19
0.083 :           Body mass index (BMI) of 25.0-25.9 in adult (268.25)
0.645 :    Non-smoker (278.9)
0.211 : Encounter 19 Date 12/09/2021
0.129 : ROO)        Sinusitis
0.125 :  acute
0.222 :  frontal
0.129 :  Hypertriglyceridemia
0.222 :  sporadic (E78.3)
0.104 :   Elevated blood pressure reading without diagnosis of hypertension
0.0 : (R03.
0.211 : Encounter 18 Date 03/25/2021
0.129 : Diagnosis Contact with and (suspected) exposureto other viral communicable diseases
0.091 :  Cough (R05)
0.211 : Encounter 17 Date 03/25/2021
0.159 : Diagnosis Contact with/suspected exposure to COVID-19
0.067 :          Cough (R05)
0.074 :  Fatigue (R53.83)
0.103 :  Overweight (BMI 25.0 - 29.9)
0.188 :  Body mass index (BMI)
0.1 : of 25.0-25.9 in adult (268.25)
0.69 :  Non-smoke

In [30]:
max([SequenceMatcher(None, key_phrase, "Non-smoker").ratio() for key_phrase in key_phrase_list])

0.6896551724137931

In [37]:
def get_similarity_score(keyword, text_file):
  # load text file
  with open(text_file, "r") as f:
    my_text = f.read()

  # prepare key phrase
  key_phrase_list = []
  for textlist in my_text.split("\n"):
    for key_phrase in textlist.split(","):
      if len(key_phrase) > 0:
        key_phrase_list.append(key_phrase)
  # return max similarity score
  return max([SequenceMatcher(None, k_phrase, keyword).ratio() for k_phrase in key_phrase_list])

In [39]:
get_similarity_score("Non-smoker", txt_list[21])

0.6896551724137931

In [None]:
my_text = "Diagnosis Left lower quadrant pain (R10.32), Non-smoker (Z78.9), Hernia (K46.9), Overweight (BMI 25.0 - 29.9), Body mass index (BMI) of"

max([SequenceMatcher(None, my_text, "Hernia").ratio() for my_text in my_text.split(",")])

0.5714285714285714

In [None]:
for keyword in my_text.split(","):
  seq = SequenceMatcher(None, keyword, "Hernia")
  print(f"{round(seq.ratio(), 3)} : {keyword}")
  if round(seq.ratio(), 3) > .70:
    print(f"Max ratio found: {round(seq.ratio(), 3)} : {keyword}")

0.082 : Diagnosis Left lower quadrant pain (R10.32)
0.16 :  Non-smoker (Z78.9)
0.571 :  Hernia (K46.9)
0.171 :  Overweight (BMI 25.0 - 29.9)
0.065 :  Body mass index (BMI) of


In [None]:
my_string = "the cat and this dog are in the garden"    
splitted = my_string.split("dog")

first = my_string.split("dog")[:2]
second = my_string.split("dog")[2:]
print(first, second)

['the cat and this ', ' are in the garden'] []


In [None]:
my_string.split("dog")[0][1]

'h'