<a href="https://colab.research.google.com/github/sbkapelner/multipageOCR/blob/master/easyocr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install easyocr
!apt-get update
!apt-get install -y imagemagick

In [None]:
!for file in *.tif; do convert "$file" "${file%.tif}.png"; done


In [None]:
import os
import easyocr
from PIL import Image

def preprocess_bw_to_rgb(image_path, output_path):
    """
    Convert black-and-white (grayscale) image to RGB format.
    """
    try:
        with Image.open(image_path) as img:
            # Convert black-and-white to RGB
            img = img.convert("RGB")
            img.save(output_path, format="PNG")
            print(f"Converted to RGB: {image_path} -> {output_path}")
        return output_path
    except Exception as e:
        print(f"Failed to preprocess image: {image_path}, Error: {e}")
        return None

def process_bw_images_with_easyocr(input_dir, output_dir, log_file, use_gpu=True):
    """
    Process all black-and-white (.png) images in the input directory using EasyOCR with GPU acceleration,
    save results to the output directory, and log errors.
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Clear the log file if it already exists
    if os.path.exists(log_file):
        open(log_file, 'w').close()

    # Initialize EasyOCR Reader
    reader = easyocr.Reader(['en'], gpu=use_gpu)  # Use GPU if available

    # Get a list of all PNG files
    png_files = [f for f in os.listdir(input_dir) if f.endswith(".png")]
    total_files = len(png_files)

    for index, filename in enumerate(png_files, start=1):
        try:
            input_path = os.path.join(input_dir, filename)
            preprocessed_path = os.path.join(output_dir, f"preprocessed_{filename}")

            print(f"[{index}/{total_files}] Processing: {filename}...")

            # Convert black-and-white to RGB
            preprocessed_path = preprocess_bw_to_rgb(input_path, preprocessed_path)
            if preprocessed_path is None:
                print(f"[{index}/{total_files}] Skipping due to preprocessing error: {filename}")
                continue

            # Perform OCR on the preprocessed image
            results = reader.readtext(preprocessed_path)

            # Save results to a text file
            output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
            with open(output_path, 'w') as output_file:
                for bbox, text, confidence in results:
                    output_file.write(f"Text: {text}\nBoundingBox: {bbox}\nConfidence: {confidence}\n\n")

            print(f"[{index}/{total_files}] Processed: {filename}, results saved to {output_path}")

        except Exception as e:
            # Log errors and continue
            error_message = f"Error processing {filename}: {str(e)}"
            print(error_message)
            with open(log_file, 'a') as log:
                log.write(error_message + "\n")

if __name__ == "__main__":
    # Define input and output directories and log file
    input_dir = "/content/"  # Update with your PNG file directory
    output_dir = "/content/"  # Update with desired output directory
    log_file = os.path.join(output_dir, "processing_errors.log")

    # Run the processing function
    process_bw_images_with_easyocr(input_dir, output_dir, log_file, use_gpu=True)


In [None]:
#Delete duplicate images and txt
import os
import hashlib

def find_duplicates(directory):
    """
    Find and delete duplicate PNG files in the given directory.
    """
    hash_map = {}  # To store file hashes and their paths
    duplicates = []  # To store duplicate file paths

    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".png"):
                file_path = os.path.join(root, file)
                # Compute hash of the file
                file_hash = compute_hash(file_path)

                if file_hash in hash_map:
                    print(f"Duplicate found: {file_path} (Original: {hash_map[file_hash]})")
                    duplicates.append(file_path)
                else:
                    hash_map[file_hash] = file_path

    # Delete duplicates
    for duplicate in duplicates:
        try:
            os.remove(duplicate)
            print(f"Deleted duplicate: {duplicate}")
        except Exception as e:
            print(f"Error deleting {duplicate}: {e}")

def compute_hash(file_path, hash_func=hashlib.md5):
    """
    Compute the hash of a file.
    """
    hash_obj = hash_func()
    with open(file_path, "rb") as f:
        while chunk := f.read(8192):  # Read file in chunks to handle large files
            hash_obj.update(chunk)
    return hash_obj.hexdigest()

# Directory to scan for duplicates
directory = "/content/"
find_duplicates(directory)

In [None]:
import os
import xml.etree.ElementTree as ET
from xml.dom.minidom import parseString
from PIL import Image  # To fetch image dimensions dynamically


def convert_text_to_voc(output_dir, img_filename, img_width, img_height, annotations):
    """
    Converts text bounding box annotations to Pascal VOC XML format.

    :param output_dir: Directory to save Pascal VOC XML files.
    :param img_filename: Name of the image file.
    :param img_width: Width of the image.
    :param img_height: Height of the image.
    :param annotations: List of dictionaries containing bounding box and text info.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Create Pascal VOC XML structure
    annotation = ET.Element("annotation")
    ET.SubElement(annotation, "folder").text = os.path.basename(output_dir)
    ET.SubElement(annotation, "filename").text = img_filename
    ET.SubElement(annotation, "path").text = os.path.join(output_dir, img_filename)

    source = ET.SubElement(annotation, "source")
    ET.SubElement(source, "database").text = "Unknown"

    size = ET.SubElement(annotation, "size")
    ET.SubElement(size, "width").text = str(img_width)
    ET.SubElement(size, "height").text = str(img_height)
    ET.SubElement(size, "depth").text = "3"  # Assuming RGB images

    ET.SubElement(annotation, "segmented").text = "0"

    for ann in annotations:
        obj = ET.SubElement(annotation, "object")
        ET.SubElement(obj, "name").text = "Text"  # All annotations are labeled as "Text"
        ET.SubElement(obj, "pose").text = "Unspecified"
        ET.SubElement(obj, "truncated").text = "0"
        ET.SubElement(obj, "difficult").text = "0"

        # Add bounding box
        bndbox = ET.SubElement(obj, "bndbox")
        ET.SubElement(bndbox, "xmin").text = str(int(ann["BoundingBox"][0][0]))
        ET.SubElement(bndbox, "ymin").text = str(int(ann["BoundingBox"][0][1]))
        ET.SubElement(bndbox, "xmax").text = str(int(ann["BoundingBox"][2][0]))
        ET.SubElement(bndbox, "ymax").text = str(int(ann["BoundingBox"][2][1]))

    # Save XML to file
    xml_string = ET.tostring(annotation)
    xml_pretty = parseString(xml_string).toprettyxml(indent="  ")
    xml_path = os.path.join(output_dir, os.path.splitext(img_filename)[0] + ".xml")

    with open(xml_path, "w") as xml_file:
        xml_file.write(xml_pretty)

    print(f"Saved: {xml_path}")


def process_txt_files(txt_dir, output_dir, img_dir):
    """
    Loops through all .txt files, converts them to Pascal VOC, and deletes the .txt files.

    :param txt_dir: Directory containing .txt annotation files.
    :param output_dir: Directory to save Pascal VOC XML files.
    :param img_dir: Directory containing the corresponding images.
    """
    for txt_file in os.listdir(txt_dir):
        if txt_file.endswith(".txt"):
            txt_path = os.path.join(txt_dir, txt_file)
            base_name = os.path.splitext(txt_file)[0]
            img_filename = f"{base_name}.png"  # Adjust if image extensions differ
            img_path = os.path.join(img_dir, img_filename)

            # Ensure the image exists
            if not os.path.exists(img_path):
                print(f"Image not found for {txt_file}: {img_path}")
                continue

            # Get image dimensions dynamically
            try:
                with Image.open(img_path) as img:
                    img_width, img_height = img.size
            except Exception as e:
                print(f"Error reading image {img_path}: {e}")
                continue

            # Parse the .txt file into annotations
            annotations = []
            with open(txt_path, "r") as file:
                for line in file:
                    parts = line.strip().split(": ")
                    if len(parts) < 2:
                        continue
                    text_key = parts[0].strip()
                    if text_key == "BoundingBox":
                        bounding_box = eval(parts[1])
                        annotations.append({
                            "BoundingBox": bounding_box,
                            "Text": None,  # Text is optional
                            "Confidence": None  # Confidence is optional
                        })

            # Convert to Pascal VOC and save
            convert_text_to_voc(output_dir, img_filename, img_width, img_height, annotations)

            # Delete the .txt file
            os.remove(txt_path)
            print(f"Deleted: {txt_path}")


# Example Usage
txt_directory = "/content"  # Replace with the path to your .txt files
output_directory = "/content"  # Replace with the path to save Pascal VOC XML files
image_directory = "/content"  # Replace with the path to your image files

process_txt_files(txt_directory, output_directory, image_directory)



In [None]:
!zip -r /home/Folder_1_New.zip /content/ -i "*.png" "*.xml"

In [None]:
from google.colab import files
files.download('/home/Folder_1_New.zip')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [45]:
#Cloud Storage Workaround from Sasfari giving problems
from google.cloud import storage

# Set up authentication
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/language-app-323017-566e94dcd421.json"

# Initialize the GCS client
client = storage.Client()

# Specify your bucket name
bucket_name = "misc_37510"  # Replace with your bucket name
bucket = client.get_bucket(bucket_name)

In [None]:
#This is for groups of 100
import os
import zipfile

def compress_files_with_associations(directory, output_directory, batch_size=100, zip_prefix="Part"):
    """
    Compress .png files and their associated .txt files into zip files with a maximum of `batch_size` files each.
    Ensures each .png file stays with its corresponding .txt file.

    :param directory: Directory containing the files to be compressed.
    :param output_directory: Directory to save the zip files.
    :param batch_size: Number of file pairs (.png + .txt) per zip file.
    :param zip_prefix: Prefix for the zip file names.
    """
    # Ensure the output directory exists
    os.makedirs(output_directory, exist_ok=True)

    # Create a list of pairs of .png and .txt files
    all_files = os.listdir(directory)
    file_pairs = []
    for file in all_files:
        if file.endswith(".png"):
            base_name = os.path.splitext(file)[0]
            txt_file = f"{base_name}.xml"
            file_pair = [file]  # Start with the PNG file
            if txt_file in all_files:
                file_pair.append(txt_file)  # Add the TXT file if it exists
            file_pairs.append(file_pair)

    # Flatten the file pairs and batch them
    flattened_files = [item for pair in file_pairs for item in pair]
    batches = [flattened_files[i:i + batch_size] for i in range(0, len(flattened_files), batch_size)]

    # Create zip files for each batch
    for idx, batch in enumerate(batches, start=1):
        zip_name = os.path.join(output_directory, f"{zip_prefix}{idx}.zip")
        with zipfile.ZipFile(zip_name, 'w') as zipf:
            for file in batch:
                file_path = os.path.join(directory, file)
                zipf.write(file_path, arcname=file)  # Add files to zip
        print(f"Created: {zip_name}")

# Directory containing the files to compress
input_directory = "/content/"  # Replace with your directory
output_directory = "/home/"  # Output directory for the zip files

# Compress files into batches of 100
compress_files_with_associations(input_directory, output_directory, batch_size=100, zip_prefix="Part")


In [None]:
import os
from google.cloud import storage

def upload_zip_files_to_gcs(directory, bucket_name, bucket_path=""):
    """
    Upload all .zip files in the specified directory to a GCS bucket.

    :param directory: Directory containing the .zip files.
    :param bucket_name: Name of the GCS bucket.
    :param bucket_path: Path in the bucket where files will be uploaded (optional).
    """
    # Initialize the GCS client
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)

    # Loop through all .zip files in the directory
    for file_name in os.listdir(directory):
        if file_name.endswith(".zip"):
            local_path = os.path.join(directory, file_name)
            blob_name = os.path.join(bucket_path, file_name)

            # Upload the file to GCS
            blob = bucket.blob(blob_name)
            blob.upload_from_filename(local_path)

            print(f"Uploaded: {local_path} to gs://{bucket_name}/{blob_name}")

# Directory containing the .zip files
zip_directory = "/home/"

# GCS bucket name
bucket_name = "misc_37510"  # Replace with your GCS bucket name

# Path in the bucket where files will be uploaded (optional)
bucket_path = ""  # Leave empty if you want to upload to the root of the bucket

# Upload all .zip files
upload_zip_files_to_gcs(zip_directory, bucket_name, bucket_path)
