### https://pymupdf.readthedocs.io/en/latest/recipes-annotations.html

In [1]:
!python -V

Python 3.10.12


In [2]:
!mkdir data
!curl -o data/Singapore.pdf https://en.wikipedia.org/api/rest_v1/page/pdf/Singapore
!curl -o data/Afghanistan.pdf https://en.wikipedia.org/api/rest_v1/page/pdf/Afghanistan

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2545k  100 2545k    0     0   336k      0  0:00:07  0:00:07 --:--:--  610k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 3161k  100 3161k    0     0   414k      0  0:00:07  0:00:07 --:--:--  938k


In [3]:
!pip install pypdf pymupdf

Collecting pypdf
  Downloading pypdf-4.3.1-py3-none-any.whl.metadata (7.4 kB)
Collecting pymupdf
  Downloading PyMuPDF-1.24.9-cp310-none-manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting PyMuPDFb==1.24.9 (from pymupdf)
  Downloading PyMuPDFb-1.24.9-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.4 kB)
Downloading pypdf-4.3.1-py3-none-any.whl (295 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m295.8/295.8 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading PyMuPDF-1.24.9-cp310-none-manylinux2014_x86_64.whl (3.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading PyMuPDFb-1.24.9-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (15.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.9/15.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pypdf, PyMuPDFb, pymupdf
Successfully installed PyMuPD

In [4]:
from pypdf import PdfReader

reader = PdfReader("data/Singapore.pdf")
number_of_pages = len(reader.pages)

text = ""
for page in range(number_of_pages):
  page_text = reader.pages[page]
  text += page_text.extract_text()

In [None]:
for page in range(0, number_of_pages-1):
  page = reader.pages[page]
  count = 0

  for image_file_object in page.images:
      with open(str(count) + image_file_object.name, "wb") as fp:
          fp.write(image_file_object.data)
          count += 1

In [None]:
text

## Highlight text from pdf

In [None]:
import os
import fitz
import re
import pandas as pd

data = {'Regex': [r"(?i)\bworld\b", r"\$[0-9]+", r"\((.*)\)"]}
df = pd.DataFrame(data=data)

def extract_sensitive_data(page_text, reg):
    compiled = re.compile(reg)
    return [word[:4] for word in page_text if compiled.search(word[4])]

def redaction():
  path = "data"
  file_list = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
  for filename in file_list:
    doc = fitz.open(os.path.join(path, filename ))

    for page in doc:
      for reg in df['Regex']:
        sensitive_data = extract_sensitive_data(page.get_text("words"), reg)
        for area in sensitive_data:
          annotation = page.add_highlight_annot(area)
          annotation.set_colors(stroke=[0.5, 0.8, 0.8]) #Change colours RGB/255
          annotation.update()

      doc.save(f"{os.path.splitext(filename)[0]}_edited.pdf")

redaction()

In [None]:
import os
import pymupdf
import re
import pandas as pd

data = {'Regex': [r"(?i)\b[a-zA-Z]+istan\b"]}
df = pd.DataFrame(data=data)

def extract_sensitive_data(page_text, reg):
    compiled = re.compile(reg)
    return [word[:4] for word in page_text if compiled.search(word[4])]

def redaction(df):
  path = ""
  file_list = ['Afghanistan_edited.pdf']
  for filename in file_list:
    doc = pymupdf.open(os.path.join(path, filename ))
    print("Number of pages: ", doc.page_count)
    for page in doc:
      text = page.get_text("words")
      for phrase in df['Regex']:
        sensitive_data = extract_sensitive_data(page.get_text("words"), phrase)
        for area in sensitive_data:
          # page.add_underline_annot(area) Green underline is quite faint
          # page.add_strikeout_annot(area) Red strikeout
          # page.add_squiggly_annot(area) Purple underline squiggly
          # page.add_highlight_annot(area) # Yellow highlight
          # page.add_redact_annot(area, fill=(0, 0, 0), cross_out=False) Red box
          # page.apply_redactions()
    doc.save(f"{os.path.splitext(filename)[0]}_edited.pdf")


redaction(df)

Number of pages:  80


In [None]:
!pip install pipdeptree



## Summarisation

In [None]:
import os
import fitz
import re
import spacy
from spacy.lang.en.stop_words import STOP_WORDS
from string import punctuation
from heapq import nlargest
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
import pandas as pd

punctuation = punctuation + '\n'

def summariser(text):
  nltk.download('punkt')
  nltk.download('stopwords')

  stopWords = set(stopwords.words("english"))

  words = word_tokenize(text)
  freqTable = dict()
  for word in words:
      word = word.lower()
      if word in stopWords:
          continue
      if word in punctuation:
          continue
      if word in freqTable:
          freqTable[word] += 1
      else:
          freqTable[word] = 1

  sentences = sent_tokenize(text)
  sentenceValue = dict()

  for sentence in sentences:
      for word, freq in freqTable.items():
          if word in sentence.lower():
              if sentence in sentenceValue:
                  sentenceValue[sentence] += freq
              else:
                  sentenceValue[sentence] = freq

  sumValues = 0
  for sentence in sentenceValue:
      sumValues += sentenceValue[sentence]

  average = int(sumValues / len(sentenceValue))

  summary = ''
  for sentence in sentences:
      if (sentence in sentenceValue) and (sentenceValue[sentence] > (1.2 * average)):
          summary += " " + sentence

  final_summary = [sentence for sentence in sentences if (
      sentence in sentenceValue) and (sentenceValue[sentence] > (1.2 * average))]
  summary = ' '.join(final_summary)

  return final_summary

def extract_sensitive_data(page_text, reg):
    compiled = re.compile(reg)
    #print(compiled)
    return [word[:4] for word in page_text if compiled.search(word[4])]

_special_chars_map = {i: '\\' + chr(i) for i in b'()[]{}?*+-|^$\\.&~#\t\n\r\v\f'}

def escape(pattern):
    """
    Escape special characters in a string.
    """
    if isinstance(pattern, str):
        return pattern.translate(_special_chars_map)
    else:
        pattern = str(pattern, 'latin1')
        return pattern.translate(_special_chars_map).encode('latin1')

def redaction():
  path = "data"
  file_list = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
  for filename in file_list:
    doc = fitz.open(os.path.join(path, filename ))
    page_text = ""
    for page in doc:
      text = page.get_text("text")
      page_text += text

    page_text = re.sub(r"\[[a-zA-Z0-9]+\]", "", page_text)

    reg_list = [r"(?i)\b{}\b".format(escape(item)).replace('\\n','') for item in summariser(page_text)]
    for reg in reg_list:
        sensitive_data = extract_sensitive_data(page.get_text("words"), reg)
        print(reg)
        for area in sensitive_data:
          page.add_highlight_annot(area)

    doc.save(f"{filename}_edited.pdf")
    return
text = redaction()




In [None]:
import os
import fitz
import re
import spacy
from spacy.lang.en.stop_words import STOP_WORDS
from string import punctuation
from heapq import nlargest

def redaction():
  path = "data"
  file_list = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
  for filename in file_list:
    doc = fitz.open(os.path.join(path, filename ))
    page_text = ""
    for page in doc:
      text = page.get_text("text")
      page_text += text
    return page_text

text = redaction()

stopwords = list(STOP_WORDS)
nlp = spacy.load('en_core_web_sm')
doc = nlp(text)
tokens = [token.text for token in doc]
punctuation = punctuation + '\n'


word_frequencies = {}
for word in doc:
  if word.text.lower() not in stopwords:
    if word.text.lower() not in punctuation:
      if word.text not in word_frequencies.keys():
        word_frequencies[word.text] = 1
      else:
        word_frequencies[word.text] += 1

max_frequency = max(word_frequencies.values())
for word in word_frequencies.keys():
  word_frequencies[word] = word_frequencies[word]/max_frequency
sentence_tokens = [sent for sent in doc.sents]


sentence_scores = {}
for sent in sentence_tokens:
  for word in sent:
    if word.text.lower() in word_frequencies.keys():
      if sent not in sentence_scores.keys():
        sentence_scores[sent] = word_frequencies[word.text.lower()]
      else:
        sentence_scores[sent] += word_frequencies[word.text.lower()]

select_length = int(len(sentence_tokens)*0.4)
summary = nlargest(select_length, sentence_scores, key = sentence_scores.get)

final_summary = [word.text for word in summary]
summary = ' '.join(final_summary)

print(summary)

## NER

In [None]:
!python -m spacy download en_core_web_md

In [59]:
import spacy
from spacy import displacy
from spacy.matcher import PhraseMatcher
import random
from spacy.tokens import Doc
from spacy.training import Example
import re

regex = r"(?i)\b(op|operation|obj|objective)\s+[A-Z]+"

text = """
On last Tuesday the operation EDGWARE was carried out. We followed 3 tangos to HENDON but they were lost at GOLDERS GREEN
but we got rid of the idiot so then we found the enemy in Objective BARNET and followed them all the way where we engauged them in
FINCHLEY and carried the bodies back to BANK.
"""
phrase_dict = {'EDGWARE': 'Op name', 'HIGH BARNET': 'Op name', 'HENDON': 'Op name', 'FINCHLEY': 'Op name', 'BRENT CROSS': 'Op name'}

train_data = [(keys, [(0, len(keys), values)]) for keys, values in phrase_dict.items()]
print(train_data)

phrase_patterns = [nlp(text) for text in phrase_dict]

nlp = spacy.load("en_core_web_md")
nlp.disable_pipes('tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer')
print("   Training ...")
optimizer = nlp.create_optimizer()
for _ in range(25):
    random.shuffle(train_data)
    for raw_text, entity_offsets in train_data:
        doc = nlp.make_doc(raw_text)
        example = Example.from_dict(doc, {"entities": entity_offsets})
        nlp.update([example], sgd=optimizer)

# Result after training
print(f"Result AFTER training:")
doc = nlp(text)
for token in doc.ents:
  print(token.text, token.label_, token.start_char, token.end_char)

spacy.explain("GPE")
displacy.render(doc, style="ent")


[('EDGWARE', [(0, 7, 'Op name')]), ('HIGH BARNET', [(0, 11, 'Op name')]), ('HENDON', [(0, 6, 'Op name')]), ('FINCHLEY', [(0, 8, 'Op name')]), ('BRENT CROSS', [(0, 11, 'Op name')])]
   Training ...
Result AFTER training:
last Tuesday DATE 4 16
EDGWARE Op name 31 38
3 CARDINAL 68 69
HENDON Op name 80 86
GOLDERS Op name 109 116
FINCHLEY Op name 256 264


In [None]:
train_data

[('robot', [(0, 5, 'TECHNOLOGY')]), ('economy', [(0, 7, 'MONEY')])]

In [None]:
nlp.pipe_names

['tok2vec', 'attribute_ruler', 'lemmatizer', 'ner']

In [None]:
text = """
On last Tuesday the operation EDGWARE was carried out. We followed 3 tangos to HENDON but they were lost at GOLDERS GREEN
but we got rid of the idiot so then we found the enemy in Objective BARNET and followed them all the way where we engauged them in
FINCHLEY and carried the bodies back to BANK.
"""

In [25]:
import pandas as pd
df = pd.DataFrame({'name': ['hendon', 'edgware', 'golders green', 'brent cross', 'high barnet'],
                   'description': ['zone 3', 'zone 5', 'zone 3', 'zone 4', 'zone 5']})
df['Op name'] = df['name']
df=df.drop(columns=['name', 'description'])
df.to_csv('my_file.csv', index=False)

In [58]:
import csv
with open('my_file.csv') as f:
    phrase_list = [line for line in csv.DictReader(f)]
train_data = [
    (values, [(0, len(values), keys)])
    for phrase_dict in phrase_list
    for keys, values in phrase_dict.items()
]

print(train_data)

[('hendon', [(0, 6, 'Op name')]), ('edgware', [(0, 7, 'Op name')]), ('golders green', [(0, 13, 'Op name')]), ('brent cross', [(0, 11, 'Op name')]), ('high barnet', [(0, 11, 'Op name')])]


In [69]:
from spacy.language import Language
@Language.component("remove")
def remove_ent(doc):
    ents = list(doc.ents)
    for ent in ents:
        if ent.label_ in  ["CARDINAL", "DATE"]:
            ents.remove(ent)
    ents = tuple(ents)
    doc.ents = ents
    return (doc)
Language.component("remove", func=remove_ent)



In [70]:
import spacy
import random
from spacy import util
from spacy.tokens import Doc
from spacy.training import Example
from spacy.language import Language

def print_doc_entities(_doc: Doc):
    if _doc.ents:
        for _ent in _doc.ents:
            print(f"     {_ent.text} {_ent.label_} {_ent.start_char} {_ent.end_char}")
    else:
        print("     NONE")

def customizing_pipeline_component(nlp: Language):
    phrase_dict = {'EDGWARE': 'Op name', 'HIGH BARNET': 'Op name', 'HENDON': 'Op name',
                   'FINCHLEY': 'Op name', 'BRENT CROSS': 'Op name'}

    train_data = [(keys, [(0, len(keys), values)]) for keys, values in phrase_dict.items()]

    # Result before training
    print(f"\nResult BEFORE training:")
    doc = nlp(text)
    print_doc_entities(doc)

    # Disable all pipe components except 'ner'
    disabled_pipes = []
    for pipe_name in nlp.pipe_names:
        if pipe_name != 'ner':
            nlp.disable_pipes(pipe_name)
            disabled_pipes.append(pipe_name)

    print("   Training ...")
    optimizer = nlp.create_optimizer()
    for _ in range(25):
        random.shuffle(train_data)
        for raw_text, entity_offsets in train_data:
            doc = nlp.make_doc(raw_text)
            example = Example.from_dict(doc, {"entities": entity_offsets})
            nlp.update([example], sgd=optimizer)

    # Enable all previously disabled pipe components
    for pipe_name in disabled_pipes:
        nlp.enable_pipe(pipe_name)

    # Result after training
    print(f"Result AFTER training:")
    doc = nlp(text)
    print_doc_entities(doc)

def main():
    nlp = spacy.load('en_core_web_md')
    nlp.add_pipe("remove")
    customizing_pipeline_component(nlp)


if __name__ == '__main__':
    main()


Result BEFORE training:
     EDGWARE PERSON 31 38
     HENDON LOC 80 86
     GOLDERS ORG 109 116
   Training ...
Result AFTER training:
     EDGWARE Op name 31 38
     HENDON Op name 80 86
     GOLDERS Op name 109 116
     FINCHLEY Op name 256 264


In [62]:
### PHRASE MATCHER

import spacy
from spacy.matcher import PhraseMatcher

nlp = spacy.load("en_core_web_sm")
matcher = PhraseMatcher(nlp.vocab)
terms = ["Barack Obama", "Angela Merkel", "Washington, D.C."]
# Only run nlp.make_doc to speed things up
patterns = [nlp.make_doc(text) for text in terms]
matcher.add("TerminologyList", patterns)

doc = nlp("German Chancellor Angela Merkel and US President Barack Obama "
          "converse in the Oval Office inside the White House in Washington, D.C.")
matches = matcher(doc)
for match_id, start, end in matches:
    span = doc[start:end]
    print(span.text)

Angela Merkel
Barack Obama
Washington, D.C.


I may have a solution that involves a few steps. I haven't fully tested it and it may still have some bugs. One problem is that it takes a long time if the PDF is large and the character range is at the end of the PDF.

First, I try to find the blocks that contain the character range and combine them into a block rectangle. Since it might be on more than one page, it is split into parts for each page.
Second, I use this range as a clip parameter to find the words and iterate through the words to find the rectangles that contain parts of the term.
Finally, I combine the rectangles found that are on the same line. So in the end I have single rectangles per page and line that contain the range of characters.


In [None]:
import fitz

def find_blocks(pdf_file: str, start_char: int, end_char: int):
    """
    Find the blocks containing the character range. Return the block rectangle with the corresponding page number.
        :param pdf_file: Path to the PDF file.
        :param start_char: Start character of the range.
        :param end_char: End character of the range.
    """
    dict_blocks = {}
    block_text = ""
    with fitz.open(pdf_file) as doc:
        for page in doc:
            blocks = page.get_text("blocks")
            for block in blocks:
                block_text += block[4]
                if len(block_text) > start_char:
                    if page.number in dict_blocks:
                        dict_blocks[page.number].append(block[:4])
                    else:
                        dict_blocks[page.number] = [block[:4]]
                if len(block_text) > end_char:
                    return dict_blocks

def find_text_coordinates_on_page(pdf_file: str, page_number: int, block_box: fitz.Rect, term: str):
    """
    Find the exact coordinates of the text given by the character range.
        :param pdf_file: Path to the PDF file.
        :param page_number: Page number of the PDF file.
        :param block_box: The block rectangle containing the character range.
        :param term: The text to find the coordinates for.
    """
    with fitz.open(pdf_file) as doc:
        page = doc[page_number]
        words = page.get_text("words", clip=block_box)
        term = term.replace("\n", " ")
        term_list = term.split(" ")
        term_list = [term for term in term_list if term]
        text_coordinates = []
        if not term_list:
            return text_coordinates
        # iterate through the words and find the coordinates of the term
        for word in words:
            if term_list[0] in word[4]:  # check if the first word in the term is in the word
                term_found = True
                text_coordinates.append(fitz.Rect(word[:4]))
                if len(term_list) == 1:  # if the term has only one word, return the coordinates
                    return text_coordinates
                # check if the next words in the term are in the next words
                index = words.index(word)
                for i in range(1, len(term_list)):
                    if index + i >= len(words):  # if the index is out of range, break the loop
                        term_found = False
                        break
                    text_coordinates.append(fitz.Rect(words[index + i][:4]))
                    if term_list[i] not in words[index + i][4]:  # if the word is not in the next word, break the loop
                        text_coordinates = []
                        term_found = False
                        break
                if term_found:  # if the term is found, return the coordinates
                    return text_coordinates
        return text_coordinates


def combine_rects(rects: list):
    """
    Function to combine multiple rectangles into one.
        :param rects: List of rectangles to combine.
    """
    x0 = min(rect[0] for rect in rects)
    y0 = min(rect[1] for rect in rects)
    x1 = max(rect[2] for rect in rects)
    y1 = max(rect[3] for rect in rects)
    return fitz.Rect(x0, y0, x1, y1)


def is_same_line(rect1: fitz.Rect, rect2: fitz.Rect, tolerance=5):
    """Check if two rectangles are on the same line.
        :param rect1: the first rectangle
        :param rect2: the second rectangle
        :param tolerance: the tolerance value
        :return: True if the rectangles are on the same line, False otherwise"""
    # Check if two rectangles are on the same line with a tolerance value
    return abs(rect1.y0 - rect2.y0) < tolerance  # return True if the rectangles are on the same line


def combine_rectangles_on_same_line(rectangles: list,):
    """Combine rectangles on the same line.
        :param rectangles: the rectangles to combine
        :return: the combined rectangles"""
    result = []  # list of combined rectangles
    grouped_rectangles = []  # list of grouped rectangles
    print(rectangles)
    rectangles.sort(key=lambda rect: rect.y0)  # sort rectangles by their y-coordinate

    for rect in rectangles:  # iterate over all rectangles
        # If grouped_rectangles is empty or the current rect is on the same line and does not contain the same word
        if not grouped_rectangles or (is_same_line(grouped_rectangles[-1], rect)):
            grouped_rectangles.append(rect)
        else:
            # Combine rectangles on the same line with different text
            combined_rect = combine_rects(grouped_rectangles)
            result.append(combined_rect)
            grouped_rectangles = [rect]

    # Add the last group of rectangles
    if grouped_rectangles:
        combined_rect = combine_rects(grouped_rectangles)
        result.append(combined_rect)

    return result  # return the combined rectangles

def combine_functions(start_char: int, end_char: int, pdf_path: str):
    """
    Combine the functions to find the coordinates for a given character range.
        :param start_char: Start character of the range.
        :param end_char: End character of the range.
        :param pdf_path: Path to the PDF file.
        :return: List of combined rectangles.
    """
    # Open the PDF file and extract the text
    pdf_text = ""
    with fitz.open(pdf_path) as doc:
        for page in doc:
            pdf_text += page.get_text()
    term = pdf_text[start_char:end_char]
    # Find the blocks containing the character range for each page (term might be on multiple pages)
    dict_blocks = find_blocks(pdf_path, start_char, end_char)
    combined_rects = []
    # Find the text coordinates for the term (for each part of the term on the page) and combine them to one rectangle
    for page_number, span_boxes in dict_blocks.items():
        combined_block_boxes = combine_rects(span_boxes)  # combine the block boxes on the same page
        # find the text coordinates for the part of the term that on the page
        text_coordinates = find_text_coordinates_on_page(pdf_path, page_number, combined_block_boxes, term)
        if not text_coordinates:
            continue
        combined_rect_on_line = combine_rectangles_on_same_line(text_coordinates)
        combined_rects.append(combined_rect_on_line)

    return combined_rects

path = "path/to/file"
start_char = 52
end_char = 110
combined_rects = combine_functions(start_char, end_char, path)`

# Spacy pipeline

In [None]:
!python -m spacy download en_core_web_md

In [None]:
import spacy

nlp = spacy.load("en_core_web_md")
doc = nlp(text)

phrase_dict = {'robot': 'TECHNOLOGY', 'economy': 'MONEY'}

train_data = [(keys, [(0, len(keys), values)]) for keys, values in phrase_dict.items()]