In [45]:
# 01_read_pdf.ipynb
# For each text in the CSV extracted in the script '01_read_pdf', it requests the LLM to identify the date.     

In [46]:
# Force to reload extrernal modules every new cell execution
%reload_ext autoreload
%autoreload 2

In [47]:
### IMPORT ###
from pathlib import Path
import csv
from datetime import datetime
import pandas as pd
import PyPDF2
import fitz  # PyMuPDF

In [48]:
### LOCAL IMPORT ###
from config import config_reader
from utilities import list_files_by_type, list_subdirectories

In [62]:
### GLOBALS ###
yaml_config = config_reader.config_read_yaml("config.yml", "config")
download_dir = str(yaml_config["DOWNLOAD_DIR"])
guue_dir = str(yaml_config["GUUE_DIR"])
data_dir = str(yaml_config["DATA_DIR"])
csv_sep = str(yaml_config["CSV_SEP"])
ted_url = str(yaml_config["TED_URL"])
ted_url_http = str(yaml_config["TED_URL_HTTP"])
year_start = int(yaml_config["YEAR_START"])
year_end = int(yaml_config["YEAR_END"])
years_list = list(range(year_start, year_end + 1))
sample_size = int(yaml_config["TEST_SAMPLE"])
# INPUT
dic_lan = {"DE":0, "ES":0, "FR":0, "IT":0, "PT":1} # <-- INPUT: set 1 for the desired language; set only one language at time
dic_markers = {"DE":["Bedingungen für die Öffnung der Angebote", "Abschnitt VI: Weitere Angaben"], 
               "ES":["Condiciones para la apertura de las plicas", "Apartado VI: Información complementaria"], 
               "FR":["Modalités d’ouverture des offres", "Section VI: Renseignements complémentaires"], 
               "IT":["Modalità di apertura delle offerte", "Sezione VI: Altre informazioni"], 
               "PT":["Condições de abertura das propostas", "Secção VI: Informação complementar"]} # <-- INPUT: for each language, set start and end text markers
dic_lan_clean = {"DE":"Tag", "ES":"Fecha", "FR":"Date", "IT":"Data", "PT":"Data"} # <-- Remove verbose word

# OUTPUT
bid_file_text = str(yaml_config["FILE_BID_TEXT"]) # with lang_code
log_pdf = str(yaml_config["LOG_PDF_EXTRACTION"]) 
log_pdf_header = str(yaml_config["LOG_PDF_EXTRACTION_HEADER"]) 

## FUNCTIONS

In [50]:
### FUNCTIONS ###
def extract_text_by_line_v1(pdf_path: str) -> str:
    """
    Extracts and prints the text from a PDF file, line by line.

    Args:
        pdf_path (str): The path to the PDF file to be processed.

    Returns:
        str: Returns None if the file could not be opened, or if no text could be extracted.
    """
    # Initialize a list to hold all extracted text
    full_text = [] # Text of all lines in the PDF
    header_text = [] # Text in PDF header (to be found and avoided as footer)
    try:
        # Open the PDF file in binary read mode
        with open(pdf_path, 'rb') as file:
            # Create a PDF reader object
            pdf_reader = PyPDF2.PdfReader(file)
            
            # Loop through each page in the PDF
            for page_num, page in enumerate(pdf_reader.pages):
                # Extract text from the page
                page_text = page.extract_text()
                
                # Check if text was successfully extracted
                if page_text:
                    # Loop each line of text inside page_text
                    line_count = 0 # to detect header
                    parts_header = None
                    for line in page_text.split('\n'):
                        line_count+=1
                        # print("Line count:",line_count) # debug
                        # print("Line text:",line) # debug
                        if line_count <= 3:
                            header_text.append(line) # Trace the header text
                            if line_count == 1: # Special text in line 1 (header: GU/S S148 -> footer: 03/08/2022 S148)
                                parts_header = line.split(" ")
                                for part in parts_header:
                                    header_text.append(part)
                            if line_count == 2:
                                header_text.append(line)
                                h2 = f"{line} {parts_header[1]}"
                                header_text.append(h2)
                        if line_count > 3: # skip header text
                            if ted_url in line:
                                continue
                            else:
                                if line not in header_text: # not in header_text -> avoid the footer
                                    full_text.append(line)
                else:
                    print(f"Text could not be extracted from page {page_num + 1}")
    except FileNotFoundError:
        print(f"ERROR! File '{pdf_path}'. Error: not found.")
    except Exception as e:
        print(f"ERROR! File '{pdf_path}'. Error: {e}")
    finally:
        return full_text

In [51]:
def extract_text_by_line_v2(pdf_path: str) -> list:
    """
    Extracts text from a PDF file, line by line, while avoiding headers and footers using PyMuPDF.

    Args:
        pdf_path (str): The path to the PDF file to be processed.

    Returns:
        list: A list of strings, each representing a line from the PDF. Returns None if the file could not be opened, or if no text could be extracted.
    """
    full_text = []
    header_text = set()  # Use a set for faster 'in' operations
    ted_url = "ted.europa.eu"  # Example TED URL, replace with actual URL or criteria if necessary

    try:
        # Open the PDF file
        doc = fitz.open(pdf_path)

        for page_num in range(len(doc)):
            # Extract the page
            page = doc[page_num]

            # Extract the text from the page
            page_text = page.get_text()

            if page_text:
                lines = page_text.split('\n')
                for i, line in enumerate(lines):
                    # Extract headers from the first few lines
                    if i < 3:
                        header_text.add(line)
                        if i == 0:
                            parts_header = line.split(" ")
                            header_text.update(parts_header)
                        elif i == 1 and parts_header:
                            h2 = f"{line} {parts_header[1]}"
                            header_text.add(h2)
                    else:
                        # Skip TED URLs or lines that match headers
                        if ted_url not in line and line not in header_text:
                            full_text.append(line)
            else:
                print(f"Text could not be extracted from page {page_num + 1}")

    except FileNotFoundError:
        print(f"ERROR! File '{pdf_path}' not found.")
        return None
    except Exception as e:
        print(f"ERROR! File '{pdf_path}'. Error: {e}")
        return None
    finally:
        if 'doc' in locals():
            doc.close()

    return full_text

In [52]:
def extract_elements_between_markers(string_list:list, start_marker:str, end_marker:str) -> list: 
    """
    Extracts a sublist of elements from the given list of strings, starting from the element that contains the start_marker and ending just before the element that contains the end_marker.

    Args:
        string_list (list of str): The list of strings to be processed.
        start_marker (str): The substring that marks the start of the extraction.
        end_marker (str): The substring that marks the end of the extraction.

    Returns:
        list of str: A sublist of the original list, containing elements between the start and end markers.
    """
    # Find the index of the start marker
    start_index = next((index for index, item in enumerate(string_list) if start_marker in item), None)
    # Find the index of the end marker
    end_index = next((index for index, item in enumerate(string_list) if end_marker in item), None)

    # Return the sublist if both markers are found
    if start_index is not None and end_index is not None and start_index < end_index:
        return string_list[start_index + 1:end_index]
    else:
        return []  # Return an empty list if markers are not found or in the wrong order

In [53]:
def filename_to_caseid(filename:str) -> str:
    """ 
    Given the name of a file, derive its case ID merging YEAR and file ID.

    Args:
        filename (str): The string containing the file name.
    
    Returns:
        str: YEARID
    """
    # Split the filename by dashes
    parts = filename.split('-')
    
    # Extract the first element
    first_element = parts[0]
    
    # Extract the third element and remove leading zeros
    third_element = parts[2].lstrip('0')
    
    # Concatenate the two parts
    result = first_element + third_element
    
    return result

## MAIN

In [54]:
### MAIN ###
print()
print("*** PROGRAM START ***")
print()

start_time = datetime.now().replace(microsecond=0)
print("Start process:", str(start_time))
print()


*** PROGRAM START ***

Start process: 2024-09-03 16:30:10



In [55]:
# print(yaml_config) # debug

In [56]:
print(">> Settings")
path_guee = Path(download_dir) / guue_dir
list_dirs = list_subdirectories(path_guee)
list_dirs_len = len(list_dirs)
print(f"Directories with GUUE found ({list_dirs_len}): {list_dirs}")
print("Years list")
print(years_list)
lang_code = None
key_with_value_1 = [key for key, value in dic_lan.items() if value == 1]
if key_with_value_1: 
    lang_code = key_with_value_1[0]
print("Desired language code for markers:", lang_code)
bid_opening_marker_start = dic_markers[lang_code][0]
bid_opening_marker_end = dic_markers[lang_code][1]
print("Markers (start):", bid_opening_marker_start)
print("Markers (end):", bid_opening_marker_end)
bid_file_text_lang = bid_file_text.replace("LANG", lang_code)
print("Output file with BID OPENING sections:", bid_file_text_lang)

>> Settings
Directories with GUUE found (5): ['DE', 'ES', 'FR', 'IT', 'PT']
Years list
[2016, 2017, 2018, 2019, 2020, 2021, 2022]
Desired language code for markers: PT
Markers (start): Condições de abertura das propostas
Markers (end): Secção VI: Informação complementar
Output file with BID OPENING sections: bid_opening_text_PT.csv


In [57]:
print(">> Preparing the output timing log")
path_log = Path(data_dir) / log_pdf
if not path_log.exists():
    # If the file does not exist, create it with the specified header
    with path_log.open(mode='w') as fp:
        fp.write(f"{log_pdf_header}\n")
    print(f"File created with header: {path_log}")
else:
    print(f"The file already exists: {path_log}")

>> Preparing the output timing log
The file already exists: data/ted_log_pdf_extraction.csv


In [58]:
# Create list of PDF files of a language
print(">> Listing PDF files")
list_pdf_files = []
path_guee_lang = Path(download_dir) / guue_dir / lang_code
print("Directory (lang):", path_guee_lang)
for year_value in years_list:
    path_guee_lang_year = path_guee_lang / str(year_value)
    list_pdf_files_y = list_files_by_type(path_guee_lang_year, "pdf")
    list_pdf_files_y_len = len(list_pdf_files_y)
    print(f"Files found for year {year_value}: {list_pdf_files_y_len}")
    list_pdf_files.extend(list_pdf_files_y)
list_pdf_files_len = len(list_pdf_files)
print("Total files found:", list_pdf_files_len)
# print("Files:", list_pdf_files) # debug
print("Sample size (0 = ALL):", sample_size)

>> Listing PDF files
Directory (lang): /Volumes/SAMSUNG-PHD/PhD/Corsi da seguire/Knowledge management and information extraction from structured and unstructured data for process mining (techniques, algorithms, and tools)/Python - GUUE in CSV per LOG/guue/PT
Files found for year 2016: 420
Files found for year 2017: 459
Files found for year 2018: 521
Files found for year 2019: 625
Files found for year 2020: 614
Files found for year 2021: 622
Files found for year 2022: 815
Total files found: 4076
Sample size (0 = ALL): 0


In [59]:
# Parse the files
print(">> Parsing PDF files")

i = 0

list_bid_opening_dic = [] # Container of texts of interest extracted from PDFs

for pdf_file in list_pdf_files:
    i+=1
    if sample_size!=0 and i>sample_size:
        break
    print(f"[{i} / {list_pdf_files_len}]")
    print("Reading (path):", str(pdf_file))
    # text_list = extract_text_by_line_v1(pdf_file) # Extracts all lines of text from the PDF (with PyPDF2)
    text_list = extract_text_by_line_v2(pdf_file) # Extracts all lines of text from the PDF (with PyMuPDF)
    text_list_len = len(text_list)
    # print(text_list) # debug

    # prepare the dictionary output for the actual PDF
    file_name = pdf_file.name # get only the file name in str
    case_id  = filename_to_caseid(file_name) # get the case_id from the file_name
    dic_bid_opening = {"file_name":file_name, "case_id":case_id, "text": None}
    
    print(f"Case ID derived from file name '{file_name}': {case_id}")
    print("Lines found in the PDF:", text_list_len)

    # get only lines about bid opening
    if text_list_len > 0:
        marker_list = extract_elements_between_markers(text_list, bid_opening_marker_start, bid_opening_marker_end) 
        marker_list_len = len(marker_list)
        # print(marker_list) # debug
        print("Bid opening strings found between markers:", marker_list_len)
        
        if marker_list_len > 0:
            if len(marker_list) == 1:
                marker_text = marker_list[0] + " | " # If the marker contains only one element adds |
            else:
                marker_text = " | ".join(marker_list)
            # print(marker_text) # debug
            # clean
            marker_text_clean = marker_text.replace(ted_url_http, "")
            dic_bid_opening["text"] = marker_text_clean

    # save the file and the text found in bid opening section into a dictionary
    list_bid_opening_dic.append(dic_bid_opening)
    print()

>> Parsing PDF files
[1 / 4076]
Reading (path): /Volumes/SAMSUNG-PHD/PhD/Corsi da seguire/Knowledge management and information extraction from structured and unstructured data for process mining (techniques, algorithms, and tools)/Python - GUUE in CSV per LOG/guue/PT/2016/2016-OJS001-00000268-pt-ts.pdf
Case ID derived from file name '2016-OJS001-00000268-pt-ts.pdf': 2016268
Lines found in the PDF: 176
Bid opening strings found between markers: 0

[2 / 4076]
Reading (path): /Volumes/SAMSUNG-PHD/PhD/Corsi da seguire/Knowledge management and information extraction from structured and unstructured data for process mining (techniques, algorithms, and tools)/Python - GUUE in CSV per LOG/guue/PT/2016/2016-OJS004-00004017-pt-ts.pdf
Case ID derived from file name '2016-OJS004-00004017-pt-ts.pdf': 20164017
Lines found in the PDF: 170
Bid opening strings found between markers: 0

[3 / 4076]
Reading (path): /Volumes/SAMSUNG-PHD/PhD/Corsi da seguire/Knowledge management and information extraction f

In [60]:
# Create a file with the texts extracted from the PDFs
print(">> Saving bid opening texts")
dtype_dict = {"file_name":"object", "case_id":"object", "text":"object"}
df_bid = pd.DataFrame.from_records(list_bid_opening_dic) # Create a dataframe from a list of dictionaries
df_bid = df_bid.astype(dtype_dict)
df_bid = df_bid.sort_values(by='file_name')
# print(df_bid.head()) # debug

# Get the first part of the data
df_bid['text_date'] = df_bid['text'].apply(lambda x: '0' if pd.isna(x) or x.strip() == '' else x.split('|')[0].strip())
df_bid['text_date'] = df_bid['text_date'].str.replace(dic_lan_clean[lang_code], '', regex=False)  # Removes redundant words
df_bid['text_date'] = df_bid['text_date'].str.strip()

# Save in CSV
path_out = Path(data_dir) / bid_file_text_lang
print("Path CSV:", path_out)
df_bid.to_csv(path_out, sep=csv_sep, index=False, quoting=csv.QUOTE_ALL)
# Save in XLSX
path_out = Path(data_dir) / f"{Path(bid_file_text_lang).stem}.xlsx"
print("Path XLSX:", path_out)
df_bid.to_excel(path_out, sheet_name=f"{Path(bid_file_text_lang).stem}", index=False)


>> Saving bid opening texts
Path CSV: data/bid_opening_text_PT.csv
Path XLSX: data/bid_opening_text_PT.xlsx


In [61]:
# program end
end_time = datetime.now().replace(microsecond=0)
delta_time = end_time - start_time

# Save the timing
print(">> Saving timing to log")
data_len = list_pdf_files_len
if sample_size != 0:
    data_len = sample_size
csv_str = f"{lang_code};{str(start_time)};{str(end_time)};{str(delta_time)};{str(data_len)}\n"
with open(path_log, "a") as fp:
    fp.write(csv_str)
    
print()
print("End process:", end_time)
print("Time to finish:", delta_time)
print()

print()
print("*** PROGRAM END ***")
print()

>> Saving timing to log

End process: 2024-09-03 16:32:37
Time to finish: 0:02:27


*** PROGRAM END ***

