# Text Preprocessing

### PDF to JSON approach (towards converting the pdfs to an alpaca-like dataset)

In [None]:
import pdfplumber
import re

def clean_text(text):
    text = re.sub(r'[^\w\n]+', ' ', text)
    text = re.sub(r'[^\S\n]+', ' ', text)
    return text

def pdf_to_txt(pdf_file_path):
    with pdfplumber.open(pdf_file_path) as pdf:
        extracted_text = ''
        for page in pdf.pages:
            page_text = page.extract_text()
            page_text = clean_text(page_text)
            extracted_text += ' ' + page_text
    return extracted_text

pdf_file_path = "PDFs/Tests/BRIEFING CUD 2023.pdf"
extracted_text = pdf_to_txt(pdf_file_path)
print(extracted_text[:1000])  # Print the first 1000 characters of the extracted text

#### After cleaning the resulting text and adding labels 'Instruction:', 'Input:' and 'Output:'

In [None]:
import re
import json

def extract_info_from_text(file_path):
    # Open the text file
    with open(file_path, 'r') as file:
        # Read the file content
        content = file.read()

    # Create a pattern for instructions, inputs, and outputs
    pattern = r'Instruction:(.*?)Input:(.*?)Output:(.*?)(?=Instruction:|$)'
    
    # Find all matches in the file content
    matches = re.findall(pattern, content, re.DOTALL)

    # Initialize an empty list to store the information
    info_list = []
    
    # Loop over each match
    for match in matches:
        # Get the instruction, input, and output
        instruction = match[0].strip()
        input_ = match[1].strip()
        output = match[2].strip()
        
        # Add the information to the list
        info_list.append({
            "instruction": instruction,
            "input": input_,
            "output": output
        })

    return info_list

# Path to the text file
text_file_path = "Texts/Tests/1.txt"

# Extract the information from the text file
info_list = extract_info_from_text(text_file_path)

# Write the information to a JSON file
with open("JSONs/Tests/output.json", 'w') as json_file:
    json.dump(info_list, json_file, indent=4)

In [8]:
import json

# Open the file and read its content
with open('Texts/Tests/4.txt', 'r', encoding='utf-8') as file:
    file_content = file.readlines()

# Print the first few lines of the file to understand its structure
file_content[:10]

# Initialize empty list to hold our data dictionaries
data_list = []

# Initialize empty dictionary to hold individual data
data_dict = {'instruction': '', 'input': '', 'output': ''}

# Create a variable to hold the current section we're parsing
current_section = ''

# Iterate through each line in the file content
for line in file_content:
    line = line.strip()  # Remove any leading/trailing whitespaces

    # Depending on the type of line, update the data dictionary
    if line.startswith('Instruction:'):
        current_section = 'instruction'
        # If we are starting a new 'instruction', and our data_dict is not empty, append it to the data_list
        # The check for emptiness ensures we don't append an empty dictionary at the start
        if data_dict['instruction']:
            data_list.append(data_dict)
            # Start a new data_dict for the new 'instruction'
            data_dict = {'instruction': '', 'input': '', 'output': ''}

    elif line.startswith('Input:'):
        current_section = 'input'

    elif line.startswith('Output:'):
        current_section = 'output'

    elif line:  # If line is not empty
        # Append the line to the current section in the dictionary
        data_dict[current_section] += (line + '\n') if data_dict[current_section] else line

# Append the last dictionary to the list
if data_dict['instruction']:
    data_list.append(data_dict)

# Convert the list of dictionaries to a JSON string
json_data = json.dumps(data_list, ensure_ascii=False, indent=4)

# Write the JSON data to a file
with open('JSONs/Tests/4.json', 'w', encoding='utf-8') as file:
    file.write(json_data)


In [9]:
# Load the JSON data from the file
with open('JSONs/Tests/4.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

# Iterate through the data and replace '\n' in the strings
for dictionary in data:
    for key, value in dictionary.items():
        dictionary[key] = value.replace('\n', ' ')

# Write the cleaned data back to a JSON file
with open('JSONs/Tests/DB_inicial_3.json', 'w', encoding='utf-8') as file:
    json.dump(data, file, ensure_ascii=False, indent=4)

In [17]:
import os
import json

def merge_json_files(directory):
    # Initialize an empty list to store all data
    all_data = []

    # Get a list of all JSON files in the directory
    json_files = [f for f in os.listdir(directory) if f.endswith('.json')]

    # Check if there are any JSON files in the directory
    if not json_files:
        print('No JSON files found in the directory.')
        return

    print(f'Found {len(json_files)} JSON files.')

    # Loop through all JSON files
    for file_name in json_files:
        print(f'Processing file: {file_name}')
        # Open each JSON file
        with open(os.path.join(directory, file_name)) as file:
            # Load the data from the JSON file
            data = json.load(file)
            # Add the data to the list
            all_data.extend(data)

    # Save the combined data to a new JSON file, set ensure_ascii=False to preserve accents
    with open('merged.json', 'w', encoding='utf-8') as file:
        json.dump(all_data, file, ensure_ascii=False, indent=4)

    print('Merging complete. The merged data is saved in merged.json')

# Specify the directory that contains the JSON files
directory = 'JSONs/Tests'

# Call the function to merge the JSON files
merge_json_files(directory)


Found 4 JSON files.
Processing file: DB_inicial_2.json
Processing file: DB_inicial.json
Processing file: DB_inicial_3.json
Processing file: DB_inicial_1.json
Merging complete. The merged data is saved in merged.json


In [None]:
#Join txts together
def merge_txt_files(directory, output_file):
    # Get a list of all txt files in the directory
    txt_files = [f for f in os.listdir(directory) if f.endswith('.txt')]

    # Check if there are any txt files in the directory
    if not txt_files:
        print('No txt files found in the directory.')
        return

    print(f'Found {len(txt_files)} txt files.')

    with open(output_file, 'w') as outfile:
        for file_name in txt_files:
            print(f'Processing file: {file_name}')
            with open(os.path.join(directory, file_name)) as infile:
                # write file content to outfile
                outfile.write(infile.read())
                # write a line break
                outfile.write('\n')

    print('Merging complete. The merged data is saved in', output_file)

# Specify the directory that contains the txt files
directory = 'Texts/Nontests'
# Specify the name of the output file
output_file = 'merged.txt'

# Call the function to merge the txt files
merge_txt_files(directory, output_file)


### PDF to Text approach (raw text training dataset)

In [12]:
import os
import pdfplumber
import re

def clean_text(text):
    # Remove any non-alphanumeric characters, but keep line breaks
    text = re.sub(r'[^\w\n]+', ' ', text)

    # Replace multiple spaces with a single space, but keep line breaks
    text = re.sub(r'[^\S\n]+', ' ', text)

    return text

def pdf_to_txt(pdf_file_path, txt_file_path):
    # Open the PDF file
    with pdfplumber.open(pdf_file_path) as pdf:
        # Initialize an empty string to store the extracted text
        extracted_text = ''

        # Loop over each page in the PDF
        for page in pdf.pages:
            # Extract the text from the page
            page_text = page.extract_text()

            # Clean the extracted text
            page_text = clean_text(page_text)

            # Append the page text to the extracted text
            extracted_text += ' ' + page_text

    # Write the extracted text to a text file
    with open(txt_file_path, 'w') as txt_file:
        txt_file.write(extracted_text)

# Directory containing the PDF files
pdf_directory = "PDFs/Nontests"

# Directory to store the text files
txt_directory = "Texts/Nontests"

# Ensure the text directory exists
os.makedirs(txt_directory, exist_ok=True)

# Loop over each file in the PDF directory
for filename in os.listdir(pdf_directory):
    # Check if the file is a PDF
    if filename.endswith(".pdf"):
        # Construct the full file path
        pdf_file_path = os.path.join(pdf_directory, filename)

        # Construct the text file path
        txt_file_path = os.path.join(txt_directory, filename.replace(".pdf", ".txt"))

        # Convert the PDF to text
        pdf_to_txt(pdf_file_path, txt_file_path)

### Discarded approaches

In [None]:
#from pdfreader import SimplePDFViewer

#This function does the work but the result is difficult to tokenize
#def convert_pdf_to_txt(file_path):
    # Open the PDF file in read-binary mode
#    with open(file_path, 'rb') as file:
        # Create a PDF file viewer object
#        viewer = SimplePDFViewer(file)

        # Initialize an empty string to hold the extracted text
#        text = ''

        # Loop through each page in the PDF and extract the text
#       for canvas in viewer:
#            viewer.render()
#            text += ''.join(viewer.canvas.strings)

    # Create the Texts directory if it doesn't exist
#    if not os.path.exists('Texts'):
#        os.makedirs('Texts')

    # Write the extracted text to a .txt file in the Texts directory
#    with open('Texts/output1.txt', 'w') as output_file:
#        output_file.write(text)

# Call the function with the path to your PDF file
#convert_pdf_to_txt("PDFs/WUDC_manual.pdf")

### Resulting text check

In [None]:
#Read the resulting text file
with open("Texts/output2.txt", "r") as file:
    text = file.read()
    print(text)

# Fine-tuning using Microsoft's LoRA

git clone https://github.com/microsoft/LoRA.git
cd LoRA
pip install -e 

In [None]:
from sklearn.model_selection import train_test_split

# Load the data
with open('Texts/output2.txt', 'r') as f:
    sentences = f.read().splitlines()

# Split the data into train and validation sets
train_sentences, val_sentences = train_test_split(sentences, test_size=0.2, random_state=42)

len(train_sentences), len(val_sentences)

In [None]:
import torch
from transformers import RobertaTokenizerFast

# Initialize the tokenizer with a pretrained RoBERTa model
tokenizer = RobertaTokenizerFast.from_pretrained('roberta-base')

def preprocess(sentences):
    # Tokenize the sentences
    input_ids = []
    attention_masks = []

    for sentence in sentences:
        # `encode_plus` will:
        #   (1) Tokenize the sentence
        #   (2) Prepend the `[CLS]` token to the start and append the `[SEP]` token to the end
        #   (3) Map tokens to their IDs
        #   (4) Create the attention mask
        #   (5) Pad or truncate the sentence to `max_length`
        #   (6) Return a dictionary of outputs
        encoded_dict = tokenizer.encode_plus(
            sentence,                      # Sentence to encode
            add_special_tokens=True,       # Add '[CLS]' and '[SEP]'
            max_length=64,                 # Pad & truncate all sentences
            pad_to_max_length=True,
            return_attention_mask=True,    # Construct attention masks
            return_tensors='pt',           # Return pytorch tensors
        )
        
        # Add the encoded sentence to the list
        input_ids.append(encoded_dict['input_ids'])
        
        # And its attention mask (simply differentiates padding from non-padding)
        attention_masks.append(encoded_dict['attention_mask'])

    # Convert the lists into tensors
    input_ids = torch.cat(input_ids, dim=0)
    attention_masks = torch.cat(attention_masks, dim=0)
    
    return input_ids, attention_masks

# Apply the preprocessing to the training and validation sentences
train_input_ids, train_attention_masks = preprocess(train_sentences)
val_input_ids, val_attention_masks = preprocess(val_sentences)

In [None]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

# Convert the lists into datasets
train_dataset = TensorDataset(train_input_ids, train_attention_masks)
val_dataset = TensorDataset(val_input_ids, val_attention_masks)

# Create the DataLoaders
train_dataloader = DataLoader(
    train_dataset,
    sampler=RandomSampler(train_dataset), # Random sampler for training
    batch_size=32
)

validation_dataloader = DataLoader(
    val_dataset,
    sampler=SequentialSampler(val_dataset), # Sequential sampler for validation
    batch_size=32
)

In [None]:
from transformers import RobertaConfig, RobertaModel
from loralib import Embedding, Linear

# Load the pretrained RoBERTa model
config = RobertaConfig.from_pretrained('roberta-base')
model = RobertaModel(config)

# Replace the embedding layer with a LoRA embedding layer
model.roberta.embeddings.word_embeddings = Embedding(
    config.vocab_size, config.hidden_size, r=32, lora_alpha=1
)

# Replace the pooler layer with a LoRA linear layer
model.roberta.pooler.dense = Linear(
    config.hidden_size, config.hidden_size, r=32, lora_alpha=1
)


In [None]:
import os
from pdf2image import convert_from_path

def convert_pdf_to_images(pdf_file_path, output_folder, base_name):
    # Ensure the output folder exists
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # Convert the PDF to a list of images
    images = convert_from_path(pdf_file_path)
    
    # Save each image to the output folder
    for i, image in enumerate(images, start=1):
        image.save(os.path.join(output_folder, f"{base_name}_{i}.jpg"), 'JPEG')

# Use the function
convert_pdf_to_images("PDFs/WUDC_manual.pdf", "JPEGs", "Manual")


In [None]:
from transformers import pipeline

# Initialize the pipeline
document_answering_model = pipeline(task='question-answering', model='deepset/roberta-base-squad2')

# Ask a question
# Text defined above
question = "What is the meaning of life?"
answer = document_answering_model(question=question, context=text)

print(answer)

In [None]:
question = "What are the type of motions and how do they work?"
answer = document_answering_model(question=question, context=text)

print(answer)

In [None]:
import os
import pytesseract
from transformers import pipeline

# Define the directory
directory = "JPEGs"

# List all files in the directory
files = os.listdir(directory)

# Filter for .jpg files and create full paths
image_paths = [os.path.join(directory, file) for file in files if file.endswith(".jpg")]

print(image_paths)

In [None]:

# Initialize the pipeline
document_answering_model = pipeline(task='document-question-answering', model='impira/layoutlm-document-qa')

# Iterate over the images, run OCR, and use the QA model
for image_path in image_paths:
    text = pytesseract.image_to_string(image_path)
    question = "What is the meaning of life?"
    answer = document_answering_model(question=question, context=text, image=image_path)

    # Print the best answer if its score is above 0.75
    if answer[0]['score'] > 0.75:
        print(answer[0])


In [None]:
# Iterate over the images, run OCR, and use the QA model
for image_path in image_paths:
    text = pytesseract.image_to_string(image_path)
    question = "How does the WUDC debate format work?"
    answer = document_answering_model(question=question, context=text, image=image_path)

    # Print the best answer if its score is above 0.75
    if answer[0]['score'] > 0.75:
        print(answer[0])