In [None]:
# Install required dependencies
!pip install paddlepaddle paddleocr
!pip install paddlepaddle-gpu paddleocr

In [None]:
# Import packages
import numpy as np
import pandas as pd
import requests
from PIL import Image
from io import BytesIO
from tqdm import tqdm
from paddleocr import PaddleOCR
from concurrent.futures import ThreadPoolExecutor
import tempfile
import os
import re
import cv2

In [None]:
df = pd.read_csv('/kaggle/input/test5-csv/test.csv')

In [None]:
ocr = PaddleOCR(use_angle_cls=True, lang='en', use_gpu=True, show_log=False)

In [None]:
# Filter dataframe for entity_name being height, width, or depth
df_hwd = df[df['entity_name'].isin(['height', 'width', 'depth'])]

# Filter dataframe for entity_name not being height, width, or depth
df_others = df[~df['entity_name'].isin(['height', 'width', 'depth'])]

In [None]:
# Function to preprocess, resize, and extract text from a single image URL
def extract_text_from_url(image_url):
    try:
        # Download the image from the URL
        response = requests.get(image_url)
        img = Image.open(BytesIO(response.content))

        # Resize the image to reduce dimensions
        max_width = 1000  # Set the maximum width you prefer
        max_height = 1000  # Set the maximum height you prefer
        img.thumbnail((max_width, max_height), Image.ANTIALIAS)

        # Convert to grayscale
        img = img.convert('L')

        # Save the image temporarily
        with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
            temp_path = temp_file.name
            img.save(temp_path)

        # Use PaddleOCR to extract text from the temporary image file
        result = ocr.ocr(temp_path, cls=True)

        # Remove the temporary file
        os.remove(temp_path)

        # Combine the extracted text into a single string
        extracted_texts = [line[1][0] for line in result[0]]  # Extracting only the detected text
        return ' '.join(extracted_texts)
    except Exception as e:
        print(f"Error processing {image_url}: {e}")
        return None


# Function to process URLs one at a time and update DataFrame
def process_images(df, column_name):
    all_texts = []
    i=1
    for image_url in df[column_name]:
        if i%100 == 0:
            print(f"Processing image:",i)
        i+=1
        text = extract_text_from_url(image_url)
        all_texts.append(text)
    return all_texts

In [None]:
# This is done to train in parts
# If anyone wants to do it completely, uncomment following 2 lines and comment further two lines
# df_hwd_small = df_hwd.copy()
# df_others_small = df_others.copy()
df_hwd_small = df_hwd[:22000]
df_others_small = df_others[:11000]

In [None]:
%%time
df_others_small['extracted_text'] = process_images(df_others_small, 'image_link')

In [None]:
df_others_small['extracted_text'] = df_others_small['extracted_text'].astype(str)

In [None]:
# Entity-unit mapping with abbreviations and full names
unit_abbreviation_map = {
    # Abbreviations and full names for weight
    "g": "gram",
    "gram": "gram",
    "gm": "gram",
    "kg": "kilogram",
    "kilogram": "kilogram",
    "mg": "milligram",
    "milligram": "milligram",
    "μg": "microgram",
    "microgram": "microgram",
    "oz": "ounce",
    "ounce": "ounce",
    "lb": "pound",
    "lbs": "pound",
    "pound": "pound",
    "ton": "ton",
    
    # Abbreviations and full names for dimensions
    "cm": "centimetre",
    "centimetre": "centimetre",
    "m": "metre",
    "metre": "metre",
    "mm": "millimetre",
    "millimetre": "millimetre",
    "in": "inch",
    "inch": "inch",
    "\"": "inch",
    "ft": "foot",
    "foot": "foot",
    "yd": "yard",
    "yard": "yard",
    
    # Abbreviations and full names for voltage
    "v": "volt",
    "volt": "volt",
    "kv": "kilovolt",
    "kilovolt": "kilovolt",
    "mv": "millivolt",
    "millivolt": "millivolt",
    
    # Abbreviations and full names for wattage
    "w": "watt",
    "watt": "watt",
    "kw": "kilowatt",
    "kilowatt": "kilowatt",
    
    # Abbreviations and full names for volume
    "l": "litre",
    "litre": "litre",
    "ml": "millilitre",
    "millilitre": "millilitre",
    "cu in": "cubic inch",
    "cubic inch": "cubic inch",
    "cu ft": "cubic foot",
    "cubic foot": "cubic foot",
    "qt": "quart",
    "quart": "quart",
    "gal": "gallon",
    "gallon": "gallon",
    "cup": "cup",
    "fl oz": "fluid ounce",
    "imperial gallon": "imperial gallon",
    "imp gal": "imperial gallon"
}


# Extended entity-unit mapping
entity_unit_map = {
    "item_weight": {"g", "gram", "kg", "kilogram", "mg", "milligram", "μg", "microgram", "oz", "ounce", "lb", "lbs", "pound", "ton"},
    "height": {"cm", "centimetre", "m", "metre", "mm", "millimetre", "in", "inch", "ft", "foot", "yd", "yard", "\""},
    "width": {"cm", "centimetre", "m", "metre", "mm", "millimetre", "in", "inch", "ft", "foot", "yd", "yard", "\""},
    "depth": {"cm", "centimetre", "m", "metre", "mm", "millimetre", "in", "inch", "ft", "foot", "yd", "yard", "\""},
    "voltage": {"v", "volt", "kv", "kilovolt", "mv", "millivolt"},
    "wattage": {"w", "watt", "kw", "kilowatt"},
    "item_volume": {"l", "litre", "ml", "millilitre", "cu in", "cubic inch", "cu ft", "cubic foot", "qt", "quart", "gal", "gallon", "imperial gallon", "imp gal", "cup", "fl oz", "fluid ounce"},
    "maximum_weight_recommendation": {"g", "gram", "kg", "kilogram", "mg", "milligram", "μg", "microgram", "oz", "ounce", "lb", "lbs", "pound", "ton"}
}

In [None]:
# Function to generate a regex pattern for the given entity
def generate_regex(entity_name):
    units = entity_unit_map.get(entity_name, [])
    if not units:
        raise ValueError(f"No units found for entity '{entity_name}'")
    
    # Create a regex pattern to match a number followed by one of the abbreviations
    unit_pattern = '|'.join(units)
    pattern = rf'(\d+\.?\d*)\s*({unit_pattern})'
    return pattern

# Function to convert abbreviation to full unit name
def convert_to_full_unit(abbreviation):
    return unit_abbreviation_map.get(abbreviation, abbreviation)

def convert_to_lower(text):
    return text.lower() if text else str(text)

# Function to extract the entity value from text using regex
def extract_entity(text, entity_name):
    # Generate the regex pattern for the specific entity
    pattern = generate_regex(entity_name)
    
    text = convert_to_lower(text)
    # Search for the pattern in the text
    matches = re.findall(pattern, text)
    
    # If matches are found, return the first match with full unit names
    if matches:
        value, unit_abbreviation = matches[0]
        full_unit = convert_to_full_unit(unit_abbreviation)
        return f"{value} {full_unit}"
    else:
        return ""

In [None]:
entity_val = []

In [None]:
for i, text in enumerate(df_others_small['extracted_text']):
    # Extract item_weight
    entity_name = df_others_small.iloc[i, 3]
    result = extract_entity(text, entity_name)
    entity_val.append(result)
    
df_others_small['entity_value'] = entity_val

In [None]:
# To save the generated CSV file, not necessary
df_others_small.to_csv('df_others_small_result.csv', index=True)

In [None]:
# Function to download and resize the image
def preprocess_image_from_url(image_url):
    response = requests.get(image_url)
    img = Image.open(BytesIO(response.content))
    max_width = 1200  # Set the maximum width
    max_height = 1200  # Set the maximum height
    img.thumbnail((max_width, max_height), Image.ANTIALIAS)
    img = img.convert('L')
    return img

# OCR Extraction with Bounding Boxes using PaddleOCR
def extract_text_with_bboxes(image_path):
    # Use PaddleOCR to extract text and bounding boxes
    results = ocr.ocr(image_path, cls=True)  # cls=True for angle classification if needed
    text_bboxes = []
    for result in results[0]:
        bbox, (text, confidence) = result[0], result[1]
        text_bboxes.append((bbox, text, confidence))
    return text_bboxes

# Line Detection Function
def detect_lines(image_path):
    # Read the image
    image = cv2.imread(image_path)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    
    # Edge detection
    edges = cv2.Canny(gray, 50, 150, apertureSize=3)
    
    # Hough Line Transformation to detect lines
    lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 100, minLineLength=50, maxLineGap=10)
    
    vertical_lines = []
    horizontal_lines = []
    
    # Separate vertical and horizontal/slanted lines
    for line in lines:
        x1, y1, x2, y2 = line[0]
        angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
        if 85 < abs(angle) < 95:  # Near-vertical lines
            vertical_lines.append(line)
        else:
            horizontal_lines.append(line)
    
    return vertical_lines, horizontal_lines, image

# Function to check if bounding box is near a line
def is_near_line(bbox, lines, threshold=50):
    center_x = (bbox[0][0] + bbox[2][0]) / 2
    center_y = (bbox[0][1] + bbox[2][1]) / 2
    
    for line in lines:
        x1, y1, x2, y2 = line[0]
        mx = max(y1, y2)
        mn = min(y1, y2)
        if abs(x1 - x2) < threshold:  # For vertical lines, check proximity to x-coordinates
            if min(abs(bbox[0][0] - x1), abs(bbox[2][0] - x1)) < threshold and bbox[0][1] > mn and bbox[0][1] < mx:
                return True
    return False

# Generate Regex Pattern
def generate_regex(entity_name):
    units = entity_unit_map.get(entity_name, [])
    if not units:
        raise ValueError(f"No units found for entity '{entity_name}'")
    unit_pattern = '|'.join(units)
    pattern = rf'(\d+\.?\d*)\s*({unit_pattern})'
    return pattern


# Convert Abbreviation to Full Unit Name
def convert_to_full_unit(abbreviation):
    return unit_abbreviation_map.get(abbreviation, abbreviation)

# Extract Entity using Regex
def extract_entity(text, entity_name):
    pattern = generate_regex(entity_name)
    matches = re.findall(pattern, convert_to_lower(text))
    if matches:
        value, unit_abbreviation = matches[0]
        full_unit = convert_to_full_unit(unit_abbreviation)
        return f"{value} {full_unit}"
    return ""

# Main function to process image
def process_image(image_url, entity_name):
    # Preprocess the image from the URL
    img = preprocess_image_from_url(image_url)
    
    # Temporarily save the preprocessed image
    with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
        temp_path = temp_file.name
        img.save(temp_path)
    
    # Extract text and bounding boxes using PaddleOCR
    text_bboxes = extract_text_with_bboxes(temp_path)
    
    # Detect lines in the image
    vertical_lines, horizontal_lines, image = detect_lines(temp_path)
    
    entity_values = {'height': None, 'width': None, 'depth': None}
    
    # Process each bounding box and assign it to an entity
    for (bbox, text, confidence) in text_bboxes:
        if is_near_line(bbox, vertical_lines):
            # Assign to height if near vertical lines
            entity_values['height'] = text
        else:
            entity_values['width'] = text
            entity_values['depth'] = text
    
    # Clean up the temporary file
    os.remove(temp_path)
    
    return extract_entity(entity_values[entity_name], entity_name), text_bboxes

In [None]:
def process_images_in_batch(df):
    results = []
    
    # Process each image in the dataframe
    for _, row in df.iterrows():
        image_url = row['image_link']
        entity_name = row['entity_name']
        try:
            # Process the image and extract text with bounding boxes
            extracted_text, text_bboxes = process_image(image_url, entity_name)
            result = {
                'index': row['index'],
                'image_link': image_url,
                'group_id': row['group_id'],
                'entity_name': entity_name,
                'extracted_text_bboxes': text_bboxes, # Add extracted text of bounding boxes
                'entity_value': extracted_text
            }
            results.append(result)
        except Exception as e:
            print(f"Error processing image at index {row['index']}: {e}")
            results.append({
                'index': row['index'],
                'image_link': image_url,
                'group_id': row['group_id'],
                'entity_name': entity_name,
                'extracted_text_bboxes': None,
                'entity_value': None  # Handle errors by setting the text to None
            })
    
    # Convert results to a dataframe
    return pd.DataFrame(results)

In [None]:
df_hwd_result = process_images_in_batch(df_hwd_small)

In [None]:
df_hwd_result.to_csv('df_hwd_small_result.csv', index=True)