In [2]:
import json
import os

expected_headers = [
    {"Posting", "Date", "Effective", "Branch", "Journal", "Transaction", "Description", "Amount", "DB/CR", "Balance"},
    {"Tgl", "Transaksi", "No.", "Dokumen", "Uraian", "Tipe", "Mutasi", "Saldo"},
    {"TIME", "REMARK", "DEBET", "CREDIT", "TELLER", "ID"},
    {"Tanggal", "Keterangan", "Debit", "Kredit", "Saldo", "SEQ"},
    {"Posting", "Date", "Remark", "Reference", "No", "Debit", "Credit", "Balance"},
    {"Date", "Val.Date", "Description", "Reference", "No.", "Debet", "Credit", "Balance"},
    {"Date", "Time", "Value", "Description", "Reference", "No.", "Debit", "Credit", "Saldo"},
]

In [3]:
def find_table_header(json_data, page_idx):
    blocks = json_data["pages"][page_idx]["blocks"]
    
    found_words = {}
    
    for block in blocks:
        for line in block.get("lines", []):
            for word in line.get("words", []):
                value = word["value"]
                found_words[value] = found_words.get(value, []) + [word["geometry"]]
    
    for header in expected_headers:
        matched_count = sum(1 for word in header if word in found_words)
        if matched_count >= len(header) * 0.8: 
            matched_geometries = {word: found_words[word] for word in header if word in found_words}
            return matched_geometries
    
    return None



In [4]:
def find_header_coordinate(matched_geometries, y_tolerance=0.01):
    rows = {}
    
    for word, geometries in matched_geometries.items():
        for geometry in geometries:
            y_coord = geometry[0][1]
            matched_row = None
            for existing_y in rows.keys():
                if abs(existing_y - y_coord) < y_tolerance:
                    matched_row = existing_y
                    break
            
            if matched_row is None:
                rows[y_coord] = []
            else:
                y_coord = matched_row
                
            rows[y_coord].append({
                'word': word,
                'geometry': geometry
            })
    
    header_row = max(rows.items(), key=lambda x: len(x[1]))
    
    header_row_sorted = sorted(header_row[1], key=lambda x: x['geometry'][0][0])
    words_in_order = [item['word'] for item in header_row_sorted]
    geometries = [item['geometry'][0][1] for item in header_row_sorted]
    highest_geometry = max(geometries)
    lowest_geometry = min(geometries)
    
    return {
        'highest_geometry': highest_geometry,
        'lowest_geometry': lowest_geometry,
        'words': words_in_order, #show words for debugging
        'geometries': geometries #show geometries for debugging
    }

In [6]:
file_path = 'sample/scanned/bri-statement_transaction.json'

with open(file_path, 'r') as f:
    json_data = json.load(f)

table_header = find_table_header(json_data, 0) 
table_header

{'Keterangan': [[[0.21151358981092439, 0.296875],
   [0.275008206407563, 0.3095703125]]],
 'Kredit': [[[0.4944787289915966, 0.296875],
   [0.5303669905462185, 0.3076171875]]],
 'Saldo': [[[0.5938616071428572, 0.296875],
   [0.6269892331932774, 0.3076171875]]],
 'Debit': [[[0.37853203781512607, 0.296875],
   [0.4102793461134454, 0.3076171875]]],
 'SEQ': [[[0.802289587710084, 0.296875], [0.8298959427521009, 0.30859375]]],
 'Tanggal': [[[0.34816504726890757, 0.1376953125],
   [0.39647616859243695, 0.1513671875]],
  [[0.0886653098739496, 0.296875], [0.13559611344537814, 0.310546875]]]}

In [11]:
def split_json_data(json_data, lowest_geometry, highest_geometry, page_idx):
    above_header = []
    below_header = []

    blocks = json_data["pages"][page_idx]["blocks"]
    
    for block in blocks:
        for line in block.get("lines", []):
            for word in line.get("words", []):
                    word_y_coord = word['geometry'][0][1]
                    word_x_coord = word['geometry'][0][0]
                    
                    if word_y_coord < lowest_geometry:
                        above_header.append({
                            'value': word['value'],
                            'y_coord': word_y_coord,
                            'x_coord': word_x_coord
                        })
                    elif word_y_coord > highest_geometry:
                        below_header.append({
                            'value': word['value'],
                            'confidence': word['confidence'],
                            'geometry': word['geometry'],
                        })

    return above_header, below_header

In [12]:
def get_statement_id(above_header):
    values = {item['value'] for item in above_header}

    with open("database/statement_settings.json") as s:
        statement_settings = json.load(s)
        
    statement_id = None
    lower_limit = None

    for setting in statement_settings:
        match_statement_count = sum(1 for value in setting["report_type"] if value in values)
        if match_statement_count == len(setting["report_type"]):
            statement_id = setting["id"]
            lower_limit = setting["lower_limit"]

    if statement_id:
        return statement_id, lower_limit
    else:
        return None, None

In [13]:
def get_table_content(below_header, lower_limit, tolerance=0.05):
    rows = []
    table_content = []
    matched_items = [item for item in below_header if item['value'] in lower_limit]

    if matched_items:
        for mached_item in matched_items:
            item_y = mached_item['geometry'][0][1]
            found_row = False
            for row in rows:
                if abs(row[0]['geometry'][0][1] - item_y) <= tolerance:
                    row.append(mached_item)
                    found_row = True
                    break
            if not found_row:
                rows.append([mached_item])

        max_row = max(rows, key=len)
        limit_coordinate = min(max_row, key=lambda item: item['geometry'][0][1])
        limit_coordinate = limit_coordinate['geometry'][0][1]

        for words in below_header:
            if words["geometry"][0][1] < (limit_coordinate - 0.01):
                table_content.append(words)

        return table_content
    
    else:
        return below_header

main process to normalize data and save it into json file!

In [16]:
file_path = 'sample/scanned/bri-statement_transaction.json'

with open(file_path, 'r') as f:
    json_data = json.load(f)

table_headers, header_coordinates, above_headers, below_headers, table_contents = {}, {}, {}, {}, {}
all_pages_data = []

for page in json_data["pages"]:
    page_idx = page["page_idx"]
    words_without_header = []

    table_header = find_table_header(json_data, page_idx) 
    table_headers[page_idx] = table_header #find table header by page index

    if table_headers[0] is not None and table_header is not None:

        header_coordinate = find_header_coordinate(table_headers[page_idx])
        header_coordinates[page_idx] = header_coordinate #find header coordinate by page index

        above_header, below_header = split_json_data(json_data, header_coordinates[page_idx]['lowest_geometry'], header_coordinates[page_idx]['highest_geometry'], page_idx)
        above_headers[page_idx], below_headers[page_idx] = above_header, below_header
        
        if above_headers[0] != []:
            statement_id, lower_limit = get_statement_id(above_headers[0])
        elif above_header[0] == []:
            print("Account Statement is unknown")
            break

        if statement_id:
            table_content =  get_table_content(below_headers[page_idx], lower_limit)
            table_contents[page_idx] = table_content
        else:
            print("Account Statement is unknown")
            break
    elif table_headers[0] is not None and table_header is None:
        for block in page.get("blocks", []):
            for line in block.get("lines", []):
                for word in line.get("words", []):
                    words_without_header.append({
                            'value': word['value'],
                            'confidence': word['confidence'],
                            'geometry': word['geometry'],
                        })

        below_headers[page_idx] = words_without_header
        table_content = get_table_content(below_headers[page_idx], lower_limit)
        table_contents[page_idx] = table_content
    else:
        print("Input is invalid!")
        break

    # save normalize process into json file
    page_data = {
        "page_id": page_idx,
        "statement_id": statement_id,
        "words": []  
    }
    page_data["words"].extend(table_contents[page_idx])
    all_pages_data.append(page_data)

if all_pages_data != []:
    folder_path = "sample/normalized"
    filename = os.path.basename(file_path)
    output_path = os.path.join(folder_path, filename)

    with open(output_path, 'w') as json_file:
        json.dump(all_pages_data, json_file, indent=4)

    print(f'Data has been written to {filename}')

Data has been written to bri-statement_transaction.json
