In [1]:
import time
import pandas as pd
import os
import easyocr
import cv2
from fuzzywuzzy import fuzz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm
import pytesseract
import warnings
from concurrent.futures import ThreadPoolExecutor

warnings.filterwarnings('ignore')

# Initialize EasyOCR reader
reader = easyocr.Reader(['en'], gpu=True)

def easyocr_transformation(img_path, rotate=0):
    """
    Uses the easyOCR model - gets the bounding boxes of each text/number.
    For each image id we collect - text, image_id and top left coordinates 'tl_x', 'tl_y' for each of the text/number detected.
    """
    if rotate == 0:
        results = reader.readtext(img_path)
    else:
        image = cv2.imread(img_path)
        if rotate == 90:
            rotated_image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
        elif rotate == 180:
            rotated_image = cv2.rotate(image, cv2.ROTATE_180)
        elif rotate == 270:
            rotated_image = cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE)
        results = reader.readtext(rotated_image)
    results_df = pd.DataFrame(results, columns=['bbox', 'text', 'conf'])
    results_df['image_id'] = os.path.basename(img_path)

    def convert_to_flat_list(bbox):
        return [item for sublist in bbox for item in sublist]

    results_df['bbox'] = results_df['bbox'].apply(convert_to_flat_list)

    bbox_cols = ['tl_x', 'tl_y', 'tr_x', 'tr_y', 'br_x', 'br_y', 'bl_x', 'bl_y']
    bbox_df = pd.DataFrame(results_df['bbox'].to_list(), columns=bbox_cols, index=results_df.index)
    results_df = pd.concat([results_df, bbox_df], axis=1)
    results_df = results_df[['text', 'image_id', 'tl_x', 'tl_y']]

    return results_df

def hybrid_similarity(text, target, threshold=0.5):
    """
    Compute a hybrid similarity measure between text and target using cosine similarity and fuzzy matching.
    """
    # Cosine Similarity
    vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 3))
    vectors = vectorizer.fit_transform([text, target])
    cosine_sim = cosine_similarity(vectors)[0, 1]

    # Fuzzy Matching
    fuzzy_sim = fuzz.partial_ratio(text, target) / 100

    # Combined similarity
    combined_sim = (cosine_sim + fuzzy_sim) / 2

    return combined_sim >= threshold, combined_sim

def compare_to_true_false(text, threshold=0.255555):
    """
    Compare the text with 'TRUE' and 'FALSE' using hybrid similarity with a threshold.
    """
    is_true, true_sim = hybrid_similarity(text, 'TRUE', threshold)
    is_false, false_sim = hybrid_similarity(text, 'FALSE', threshold)

    if is_true:
        return 'TRUE', true_sim
    elif is_false:
        return 'FALSE', false_sim
    else:
        return 'UNKNOWN', max(true_sim, false_sim)

def is_similar(text, targets, threshold=80):
    """
    Check if a text is similar to any target strings within a given threshold using fuzzy matching.
    Return both the boolean result and the highest similarity ratio found.
    """
    highest_ratio = 0
    for target in targets:
        if len(text) > 2:  # Ensure text length is more than 2 for meaningful comparison
            ratio = fuzz.partial_ratio(text, target)
            if ratio > highest_ratio:
                highest_ratio = ratio
            if ratio >= threshold:
                return True
    return False

def clean_and_extract_answers(easyocr_df):
    """
    Clean and extract answers from the EasyOCR output dataframe.
    """
    answers = []
    unique_images = easyocr_df['image_id'].unique()

    for image_id in unique_images:

        image_df = easyocr_df[easyocr_df['image_id'] == image_id]

        # Filter rows based on the length of the text column and store clean data
        image_df = image_df[image_df['text'].str.len().between(1, 12)]

        # Find the TRUE / FALSE text and remember its tl_x value
        true_false_x = None
        for index, row in image_df.iterrows():
            if is_similar(row['text'].strip().upper(), ["TRUE /FALSE", "TRUE/FALSE", "TRUE / FALSE"]):
                true_false_x = row['tl_x']
                break

        if true_false_x is None:
            print(f"TRUE / FALSE not found in the image {image_id}.")
            continue

        # Further filter rows based on tl_x +/- 450
        image_df = image_df[(image_df['tl_x'] >= true_false_x - 450) & (image_df['tl_x'] <= true_false_x + 450)]

        # Remove the first row if it contains the index values
        index_values = ["TRUE /FALSE", "TRUE/FALSE", "TRUE / FALSE"]
        image_df = image_df[~image_df['text'].str.strip().str.upper().isin(index_values)]
        
        # Find the answers for each question
        current_answers = []
        question_number = 1  # Start question number from 1
        for index, row in image_df.iterrows():
            answer, sim_score = compare_to_true_false(row['text'].strip().upper())
            if answer in ['TRUE', 'FALSE', 'UNKNOWN']:
                current_answers.append({
                    'Question Number': question_number,
                    'Correct Answer': answer,
                    'image_id': image_id,
                    'tl_x': row['tl_x'],
                    'text': row['text'],
                    'Similarity Score': sim_score
                })
                question_number += 1  # Increment the question number

        answers_df = pd.DataFrame(current_answers)
        answers.append(answers_df)

    return pd.concat(answers, ignore_index=True)

def handle_unknown_answers(df):
    """
    Handle UNKNOWN answers:
    - Convert UNKNOWN words starting with 'T' to 'TRUE'
    - Convert UNKNOWN words starting with 'F' to 'FALSE'
    - Remove records where the text does not start with 'T' or 'F' and combined_similarity < 0.15
    - Replace 'UNKNOWN' with 'TRUE' or 'FALSE' based on the highest combined similarity
    - Remove records with specific text values
    - Ensure there are exactly 10 questions in the dataframe
    """
    # Convert UNKNOWN starting with 'T' or 'F'
    df.loc[df['text'].str.startswith('T', na=False) & (df['Correct Answer'] == 'UNKNOWN'), 'Correct Answer'] = 'TRUE'
    df.loc[df['text'].str.startswith('F', na=False) & (df['Correct Answer'] == 'UNKNOWN'), 'Correct Answer'] = 'FALSE'

    # Remove records with specific text values
    remove_records = ['0', 'OR', '0 OR', 'THAN', 'ERE', 'THEN', 'RUE', 'LER']
    df = df[~df['text'].str.strip().str.upper().isin(remove_records)].reset_index(drop=True)

    # Remove records where the text does not start with 'T' or 'F' and combined_similarity < 0.15
    to_remove = []
    for index, row in df.iterrows():
        if not row['text'].startswith(('T', 'F')) and hybrid_similarity(row['text'].strip().upper(), 'TRUE', 0.15)[1] < 0.15 and hybrid_similarity(row['text'].strip().upper(), 'FALSE', 0.15)[1] < 0.15:
            to_remove.append(index)

    df = df.drop(to_remove).reset_index(drop=True)

    # Replace 'UNKNOWN' with 'TRUE' or 'FALSE' based on the highest combined similarity
    for index, row in df.iterrows():
        if row['Correct Answer'] == 'UNKNOWN':
            similarity_with_true = hybrid_similarity(row['text'].strip().upper(), 'TRUE', 0.15)[1]
            similarity_with_false = hybrid_similarity(row['text'].strip().upper(), 'FALSE', 0.15)[1]
            if similarity_with_true > similarity_with_false:
                df.at[index, 'Correct Answer'] = 'TRUE'
            else:
                df.at[index, 'Correct Answer'] = 'FALSE'

    # Ensure there are exactly 10 questions in the dataframe
    image_id = df['image_id'].iloc[0] if 'image_id' in df.columns else None
    while len(df) < 10:
        df = df.append({'Question Number': len(df) + 1, 'Correct Answer': 'TRUE', 'text': 'ADDED EXTRA', 'image_id': image_id}, ignore_index=True)

    if len(df) > 10:
        df = df.iloc[:10]

    # Ensure Question Number column is in incremental numeric format
    df['Question Number'] = range(1, len(df) + 1)

    return df

def calculate_accuracy(extracted_df, ground_truth_df):
    """
    Calculate the accuracy of the extracted answers compared to the ground truth.
    """
    # Ensure both dataframes have the same question numbers as strings
    extracted_df['Question Number'] = extracted_df['Question Number'].astype(str)
    ground_truth_df['Question Number'] = ground_truth_df['Question Number'].astype(str)

    # Merge the dataframes on the question number to compare answers
    merged_df = pd.merge(extracted_df, ground_truth_df, on='Question Number', how='left', suffixes=('_extracted', '_truth'))

    # Convert answers to strings and then to uppercase for case-insensitive comparison
    merged_df['Correct Answer_extracted'] = merged_df['Correct Answer_extracted'].astype(str).str.upper()
    merged_df['Correct Answer_truth'] = merged_df['Correct Answer_truth'].astype(str).str.upper()

    # Calculate the number of correct answers
    correct_answers = merged_df[merged_df['Correct Answer_extracted'] == merged_df['Correct Answer_truth']]
    accuracy = len(correct_answers) / len(ground_truth_df) * 100  # Accuracy in percentage

    return accuracy

def calculate_mae(extracted_df, ground_truth_df):
    """
    Calculate the Mean Absolute Error (MAE) of the extracted answers compared to the ground truth.
    """
    # Ensure both dataframes have the same question numbers as strings
    extracted_df['Question Number'] = extracted_df['Question Number'].astype(str)
    ground_truth_df['Question Number'] = ground_truth_df['Question Number'].astype(str)

    # Merge the dataframes on the question number to compare answers
    merged_df = pd.merge(extracted_df, ground_truth_df, on='Question Number', how='left', suffixes=('_extracted', '_truth'))

    # Convert answers to strings for comparison
    merged_df['Correct Answer_extracted'] = merged_df['Correct Answer_extracted'].astype(str).str.upper()
    merged_df['Correct Answer_truth'] = merged_df['Correct Answer_truth'].astype(str).str.upper()

    # Calculate MAE (Mean Absolute Error)
    merged_df['Error'] = (merged_df['Correct Answer_extracted'] != merged_df['Correct Answer_truth']).astype(int)
    mae = merged_df['Error'].mean()

    return mae

def process_and_save_cleaned_answers_for_all_images(image_paths, ground_truth):
    """
    Process all images, clean answers, save them, and calculate accuracy and MAE.
    """
    corrected_images_folder = './corrected_images'
    os.makedirs(corrected_images_folder, exist_ok=True)
    results = []
    all_metrics = []
    
    start_time = time.time()  # Record start time

    # Use ThreadPoolExecutor for parallel processing
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(process_image, image_path, ground_truth) for image_path in image_paths]

        for future in tqdm(futures, desc="Processing all images"):
            result = future.result()
            if result is not None:
                results.append(result['result'])
                all_metrics.append(result['metrics'])
    
    end_time = time.time()  # Record end time
    elapsed_time = end_time - start_time  # Calculate elapsed time
    print(f"Total processing time: {elapsed_time:.2f} seconds")
    
    results_df = pd.DataFrame(results)
    results_df.to_csv('leaderboard_results.csv', index=False)
    print("Leaderboard results saved to 'leaderboard_results.csv'.")
    all_metrics_df = pd.DataFrame(all_metrics)
    all_metrics_df.to_csv('leaderboard_results_all_metrics.csv', index=False)
    print("Leaderboard results saved to 'leaderboard_results_all_metrics.csv'.")

def process_image(image_path, ground_truth):
    """
    Process all images, clean answers, save them, and calculate accuracy and MAE.
    """
    # Correct the orientation of the image
    image = cv2.imread(image_path)
    orientation = get_image_orientation(image)
    corrected_image = correct_image_orientation(image, orientation)

    corrected_image_path = os.path.join('./corrected_images', os.path.basename(image_path))
    cv2.imwrite(corrected_image_path, corrected_image)

    # Process the corrected image
    easyocr_df = easyocr_transformation(corrected_image_path)
    model_answers_df = clean_and_extract_answers(easyocr_df)

    if model_answers_df.empty:
        print(f"No valid answers extracted from image: {os.path.basename(image_path)}")
        return None


    # Save the best outputs after handling 'UNKNOWN' answers
    best_output_df = handle_unknown_answers(model_answers_df)


    # Calculate accuracy and MAE using the best outputs
    accuracy = calculate_accuracy(best_output_df, ground_truth)
    mae = calculate_mae(best_output_df, ground_truth)


    correct_answers_count = calculate_accuracy(best_output_df, ground_truth) / 10   # Assuming 10 questions per image
    filename = os.path.basename(image_path)
    result = {'filename': filename, 'predicted marks': int(correct_answers_count)}
    metrics = {'filename': filename, 'predicted marks': int(correct_answers_count), 'mae': mae, 'accuracy': accuracy}
    
    return {'result': result, 'metrics': metrics}


def get_image_orientation(image):
    """
    Get the orientation information from pytesseract.
    """
    osd = pytesseract.image_to_osd(image)
    orientation = int(osd.split("\n")[2].split(":")[1])
    return orientation

# Function to correct the orientation of an image
def correct_image_orientation(image, orientation):
    """
    Correct the orientation of an image.
    """
    if orientation == 0:
        return image
    elif orientation == 90:
        return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
    elif orientation == 180:
        return cv2.rotate(image, cv2.ROTATE_180)
    elif orientation == 270:
        return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE)
    else:
        return image

# Load ground truth model answers
ground_truth = pd.read_csv('./ModelAnswer.csv')

# Folder containing images
image_folder = './Sample_Data/Sample_Data'  # Replace with the path to your folder

# List of all image files in the folder
image_paths = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith('.jpg')]

# Process all images, clean answers, save them, and calculate accuracy and MAE
process_and_save_cleaned_answers_for_all_images(image_paths, ground_truth)

Processing all images: 100%|██████████| 48/48 [01:56<00:00,  2.42s/it]

Total processing time: 116.31 seconds
Leaderboard results saved to 'leaderboard_results.csv'.
Leaderboard results saved to 'leaderboard_results_all_metrics.csv'.



