In [6]:
# @title Colab File ConverterThis script scans a directory for file types, identifies the most common type,and offers to convert all files of that type to a different format (e.g., .docx to .pdf).It is designed to run in a Google Colab environment, minimizing external dependencies and providing "heartbeat" status updates to the user.
# -*- coding: utf-8 -*-

import os
import sys
import subprocess
import glob
from collections import Counter
import shutil
import tempfile

# --- Configuration ---

# Defines available conversion options based on the source extension.
# Format: '.source_ext': [('Display Name', 'output_ext'), ...]
CONVERSION_MAP = {
    '.docx': [
        ('PDF', 'pdf'),
        ('Markdown', 'md'),
        ('Plain Text', 'txt'),
        ('HTML', 'html'),
        ('Rich Text Format', 'rtf')
    ],
    '.md': [
        ('PDF', 'pdf'),
        ('Word Document', 'docx'),
        ('HTML', 'html'),
        ('Rich Text Format', 'rtf')
    ],
    '.html': [
        ('PDF', 'pdf'),
        ('Word Document', 'docx'),
        ('Markdown', 'md')
    ],
    '.txt': [
        ('PDF', 'pdf'),
        ('Word Document', 'docx'),
        ('Markdown', 'md')
    ],
    '.rtf': [
        ('PDF', 'pdf'),
        ('Word Document', 'docx'),
        ('Markdown', 'md')
    ],
    # Added PDF as a source, as requested.
    '.pdf': [
        ('Word Document', 'docx'),
        ('Markdown', 'md'),
        ('Plain Text', 'txt'),
        ('HTML', 'html')
    ]
    # Add more conversion types here as needed
}

# --- Dependency Management ---

def install_and_check_deps():
    """
    Checks for 'pypandoc' and 'pandoc'. Installs 'pypandoc' if missing.
    Installs 'poppler-utils' (for pdftotext) for PDF source conversion.
    Returns True if all dependencies are met, False otherwise.
    """
    global pypandoc
    print("Heartbeat: Initializing dependency check...")

    # 1. Check for 'pypandoc' Python library
    print("Heartbeat: Checking for 'pypandoc' library...")
    try:
        import pypandoc
        print("Heartbeat: 'pypandoc' is already installed.")
    except ImportError:
        print("Heartbeat: 'pypandoc' not found. Attempting installation (this may take a moment)...")
        try:
            # Use subprocess to run pip quietly
            subprocess.check_call([sys.executable, "-m", "pip", "install", "pypandoc", "--quiet"])
            print("Heartbeat: 'pypandoc' installed successfully.")
            import pypandoc
        except Exception as e:
            print(f"Heartbeat: [FATAL ERROR] Failed to install 'pypandoc'.")
            print(f"Heartbeat: [DETAIL] Error: {e}")
            print("Heartbeat: Cannot proceed without 'pypandoc'. Aborting.")
            return False

    # 2. Check for 'pandoc' system binary (which pypandoc wraps)
    print("Heartbeat: Checking for 'pandoc' executable...")
    try:
        # pypandoc can find pandoc, but we double-check
        result = subprocess.run(['pandoc', '--version'], capture_output=True, text=True, check=True, encoding='utf-8')
        # Print just the first line of the version info
        print(f"Heartbeat: 'pandoc' executable found. Version: {result.stdout.splitlines()[0]}")
    except (subprocess.CalledProcessError, FileNotFoundError, UnicodeDecodeError) as e:
        print("Heartbeat: [FATAL ERROR] 'pandoc' executable not found or failed to run.")
        print(f"Heartbeat: [DETAIL] Error: {e}")
        print("Heartbeat: 'pypandoc' requires the 'pandoc' binary to be installed on the system.")
        print("Heartbeat: On Colab, this should be pre-installed. If this error persists, try 'Factory reset runtime'.")
        print("Heartbeat: Aborting.")
        return False

    # 3. Install and check for 'pdftotext' (for PDF-to-text conversion)
    print("Heartbeat: Checking for 'pdftotext' (for PDF source conversion)...")
    try:
        # Check if pdftotext is runnable
        result = subprocess.run(['pdftotext', '-v'], capture_output=True, text=True, check=True, encoding='utf-8')
        # pdftotext prints version info to stderr
        version_line = result.stderr.splitlines()[0] if result.stderr else "version unknown"
        print(f"Heartbeat: 'pdftotext' is already installed. Version: {version_line}")
    except (subprocess.CalledProcessError, FileNotFoundError, UnicodeDecodeError) as e:
        print("Heartbeat: [INFO] 'pdftotext' (from poppler-utils) not found.")
        print("Heartbeat: [INFO] Attempting to install 'poppler-utils' via apt-get. This may take a moment...")

        try:
            # Update apt-get quietly
            print("Heartbeat: Running apt-get update (quietly)...")
            subprocess.run(['apt-get', 'update', '-qq'], check=True, capture_output=True)
            # Install poppler-utils quietly
            print("Heartbeat: Installing poppler-utils (quietly)...")
            subprocess.run(['apt-get', 'install', '-y', 'poppler-utils', '-qq'], check=True, capture_output=True)
            print("Heartbeat: 'poppler-utils' installed successfully.")

            # Re-check for pdftotext
            result = subprocess.run(['pdftotext', '-v'], capture_output=True, text=True, check=True, encoding='utf-8')
            version_line = result.stderr.splitlines()[0] if result.stderr else "version unknown"
            print(f"Heartbeat: 'pdftotext' is now available. Version: {version_line}")

        except Exception as install_e:
            print(f"Heartbeat: [FATAL ERROR] Failed to install 'poppler-utils'.")
            print(f"Heartbeat: [DETAIL] Error: {install_e}")
            print("Heartbeat: [WARNING] Conversion *from* PDF files will fail.")
            # Dynamically remove PDF as a convertible source if pdftotext is missing
            if '.pdf' in CONVERSION_MAP:
                print("Heartbeat: Disabling PDF as a source file type for this session.")
                CONVERSION_MAP.pop('.pdf', None)
            # We don't abort, as other conversions might still work.
            pass

    print("Heartbeat: Dependency check complete.")
    return True

# --- Core Logic ---

def get_directory_path():
    """
    Prompts the user to enter a directory path, use PWD, or browse.
    Returns the validated absolute path as a string, or None if user quits.
    """
    print("\n--- Directory Selection ---")
    while True:
        print("Heartbeat: Please specify the directory containing your files.")
        prompt = "Enter path (or press Enter for PWD, '#' to browse, 'q' to quit): "
        choice = input(prompt).strip()

        if choice.lower() == 'q':
            print("Heartbeat: Quit command received. Exiting.")
            return None

        if choice == '':
            # User pressed Enter for PWD
            path = '.'
            print("Heartbeat: Using Present Working Directory (PWD).")
        elif choice == '#':
            # User wants to "browse"
            print("\nHeartbeat: Browse for directory requested.")
            print("Please use the file explorer panel on the left side of your Colab window:")
            print("  1. Navigate to your desired directory.")
            print("  2. Right-click on the directory name.")
            print("  3. Select 'Copy path' from the menu.")
            print("  4. Paste the copied path below and press Enter.")
            path = input("Paste path here: ").strip()
            # Remove potential quotes from pasted path
            path = path.strip().strip("'\"")
        else:
            # User entered a path directly
            path = choice.strip().strip("'\"")

        # Validate the chosen path
        abs_path = os.path.abspath(path)
        print(f"Heartbeat: Checking path: {abs_path}")
        if os.path.isdir(abs_path):
            print("Heartbeat: Path confirmed.")
            return abs_path
        else:
            print(f"Heartbeat: [ERROR] Path not found or is not a directory: {abs_path}")
            print("Heartbeat: Please try again.")

def scan_directory(src_dir):
    """
    Scans the directory for all files and identifies all convertible extensions.
    Returns (extension_counts, all_file_paths, list_of_convertible_extensions)
    or (None, None, None) if no convertible files are found.
    """
    print(f"\n--- Scanning Directory ---")
    print(f"Heartbeat: Scanning for files in: {src_dir}")

    try:
        # Use glob to find all files (excluding directories)
        all_files = [f for f in glob.glob(os.path.join(src_dir, '*.*')) if os.path.isfile(f)]

        if not all_files:
            print("Heartbeat: [INFO] No files found in this directory.")
            return None, None, None

        # Get all extensions
        extensions = [os.path.splitext(f)[1].lower() for f in all_files]
        ext_counts = Counter(extensions)

        print("Heartbeat: File scan complete. Found types:")
        for ext, count in ext_counts.most_common():
            print(f"  - {ext}: {count} file(s)")

        # Find all extensions that we know how to convert
        convertible_exts = []
        for ext in ext_counts:
            if ext in CONVERSION_MAP:
                convertible_exts.append(ext)

        if not convertible_exts:
            print("Heartbeat: [INFO] No files found with a known convertible type.")
            print(f"Heartbeat: Known types are: {', '.join(CONVERSION_MAP.keys())}")
            return None, None, None

        print(f"Heartbeat: Found convertible types: {', '.join(convertible_exts)}")
        return ext_counts, all_files, convertible_exts

    except Exception as e:
        print(f"Heartbeat: [ERROR] Failed to scan directory.")
        print(f"Heartbeat: [DETAIL] Error: {e}")
        return None, None, None

def get_source_extension(convertible_exts, ext_counts):
    """
    Shows the user the available source file types and asks them to pick one.
    Returns the chosen source extension (e.g., '.docx') or None.
    """
    print("\n--- Source File Selection ---")
    print("Heartbeat: Found the following convertible file types:")

    options = sorted(convertible_exts) # Sort for consistent order
    for i, ext in enumerate(options, 1):
        count = ext_counts.get(ext, 0)
        print(f"  {i}. {ext} ({count} file(s))")

    while True:
        try:
            choice = input(f"Which file type do you want to convert FROM? (1-{len(options)}) or 'q' to quit: ")
            if choice.lower() == 'q':
                print("Heartbeat: Quit command received. Aborting.")
                return None

            choice_idx = int(choice) - 1
            if 0 <= choice_idx < len(options):
                chosen_ext = options[choice_idx]
                print(f"Heartbeat: User selected {chosen_ext} as the source.")
                return chosen_ext
            else:
                print(f"Heartbeat: [ERROR] Invalid choice. Please enter a number from 1 to {len(options)}.")
        except ValueError:
            print("Heartbeat: [ERROR] Invalid input. Please enter a number.")

def get_conversion_choice(source_ext):
    """
    Shows the user conversion options for their file type and gets their choice.
    Returns the chosen output extension (e.g., 'pdf') or None.
    """
    print("\n--- Conversion Options ---")
    print(f"Heartbeat: Converting from {source_ext}. Choose a target format:")

    options = CONVERSION_MAP.get(source_ext)
    if not options:
        print(f"Heartbeat: [ERROR] No conversion map defined for {source_ext}. Aborting.")
        return None

    for i, (name, ext) in enumerate(options, 1):
        print(f"  {i}. {name} (.{ext})")

    while True:
        try:
            choice = input(f"Enter number (1-{len(options)}) or 'q' to quit: ")
            if choice.lower() == 'q':
                print("Heartbeat: Quit command received. Aborting.")
                return None

            choice_idx = int(choice) - 1
            if 0 <= choice_idx < len(options):
                chosen_ext = options[choice_idx][1]
                print(f"Heartbeat: User selected conversion to .{chosen_ext}")
                return chosen_ext
            else:
                print(f"Heartbeat: [ERROR] Invalid choice. Please enter a number from 1 to {len(options)}.")
        except ValueError:
            print("Heartbeat: [ERROR] Invalid input. Please enter a number.")

def convert_files(file_list, source_ext, new_ext, src_dir):
    """
    Converts all files in file_list to the new_ext format.
    Saves them in a new directory outside of src_dir.
    """
    print("\n--- File Conversion ---")
    print(f"Heartbeat: Preparing to convert {len(file_list)} file(s) from {source_ext} to .{new_ext}")

    # Create the output directory
    try:
        # Place the new folder adjacent to the source directory
        parent_dir = os.path.dirname(src_dir)
        dir_name = f"converted_from_{source_ext.lstrip('.')}_to_{new_ext}"
        output_dir = os.path.join(parent_dir, dir_name)

        os.makedirs(output_dir, exist_ok=True)
        print(f"Heartbeat: Output directory created: {output_dir}")
    except Exception as e:
        print(f"Heartbeat: [FATAL ERROR] Could not create output directory.")
        print(f"Heartbeat: [DETAIL] Error: {e}")
        print("Heartbeat: Aborting conversion.")
        return

    # Process each file
    success_count = 0
    fail_count = 0
    temp_txt_file = None # Variable for PDF temp file cleanup

    for i, file_path in enumerate(file_list, 1):
        filename = os.path.basename(file_path)
        base_name = os.path.splitext(filename)[0]
        output_file = os.path.join(output_dir, f"{base_name}.{new_ext}")

        print(f"Heartbeat: ({i}/{len(file_list)}) Converting '{filename}'...")

        try:
            # --- SPECIAL HANDLING FOR PDF SOURCE ---
            if source_ext == '.pdf':
                # PDF conversion is a two-step process
                # 1. pdftotext (PDF -> TXT)
                # 2. pandoc (TXT -> new_ext)

                # If user just wants plain text, we can do it in one step.
                if new_ext == 'txt':
                    # Use subprocess to run pdftotext
                    # pdftotext [options] <PDF-file> <text-file>
                    # We use '-' as the text-file to output to stdout, then capture it
                    result = subprocess.run(['pdftotext', file_path, '-'],
                                   check=True, capture_output=True, text=True, encoding='utf-8')
                    # Write the captured text to the output file
                    with open(output_file, 'w', encoding='utf-8') as f:
                        f.write(result.stdout)
                else:
                    # For other formats, we use a temporary text file
                    # We create a unique temp file path
                    temp_txt_file = os.path.join(output_dir, f"__temp_{base_name}.txt")

                    # 1. PDF -> TXT (pdftotext)
                    subprocess.run(['pdftotext', file_path, temp_txt_file],
                                   check=True, capture_output=True, text=True, encoding='utf-8')

                    # 2. TXT -> new_ext (pandoc) - REMOVED format='txt'
                    pypandoc.convert_file(temp_txt_file, new_ext, outputfile=output_file)

                    # 3. Clean up temporary file
                    if os.path.exists(temp_txt_file):
                        os.remove(temp_txt_file)
                    temp_txt_file = None # Reset temp file var

            # --- STANDARD PANDOC CONVERSION ---
            else:
                # The core conversion call for all other types
                # pandoc can auto-detect the input format from the extension
                pypandoc.convert_file(file_path, new_ext, outputfile=output_file)

            print(f"Heartbeat: ({i}/{len(file_list)}) SUCCESS -> {output_file}")
            success_count += 1
        except Exception as e:
            print(f"Heartbeat: ({i}/{len(file_list)}) FAILED for '{filename}'")
            print(f"Heartbeat: [DETAIL] Error: {e}")
            fail_count += 1
            # Clean up partial files on failure
            if os.path.exists(output_file):
                try:
                    os.remove(output_file)
                except Exception as cleanup_e:
                    print(f"Heartbeat: [WARN] Failed to clean up partial file: {cleanup_e}")
            # Clean up temp file if one was left
            if temp_txt_file and os.path.exists(temp_txt_file):
                 try:
                    os.remove(temp_txt_file)
                 except Exception as cleanup_e:
                    print(f"Heartbeat: [WARN] Failed to clean up temp file: {cleanup_e}")
            temp_txt_file = None

    print("\n--- Conversion Complete ---")
    print(f"Heartbeat: Summary:")
    print(f"  - Successfully converted: {success_count}")
    print(f"  - Failed to convert:   {fail_count}")
    print(f"Heartbeat: All converted files are located in: {output_dir}")

# --- Main Execution ---

def main():
    """
    Main function to run the application workflow.
    """
    print("="*50)
    print("      Welcome to the Colab File Converter")
    print("="*50)

    # 1. Check dependencies
    if not install_and_check_deps():
        return  # Exit if deps failed

    # 2. Get directory
    src_dir = get_directory_path()
    if not src_dir:
        return  # Exit if user quit

    # 3. Scan directory
    ext_counts, all_files, convertible_exts = scan_directory(src_dir)
    if not convertible_exts:
        print("Heartbeat: No convertible files found. Exiting.")
        return # Exit if no files

    # 4. Get SOURCE file type from user
    source_ext = get_source_extension(convertible_exts, ext_counts)
    if not source_ext:
        return # Exit if user quit

    # 5. Get file list for the chosen source type
    file_list = [f for f in all_files if os.path.splitext(f)[1].lower() == source_ext]

    # 6. Get conversion choice
    new_ext = get_conversion_choice(source_ext)
    if not new_ext:
        return # Exit if user quit

    # --- NEW STEP: Cache files locally to prevent Google Drive I/O errors ---
    temp_src_dir = None
    local_file_list = []
    try:
        if src_dir.startswith('/content/drive'):
            print(f"\nHeartbeat: Detected Google Drive path. Caching {len(file_list)} files locally for stability...")
            temp_src_dir = tempfile.mkdtemp(prefix="converter_cache_")
            print(f"Heartbeat: Local cache created at: {temp_src_dir}")

            for i, file_path in enumerate(file_list, 1):
                filename = os.path.basename(file_path)
                local_path = os.path.join(temp_src_dir, filename)

                try:
                    shutil.copy2(file_path, local_path)
                    local_file_list.append(local_path)
                except Exception as copy_e:
                    print(f"Heartbeat: [ERROR] Failed to cache file: {filename}")
                    print(f"Heartbeat: [DETAIL] Error: {copy_e}")
                    print(f"Heartbeat: Skipping this file.")

            print(f"Heartbeat: Successfully cached {len(local_file_list)} files.")
            # Use the new local file list for conversion
            files_to_convert = local_file_list
        else:
            # Not a GDrive path, just use the original list
            print("\nHeartbeat: Using local path. No caching needed.")
            files_to_convert = file_list

        # 7. Convert files
        if files_to_convert:
            convert_files(files_to_convert, source_ext, new_ext, src_dir)
        else:
            print("Heartbeat: No files were successfully cached. Nothing to convert.")

    except Exception as e:
        print(f"Heartbeat: [FATAL ERROR] An unexpected error occurred during caching or conversion.")
        print(f"Heartbeat: [DETAIL] Error: {e}")

    finally:
        # 8. Clean up temporary directory
        if temp_src_dir and os.path.exists(temp_src_dir):
            try:
                shutil.rmtree(temp_src_dir)
                print(f"Heartbeat: Cleaned up local cache: {temp_src_dir}")
            except Exception as e:
                print(f"Heartbeat: [WARN] Failed to clean up temporary directory: {temp_src_dir}")
                print(f"Heartbeat: [DETAIL] Error: {e}")

    print("\n" + "="*50)
    print("      File conversion process finished.")
    print("="*50)

if __name__ == "__main__":
    # The 'pypandoc' variable is loaded dynamically,
    # so we initialize it to None here for linters.
    pypandoc = None
    main()
