In [199]:
import xml.etree.ElementTree as ET
from typing import NamedTuple
import itertools
import json
from decimal import Decimal
from itertools import groupby, starmap
from functools import cmp_to_key
from typing import List

In [157]:
tree = ET.parse("test_may_2023.xml")

In [158]:
root = tree.getroot()
alltext = root.findall(".//text")

In [211]:
BBox = NamedTuple("BBox", [("left", float), ("top", float), ("right", float), ("bottom", float)])
TextChar = NamedTuple("TextChar", [("char", str), ("font", str), ("bbox", BBox), ("size", float)])
Text = NamedTuple("Text", [("text", str), ("font", str), ("bbox", BBox), ("size", float)])
TextLine = NamedTuple("TextLine", [("fields", List[Text])])

def process_bbox(tag: ET.Element) -> BBox:
    # Expect a comma-separated list of 4 coordinates in order: left, top, right, bottom
    # Parse them into a list of strings
    str_coords = tag.attrib.get("bbox", "0,0,0,0").split(",")
    # Use map to convert to a list of floats
    coords = map(float, str_coords)
    # Unpack coords
    return BBox(*coords)

def process_text_char(tag: ET.Element) -> TextChar:
    bbox = process_bbox(tag)
    text_dict = {
        "char": tag.text,
        "font": tag.attrib.get("font", "Nofont"),
        "size": float(tag.attrib.get("size", "0")),
        "bbox": bbox,
    }
    return TextChar(**text_dict)

test_text = ET.Element("text", {'font': 'EOUWVZ+Utopia-Bold', 'bbox': '136.260,728.144,140.612,736.016', 'size': '7.872'}, text="a")
print(process_text_char(test_text))

TextChar(char=None, font='EOUWVZ+Utopia-Bold', bbox=BBox(left=136.26, top=728.144, right=140.612, bottom=736.016), size=7.872)


In [171]:
# Naive calculation of a bounding box for all text
def grouped_bbox(el: List[TextChar|LinePart]) -> BBox:
    first = el[0]
    last = el[-1]
    return BBox(first.bbox.left, first.bbox.top, last.bbox.right, last.bbox.bottom)

# Grouping for a consecutive string of characters
def proccess_text(chars: List[TextChar]) -> Text:
    text = "".join([tc.char for tc in chars])
    first = chars[0]
    bbox = grouped_bbox(chars)
    font = first.font
    size = first.size
    return Text(text=text, font=font, bbox=bbox, size=size)

In [66]:
LinePart = NamedTuple("LinePart", [("width", int), ("bbox", BBox)])
Line = NamedTuple("Line", [("width", int), ("bbox", BBox)])

def process_line_part(tag: ET.Element) -> LinePart:
    bbox = process_bbox(tag)
    width = int(tag.attrib.get("linewidth", "0"))
    return LinePart(width, bbox)

test_line = ET.Element("line", {"linewidth": 5, "bbox": "583.000,308.750,584.000,308.750"})
print(process_line_part(test_line))

LinePart(width=5, bbox=BBox(left=583.0, top=308.75, right=584.0, bottom=308.75))


In [178]:
Grouping = NamedTuple("Grouping", [("type", type), ("top", float)])

def group_text_horizontal(textchars: List[TextChar]) -> List[Text]:
    groups = []
    cur_group = []

    # Check each pair of TextChars
    for a,b in itertools.pairwise(textchars):
        cur_group.append(a)
        t = abs(a.bbox.right - b.bbox.left)

        # Gaps of > 5 are separate text chunks
        if t > 5:
            groups.append(cur_group)
            cur_group = []

        # Gaps of <= 5 and > 0.7 are spaces
        elif t > 0.7:
            space_bbox = BBox(a.bbox.right, a.bbox.top, b.bbox.left, b.bbox.bottom)
            space = TextChar(char=" ", font=a.font, bbox=space_bbox, size=a.size)
            cur_group.append(space)

    else:
        cur_group.append(b)
        groups.append(cur_group)

    return groups


In [212]:
filter_start = "Details of your account"
transaction_fields = ["date", "description", "withdrawals", "deposits", "balance"]
transaction_field_types = [str, str, Decimal, Decimal, Decimal]
Transaction = NamedTuple("Transaction", list(zip(transaction_fields, transaction_field_types)))

In [206]:
def validate_transaction_headers(header_text: List[Text]) -> bool:
    # Expect at least first 4 headers
    if len(header_text) != len(transaction_fields):
        return False

    # Ensure headers are Text types
    types_match = [type(text) == Text for text in header_text]
    if not all(types_match):
        return False
    
    # Ensure all header text fields match expected transaction fields
    fields_match = [
        header_text.text.lower().startswith(fieldname)
        for header_text, fieldname in zip(header_text, transaction_fields)
    ]
    return all(fields_match)

extracted_headers = [
    Text(text='Date', font='GQYEOP+MetaBoldLF-Roman', bbox=BBox(left=16.8, top=634.928, right=32.618, bottom=643.008), size=8.08),
    Text(text='Description', font='GQYEOP+MetaBoldLF-Roman', bbox=BBox(left=61.92, top=634.928, right=101.322, bottom=643.008), size=8.08),
    Text(text='Withdrawals ($)', font='GQYEOP+MetaBoldLF-Roman', bbox=BBox(left=290.88, top=634.928, right=344.925, bottom=643.008), size=8.08),
    Text(text='Deposits ($)', font='GQYEOP+MetaBoldLF-Roman', bbox=BBox(left=395.52, top=634.928, right=437.085, bottom=643.008), size=8.08),
    Text(text='Balance ($)', font='GQYEOP+MetaBoldLF-Roman', bbox=BBox(left=526.56, top=634.928, right=565.005, bottom=643.008), size=8.08),
]
validate_transaction_headers(extracted_headers)

True

In [214]:
def process_xml(root: ET.Element) -> List[Text|Line]:
    # XML output is organized by page
    pages = root.findall(".//page")
    all_transformed = []
    for page in pages:
        # Page ID should be page number
        page_id = page.attrib.get("id", None)
        if not page_id:
            print("Page ID is ????")
        else:
            print(f"Processing page {page_id}")
    
        page_lines = []

        # Find everything with a bbox (lines, text, etc) attribute
        elements = page.findall(".//*[@bbox]")
        transformed_elements = []
    
        # Transform elements we care about into simpler data
        for e in elements:
            te = None
            if e.tag == "text":
                te = process_text_char(e)
            elif e.tag == "line":
                te = process_line_part(e)
    
            if te:
                transformed_elements.append(te)

        # As we parse through grouped data, decide whether we should include it
        # For RBC Chequing statements, we look for "Details of your account activity"
        # and keep everything after
        relevant_data = False

        # Group consecutive elements that align horizontally
        # Processing lines allows us to split up text
        for k, g in groupby(transformed_elements, lambda tc: Grouping(type(tc), tc.bbox.top)):
            gl = list(g)
    
            first = gl[0]
            group_bbox = grouped_bbox(gl)

            last_text_line = TextLine([])
    
            # Create a Line out of LineParts. This logic is pretty naive but it's not that important
            # We just use Lines as separators to simplify data capture
            if k.type == LinePart:
                if relevant_data:
                    line = Line(first.width, group_bbox)
                    page_lines.append(line)
                    page_lines.append(last_text_line)
                    last_text_line = TextLine([])

            # Create a group of Text objects from a group of TextChars
            elif k.type == TextChar:
                text = "".join([tc.char for tc in gl])
    
                # Skip irrelevant single character groups
                if len(text) == 1:
                    continue
    
                # Group text by closeness of character bboxes
                text_groups = group_text_horizontal(gl)
                for ltext in text_groups:
                    text = proccess_text(ltext)
                    last_text_line.fields.append(text)

            # Process all data after "Details of your account"
            # This may not be consistent
            if text.text.startswith(filter_start):
                relevant_data = True
    

        else:
            # Cleanup any missed text that isn't followed by a Line
            if k.type == TextChar:
                page_lines.append(last_text_line)

        ##### Process lines into transactions
        page_transactions = []
        
        # Verify headers
        header_text = page_lines.pop(0)
        # header_text = page_lines[0]
        if not validate_transaction_headers(header_text):
            # Skip the rest of this page if it doesn't match what we expect
            print(f"ERROR: Incorrect transaction headers for page: {header_text}")
            continue

        cur_transaction = Transaction("", "", "", "", "")
        for item in page_lines:
            print(item)

        all_transformed.append(page_transactions)
    
    return all_transformed

processed_xml = process_xml(root)

Processing page 1


IndexError: pop from empty list

In [None]:
print(json.dumps(processed_xml, indent=2))