# Install the required packages

In [None]:
# Install google cloud vision
!pip install google-cloud
!pip install google-cloud-vision

# Set up Google Cloud Project

### Step 1: Create a Google Cloud Project
Go to the [Google Cloud Console](https://console.cloud.google.com/projectcreate) and create a new project.
### Step 2: Enable Cloud Vision API
Go to the [Google Vision Wizard Page](https://console.cloud.google.com/apis/credentials/wizard?api=vision.googleapis.com), create a new service account to allow us to access the Cloud Vision API.
### Step 3: Create credential JSON key
After creating the service account, click on the service account and go to the `Keys` tab. Click on `Add Key` and `Create new key`.
### Step 4: Download JSON key
The key should be automatically downloaded. This file should be named like `<project_id>-<hash>.json` for example `my-project-1234567890abcdef.json`.

# Define interface for Google Cloud Vision API

Create an GoogleVision class to send image to Google Cloud Vision API

In [None]:
import json
import pandas as pd
from google.cloud.vision import ImageAnnotatorClient, Image, EntityAnnotation
import PIL.Image as PILImage
from PIL import ImageDraw, ImageFont
from io import BytesIO
from langchain_community.llms import Ollama
from langchain import PromptTemplate # Added

FONT = ImageFont.truetype("../assets/THSarabun.ttf", 20)


class GoogleVision:
    """
    Google Vision API client

    This class allows you to recognize text in an image using Google Vision API.
    """

    def __init__(self, credential_path: str) -> None:
        """Create a Google Vision API client using the given credential path"""
        self.client: ImageAnnotatorClient = ImageAnnotatorClient.from_service_account_json(credential_path)

    def recognize(self, image: PILImage.Image) -> list[EntityAnnotation]:
        """Detect bounding box and recognize text in an image from the given PIL image"""
        # Convert PIL image to binary
        buffered = BytesIO()
        image.save(buffered, format="PNG")
        image_binary = buffered.getvalue()
        # Construct image object
        image = Image(content=image_binary)
        # Send request to Google Vision API
        response = self.client.text_detection(image)
        # Handle error.
        if response.error.message:
            raise Exception(
                f"{response.error.message}\nFor more info on error messages, check: https://cloud.google.com/apis/design/errors"
            )
        # Get all annotations except the first one (all parsed text)
        annotations = [
            annotation
            for idx, annotation in enumerate(response.text_annotations)
            if idx != 0
        ]
        return annotations

    @staticmethod
    def draw_bbox(
        image: PILImage.Image,
        annotations: list[EntityAnnotation],
        label_offset: int = 20,
    ):
        """Draw bounding box and text on the given image"""
        draw = ImageDraw.Draw(image)
        for annotation in annotations:
            # Get information in the annotation
            text = annotation.description
            vertices = [(vertex.x, vertex.y) for vertex in annotation.bounding_poly.vertices]
            # Draw bounding box
            draw.polygon(vertices, outline="blue")
            # Draw text
            draw.text(
                (vertices[0][0], vertices[0][1] - label_offset),
                text,
                fill="red",
                font=FONT,
            )
        return image

llm = Ollama(model="llama3.1", stop=["<|eot_id|>"]) # Added stop token
SYSTEM_PROMPT = "You are a helpful assistant expert in returning JSON output from a given prompt."

def get_model_response(user_prompt, system_prompt=SYSTEM_PROMPT):
    # NOTE: No f string and no whitespace in curly braces
    template = """
        <|begin_of_text|>
        <|start_header_id|>system<|end_header_id|>
        {system_prompt}
        <|eot_id|>
        <|start_header_id|>user<|end_header_id|>
        {user_prompt}
        <|eot_id|>
        <|start_header_id|>assistant<|end_header_id|>
        """

    # Added prompt template
    prompt = PromptTemplate(
        input_variables=["system_prompt", "user_prompt"],
        template=template
    )
    
    # Modified invoking the model
    response = llm(prompt.format(system_prompt=system_prompt, user_prompt=user_prompt))
    
    return response

In [None]:
# Create an instance of GoogleVision
PATH_TO_CREDENTIAL_JSON = "../path/to/credentials.json"
api = GoogleVision(PATH_TO_CREDENTIAL_JSON)

# Load the data
We load the receipt to run text detection and recognition on it.

In [None]:
path = "path/to/.jpg"
image = PILImage.open(path).convert("RGB")
display(image)

Send the receipt to the Google Cloud Vision API to extract the text from the image.

In [None]:
annotations = api.recognize(image)

In [None]:
drawn_image = GoogleVision.draw_bbox(image, annotations)
display(drawn_image)

## Parse extracted OCR text and prompt for JSON output

In [None]:
plain_text = " ".join([anno.description for anno in annotations])

In [None]:
def post_process(text):
    text = text.replace("เลข ดัง", "เลขถัง")
    text = text.replace("นํ้า หนัก", "น้ำหนัก")
    text = text.replace("เลข ทะเบียน", "เลขทะเบียน")
    return text

plain_text = post_process(plain_text)

In [None]:
def create_prompt(text):
    prompt = f"""You are an expert in analyzing Thai vehicle registration documents. Your task is to extract specific information from the following OCR text of a Thai vehicle registration document. Please identify and extract the following information, providing the values in Thai where applicable. If a piece of information is not found or unclear, respond with "ไม่พบข้อมูล" (Information not found).
    
    OCR Text: {text}
    
    Please extract and provide the following information:
    
    1. วันจดทะเบียน (date_of_registration):
    2. เลขทะเบียน (registration_no):
    3. จังหวัด (car_province):
    4. ประเภท (vehicle_use):
    5. รย. (type):
    6. ลักษณะ (body_style):
    7. ยี่ห้อรถ (manufacturer):
    8. แบบ (model):
    9. รุ่นปี คศ (year):
    10. สี (color):
    11. เลขตัวรถ (chassis_number):
    12. อยู่ที่ (chassis_location):
    13. ยี่ห้อเครื่องยนต์ (engine_manufacturer):
    14. เลขเครื่องยนต์ (engine_number):
    15. อยู่ที่ (engine_location):
    16. เชื้อเพลิง (fuel_type):
    17. เลขถังแก๊ส or เลขดังแก๊ส (fuel_tank_number):
    18. จำนวน (cylinders):
    19. ซีซี (cubic_capacity):
    20. แรงม้า (horse_power):
    21. จำนวนเพลาและล้อ (axles_wheels_no):
    22. น้ำหนักรถ (unladen_weight):
    23. น้ำหนักบรรทุก/น้ำหนักเพลา (load_capacity):
    24. น้ำหนักรวม (gross_weight):
    25. ที่นั่ง (seats):
    
    Please provide the extracted information in a structured JSON format. Listing each item in a given key with its corresponding value. If information is not found, leave as empty string. Don't need to comment."""
    return prompt

In [None]:
prompt = create_prompt(plain_text)
output = get_model_response(prompt)
print(output)

In [None]:
parsed_json_output = json.loads(output.split("```")[1])

In [None]:
def map_thai_to_english_keys(thai_dict):
    # Define the mapping of Thai keys to English keys
    key_mapping = {
        'วันจดทะเบียน': 'date_of_registration',
        'เลขทะเบียน': 'registration_no',
        'จังหวัด': 'car_province',
        'ประเภท': 'vehicle_use',
        'รย.': 'type',
        'ลักษณะ': 'body_style',
        'ยี่ห้อรถ': 'manufacturer',
        'แบบ': 'model',
        'รุ่นปี คศ': 'year',
        'สี': 'color',
        'เลขตัวรถ': 'chassis_number',
        'อยู่ที่': 'chassis_location',
        'ยี่ห้อเครื่องยนต์': 'engine_manufacturer',
        'เลขเครื่องยนต์': 'engine_number',
        'เชื้อเพลิง': 'fuel_type',
        'เลขถังแก๊ส': 'fuel_tank_number',
        'เลขดังแก๊ส': 'fuel_tank_number',  # Alternative key
        'จำนวน': 'cylinders',
        'ซีซี': 'cubic_capacity',
        'แรงม้า': 'horse_power',
        'จำนวนเพลาและล้อ': 'axles_wheels_no',
        'น้ำหนักรถ': 'unladen_weight',
        'น้ำหนักบรรทุก/น้ำหนักเพลา': 'load_capacity',
        'น้ำหนักรวม': 'gross_weight',
        'ที่นั่ง': 'seats'
    }
    
    # Create a new dictionary with English keys
    english_dict = {}
    for thai_key, value in thai_dict.items():
        if thai_key in key_mapping:
            english_key = key_mapping[thai_key]
            english_dict[english_key] = value
        else:
            # If the key is not in our mapping, keep the original key
            english_dict[thai_key] = value
    
    return english_dict

In [None]:
parsed_json_output_eng = map_thai_to_english_keys(parsed_json_output)

In [None]:
parsed_json_output_eng

## Pull all together: run on sample images

In [None]:
import pandas as pd
from tqdm.auto import tqdm
from glob import glob
from pathlib import Path

extracted_values = []
paths = glob("path/to/*.jpg")
for path in tqdm(paths):
    try:
        image = PILImage.open(path).convert("RGB")
        # Google API
        annotations = api.recognize(image)
        plain_text = " ".join([anno.description for anno in annotations])
        # Prompt
        prompt = create_prompt(plain_text)
        output = get_model_response(prompt)
        output = output.split("```")[1] if "```" in output else output
        parsed_json_output = json.loads(output)
        parsed_json_output_eng = map_thai_to_english_keys(parsed_json_output)
        parsed_json_output_eng["path"] = Path(path).stem
        extracted_values.append(parsed_json_output_eng)

    except json.JSONDecodeError:
        print(f"JSON decoding error for path: {path}")

    except Exception as e:
        print(f"Error processing path: {path}, Error: {e}")

extracted_values_df = pd.DataFrame(extracted_values)

In [None]:
extracted_values_df["image_path"] = [Path(p).stem for p in paths]
extracted_values_df

In [None]:
output_file = 'predicted_results_google_ocr.xlsx'
extracted_values_df.to_excel(output_file, index=False)
print(f"DataFrame saved as {output_file}")

## Evaluation

อ่านผลของแต่ละ key และวัดประสิทธิภาพด้วย CER และ Accuracy

In [None]:
import numpy as np
from torchmetrics.text import CharErrorRate

def calculate_cer(preds: list, targets: list):
    cer = CharErrorRate() # Initialize the CharErrorRate metric
    cer_val = cer(preds, targets) # Calculate CER
    return cer_val.tolist()

In [None]:
annotated_df = pd.read_excel('annotated_results.xlsx', dtype=str).fillna("")
predicted_df = pd.read_excel('predicted_results_google_ocr.xlsx', dtype=str).fillna("")

In [None]:
predicted_df["year"] = predicted_df["year"].map(lambda x: x.replace("ไม่พบข้อมูล", ""))
annotated_df["year"] = annotated_df["year"].map(lambda x: x.replace("ไม่พบข้อมูล", ""))

In [None]:
columns_of_interest = [
    'date_of_registration', 'registration_no', 'car_province', 'vehicle_use', 'type', 'body_style',
    'manufacturer', 'model', 'year', 'color', 'chassis_number', 'chassis_location', 'engine_manufacturer',
    'engine_number', 'engine_location', 'fuel_type', 'fuel_tank_number', 'cylinders', 'cubic_capacity',
    'horse_power', 'axles_wheels_no', 'unladen_weight', 'load_capacity', 'gross_weight', 'seats'
]
merged_df = pd.merge(annotated_df, predicted_df, on='image_path', suffixes=('_annotation', '_prediction'))

In [None]:
col = "car_province" # col = "date_of_registration"
merged_df[[f"{col}_annotation", f"{col}_prediction"]]

In [None]:
eval_list = []
for col in columns_of_interest:
    if f"{col}_annotation" in merged_df.columns and f"{col}_prediction" in merged_df.columns:
        avg_cer = np.mean(calculate_cer(merged_df[f"{col}_prediction"], merged_df[f"{col}_annotation"]))
        avg_accuracy = (merged_df[f"{col}_prediction"] == merged_df[f"{col}_annotation"]).mean() * 100
        eval_list.append({
            "column_name": col,
            "cer": avg_cer,
            "accuracy": avg_accuracy
        })
eval_df = pd.DataFrame(eval_list)

## Quiz

- ถ่ายภาพใบเสร็จที่ให้
- จากนั้นรัน `annotations = api.recognize(image)`
- เขียน prompt เพื่อดึงรายการที่สั่งและราคารวมออกมาในรูปแบบ JSON
- จากนั้นรัน `output = get_model_response(prompt)` เพื่อดึงข้อมูลออกมาในรูปแบบ JSON