In [1]:
import fitz #PyMuPDF
import pandas as pd

from typing import List
import re

In [2]:
FOLDER_RAW = "./data/raw"
FOLDER_PROCESSED = "./data/processed"

In [3]:
NOISE_HEADER_MARKERS = {4: "Copom Meeting",
                        5: "bcb.gov.br"}
NOISE_FOOTER_MARKERS = {2: ["exchange policy actions. Questions and comments to gci.bacen@bcb.gov.br", "Information for unrestricted disclosure. It is not intended to "]}

INITIAL_TEXT_MARKERS = {
    1: ["THE BOARD ANALYZED THE RECENT PERFORMANCE ", "THE BOARD ANALYZED THE RECENT EVOLUTION "],
    2: ["THE BOARD ANALYZED THE RECENT PERFORMANCE "],
    3: ["THE MEMBERS OF THE COPOM ANALYZED ", "THE MEMBERS OF THE MONETARY POLICY "]}

#### CODE INCOMPLETE
This code is incomplete and requires further work. I shall return to it in the future.
- Some .PDF files could not be converted to text properly due to encoding problems. E.g., "7 K H  % R D U G  D Q D O";
- Some Minutes that i shall manually adjust: All Version 1 (43->60)... 61, 69... 80, 81, 82... 201, 207, 208.

In [4]:
def pdf_2_text(file_path: str) -> List[str]:
    """
    Extract text from a PDF file using PyMuPDF (fitz).

    Parameters:
     - file_path (str): Path to the PDF file.

    Returns:
     - List[str]: A list of strings, each representing the text extracted from a single page of the .PDF file.
    """
    doc = fitz.open(file_path)
    texts = []
    for page in doc:
        texts.append(page.get_text())
    doc.close()
    return texts

In [5]:
def _remove_page_header_by_noise_marker(page: str, marker: str) -> str:
    """
    Remove the page header, using a noise text marker, and returning all the content after the header.

    Args:
     - page (str): The page text.
     - marker (str): The text marker that shows the end of the header.
     
    Return:
     - str: Page text without the header.
    """
    start_index = page.find(marker)
    if start_index == -1:
        raise ValueError(f"ERROR finding the marker: '{marker}' on the page.")
    return page[start_index + len(marker):]

In [6]:
def _remove_page_footer_by_noise_marker(page: str, marker: str) -> str:
    """
    Remove the page footer, using a noise text marker, and returning all the content before the footer.

    Args:
     - page (str): The page text.
     - marker (str): The text marker that shows the start of the footer.
     
    Return:
     - str: Page text without the header.
    """
    end_index = page.find(marker)
    if end_index == -1:
        return page
        #raise ValueError(f"ERROR finding the marker: '{marker}' on the page.")
    return page[:end_index]

In [7]:
def _find_start_of_content(page: str) -> str:
    """
    Finds the start of the main content on a page, assuming there is no noise header above header.
    The logic assumes that the header ends with the first line that is not followed by a whitespace, indicating the start of a paragraph.
    This method is not perfect, but does most of the job.
    
    Args:
     - page (str): The page text.
     
    Retorna:
     - str: Page text without the header.
    """
    current_pos = 0

    while current_pos < len(page):
        newline_pos = page.find('\n', current_pos) # Finds the next line break.
        if newline_pos == -1:
            return page[current_pos:]

        if page[newline_pos:newline_pos + 2] != "\n ":
            return page[newline_pos + 1:]
        
        current_pos = newline_pos + 1
        
    return "" # Error

In [8]:
def _remove_footnotes_from_minute(page_text: str, minute_version: int) -> str:
    """
    Processes text to remove the footnotes from the main content, for Copom Minute Version 4.
    The footnote detection assumes the note's number will have a maximum of 2 digits.

    Args:
     - page_text (str): The text content extracted from a single page of the PDF.
     - minute_version (int): The format version of the Copom minute.
     
    Returns:
     - wo_footnote_main_text (str): The main text, without footnotes.
    """   
    if minute_version == 4:
        footnote_pattern = re.compile(r'\d{1,2}\s{0,2}[A-Z][^.\n]*\.')
        page_text = re.sub(footnote_pattern, "", repr(page_text))
        return eval(page_text)

    elif minute_version == 5:
        footnote_pattern = re.compile(r'^\s*\d{1,2}\s+(?![.])', re.MULTILINE)

        half_index = int(len(page_text)/2)
        page_text_1 = page_text[:half_index]
        page_text_2 = page_text[half_index:]

        match = footnote_pattern.search(page_text_2)
        if match:
            return page_text_1 + page_text_2[:match.start()]
        else:
            return page_text
        
    else:
        return page_text

In [9]:
def _extract_for_perfect_segregation(texts_pages: List[str], minute_version: int) -> List[str]:
    """Extract text from pages, when there is perfect segregation. It means, when the start of the content is in a segregated page."""

    if minute_version == 1:
        # No need for processing. This only applies for minute version 1 with perfect segregation.
        return texts_pages

    header_noise_marker = NOISE_HEADER_MARKERS.get(minute_version)
    if not header_noise_marker:
        raise ValueError(f"Noise header marker not found for this file version: {minute_version}")
    
    processed_pages = []

    for page in texts_pages:
        page = page.replace('\xa0', ' ')
        if minute_version == 4:
            page_wo_footnote = _remove_footnotes_from_minute(page, minute_version)
            header_removed = _remove_page_header_by_noise_marker(page_wo_footnote, header_noise_marker)
            content = _find_start_of_content(header_removed)
        else:
            page_wo_footnote = _remove_footnotes_from_minute(page, minute_version)
            content = _remove_page_header_by_noise_marker(page_wo_footnote, header_noise_marker)

        processed_pages.append(content)
        
    return processed_pages

In [10]:
def _extract_for_imperfect_segregation(texts_pages: List[str], minute_version: int) -> List[str]:
    """Extract text from pages, when there is IMperfect segregation."""
    
    processed_pages = []
    
    first_page = texts_pages[0].replace('\xa0', ' ')
    first_page_upper = first_page.upper()
    start_index = -1

    initial_text_markers = INITIAL_TEXT_MARKERS.get(minute_version)
    if not initial_text_markers:
        raise ValueError(f"Initial text marker not found FOR this file version: {minute_version}")

    for marker in initial_text_markers:
        start_index = first_page_upper.find(marker)
        if start_index != -1:
            break
    if start_index == -1:
        raise ValueError(f"Initial text marker not found IN this file version: {minute_version}")
    
    footer_noise_marker = NOISE_FOOTER_MARKERS.get(minute_version)
    if not footer_noise_marker:
        pass
        #raise ValueError(f"Noise footer marker not found for this file version: {minute_version}")
    
    first_page_content = first_page[start_index:]
    if footer_noise_marker:
        for footer_marker in footer_noise_marker:
            first_page_content = _remove_page_footer_by_noise_marker(first_page_content, footer_marker)

    processed_pages.append(first_page_content)
    
    for page in texts_pages[1:]:
        cleaned_page = page.replace('\xa0', ' ')
        content = _find_start_of_content(cleaned_page)

        if footer_noise_marker:
            for footer_marker in footer_noise_marker:
                content = _remove_page_footer_by_noise_marker(content, footer_marker)
    
        processed_pages.append(content)
        
    return processed_pages

In [11]:
def adjust_paragraphs_breaks(extracted_text: str, minute_version: int) -> str:
    """
    Adjusts the text extracted from a document, such as the Copom minutes, by removing improper line breaks within paragraphs.

    The function identifies the start of a new paragraph (e.g., "1. ", "A) ", "(i) ")
    and joins the subsequent lines into a single paragraph until a new marker is found.

    Args:
     - extracted_text (str): The full content of the text extracted from the PDF.

    Returns:
     - str: The formatted text with corrected paragraphs, separated by a blank line.
    """
    if minute_version == 1:
        paragraph_start_pattern = re.compile(r'(?:\n\s*)+')
        
        paragraphs = re.split(paragraph_start_pattern, extracted_text.strip())
        formatted_paragraphs = [" ".join(p.splitlines()) for p in paragraphs if p.strip()]

        paragrafos_corrigidos = [formatted_paragraphs[0]]
        for p in formatted_paragraphs[1:]:
            if (p and p[0].islower()) or (p and p[0].isdigit()):
                paragrafos_corrigidos[-1] += " " + p
            else:
                paragrafos_corrigidos.append(p)

        return "\n\n".join(paragrafos_corrigidos)

    elif minute_version == 2:
        paragraph_start_pattern = re.compile(r'(?:\n\s*){2,}')
    
        paragraphs = re.split(paragraph_start_pattern, extracted_text.strip())
        formatted_paragraphs = [" ".join(p.splitlines()) for p in paragraphs if p.strip()]

        paragrafos_corrigidos = [formatted_paragraphs[0]]
        for p in formatted_paragraphs[1:]:
            if p and p[0].islower():
                paragrafos_corrigidos[-1] += " " + p
            else:
                paragrafos_corrigidos.append(p)

        return "\n\n".join(paragrafos_corrigidos)
    
    elif minute_version == 3:
        paragraph_start_pattern = re.compile(r'^\s*(\d{1,2}\.(?!\d)\s*|[A-Z][a-z]+(?: \S+){1,5}\s*$)', re.MULTILINE)

        lines = extracted_text.splitlines()
        corrected_paragraphs = []
        current_paragraph = []
        is_first_line = True

        for line in lines:
            clean_line = line.strip()

            if not clean_line:
                continue

            if is_first_line or paragraph_start_pattern.match(clean_line):
                is_first_line = False
                if current_paragraph:
                    full_paragraph = " ".join(current_paragraph)
                    corrected_paragraphs.append(full_paragraph)
                    
                current_paragraph = [clean_line]
            else:
                if current_paragraph:
                    current_paragraph.append(clean_line)

        if current_paragraph:
            full_paragraph = " ".join(current_paragraph)
            corrected_paragraphs.append(full_paragraph)

        return "\n\n".join(corrected_paragraphs)
    
    
    elif minute_version == 4 or minute_version == 5:
        paragraph_start_pattern = re.compile(r'^\s*(\d+\.\s|[A-Z]\)\s|\([ivx]+\)\s)')
        lines = extracted_text.splitlines()
        corrected_paragraphs = []
        current_paragraph = []

        for line in lines:
            clean_line = line.strip()

            if not clean_line:
                continue

            if paragraph_start_pattern.match(clean_line):
                if current_paragraph:
                    full_paragraph = " ".join(current_paragraph)
                    corrected_paragraphs.append(full_paragraph)
                    
                current_paragraph = [clean_line]
            else:
                if current_paragraph:
                    current_paragraph.append(clean_line)

        if current_paragraph:
            full_paragraph = " ".join(current_paragraph)
            corrected_paragraphs.append(full_paragraph)

        return "\n\n".join(corrected_paragraphs)
    
    else:
        return extracted_text # Error

In [13]:
def extract_minute_from_text(texts_pages: List[str], minute_version: int, perfect_segregation: bool, initial_page: int) -> str:
    """
    Extract the Copom minute text from the list of text of each page of the .pdf file.
    
    Parameters:
     - texts_pages (List[str]): List of strings, each representing the text of a page.
     - minute_version (int): The format version of the Copom minute.
     - perfect_segregation (bool): If True, consider that the minute starts at a specific page, without unrellated topic above it.
     - initial_page (int): The page number where the minute starts.

    Returns:
     - str: The extracted Copom minute text.
    """

    relevant_pages = texts_pages[initial_page - 1:]

    if not relevant_pages: # error
        return "" 

    if perfect_segregation:
        processed_pages = _extract_for_perfect_segregation(relevant_pages, minute_version)
    else:
        processed_pages = _extract_for_imperfect_segregation(relevant_pages, minute_version)
    
    copom_minute_text = "\n".join(processed_pages)
    copom_minute_text = adjust_paragraphs_breaks(copom_minute_text, minute_version)
    
    # Romoving 'Acronyms'
    copom_minute_text = copom_minute_text.split("Acronyms", 1)[0]

    return copom_minute_text

In [None]:
df_minutes_format = pd.read_excel(f"{FOLDER_RAW}/minutes_format.xlsx")
for index, row in df_minutes_format.iterrows():
    if row.Ignore == 1:
        continue
    minute_version = row.Version
    perfect_segregation = row.PerfSeg
    initial_page = row.InitialPage

    texts_pages = pdf_2_text(f"{FOLDER_RAW}/copom_minutes_raw/{row.Titulo}.pdf")
    for i in texts_pages:
        print(i)

    minute_text = extract_minute_from_text(texts_pages, minute_version, perfect_segregation, initial_page)

    try:
        with open(f"{FOLDER_PROCESSED}/copom_minutes_processed/{row.Titulo}.txt", 'w', encoding='utf-8') as f:
            f.write(minute_text)

    except IOError as e:
        print(f"[INFO] Error saving {row.Titulo}: {e}")
