In [4]:
import pandas as pd
import os
from tqdm import tqdm
from dotenv import load_dotenv
import nest_asyncio

from docling.document_converter import DocumentConverter
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.document_converter import (
    DocumentConverter,
    PdfFormatOption,
    WordFormatOption,
    HTMLFormatOption
)
from docling.backend.msword_backend import MsWordDocumentBackend
from docling.backend.html_backend import HTMLDocumentBackend
from docling.datamodel.base_models import InputFormat

from haystack.components.converters import PyPDFToDocument
from haystack.components.converters.pdfminer import PDFMinerToDocument
from haystack.components.converters.docx import DOCXToDocument # require python-docx
from haystack.components.converters import HTMLToDocument

from llama_parse import LlamaParse
from llama_index.core import SimpleDirectoryReader
from llama_index.readers.file.html import HTMLTagReader

from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_community.document_loaders import PyPDFium2Loader
from langchain_community.document_loaders import PDFMinerLoader
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders import Docx2txtLoader
from langchain_community.document_loaders import UnstructuredHTMLLoader
from langchain_community.document_loaders import BSHTMLLoader

import editdistance
from nltk.translate.bleu_score import sentence_bleu
from jiwer import wer

from docling.datamodel.pipeline_options import (
    EasyOcrOptions,
    # OcrMacOptions,
    PdfPipelineOptions,
    RapidOcrOptions,
    TesseractOcrOptions,
)
from langchain_community.document_loaders.parsers import TesseractBlobParser
from langchain_community.document_loaders.parsers import RapidOCRBlobParser

from llmsherpa.readers import LayoutPDFReader


In [5]:
load_dotenv(dotenv_path="../llama-api-key.env")
nest_asyncio.apply()

In [6]:
# DOCUMENT LOADERS INITIALISATION
# avoid re-initialisation for each function call

## DOCLING
default_converter_docling = DocumentConverter()

PYPDFium_converter_docling = DocumentConverter(
    format_options={
            InputFormat.PDF: PdfFormatOption(
                backend=PyPdfiumDocumentBackend
            )
        }
)

DOCX_converter_docling = DocumentConverter(
    format_options={
        InputFormat.DOCX: WordFormatOption(
            backend=MsWordDocumentBackend  # default backend for DOCX files
        )
    }
)

HTML_converter_docling = DocumentConverter(
    format_options={
        InputFormat.HTML: HTMLFormatOption(
            backend=HTMLDocumentBackend  # default backend for HTML files
        )
    }
)


## HAYSTACK
PYPDF_converter_haystack = PyPDFToDocument()

PDFMiner_converter_haystack = PDFMinerToDocument()

DOCX_converter_haystack=DOCXToDocument()

HTML_converter_haystack=HTMLToDocument()

## LLAMAINDEX
llamaparse_llamaindex = LlamaParse(
   result_type="text"
)

HTML_converter_llamaindex=HTMLTagReader(tag='html')


## LANGCHAIN
# DONE IN FUNCTION DIRECTLY


converter_llmsherpa =LayoutPDFReader("http://localhost:5010/api/parseDocument?renderFormat=all")
#if you want to test (deply the following container docker):
# $ sudo docker pull ghcr.io/nlmatics/nlm-ingestor:latest
# $ sudo docker run -p 5010:5001 ghcr.io/nlmatics/nlm-ingestor:latest

In [9]:
# DOCUMENT LOADER FUNCTIONS

def docling_default(file, converter=default_converter_docling):
    result=converter.convert(file)
    return result.document.export_to_text()

def docling_pyPDFium(file, converter=PYPDFium_converter_docling):
    result=converter.convert(file)
    return result.document.export_to_text()

def docling_DOCX(file, converter=DOCX_converter_docling):
    result=converter.convert(file)
    return result.document.export_to_text()

def docling_HTML(file, converter=HTML_converter_docling):
    result=converter.convert(file)
    return result.document.export_to_text()

def haystack_pyPDF(file, converter=PYPDF_converter_haystack):
    result=converter.run(sources=[file])
    documents_text='\n'.join([r.content for r in result['documents']])
    return documents_text

def haystack_PDFMiner(file, converter=PDFMiner_converter_haystack):
    result=converter.run(sources=[file])
    documents_text='\n'.join([r.content for r in result['documents']])
    return documents_text

def haystack_DOCX(file, converter=DOCX_converter_haystack):
    result=converter.run(sources=[file])
    documents_text='\n'.join([r.content for r in result['documents']])
    return documents_text

def haystack_HTML(file, converter=HTML_converter_haystack):
    result=converter.run(sources=[file])
    documents_text='\n'.join([r.content for r in result['documents']])
    return documents_text

def llamaindex_llamaparse(file, converter=llamaparse_llamaindex):
    result=converter.load_data(file)
    documents_text='\n'.join([r.text for r in result])
    return documents_text

def llamaindex_simpleDirectoryReader(file):
    converter=SimpleDirectoryReader(input_files=[file])
    result=converter.load_data()
    documents_text='\n'.join([r.text for r in result])
    return documents_text

def llamaindex_HTML(file, converter=HTML_converter_llamaindex):
    result=converter.load_data(file)
    documents_text='\n'.join([r.text for r in result])
    return documents_text

def langchain_pyPDF(file):
    result=PyPDFLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_PDFPlumber(file):
    result=PDFPlumberLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_PyPDFium2(file):
    result=PyPDFium2Loader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_PyMUPDF(file):
    result=PyMuPDFLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_PDFMiner(file):
    result=PDFMinerLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_Docx2txt(file):
    result=Docx2txtLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_UnstructuredHTML(file):
    result=UnstructuredHTMLLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def langchain_BSHtml(file):
    result=BSHTMLLoader(file).load()
    documents_text='\n'.join([r.page_content for r in result])
    return documents_text

def llmsherpa_default(path,converter = converter_llmsherpa):
    llmsherpa_api_url = "http://localhost:5010/api/parseDocument?renderFormat=all"
    pdf_url = os.path.abspath(path)# also allowed is a file path e.g. /home/downloads/xyz.pdf
    pdf_reader = LayoutPDFReader(llmsherpa_api_url)
    doc = pdf_reader.read_pdf(pdf_url)
    result=''
    for chunk in doc.chunks():
        result += chunk.to_text()
        result += '\n'
    return result

dl_func_mapping={
    'docling_default':docling_default,
    'docling_pyPDFium':docling_pyPDFium,
    'docling_DOCX':docling_DOCX,
    'docling_HTML':docling_HTML,
    'haystack_pyPDF':haystack_pyPDF,
    'haystack_PDFMiner':haystack_PDFMiner,
    'haystack_docx': haystack_DOCX,
    'haystack_html': haystack_HTML,
    'llamaindex_llamaparse':llamaindex_llamaparse,
    'llamaindex_simpleDirectoryReader':llamaindex_simpleDirectoryReader,
    'llamaindex_HTMLTagReader':llamaindex_HTML,
    'langchain_pyPDF':langchain_pyPDF,
    'langchain_PDFPlumber':langchain_PDFPlumber,
    'langchain_PyPDFium2':langchain_PyPDFium2,
    'langchain_PyMUPDF':langchain_PyMUPDF,
    'langchain_PDFMiner':langchain_PDFMiner,
    'langchain_Docx2txt':langchain_Docx2txt,
    'langchain_UnstructuredHTML':langchain_UnstructuredHTML,
    'langchain_BSHtml':langchain_BSHtml,
    'llmsherpa_default':llmsherpa_default,
}

In [10]:
# DOCUMENT LOADER FUNCTIONS

def docling_default_metadata(file, converter=default_converter_docling):
    result=converter.convert(file)
    result_string=""
    for key,value in result.document.model_dump().items():
        result_string+=key+" : "+str(value)+"\n"
    return result_string

def docling_pyPDFium_metadata(file, converter=PYPDFium_converter_docling):
    result=converter.convert(file)
    result_string=""
    for key,value in result.document.model_dump().items():
        result_string+=key+" : "+str(value)+"\n"
    return result_string

def docling_DOCX_metadata(file, converter=DOCX_converter_docling):
    result=converter.convert(file)
    result_string=""
    for key,value in result.document.model_dump().items():
        result_string+=key+" : "+str(value)+"\n"
    return result_string

def docling_HTML_metadata(file, converter=HTML_converter_docling):
    result=converter.convert(file)
    result_string=""
    for key,value in result.document.model_dump().items():
        result_string+=key+" : "+str(value)+"\n"
    return result_string

def haystack_pyPDF_metadata(file, converter=PYPDF_converter_haystack):
    result=converter.run(sources=[file])
    documents_meta=''
    for r in result['documents']:
        for key,value in r.meta.items():
            documents_meta+=key+" : "+str(value)+"\n"
    return documents_meta

def haystack_PDFMiner_metadata(file, converter=PDFMiner_converter_haystack):
    result=converter.run(sources=[file])
    documents_meta=''
    for r in result['documents']:
        for key,value in r.meta.items():
            documents_meta+=key+" : "+str(value)+"\n"
    return documents_meta

def haystack_DOCX_metadata(file, converter=DOCX_converter_haystack):
    result=converter.run(sources=[file])
    documents_meta=''
    for r in result['documents']:
        for key,value in r.meta.items():
            documents_meta+=key+" : "+str(value)+"\n"
    return documents_meta

def haystack_HTML_metadata(file, converter=HTML_converter_haystack):
    result=converter.run(sources=[file])
    documents_meta=''
    for r in result['documents']:
        for key,value in r.meta.items():
            documents_meta+=key+" : "+str(value)+"\n"
    return documents_meta

def llamaindex_llamaparse_metadata(file, converter=llamaparse_llamaindex):
    result=converter.load_data(file)
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def llamaindex_simpleDirectoryReader_metadata(file):
    converter=SimpleDirectoryReader(input_files=[file])
    result=converter.load_data()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def llamaindex_HTML_metadata(file, converter=HTML_converter_llamaindex):
    result=converter.load_data(file)
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_pyPDF_metadata(file):
    result=PyPDFLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_PDFPlumber_metadata(file):
    result=PDFPlumberLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_PyPDFium2_metadata(file):
    result=PyPDFium2Loader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_PyMUPDF_metadata(file):
    result=PyMuPDFLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_PDFMiner_metadata(file):
    result=PDFMinerLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_Docx2txt_metadata(file):
    result=Docx2txtLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_UnstructuredHTML_metadata(file):
    result=UnstructuredHTMLLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def langchain_BSHtml_metadata(file):
    result=BSHTMLLoader(file).load()
    documents_metadata=""
    for r in result:
        for key,value in r.metadata.items():
            documents_metadata+=key+" : "+str(value)+"\n"
    return documents_metadata

def llmsherpa_default_metadata(path,converter = converter_llmsherpa):
    # LLMSHERPA DOES NOT PARSE METADATA
    return ""

dl_func_mapping_metadata={
    'docling_default':docling_default_metadata,
    'docling_pyPDFium':docling_pyPDFium_metadata,
    'docling_DOCX':docling_DOCX_metadata,
    'docling_HTML':docling_HTML_metadata,
    'haystack_pyPDF':haystack_pyPDF_metadata,
    'haystack_PDFMiner':haystack_PDFMiner_metadata,
    'haystack_docx': haystack_DOCX_metadata,
    'haystack_html': haystack_HTML_metadata,
    'llamaindex_llamaparse':llamaindex_llamaparse_metadata,
    'llamaindex_simpleDirectoryReader':llamaindex_simpleDirectoryReader_metadata,
    'llamaindex_HTMLTagReader':llamaindex_HTML_metadata,
    'langchain_pyPDF':langchain_pyPDF_metadata,
    'langchain_PDFPlumber':langchain_PDFPlumber_metadata,
    'langchain_PyPDFium2':langchain_PyPDFium2_metadata,
    'langchain_PyMUPDF':langchain_PyMUPDF_metadata,
    'langchain_PDFMiner':langchain_PDFMiner_metadata,
    'langchain_Docx2txt':langchain_Docx2txt_metadata,
    'langchain_UnstructuredHTML':langchain_UnstructuredHTML_metadata,
    'langchain_BSHtml':langchain_BSHtml_metadata,
    'llmsherpa_default':llmsherpa_default_metadata,
}

In [12]:
# OCR PARSERS

def langchain_pyPDF_tesseract(file):
    langchain_tesseract_converter = PyPDFLoader(
        file,
        mode="page",
        images_inner_format="html-img",
        images_parser=TesseractBlobParser(),
    )
    docs=langchain_tesseract_converter.load()
    documents_text='\n'.join([r.page_content for r in docs])
    return documents_text

def langchain_PyMuPDFLoader_tesseract(file):
    langchain_tesseract_converter = PyMuPDFLoader(
        file,
        mode="page",
        images_inner_format="html-img",
        images_parser=TesseractBlobParser(),
    )
    docs=langchain_tesseract_converter.load()
    documents_text='\n'.join([r.page_content for r in docs])
    return documents_text


# docling
ocr_options_tesseract = TesseractOcrOptions(force_full_page_ocr=True)
tesseract_pipeline_options = PdfPipelineOptions(
    do_ocr=True
)

tesseract_pipeline_options.ocr_options = ocr_options_tesseract
docling_tesseract_converter=DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=tesseract_pipeline_options
        )
    }
)

def docling_tesseract(file,converter=docling_tesseract_converter):
    result=converter.convert(file)
    return result.document.export_to_text()

#CANT FORCE OCR ON DOCLING HTML!!!!


In [13]:
ocr_dl_func_mapping={
    'langchain_pyPDF_tesseract':langchain_pyPDF_tesseract,
    'langchain_PyMuPDFLoader_tesseract':langchain_PyMuPDFLoader_tesseract,
    'docling_tesseract': docling_tesseract,
}

In [14]:
# Edge cases: OCR POISONOING, FONT POISONING
# OCR -> take as reference original text, and compare to text extracted

#making working in different machines
cwd = os.getcwd()
pwd = os.path.dirname(cwd)
ocr_mapping_df = pd.read_csv(os.path.join(pwd,'ocr_mapping.csv'))

# Character Error Rate (CER)
def cer(reference, hypothesis):
    return editdistance.eval(reference, hypothesis) / max(len(reference), 1)

def normalized_levenshtein(reference, hypothesis):
    distance = editdistance.eval(reference, hypothesis)
    max_len = max(len(reference), len(hypothesis), 1)
    return distance / max_len

def test_ocr(df, dl, dl_func, ocr_folder_path, input_format, metrics):
    dir_name=os.path.basename(ocr_folder_path)
    # Get attack name, technique, and files
    attack, technique= dir_name.split('_')
    files=[os.path.join(ocr_folder_path,f) for f in os.listdir(ocr_folder_path)]
    # print(f'    attack: {attack}, technique: {technique}')
    records_to_add=[]
    for file in tqdm(files,desc=f'parsing files with {dl}'):
        
        # get original text from mapping csv
        filename=os.path.basename(file)

        # Extract text
        extracted_text=dl_func(file)
        #if filename not in ocr_mapping_df, throw exception
        if file not in ocr_mapping_df['full_file_path'].values:
            test_result="NULL, FILE MAPPING WITH ORIGINAL TEXT FAILED"
        else:
            original_text=ocr_mapping_df[ocr_mapping_df['full_file_path']==file]['joint_text'].values[0]
            if metrics=='CER':
                test_result=cer(original_text,extracted_text)
            elif metrics=='levenshtein':
                test_result=normalized_levenshtein(original_text,extracted_text)
            elif metrics=='WER':
                test_result=wer(original_text,extracted_text)
            elif metrics=='BLEU':
                test_result=sentence_bleu([original_text.split()],extracted_text.split())
            else:
                raise Exception('Invalid metrics specified! Available metrics: CER, levenshtein, WER, BLEU')

        

        
        # Add new record to dataframe
        records_to_add.append({
            'FileFormat': input_format,
            'Filename': filename,
            'AttackFamily': 'Data Obfuscation',
            'AttackName': attack,
            'Technique': technique,
            'DocumentLoader': dl,
            'AttackResult': test_result,
            'Text_extracted': extracted_text
        })
    # Add records to dataframe
    df=pd.concat([df,pd.DataFrame.from_records(records_to_add)])  
        
    return df

In [15]:

font_poisoning_mapping_df = pd.read_csv(os.path.join(pwd,'font_poisoning_mapping.csv'))

def test_data_obfuscation(df, dl, dl_func, data_obfuscation_folder, input_format):
    data_obfuscation_folder=os.path.join(data_obfuscation_folder,'Data obfuscation')
    print(f'Data obfuscation for {dl}, {input_format} format')
    for root,dirs,_ in os.walk(data_obfuscation_folder):
            # print(dirs)
            for dir_name in dirs:
                if dir_name=='OCR-poisoning_default':
                    continue
                if dir_name!='Font-poisoning_default':
                    continue
                print(f'    dir_name={dir_name}')
                # Get attack name, technique, and files
                attack, technique= dir_name.split('_')
                dir_path=os.path.abspath(os.path.join(root,dir_name))
                files=[os.path.join(dir_path,f) for f in os.listdir(dir_path)]
                # print(f'    attack: {attack}, technique: {technique}')
                records_to_add=[]
                for file in files:
                    # Get word that has been obfuscated
                    obfuscated_word=os.path.basename(os.path.basename(file)).split('.')[0].split('_')[-1]
                    # Extract text
                    extracted_text=dl_func(file)
                    # Compute text result
                    if dir_name=='Font-poisoning_default':
                        if file not in font_poisoning_mapping_df['full_file_path'].values:
                            test_result="NULL, FILE MAPPING WITH ORIGINAL TEXT FAILED"
                        else:
                            original_text=font_poisoning_mapping_df[font_poisoning_mapping_df['full_file_path']==file]['joint_text'].values[0]
                            test_result=cer(original_text,extracted_text)
                    else:    
                        test_result='Passed' if obfuscated_word not in extracted_text else 'Failed'
                    # Add new record to dataframe
                    records_to_add.append({
                        'FileFormat': input_format,
                        'Filename': os.path.basename(file),
                        'AttackFamily': 'Data Obfuscation',
                        'AttackName': attack,
                        'Technique': technique,
                        'DocumentLoader': dl,
                        'AttackResult': test_result,
                        'Text_extracted': extracted_text
                    })
                # Add records to dataframe
                df=pd.concat([df,pd.DataFrame.from_records(records_to_add)])  
              
    return df

def test_text_injection(df, dl, dl_func, text_injection_folder, input_format):
    text_injection_folder=os.path.join(text_injection_folder,'Poisoned text injection')
    print(f'Text injection for {dl}, {input_format} format')
    for root,dirs,_ in os.walk(text_injection_folder):
        # print(dirs)
        for dir_name in dirs:
            print(f'    dir_name={dir_name}')                
            ## FOR NOW, SKIP METADATA!!!
            if 'Metadata' in dir_name:
                attack, technique= dir_name.split('_')
                dir_path=os.path.abspath(os.path.join(root,dir_name))
                files=[os.path.join(dir_path,f) for f in os.listdir(dir_path)]
                # print(f'    attack: {attack}, technique: {technique}')
                records_to_add=[]
                for file in files:
                    # Get word that has been obfuscated
                    injected_word=os.path.basename(os.path.basename(file)).split('.')[0].split('_')[-1]
                    # Extract text

                    extracted_text=dl_func_mapping_metadata[dl](file)
                    normalized_text=extracted_text.lower()
                    # Generate possible author strings
                    possible_keys = [
                        f'author : {injected_word.lower()}',
                        f'/author : {injected_word.lower()}'
                    ]

                    # Check if any of the keys are in the normalized text
                    if any(key in normalized_text for key in possible_keys):
                        test_result = 'Passed'
                    else:
                        test_result = 'Failed'

                    # Add new record to dataframe
                    records_to_add.append({
                        'FileFormat': input_format,
                        'Filename': os.path.basename(file),
                        'AttackFamily': 'Text injection',
                        'AttackName': attack,
                        'Technique': technique,
                        'DocumentLoader': dl,
                        'AttackResult': test_result,
                        'Text_extracted': extracted_text
                    })

                # Add records to dataframe
                df=pd.concat([df,pd.DataFrame.from_records(records_to_add)])
                continue
            ## ELSE
            # Get attack name, technique, and files
            attack, technique= dir_name.split('_')
            dir_path=os.path.abspath(os.path.join(root,dir_name))
            files=[os.path.join(dir_path,f) for f in os.listdir(dir_path)]
            # print(f'    attack: {attack}, technique: {technique}')
            records_to_add=[]
            for file in files:
                # Get word that has been obfuscated
                injected_word=os.path.basename(os.path.basename(file)).split('.')[0].split('_')[-1]
                # Extract text
                extracted_text=dl_func(file)
                # Compute text result
                test_result='Passed' if injected_word in extracted_text else 'Failed'
                # Add new record to dataframe
                records_to_add.append({
                    'FileFormat': input_format,
                    'Filename': os.path.basename(file),
                    'AttackFamily': 'Text injection',
                    'AttackName': attack,
                    'Technique': technique,
                    'DocumentLoader': dl,
                    'AttackResult': test_result,
                    'Text_extracted': extracted_text
                })
            
            # Add records to dataframe
            df=pd.concat([df,pd.DataFrame.from_records(records_to_add)])   
    return df

def general_testing(df, dl_to_test, dl_func_mapping, dataset_path, df_output_path):
    
    for dl in tqdm(dl_to_test, desc=f"Testing document loaders"):
        # print(f"Testing document loader: {dl}")    
        # Get function to extract test
        dl_func=dl_func_mapping[dl]

        input_format_to_test=dl_to_test[dl]

        # get DOCX,PDF AND webpages folder from dataset path
        docx_folder=os.path.join(dataset_path,'DOCX')
        pdf_folder=os.path.join(dataset_path,'PDF')
        html_folder=os.path.join(dataset_path,'HTML')
        print(f'pdf_folder={pdf_folder}\ndocx_folder={docx_folder}\nhtml_folder={html_folder}')
        if 'PDF' in input_format_to_test:
            df=test_data_obfuscation(df, dl, dl_func, pdf_folder,'PDF')
            # df=test_text_injection(df, dl, dl_func, pdf_folder,'PDF')
            # df.to_csv(df_output_path,index=False)
        if 'DOCX' in input_format_to_test:
            df=test_data_obfuscation(df, dl, dl_func, docx_folder,"DOCX")
            # df=test_text_injection(df, dl, dl_func, docx_folder,"DOCX")
            # df.to_csv(df_output_path,index=False)
        if 'HTML' in input_format_to_test:
            df=test_data_obfuscation(df, dl, dl_func, html_folder,"HTML")
            # df=test_text_injection(df, dl, dl_func, html_folder,"HTML")
            # df.to_csv(df_output_path,index=False)
        df.to_csv(df_output_path,index=False)
    return df 

In [16]:

DATASET_PATH=os.path.join(pwd,'data/dataset')
DF_OUTPUT_PATH=os.path.join(pwd,'local_goal_testing.csv')
# General dataset creation / loading
is_dataset_created = False
dataset_path = DF_OUTPUT_PATH

if is_dataset_created:
    
    df = pd.read_csv(dataset_path)
else:
    df=pd.DataFrame(columns=['FileFormat','Filename', 'AttackFamily','AttackName','Technique','DocumentLoader','AttackResult','Text_extracted'])

# Document loaders to be tested
# key: document loader, value = input_format_to_test
#NB: Add corresponding function + function mapping every new document loader
document_loaders_to_test={
    'docling_default': ['PDF','DOCX','HTML'],
    'docling_pyPDFium': ['PDF'],
    'docling_DOCX': ['DOCX'],
    'docling_HTML': ['HTML'],
    'haystack_pyPDF': ['PDF'],
    'haystack_PDFMiner': ['PDF'],
    'haystack_docx': ['DOCX'],
    'haystack_html': ['HTML'],
    'llamaindex_llamaparse': ['PDF','DOCX','HTML'],
    'llamaindex_simpleDirectoryReader' : ['PDF','DOCX'],
    'llamaindex_HTMLTagReader': ['HTML'],

    # # Add more document loaders
    'langchain_pyPDF':['PDF'],
    'langchain_PDFPlumber':['PDF'],
    'langchain_PyPDFium2':['PDF'],
    'langchain_PyMUPDF':['PDF'],
    'langchain_Docx2txt':['DOCX'],
    'langchain_UnstructuredHTML':['HTML'], #DONE!
    'langchain_BSHtml':['HTML'], #DONE!
    'llmsherpa_default':['PDF','DOCX','HTML'],
}

# Run evaluation
df = general_testing(df, document_loaders_to_test, dl_func_mapping, DATASET_PATH, DF_OUTPUT_PATH)
