In [4]:
from PIL import Image, ImageEnhance, ImageFilter
import pandas as pd
from tqdm import tqdm
import os
import json
import google.generativeai as genai
from dotenv import load_dotenv
import csv
import re

# https://ai.google.dev/gemini-api/docs/pricing#gemini-2.0-flash-lite

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
class Utils:
    @staticmethod
    def process_image(image):
        image = image.convert('L')  # Grayscale
        width, height = image.size
        image = image.resize((width * 2, height * 2),
                        Image.Resampling.LANCZOS)  # Upscale
        enhancer = ImageEnhance.Contrast(image)
        image = enhancer.enhance(3)  # Add contrast
        image = image.point(lambda p: 0 if p < 200 else 255)  # Text visibility
        return image
    
    @staticmethod
    def standardize_id(extracted_text: str) -> list:
        """
        Standardizes the extracted ID text based on predefined rules.
        Handles cleaning, splitting, and applying specific transformations.
        
        Args:
            extracted_text (str): The raw text containing ID information
            
        Returns:
            list: List of standardized ID parts
        """
        if not extracted_text:
            return []

        # 1. Remove trailing newline and potential extra whitespace
        cleaned_text = extracted_text.strip()

        # First, clean the input by removing all content within parentheses
        # This handles cases where there might be complex content including slashes inside parentheses
        cleaned_no_parentheses = re.sub(r'\([^)]*\)', '', cleaned_text)

        # Then standardize the cleaned result
        cleaned_no_parentheses = cleaned_no_parentheses.strip()

        # If the input was just something in parentheses, handle that case
        if not cleaned_no_parentheses:
            return []

        # Split by comma if present
        comma_parts = [part.strip()
                       for part in cleaned_no_parentheses.split(',')]
        standardized_parts = []

        # Process each comma-separated part
        for part in comma_parts:
            if not part:
                continue

            # Check if we have a prefix pattern (letters followed by numbers)
            prefix_match = re.match(r'^([A-Z]+)(\d+)', part)
            prefix = ""
            if prefix_match:
                prefix = prefix_match.group(1)

            # Split by slash if present
            slash_parts = [sub_part.strip() for sub_part in part.split('/')]

            # Process each slash part
            for i, sub_part in enumerate(slash_parts):
                if not sub_part:
                    continue

                # If this is not the first part and it's only digits, prepend the prefix
                if i > 0 and re.match(r'^\d+$', sub_part) and prefix:
                    sub_part = prefix + sub_part

                if sub_part:
                    standardized_parts.append(sub_part)

        # De-duplicate while preserving order
        final_standardized_list = []
        seen = set()
        for part in standardized_parts:
            if part not in seen:
                final_standardized_list.append(part)
                seen.add(part)

        return final_standardized_list
    
    @staticmethod
    def clean_json_response(response):
        ## Process response
        start_marker = '```json\n'
        end_marker = '\n```'

        cleaned_json_string = response
        if cleaned_json_string.startswith(start_marker):
            cleaned_json_string = cleaned_json_string[len(start_marker):]
        if cleaned_json_string.endswith(end_marker):
            cleaned_json_string = cleaned_json_string[:-len(end_marker)]

        try:
            data_dict = json.loads(cleaned_json_string)
            print("Successfully parsed JSON string into Python dictionary.")

        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
            print("The cleaned string might still be invalid JSON.")
            data_dict = None 

        return data_dict
    
    @staticmethod
    def compute_and_append(data_dict):
        ## Append data into csv
        MM_TO_MIL = 39.4
        csv_file = "table_data.csv"

        try:
            id = data_dict['model_name']
            min_coplanarity = 0
            max_coplanarity = round(data_dict['table']['aaa']['MAX'] * MM_TO_MIL, 5)
            nom_ball_width = round(data_dict['table']['øb']['NOM'] * MM_TO_MIL, 5)
            nom_pitch = round(data_dict['table']['e'] * MM_TO_MIL, 5)
            ball_width = round((data_dict['table']['øb']['MAX'] -
                            data_dict['table']['øb']['NOM']) * MM_TO_MIL, 5)

            row = [id, min_coplanarity, max_coplanarity,
                nom_ball_width, nom_pitch, ball_width]
            headers = ['id', 'min_coplanarity',
                    'max_coplanarity', 'nom_ball_width', 'nom_pitch', 'ball_width']

            # Write to CSV
            with open(csv_file, mode='a', newline='') as file:
                writer = csv.writer(file)
                if not os.path.exists(
                        csv_file) or os.path.getsize(csv_file) == 0:
                    writer.writerow(headers)
                writer.writerow(row)
                
        except Exception as e:
            print("Exception in", e)
            print(data_dict)

In [None]:
class geminiFlashLite:
    def __init__(self, mappings_file=None):
        load_dotenv()
        
        api_key = os.getenv("GOOGLE_API_KEY")
        if not api_key:
            raise ValueError(
                "GOOGLE_API_KEY environment variable not set. Please set it.")
            
        genai.configure(api_key=api_key)
        self.MODEL_NAME = 'gemini-2.0-flash-lite-001'
        self.model = genai.GenerativeModel(model_name=self.MODEL_NAME)
        if not mappings_file:
            self.mappings = {}
        else:
            with open(mappings_file, 'r') as f:
                loaded_mappings = json.load(f)
            self.mappings = loaded_mappings
    
    def getMapping(self, targetId):
        print("Processing mappings...")
        # Search through mappings
        print("Looking through cache...")
        targetIds = Utils.standardize_id(targetId)
        for key, val in self.mappings.items():
            if key in targetIds:
                return self.mappings[key][0]

        initial_history = [
        {'role': 'user', 'parts': [
            "I will be providing you an image. Help me to extract the model name found at the top of the image. Only include the model name in your reponse without any other text."]},
        ]
        chat = self.model.start_chat(history=initial_history)

        print("Looking through files...")
        cachedIds = set(item for sublist in self.mappings.values()
            for item in sublist)
        
        idFound = False
        for filename in os.listdir("./jpgs"):
            image_path = os.path.join("./jpgs", filename)
            # Skip if mapping is already stored
            if image_path in cachedIds:
                continue
            
            img = Image.open(image_path)
            img = Utils.process_image(img)
            id_list = self.extractId(chat, img)
            
            # update mappings
            for id in id_list:
                if id in self.mappings: # Appends the full pathname
                    self.mappings[id].append(image_path)
                else:
                    self.mappings[id] = [image_path]
            
            idFound = False
            for processedTargetId in targetIds:
                if processedTargetId in id_list:
                    idFound = True
                    break
            if idFound:
                break
        
        # save mappings
        save_dir = os.path.dirname("idFilenameMap.json")
        if save_dir and not os.path.exists(save_dir):
            os.makedirs(save_dir)

        with open("idFilenameMap.json", 'w') as f:
            json.dump(self.mappings, f, indent=4)
        
        if idFound:
            return image_path
        else:
            return None
            
    def extractId(self, chat, img):
        print("Querying Id from bot")
        current_message_content = [
            img
        ]
        response = chat.send_message(current_message_content)
        id = response.text
        return Utils.standardize_id(id)
        
    def extractImageTable(self, targetId):
        path = self.getMapping(targetId)
        if not path:
            print("Image with correct targetID not found.")
            return { "status": 404 }
        
        initial_history = [
            {'role': 'user', 'parts': [
                "I will be providing you an image. Help me to extract the table (with the key for json as 'table') from the bottom left of the image in JSON for futher computation. Also include the model name found at the top of the image. The primary keys to use are the Symbols on the left column (A, A1, A2, D/E, D1/E1, e, øb, aaa, ccc, ddd, eee, M) and the secondary keys to use are MIN, NOM, MAX if there are multiple values for these fields. For example, primary key D/E, D_1/E_1, e & M only has 1 column and therefore, do not include secondary keys for them (Do not have nested keys of the same key). Ignore the last column named NOTE."]},
        ]

        current_text_input = "Only provide me with the table data in numerical form or null for columns with no values, without including any text in your response so that I can simply take your response and do json.loads(string)."

        print("Extracting image table...")
        try:
            chat = self.model.start_chat(history=initial_history)
            
            img = Image.open(path)
            img = Utils.process_image(img)

            current_message_content = [
                current_text_input,
                img
            ]

            response = chat.send_message(current_message_content)

            if not response.text:
                print("FAIL", path)
                if response.prompt_feedback and response.prompt_feedback.block_reason:
                    print(
                        f"Reason for blocking: {response.prompt_feedback.block_reason}")

        except Exception as e:
            print(f"An error occurred: {e}")
            
        # Log response
        with open("history.log", "a") as log_file:
            log_file.write(str(response))
            log_file.write("\n-----\n")
            
        return response
        

In [60]:
bot = geminiFlashLite("idFilenameMap.json")

In [61]:
bot.extractImageTable("CS225")

Processing mappings...
Looking through cache...
Extracting image table...


response:
GenerateContentResponse(
    done=True,
    iterator=None,
    result=protos.GenerateContentResponse({
      "candidates": [
        {
          "content": {
            "parts": [
              {
                "text": "```json\n{\n  \"model\": \"CS(G)225\",\n  \"table\": {\n    \"A\": {\n      \"MIN\": 1.00,\n      \"MAX\": 1.40\n    },\n    \"A1\": {\n      \"MIN\": 0.25,\n      \"NOM\": 0.30,\n      \"MAX\": 0.40\n    },\n    \"A2\": {\n      \"MIN\": 0.75,\n      \"NOM\": 0.90,\n      \"MAX\": 1.00\n    },\n    \"D/E\": 13.00,\n    \"D1/E1\": 11.20,\n    \"e\": 0.80,\n    \"\u00f8b\": {\n      \"MIN\": 0.35,\n      \"NOM\": 0.40,\n      \"MAX\": 0.45\n    },\n    \"bbb\": {\n      \"MAX\": 0.20\n    },\n    \"ddd\": {\n      \"MAX\": 0.10\n    },\n    \"eee\": {\n      \"MAX\": 0.15\n    },\n    \"fff\": {\n      \"MAX\": 0.08\n    },\n    \"M\": 15\n  }\n}\n```"
              }
            ],
            "role": "model"
          },
          "finish_reason": "STOP",
 

In [None]:
for filename in os.listdir("./jpgs"):
    ## Process image
    image_path = os.path.join("./jpgs", filename)
    img = Image.open(image_path)

    img = img.convert('L') # Grayscale
    width, height = img.size
    img = img.resize((width * 2, height * 2), Image.Resampling.LANCZOS) # Upscale
    enhancer = ImageEnhance.Contrast(img)
    img = enhancer.enhance(3) # Add contrast
    img = img.point(lambda p: 0 if p < 200 else 255)  # Text visibility
    
    ## Extract data
    load_dotenv()

    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError(
            "GOOGLE_API_KEY environment variable not set. Please set it.")

    genai.configure(api_key=api_key)

    MODEL_NAME = 'gemini-2.0-flash-lite-001'

    initial_history = [
        {'role': 'user', 'parts': [
            "I will be providing you an image. Help me to extract the table (with the key for json as 'table') from the bottom left of the image in JSON for futher computation. Also include the model name found at the top of the image. The primary keys to use are the Symbols on the left column (A, A1, A2, D/E, D1/E1, e, øb, aaa, ccc, ddd, eee, M) and the secondary keys to use are MIN, NOM, MAX if there are multiple values for these fields. For example, primary key D/E, D_1/E_1, e & M only has 1 column and therefore, do not include secondary keys for them (Do not have nested keys of the same key). Ignore the last column named NOTE."]},
    ]

    current_text_input = "Only provide me with the table data in numerical form or null without any text in your response so that I can simply take your response and do json.loads(string)."

    try:
        model = genai.GenerativeModel(model_name=MODEL_NAME)

        chat = model.start_chat(history=initial_history)

        current_message_content = [
            current_text_input,
            img  
        ]

        response = chat.send_message(current_message_content)

        if not response.text:
            print("FAIL", filename)
            if response.prompt_feedback and response.prompt_feedback.block_reason:
                print(
                    f"Reason for blocking: {response.prompt_feedback.block_reason}")

    except Exception as e:
        print(f"An error occurred: {e}")
        
    ## Process response
    start_marker = '```json\n'
    end_marker = '\n```'

    cleaned_json_string = response.text
    if cleaned_json_string.startswith(start_marker):
        cleaned_json_string = cleaned_json_string[len(start_marker):]
    if cleaned_json_string.endswith(end_marker):
        cleaned_json_string = cleaned_json_string[:-len(end_marker)]

    try:
        data_dict = json.loads(cleaned_json_string)
        print("Successfully parsed JSON string into Python dictionary.")

    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        print("The cleaned string might still be invalid JSON.")
        data_dict = None 

    ## Log response
    with open("history.log", "a") as log_file:
        log_file.write(str(response))
        log_file.write("\n-----\n")
    
    ## Append data into csv
    MM_TO_MIL = 39.4
    csv_file = "table_data.csv"

    try:
        id = data_dict['model_name']
        min_coplanarity = 0
        max_coplanarity = round(data_dict['table']['aaa']['MAX'] * MM_TO_MIL, 5)
        nom_ball_width = round(data_dict['table']['øb']['NOM'] * MM_TO_MIL, 5)
        nom_pitch = round(data_dict['table']['e'] * MM_TO_MIL, 5)
        ball_width = round((data_dict['table']['øb']['MAX'] -
                        data_dict['table']['øb']['NOM']) * MM_TO_MIL, 5)

        row = [id, min_coplanarity, max_coplanarity,
            nom_ball_width, nom_pitch, ball_width]
        headers = ['id', 'min_coplanarity',
                'max_coplanarity', 'nom_ball_width', 'nom_pitch', 'ball_width']

        # Write to CSV
        with open(csv_file, mode='a', newline='') as file:
            writer = csv.writer(file)
            if not os.path.exists(
                    csv_file) or os.path.getsize(csv_file) == 0:
                writer.writerow(headers)
            writer.writerow(row)
            
    except Exception as e:
        print("Exception for ", filename, e)
        print(data_dict)
        

print('JOB COMPLETE!')
    

Successfully parsed JSON string into Python dictionary.
Successfully parsed JSON string into Python dictionary.
Exception for  obg0103_page_3.jpg unsupported operand type(s) for *: 'NoneType' and 'float'
{'model_name': 'FB(G)484/FBV484 (XC7K70T)', 'table': {'A': {'MIN': 0.735, 'NOM': None, 'MAX': None}, 'A1': {'MIN': None, 'NOM': None, 'MAX': 0.55}, 'A2': {'MIN': None, 'NOM': None, 'MAX': None}, 'D/E': 0.0, 'D1/E1': None, 'e': 0.8, 'øb': {'MIN': None, 'NOM': None, 'MAX': None}, 'aaa': {'MIN': None, 'NOM': None, 'MAX': None}, 'ccc': {'MIN': None, 'NOM': None, 'MAX': None}, 'ddd': {'MIN': None, 'NOM': None, 'MAX': None}, 'eee': {'MIN': None, 'NOM': None, 'MAX': None}, 'M': None}}
Successfully parsed JSON string into Python dictionary.
Exception for  obg0078_page_2.jpg 'aaa'
{'model_name': 'CVG100', 'table': {'A': {'MIN': 0.76, 'NOM': 0.86, 'MAX': 0.96}, 'A1': {'MIN': 0.15, 'NOM': 0.2, 'MAX': 0.25}, 'A2': {'MIN': 0.61, 'NOM': 0.66, 'MAX': 0.71}, 'D/E': 5.0, 'D1/E1': 3.6, 'e': 0.4, 'øb': {

In [11]:
with open('data2fill.csv', 'r', newline='') as infile:
    reader = csv.reader(infile)
    
    header = next(reader)
    print(header)
    print(next(reader))
    print(next(reader))
    header = next(reader)
    print(header)
    print(header.index('BG225'))
    


['BGA Pkg Type', 'Coplanarity', '', 'Pitch Error', 'Nominal Pitch', 'Ball Width ', 'Nom. Ball Width', 'Ball Quality']
['', '(CO)**', '', '', '', '', '', '']
['', 'min', 'max', '+/-', 'nom', 'Error +/-', 'nom', '%']
['BG225', '0', '7.7', '12', '59.1', '5.9', '29.5', '75']
0
