In [1]:
# file handling
import glob
import os
from dotenv import load_dotenv
import re
import unidecode

# image handling
import base64

# AI API
import anthropic

# parsing
import csv
from io import StringIO

# tabular data
import numpy as np
import pandas as pd

# date handling
import locale
import datetime as dt

In [2]:
# Load environment variables from .env file
load_dotenv()

True

In [3]:
# setup constructor
api_key = os.getenv("CLAUDE_API_KEY")
if not api_key:
    raise ValueError("CLAUDE_API_KEY not found")
client = anthropic.Anthropic(api_key=api_key)

In [4]:
# use AI API to extract text from image
def extract_img2text(image_path, prompt):

    with open(image_path, "rb") as image_file:
        image_data = base64.b64encode(image_file.read()).decode('utf-8')

    message = client.messages.create(
        model="claude-3-5-sonnet-20240620",
        max_tokens=3200,
        messages=[{
            "role":
            "user",
            "content": [{
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/jpeg",
                    "data": image_data,
                },
            }, {
                "type": "text",
                "text": prompt
            }],
        }],
    )
    return message

# parse csv string to list of lists
def parse_csv_string(csv_string):
    # Remove the leading newline and split the string into lines
    lines = csv_string.strip().split('\n')

    # Parse the CSV data
    reader = csv.reader(StringIO('\n'.join(lines)))

    # Convert to list of lists
    data = list(reader)
    # return output
    return data

def insert_column_name(df, column_name):
    # Get the index of the specified column
    col_index = df.columns.get_loc(column_name)

    # Insert a new column with the column name as the constant value
    df.insert(col_index, f'{column_name}_name', column_name)

    return df


def convert_to_date(date_string, year):
    # Set locale to Spanish
    locale.setlocale(locale.LC_TIME, 'es_ES.UTF-8')

    # Parse the date string
    date_obj = dt.datetime.strptime(f"{date_string} {year}", "%B %A %d %Y")

    # Reset locale
    locale.setlocale(locale.LC_TIME, '')

    # format: 2/03/2024
    return date_obj.strftime('%-d/%m/%Y')


def normalize_day(input_string):
    day_mapping = {
        "Lun":"Lunes",
        "Mar": "Martes",
        "Mié": "Miércoles",
        "Mie": "Miércoles",
        "Jue": "Jueves",
        "Vie": "Viernes",
        "Sab": "Sábado",
        "Sáb": "Sábado",
        "Dom": "Domingo"
    }

    # Convert to title case and split
    parts = input_string.title().split()

    # Check if the second part is a day abbreviation
    if len(parts) > 1:
        day_abbr = parts[1][:3]  # Take first 3 characters
        if day_abbr in day_mapping:
            parts[1] = day_mapping[day_abbr]

    return " ".join(parts)


def normalize_month(input_string):

    month_mapping = {
        "Ene": "Enero",
        "Feb": "Febrero",
        "Mar": "Marzo",
        "Apr": "Abril",
        "May": "Mayo",
        "Jun": "Junio",
        "Jul": "Julio",
        "Aug": "Agosto",
        "Sep": "Septiembre",
        "Oct": "Octubre",
        "Nov": "Noviembre",
        "Dic": "Diciembre"
    }

    # Convert to title case and split
    parts = input_string.title().split()

    # Check if the second part is a day abbreviation
    if len(parts) > 1:
        month_abbr = parts[0][:3]  # Take first 3 characters
        if month_abbr in month_mapping:
            parts[0] = month_mapping[month_abbr]

    return " ".join(parts)

In [5]:
# folder structure

# Get the current working directory where script is located
current_dir = os.path.dirname(os.path.abspath(os.getcwd()))

# data folder
data = "_data"
# target folder
batch = "01_2025_03"

# Join paths starting from the _imgs folder
folder_img_path = os.path.join(current_dir, data, batch, "1_img")
folder_sg_path = os.path.join(current_dir, data, batch, "2_sg_excel")
folder_output = os.path.join(current_dir, data, batch, "3_output")

In [6]:
# Find the file matching the pattern
file_path = glob.glob(os.path.join(folder_sg_path, "[Ff]echa*[Pp]arto*.xlsx"))[0]
# Read the matched file
data_sg = pd.read_excel(file_path, header=1)
data_sg = data_sg.rename(columns={
    "Número": "Número animal",
    "F.Últ.Par": "Fecha Parto"
}).copy()
data_sg = data_sg[["Número animal", "Fecha Parto"]].dropna().copy()
# format: 2/03/2024
data_sg["Fecha Parto"] = data_sg["Fecha Parto"].dt.strftime('%-d/%m/%Y')
data_sg.info()
data_sg.head(2)

<class 'pandas.core.frame.DataFrame'>
Index: 73 entries, 0 to 72
Data columns (total 2 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   Número animal  73 non-null     object
 1   Fecha Parto    73 non-null     object
dtypes: object(2)
memory usage: 1.7+ KB


Unnamed: 0,Número animal,Fecha Parto
0,321/5,1/03/2024
1,0836/3,28/03/2024


In [7]:
conf_level = 99.75
prompt_input = f"""Instruction 1: Convert the text in the image to csv.
Instruction 2: Employ a strict approach: add 1 asterisk next to the estimated values for those cells whose text-to-digit conversion are below a {conf_level} percent confidence threshold; it does not matter if data is over-flagged.
Instruction 3: Include in comments the confidence threshold used.
Instruction 4: Do not use outlier-detection as criteria to flag the data.
Instruction 5: Make sure to not use outlier-detection as criteria to flag the data.
Instruction 6: If headers are present, include them. If no headers are found, do not include any.
Instruction 7: Include any comments before returning output. Limit verbosity.
Instruction 8: Return output enclosed in brackets to facilitate parsing.
Instruction 9: Do not include any additional comments after final output.
"""

In [8]:
print(sorted(os.listdir(folder_img_path)))

['.DS_Store', 'Escaner_20250112_1.jpg', 'Escaner_20250112_2.jpg', 'Escaner_20250112_3.jpg']


In [9]:
data_list = []
cols_list = []
year = 2025

class NoColsError(Exception):
    pass


for filename in sorted(os.listdir(folder_img_path)):
    if filename.endswith(('.jpeg', '.jpg')):
        print(f"Processing file: {filename}")
        print(f"cols_list at start of iteration: {cols_list}")

        image_path = os.path.join(folder_img_path, filename)

        try:
            result = extract_img2text(image_path, prompt_input)
            print(result.content[0].text.split("[")[0])
        except Exception as e:
            print(f"An error occurred: {e}")
            continue  # Skip to next file if there's an error

        data_string = result.content[0].text.split("[")[1].replace("]", "")
        parsed_data = parse_csv_string(data_string)

        if not cols_list:
            if any("vaca" in s.lower() for s in parsed_data[0]):
                cols_list = parsed_data[0]
                print(f"cols_list initialized: {cols_list}")
            else:
                print(f"No 'vaca' found in: {parsed_data[0]}")
                raise NoColsError("No columns found: Check image folder")
        else:
            if any("vaca" in s.lower()
                   for s in parsed_data[0]) and (cols_list != parsed_data[0]):
                cols_list = parsed_data[0]
                print(f"cols_list updated: {cols_list}")
            else:
                print(f"No update to cols_list. Current: {cols_list}")

        # Create the DataFrame
        try:
            data_df = pd.DataFrame(parsed_data[1:], columns=cols_list)
            print("Dataframe successfully created")
        except Exception as e:
            print(f"{e}")
            print(cols_list)
            for c in parsed_data[1:]:
                if len(c) > len(cols_list):
                    print(c)
                    break
                else:
                    continue
            print("breaking loop...")
            break

        print(f"cols_list at end of iteration: {cols_list}")

        data_df['flag_count'] = data_df.apply(
            lambda row: row.astype(str).str.count('\*').sum(), axis=1)

        col_label_num = 1
        for col in data_df.iloc[:, 3:10].columns.tolist():

            data_df[col] = data_df[col].astype(str).str.replace("-*", "").str.replace("-", "")

            # Get the index of the specified column
            col_index = data_df.columns.get_loc(col)

            col_label_str = normalize_month(normalize_day(col.replace(".",
                                                                      "")))

            # Insert a new column with the column name as the constant value
            data_df.insert(col_index, f'Fecha {col_label_num}',
                           convert_to_date(col_label_str, year = year))
            col_label_num += 1
            data_df = data_df.rename(columns={col: "Kg/Leche"}).copy()

        data_df = data_df.drop(columns=["Nombre", "Becerro", "Fecha PP", "#"],
                               errors="ignore").copy()

        data_df = data_df.rename(columns={
            data_df.columns[0]: "Número animal"
        }).copy()

        data_df["Número animal"] = data_df["Número animal"].str.replace(
            "-", "/").copy()
        print("data_df: ", data_df.columns.tolist())
        data_final = data_df.merge(data_sg, on="Número animal", how="left")
        print("data_final: ", data_final.columns.tolist())
        data_final["Fecha Parto"] = data_final["Fecha Parto"].fillna(
            "X*").copy()

        # Get the name of the column you want to move
        cols_to_move = ["Número animal", "Fecha Parto"]

        for col in reversed(cols_to_move):
            data_final.insert(0, col, data_final.pop(col))

        print(data_final.columns.tolist())

        data_list.append(data_final)
        print("---")

Processing file: Escaner_20250112_1.jpg
cols_list at start of iteration: []
Comments: Using a 99.75% confidence threshold for text-to-digit conversion. Asterisks (*) indicate values below this threshold.


cols_list initialized: ['Número vaca', 'Nombre', 'Becerro', 'Ene. Lunes 6', 'Ene. Martes 7', 'Ene. Miérc. 8', 'Ene. Jueves 9', 'Ene. Vierne 10', 'Ene. Sáb. 11', 'Ene. Dom. 12', '#']
Dataframe successfully created
cols_list at end of iteration: ['Número vaca', 'Nombre', 'Becerro', 'Ene. Lunes 6', 'Ene. Martes 7', 'Ene. Miérc. 8', 'Ene. Jueves 9', 'Ene. Vierne 10', 'Ene. Sáb. 11', 'Ene. Dom. 12', '#']
data_df:  ['Número animal', 'Fecha 1', 'Kg/Leche', 'Fecha 2', 'Kg/Leche', 'Fecha 3', 'Kg/Leche', 'Fecha 4', 'Kg/Leche', 'Fecha 5', 'Kg/Leche', 'Fecha 6', 'Kg/Leche', 'Fecha 7', 'Kg/Leche', 'flag_count']
data_final:  ['Número animal', 'Fecha 1', 'Kg/Leche', 'Fecha 2', 'Kg/Leche', 'Fecha 3', 'Kg/Leche', 'Fecha 4', 'Kg/Leche', 'Fecha 5', 'Kg/Leche', 'Fecha 6', 'Kg/Leche', 'Fecha 7', 'Kg/Lech

In [10]:
with pd.ExcelWriter(os.path.join(folder_output,
                                 f"leche_{batch}.xlsx")) as writer:
    for i, df in enumerate(data_list):
        df.to_excel(writer, sheet_name=f'Sheet_{i+1}', index=False)

with pd.ExcelWriter(os.path.join(folder_output,
                                 f"leche_{batch}_final.xlsx")) as writer:
    for i, df in enumerate(data_list):
        df.to_excel(writer, sheet_name=f'Sheet_{i+1}', index=False)