In [1]:
PDF_PATH = "IT_Grundschutz_Kompendium_Edition2022.pdf"

In [2]:
from dataclasses import dataclass
from enum import Enum
from itertools import pairwise
from typing import List, Optional, Union
import json
import re

import pdfminer
import pdfminer.high_level

from spacy.lang.char_classes import ALPHA, HYPHENS, PUNCT, CONCAT_QUOTES
from spacy.lang.de import German

In [3]:
START_ON_PAGE_IDX = 92
pdf_pages = list(pdfminer.high_level.extract_pages(PDF_PATH, laparams=pdfminer.layout.LAParams(boxes_flow=None)))[START_ON_PAGE_IDX:]

In [4]:
box2str = lambda box: box.get_text().strip()
is_text_box = lambda element: isinstance(element, pdfminer.layout.LTTextBoxHorizontal)


@dataclass
class Page:
    page_num: int
    baustein_abbrev: Optional[str]
    body_fragments: List[str]

    @property
    def is_empty(self) -> bool:
        return not self.baustein_abbrev or not self.body_fragments

    @classmethod
    def from_pdfminer_page(cls, page: pdfminer.layout.LTPage) -> 'Page':
        elements = list(filter(is_text_box, page))
        is_empty = len(elements) < 2
        if is_empty:
            return cls(page_num=page.pageid, baustein_abbrev=None, body_fragments=[])
        _, abbrev = elements[:2]
        body_elements = elements[2:-2]  # exclude footer content
        return cls(page_num=page.pageid, baustein_abbrev=box2str(abbrev), body_fragments=list(map(box2str, body_elements)))

pages = [Page.from_pdfminer_page(pdf_page) for pdf_page in pdf_pages]

In [5]:
@dataclass
class BausteinPageRange:
    abbrev: str
    pages: List[Page]


def baustein_page_ranges_from_pages(pages: List[Page]) -> List[BausteinPageRange]:
    baustein_page_ranges = []

    current_baustein_page_range = BausteinPageRange(abbrev='DUMMY', pages=[])
    for page in pages:
        if page.is_empty:
            continue

        found_new_abbrev = current_baustein_page_range.abbrev != page.baustein_abbrev
        if found_new_abbrev:
            current_baustein_page_range = BausteinPageRange(abbrev=page.baustein_abbrev, pages=[page])
            baustein_page_ranges.append(current_baustein_page_range)
        else:
            current_baustein_page_range.pages.append(page)
    
    return baustein_page_ranges

baustein_page_ranges = baustein_page_ranges_from_pages(pages)

In [7]:
WORD_CHAR = f"({HYPHENS}|{PUNCT}|[{ALPHA}{CONCAT_QUOTES}./\d])"
TEXT_CHAR = f"({WORD_CHAR}|\s)"
NEWLINE = r"\n"

RE_BAUSTEIN_ABBREV = re.compile(r"[A-Z]{3,4}\.\d+(\.\d+)*")
RE_BAUSTEIN_HEADING = re.compile(RE_BAUSTEIN_ABBREV.pattern + ":? ")
RE_REQUIREMENT_HEADING = re.compile(RE_BAUSTEIN_ABBREV.pattern + "\.A\d+")
RE_RESPONSIBLE_FUNCTION_LIST = re.compile(f"\[{TEXT_CHAR}+\]")
RE_SECURITY_LEVEL = re.compile(r"\((B|S|H)\)")
RE_REQUIREMENT_HEADING_FULL = re.compile(RE_REQUIREMENT_HEADING.pattern + f"{TEXT_CHAR}+{RE_SECURITY_LEVEL.pattern}")
RE_SECTION_HEADING = re.compile("\d(\.\d+)*\.?")
RE_LIST_ITEM = re.compile("•")
RE_CROSS_REFERENCE_SECTION_HEADING = re.compile("5\.? Anlage")

RE_SUB_HYPHENS = re.compile(f"(?P<ante>{WORD_CHAR})({HYPHENS}){NEWLINE}(?P<post>{WORD_CHAR})")

RE_DO_NOT_MERGE_ACROSS_PAGES = re.compile("(" + "|".join([
    RE_BAUSTEIN_HEADING.pattern,
    RE_REQUIREMENT_HEADING.pattern,
    RE_SECTION_HEADING.pattern,
    "G \d+\.",  # threat heading
    RE_LIST_ITEM.pattern,
    "X",  # entries in cross reference tables
    # concrete headings:
    "CIA-Werte",
    "Zuständigkeiten",
]) + ")")

In [8]:
@dataclass
class Baustein:
    abbrev: str
    name: str
    requirements: List['Requirement']

    @classmethod
    def from_baustein_page_range(cls, baustein_page_range: BausteinPageRange) -> 'Baustein':
        return BausteinParser(baustein_page_range).parse()

    @property
    def title(self) -> str:
        return f"{self.abbrev}: {self.name}"

    def to_md(self) -> str:
        md = f"# {self.title}\n\n"

        for requirement in self.requirements:
            md += requirement.to_md()
        
        md += "\n"
        return md

    def to_simple_dict(self) -> dict:
        return dict(
            abbrev=self.abbrev,
            name=self.name,
            requirements=[req.to_simple_dict() for req in self.requirements]
        )


@dataclass
class Requirement:
    name: str
    statements: List[Union['StatementString', 'StatementListItem']]

    def to_md(self) -> str:
        md = f"## {self.name}\n\n"
        
        for statement in self.statements:
            md += statement.to_md()
        
        md += "\n"
        return md

    def to_simple_dict(self) -> dict:
        return dict(
            name=self.name,
            statements=[stmt.to_simple_dict() for stmt in self.statements]
        )


@dataclass
class StatementBase:
    body: str


@dataclass
class StatementString(StatementBase):
    def to_md(self) -> str:
        return f"- {self.body}\n"

    def to_simple_dict(self) -> dict:
        return dict(
            body=self.body,
            type="string"
        )


@dataclass
class StatementListItem(StatementBase):
    def to_md(self) -> str:
        return f"    - {self.body}\n"

    def to_simple_dict(self) -> dict:
        return dict(
            body=self.body,
            type="list_item"
        )



In [9]:
class Tag(Enum):
    baustein_heading = 'baustein_heading'
    section_heading = 'section_heading'
    requirement_heading = 'requirement_heading'
    body = 'body'
    list_item = 'list_item'


@dataclass
class Para:
    s: str
    tag: Tag

    def __repr__(self) -> str:
        return f"{self.tag:<25}{self.s}"


class CrossReferenceTableReached(Exception):
    pass


nlp = German()
nlp.add_pipe("sentencizer")


class BausteinParser:
    def __init__(self, baustein_page_range):
        self._baustein_page_range = baustein_page_range
        self._raw_paras =[]
        self._paras = []

    @staticmethod
    def should_merge_across_page_boundary(last_fragment_of_prev_page: str, first_fragment_of_next_page: str) -> bool:
        last, first = last_fragment_of_prev_page, first_fragment_of_next_page

        if RE_DO_NOT_MERGE_ACROSS_PAGES.match(first):
            return False

        return not BausteinParser.is_sentence_boundary_between_fragments(last, first)

    @staticmethod
    def is_sentence_boundary_between_fragments(fragment_1: str, fragment_2: str) -> bool:
        # join the fragments and ask spacy if there's a sentence boundary where we joined.
        combined_fragments = f"{fragment_1}\n{fragment_2}"
        start_of_first = len(fragment_1)
        doc = nlp(combined_fragments)
        span = doc.char_span(start_of_first, start_of_first + 2, alignment_mode="expand")
        return span[0].is_sent_start

    @staticmethod
    def split_fragment_into_sentences(fragment):
        return [sent.text for sent in nlp(fragment).sents]

    def parse(self) -> Baustein:
        self.extract_raw_paragraphs()
        self.sanitize()
        return self.parse_baustein()

    def extract_raw_paragraphs(self) -> None:
        did_merge = False
        for page, next_page in pairwise(self._baustein_page_range.pages):
            page_fragments = page.body_fragments
            next_page_first_fragment = next_page.body_fragments[0]

            try:
                if self.should_merge_across_page_boundary(page_fragments[-1], next_page_first_fragment):
                    self._extract_paragraphs_from_page(page, skip_first=did_merge, append=next_page_first_fragment)
                    did_merge = True
                else:
                    self._extract_paragraphs_from_page(page, skip_first=did_merge)
                    did_merge = False
            except CrossReferenceTableReached:
                return
    
    def sanitize(self) -> None:
        self._tag_raw_paras()
        self._fixup_requirement_headings()
        self._remove_bullets_and_fixup_list_items()
        self._dehyphenate()
        self._replace_newlines()

    def parse_baustein(self) -> Baustein:
        baustein = Baustein(abbrev=None, name=None, requirements=[])

        requirement = None
        for para in self._paras:
            if para.tag == Tag.baustein_heading:
                abbrev_match = RE_BAUSTEIN_ABBREV.match(para.s)
                baustein.abbrev = abbrev_match.group()
                baustein.name = para.s[abbrev_match.end() + 1:].strip()
            if requirement and para.tag in (Tag.section_heading, Tag.requirement_heading):
                requirement = None
            if para.tag == Tag.requirement_heading:
                requirement = Requirement(name=para.s, statements=[])
                baustein.requirements.append(requirement)
            if requirement and para.tag == Tag.body:
                for sentence in self.split_fragment_into_sentences(para.s):
                    requirement.statements.append(StatementString(sentence))
            if requirement and para.tag == Tag.list_item:
                requirement.statements.append(StatementListItem(para.s))

        return baustein

    def _dehyphenate(self) -> None:
        for idx, para in enumerate(self._paras):
            while re.match(f"({HYPHENS})", para.s[-1]):
                next = self._paras.pop(idx + 1)
                para.s += "\n" + next.s
            para.s = RE_SUB_HYPHENS.sub(r"\g<ante>\g<post>", para.s)

    def _extract_paragraphs_from_page(self, page: Page, skip_first: bool, append: str = "") -> None:
        start_idx = 1 if skip_first else 0
        paras = page.body_fragments[start_idx:-1] + [page.body_fragments[-1] + append]
        for idx, para in enumerate(paras):
            # stop when reaching the cross-reference tables (which are hard to parse)
            if RE_CROSS_REFERENCE_SECTION_HEADING.match(para):
                self._raw_paras.extend(paras[:idx])
                raise CrossReferenceTableReached()
        self._raw_paras.extend(paras)

    def _fixup_requirement_headings(self) -> None:
        for idx, para in enumerate(self._paras):
            if para.tag != Tag.requirement_heading:
                continue
            while (
                not RE_REQUIREMENT_HEADING_FULL.match(para.s)
                or ("[" in para.s and "]" not in para.s)
            ):
                next = self._paras.pop(idx + 1)
                para.s += "\n" + next.s

    def _remove_bullets_and_fixup_list_items(self) -> None:
        for idx, para in enumerate(self._paras):
            if not para.tag == Tag.list_item:
                continue

            para.s = re.sub("^•\s+", "", para.s)
            if idx < len(self._paras) - 1:
                next = self._paras[idx + 1]
                if next.tag == Tag.body and not self.is_sentence_boundary_between_fragments(para.s, next.s):
                    self._paras.pop(idx + 1)
                    para.s += "\n" + next.s

    def _replace_newlines(self) -> None:
        for para in self._paras:
            para.s = para.s.replace("\n", " ")

    def _tag_raw_paras(self) -> None:
        for raw_para in self._raw_paras:
            if RE_BAUSTEIN_HEADING.match(raw_para):
                tag = Tag.baustein_heading
            elif RE_SECTION_HEADING.match(raw_para):
                tag = Tag.section_heading
            elif RE_REQUIREMENT_HEADING.match(raw_para):
                tag = Tag.requirement_heading
            elif RE_LIST_ITEM.match(raw_para):
                tag = Tag.list_item
            else:
                tag = Tag.body
            self._paras.append(Para(s=raw_para, tag=tag))

bausteine = [BausteinParser(baustein_page_range).parse() for baustein_page_range in baustein_page_ranges]

In [14]:
baustein_dicts = []
baustein_md_links = []

dir = "bausteine_2022"

for baustein in bausteine:
    fname = f"{baustein.abbrev}.md"
    baustein_md_links.append(f"- [{baustein.title}]({fname})")

    with open(f"{dir}/{fname}", "w") as file:
        file.write(baustein.to_md())
    baustein_dicts.append(baustein.to_simple_dict())

with open(f"{dir}/_index.md", "w") as md_index:
    md_index.write("\n".join(baustein_md_links) + "\n")

with open(f"{dir}/bausteine.json", "w") as json_file:
    json.dump(baustein_dicts, json_file, indent=4)