In [8]:
##the OCR HANDLER FOR ARABIC TEXT
from tessetact_pdf_processor import PDFTextExtractor

In [None]:
pdf_arabic_processor = PDFTextExtractor(dpi = 400, language='ara')

pdf_arabic_processor.process_pdf('short_stories_ar.pdf', 'short_stories_ar.txt')

In [10]:
import os
import textwrap
import time
from llm import GeminiLLM
from typing import List
from dotenv import load_dotenv
import re

class ArabicTextCorrector:
    """
    A class that provides functionality to correct Arabic text in chunks using an LLM model. 
    It handles reading, processing, splitting, and merging text files while allowing for corrections
    with retry logic in case of errors during the correction process.
    """

    def __init__(self, model: GeminiLLM):
        """
        Initializes the ArabicTextCorrector instance with a given model.

        Args:
            model (GeminiLLM): An instance of the GeminiLLM model used for text corrections.
        """
        self.model = model

    def configure(self):
        """Configures the model for use. (Stub for potential future use)"""
        pass

    def read_text_from_file(self, file_path: str) -> str:
        """
        Reads text from a file and returns it.

        Args:
            file_path (str): The path to the file to read from.

        Returns:
            str: The content of the file.
        """
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()

    def save_text_to_file(self, text: str, file_path: str):
        """
        Saves a string of text to a file, ensuring the directory exists.

        Args:
            text (str): The text to save.
            file_path (str): The path to the file to save to.
        """
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, 'w', encoding='utf-8') as file:
            file.write(text)

    def split_text(self, text: str, chunk_size: int) -> List[str]:
        """
        Splits the text into chunks of a specified size.

        Args:
            text (str): The text to be split.
            chunk_size (int): The size of each chunk.

        Returns:
            List[str]: A list of text chunks.
        """
        return textwrap.wrap(text, chunk_size)

    def generate_correction_prompt(self, text: str, custom_prompt: str = None) -> str:
        """
        Generates a correction prompt for the model to process the text.

        Args:
            text (str): The text to be corrected.
            custom_prompt (str): An optional custom prompt to be used for correction.

        Returns:
            str: A prompt that will be passed to the model for text correction.
        """
        if custom_prompt:
            return f"{custom_prompt}\n{text}"
        return f"يرجى إعادة كتابة النص التالي بشكل صحيح دون أي ملاحظات أو توضيحات:\n{text}"

    def correct_text(self, text: str, custom_prompt: str = None, max_retries: int = 3) -> str:
        """
        Corrects the provided text using the model, with retry logic in case of failure.

        Args:
            text (str): The text to be corrected.
            custom_prompt (str): An optional custom prompt for the correction.
            max_retries (int): The maximum number of retry attempts in case of failure.

        Returns:
            str: The corrected text or an error message after the maximum retries.
        """
        prompt = self.generate_correction_prompt(text, custom_prompt)
        if not self.model:
            raise ValueError("Model is not configured. Please call 'configure' first.")
        attempts = 0
        while attempts < max_retries:
            try:
                response = self.model.generate_content(prompt)
                return response
            except Exception as e:
                attempts += 1
                print(f"Error occurred while generating content (attempt {attempts}/{max_retries}): {e}")
                if attempts == max_retries:
                    return f"\n\n[تنبيه: لم يتم معالجة هذا الجزء بسبب خطأ بعد {max_retries} محاولات]\n\n{text}"

    def process_file(self, input_file: str, output_file: str, output_folder: str, chunk_size: int = 1000, custom_prompt: str = None):
        """
        Processes the input file, splits it into chunks, corrects each chunk, and saves the output to a folder.

        Args:
            input_file (str): The path to the input text file.
            output_file (str): The path to the final output file.
            output_folder (str): The folder to save the corrected chunks.
            chunk_size (int): The size of each chunk in characters.
            custom_prompt (str): An optional custom prompt for text correction.
        """
        start_time = time.time()
        os.makedirs(output_folder, exist_ok=True)
        text = self.read_text_from_file(input_file)
        chunks = self.split_text(text, chunk_size)

        corrected_text = ""
        for i, chunk in enumerate(chunks):
            corrected_chunk = self.correct_text(chunk, custom_prompt)
            corrected_text += corrected_chunk
            output_path = os.path.join(output_folder, f'corrected_chunk_{i}.txt')
            self.save_text_to_file(corrected_chunk, output_path)
            print(f"Processed chunk {i + 1} of {len(chunks)}")

        end_time = time.time()
        runtime = end_time - start_time
        log_message = f"{os.path.basename(__file__)} --> time taken: {runtime:.2f} seconds --> start of the run: {time.ctime(start_time)}\n"
        self.log_runtime(log_message)

    def save_chunks_without_processing(self, input_file: str, chunk_numbers: List[int], output_folder: str, chunk_size: int = 1000):
        """
        Saves specified chunks of the input file without processing.

        Args:
            input_file (str): The path to the input text file.
            chunk_numbers (List[int]): A list of chunk numbers to save.
            output_folder (str): The folder to save the chunks.
            chunk_size (int): The size of each chunk in characters.
        """
        os.makedirs(output_folder, exist_ok=True)
        text = self.read_text_from_file(input_file)
        chunks = self.split_text(text, chunk_size)

        for index in chunk_numbers:
            if index < 1 or index > len(chunks):
                print(f"Chunk number {index} is out of range.")
                continue
            chunk = chunks[index - 1]
            output_path = os.path.join(output_folder, f'corrected_chunk_{index}.txt')
            self.save_text_to_file(chunk, output_path)
            print(f"Saved chunk {index} of {len(chunks)}")

    def log_runtime(self, log_message: str):
        """
        Logs the runtime information into a file.

        Args:
            log_message (str): The message to be logged.
        """
        with open("runtime.log", 'a', encoding='utf-8') as log_file:
            log_file.write(log_message)

    def merge_chunks(self, input_folder: str, output_file: str):
        """
        Merges all chunks in the input folder into a single file, sorted by chunk number.

        Args:
            input_folder (str): The folder containing chunk files.
            output_file (str): The file to save the merged output.
        """
        files = os.listdir(input_folder)
        sorted_files = sorted(files, key=lambda x: int(re.search(r'corrected_chunk_(\d+)\.txt', x).group(1)))

        with open(output_file, 'w', encoding='utf-8') as output:
            for file in sorted_files:
                with open(os.path.join(input_folder, file), 'r', encoding='utf-8') as input_file:
                    print(f"Merging {file}")
                    output.write(input_file.read())
                    output.write("\n\n")


In [11]:
load_dotenv()  # Load environment variables
gemini_api_key = os.getenv("GEMINI_API_KEY")
# Instantiate and configure GeminiLLM
model = GeminiLLM(model_name='gemini-1.0-pro-latest')
model.configure(api_key=gemini_api_key)
corrector = ArabicTextCorrector(model)
corrector.configure()

In [None]:
# Processing the file with custom prompt and logging runtime
corrector.process_file(
    input_file="short_stories_ar.txt", 
    output_file="corrected_short_ar_stories.txt", 
    output_folder="correct_ar_stories",
    chunk_size=1000
    )

In [None]:
# Saving specific chunks without processing

chunk_numbers = [10,94,95]

corrector.save_chunks_without_processing(input_file="taw_hist.txt", chunk_numbers=chunk_numbers, output_folder="correct_hist", chunk_size=1000)

In [None]:
#TODO: MERGE THE FILE AFTER MANUALLY EDITING UN-USABLE CHUNKS for both bio and hist

##manual fix is done so now we can merge the files

corrector.merge_chunks(input_folder="correct_bio", output_file="corrected_bio.txt")

##chroma class and its util classes

In [None]:
from chroma_text_processing import RecursiveCharacterTextSplitterAdapter, ChromaInterface

In [2]:
# Initialize the interface
text_splitter = RecursiveCharacterTextSplitterAdapter(chunk_size=200, chunk_overlap=20)

chroma_interface = ChromaInterface("taw_bio",
                                   "DB/chroma_db",
                                       text_splitter=text_splitter
)

In [None]:
chroma_interface.client.list_collections()

In [None]:
#generate the embeddings for the bio chunks

chroma_interface.add_documents_from_files(
    file_paths=["corrected_bio.txt"],
    metadatas=[{"source": "corrected_bio.txt"}]
)

In [None]:
# Ask the user for input instead of hardcoding the query_text
query_text = input("Please enter your query: ")

# Execute the query using the user's input
results = chroma_interface.query(query_text, n_results=5)

# Print the results
for i, result in enumerate(results):
    print(f"Result {i + 1}:\n{result}\n")
