In [None]:
import os
import uuid
import tempfile
import asyncio
import subprocess
from io import BytesIO
from pathlib import Path
from loguru import logger
from typing import Dict, Any
from file_extractor.tools.pdf_extractor import PDFExtractor
from file_extractor.tools.word_extractor import WordDocumentExtractor
from concurrent.futures import ProcessPoolExecutor

MAX_PROCESS_WORKERS = 4

process_executor = ProcessPoolExecutor(max_workers=MAX_PROCESS_WORKERS)

file_path = "pathf"

async def main(filePath: str, userId: str, chatId: str) -> Dict[str, Any]:

    logger.info(f"Processing request from ({userId},{chatId}).")

    # Open file to binary:
    with open(filePath, "rb") as f:
        binary_file = f.read()

    filename = Path(filePath).name

    if filename.lower().endswith("pdf"):
        try:
            with tempfile.TemporaryDirectory(prefix="file_") as temp_dir:

                    file_dir = os.path.join(temp_dir, userId, chatId)
                    os.makedirs(file_dir, exist_ok=True)

                    file_path = os.path.join(file_dir, filename)
                    with open(file_path, "wb") as file:
                        file.write(binary_file)

                    # start pdf extraction
                    extractor = PDFExtractor(max_workers=MAX_PROCESS_WORKERS)
                    result = await extractor.extract_async(file=binary_file, filename=filename, extract_tables=False, executor=process_executor)

                    if result["status"]:
                        
                        logger.info("Extraction successful.")

                        # cleanup
                        process_executor.shutdown(wait=True)

                        return result
                    
                    # Fall back to OCR
                    logger.info("Fall back to OCR")
                    ocr_output_path = os.path.join(file_dir, f"ocr_{filename}")
                    ocr_command = [
                                    "ocrmypdf", "--output-type", "pdf", "--jobs", str(MAX_PROCESS_WORKERS), "--language", "eng+ind", "-q", "-f", file_path, ocr_output_path
                                ]
                    try:
                        process = await asyncio.create_subprocess_exec(*ocr_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                        stdout, stderr = await process.communicate()

                        if process.returncode != 0:
                            logger.error(f"OCR extraction failed: {stderr.decode()}")

                            # cleanup
                            process_executor.shutdown(wait=True)

                            raise
                        
                        logger.info(f"OCR extraction completed successfully. Output saved to {ocr_output_path}")
                        logger.info("Retry extraction again.")

                        with open(ocr_output_path, 'rb') as file:
                            binary_file = file.read()

                        result = await extractor.extract_async(file=binary_file, filename=filename, extract_tables=False, executor=process_executor)

                        if result["status"]:
                            
                            logger.info("Extraction successful.")
                            
                            # cleanup
                            process_executor.shutdown(wait=True)

                            return result
                        
                        else:
                            logger.error("No content found after being ocrd.")
                            
                            # cleanup
                            process_executor.shutdown(wait=True)

                            return {"error": "No content found"}
                        
                    except Exception as e:
                        
                        # cleanup
                        process_executor.shutdown(wait=True)

                        raise e
        except Exception as e:
            # cleanup
            process_executor.shutdown(True)
            raise e

    else:
        try:
            extractor = WordDocumentExtractor(infer_table_structure=True)
            result = await extractor.extract_async(file=BytesIO(binary_file), filename=filename, executor=process_executor)

            if result["status"]:
                logger.info("Extraction successful.")

                # cleanup
                process_executor.shutdown(wait=True)

                return result
            
            else:
                logger.error("No content found")

                # cleanup
                process_executor.shutdown(wait=True)
                return {"error": "NO content found"}
            
        except Exception as e:
             # cleanup
             process_executor.shutdown(wait=True)
             raise e