# Installing Dependencies

In [84]:
!ls
%pip install smolagents
%pip install sqlalchemy
%pip install agentops
%pip install openai
%pip install litellm

pdf_images  sample_data  table_0.png  table_10.png  table_3.png  table_8.png


# Main tools

In [85]:
import base64
from typing import Dict, Any, Optional
import os
import openai
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float, insert, text, inspect
from smolagents import tool, CodeAgent, HfApiModel, LiteLLMModel


# Set up OpenAI API key
'
# Get the directory of the current script

def encode_image_to_base64(image_path: str) -> str:
    """
    Encode a PNG image to Base64 for inclusion in the OpenAI prompt.
    """
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image file not found: {image_path}")
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")

from openai import OpenAI
from typing import Optional

def call_openai_gpt4(prompt: str, img_b64: Optional[str] = None) -> str:
    """
    Call OpenAI GPT-4 API with a given prompt and optional Base64-encoded image.

    Args:
        prompt (str): The input prompt
        img_b64 (Optional[str]): Base64 encoded image string

    Returns:
        str: The generated response
    """
    client = OpenAI()

    messages = [
        {"role": "system", "content": "You are a helpful GERMAN assistant for analyzing technical drawings."},
    ]

    content = []

    if (img_b64):
        content.append({
        "type": "image_url",
        "image_url": {"url": f"data:image/png;base64,{img_b64}"
        } })

    content.append({"type" : "text", "text": prompt})
    messages.append({"role": "user", "content": content})

    #print(img_b64)

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        max_tokens=10000,
        temperature=0.7
    )

    return response.choices[0].message.content.strip()

# SQLAlchemy Database Setup
engine = create_engine("sqlite:///:memory:")
metadata_obj = MetaData()

In [86]:
# Define tables
table_12 = Table(
    "table_12",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Bauteilnummer", String(50)),
    Column("Materialangabe", String(50)),
    Column("Oberflächenbehandlung", String(50)),
    Column("Schweißnahtlänge", String(50)),
)

table_10 = Table(
    "table_10",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Planschlüssel", String(50)),
    Column("Stat_Pos", String(50)),
    Column("Aufr_Nr", String(50)),
    #Column("Index", String(50)),
    Column("Fertigteil_Position", String(50)),
    Column("Stück", Integer),
    Column("Volumen", Float),
    Column("Gewicht", Float),
)

table_8 = Table(
    "table_8",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Länge", Float),
    Column("Höhe", Float),
)

metadata_obj.create_all(engine)

# Define tools
@tool
def sql_engine(query: str) -> str:
    """
    Allows you to perform SQL queries on the table. Returns a string representation of the result.
    Args:
        query: The query to perform. This should be correct SQL.
    """
    output = ""
    with engine.connect() as con:
        rows = con.execute(text(query))
        for row in rows:
            output += "\n" + str(row)
    return output

# Update SQL engine description based on table metadata
def update_sql_engine_description():
    updated_description = """Allows you to perform SQL queries on the table. Beware that this tool's output is a string representation of the execution output.
It can use the following tables, NO "measurements_table", ALTER the TABLE IF NEEDED:"""

    inspector = inspect(engine)
    for table in ["table_8", "table_10", "table_12"]:
        columns_info = [(col["name"], col["type"]) for col in inspector.get_columns(table)]
        table_description = f"\n\nTable '{table}':\n"
        table_description += "Columns:\n" + "\n".join([f"  - {name}: {col_type}" for name, col_type in columns_info])
        updated_description += table_description

    print(updated_description)
    sql_engine.description = updated_description

# OpenAI GPT-4 functions
def encode_image_to_base64(image_path: str) -> str:
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image file not found: {image_path}")
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")

In [87]:
def process_image_with_prompt(image_path: str, prompt: str, table_name: str) -> str:
    # Check if image exists
    if not os.path.exists(image_path):
        return f"Error: File not found: {image_path}"

    # Encode image
    img_b64 = encode_image_to_base64(image_path)

    # Call GPT-4
    return call_openai_gpt4(prompt, img_b64)
'''
def process_image_with_prompt(image_name: str, prompt: str, table_name: str) -> Dict[str, Any]:
    image_path = os.path.join(os.path.dirname(__file__), image_name)

    if not os.path.exists(image_path):
        return {"error": f"File not found: {image_path}"}

    # Encode image
    try:
        img_b64 = encode_image_to_base64(image_path)
    except FileNotFoundError as e:
        return {"error": str(e)}

    # Call GPT-4
    gpt_response = call_openai_gpt4(prompt, img_b64)

    # Store response in the database
    table = globals()[table_name.lower()]
    with engine.begin() as connection:
        try:
            measurements_data = eval(gpt_response)  # Use json.loads for stricter parsing
            for key, value in measurements_data.items():
                connection.execute(
                    insert(table).values(
                        {table.c.keys()[1]: key, table.c.keys()[2]: value}
                    )
                )
        except Exception as e:
            return {"error": f"Failed to parse or store GPT response: {str(e)}"}

    return {"gpt_response": gpt_response, "table_name": table_name, "image_name": image_name}
    '''

'\ndef process_image_with_prompt(image_name: str, prompt: str, table_name: str) -> Dict[str, Any]:\n    image_path = os.path.join(os.path.dirname(__file__), image_name)\n\n    if not os.path.exists(image_path):\n        return {"error": f"File not found: {image_path}"}\n\n    # Encode image\n    try:\n        img_b64 = encode_image_to_base64(image_path)\n    except FileNotFoundError as e:\n        return {"error": str(e)}\n\n    # Call GPT-4\n    gpt_response = call_openai_gpt4(prompt, img_b64)\n\n    # Store response in the database\n    table = globals()[table_name.lower()]\n    with engine.begin() as connection:\n        try:\n            measurements_data = eval(gpt_response)  # Use json.loads for stricter parsing\n            for key, value in measurements_data.items():\n                connection.execute(\n                    insert(table).values(\n                        {table.c.keys()[1]: key, table.c.keys()[2]: value}\n                    )\n                )\n        except 

In [88]:
@tool
def process_table8() -> Dict[str, Any]:
    """
    Process Table 8: Extract specific measurements from an image.
    """
    prompt = """
    Please analyze this technical drawing and extract the following measurements:
    - Länge
    - Höhe
    - Fertigteil Position
    - Stück
    - Volumen (m3)
    - Gewicht (to)

    Provide the measurements in the format:
    {"Länge „Gesamt“": "value", "Höhe „Gesamt“": "value", ...}
    """
    return process_image_with_prompt("./table_8.png", prompt, "Table 8")

'''
special case above in two databases technically
'''


@tool
def process_table10() -> Dict[str, Any]:
    """
    Process Table 10: Extract specific measurements from an image.
    """
    prompt = """
    Please analyze this technical drawing and extract the following measurements:
    - Planschlüssel
    - Stat.Pos
    - Aufr. Nr.
    - Index
    Provide the measurements in the format:
    {"Planschlüssel": "value", "Stat.Pos": "value", ...}
    """
    return process_image_with_prompt("./table_10.png", prompt, "Table 10")


@tool
def process_table12() -> Dict[str, Any]:
    """
    Process Table 12: Extract specific measurements from an image.
    """
    prompt = """
    Please analyze this technical drawing and extract the following measurements:
    -Pos.
    -Stck:
    -Bezeichnung:

    Provide the measurements in the format:
    {"Pos.": "value", "Stück": "value", "Bezeichnung": "value"}
    """
    return process_image_with_prompt("./table_0.png", prompt, "Table 12")

In [89]:
!ls

pdf_images  sample_data  table_0.png  table_10.png  table_3.png  table_8.png


In [90]:
#####################################

process_table8()
#

'{"Länge „Gesamt“": "6.66", "Höhe „Gesamt“": "0.20", "Fertigteil Position": "09-001", "Stück": "3", "Volumen (m3)": "3.97", "Gewicht (to)": "9.92"}'

In [91]:
# Agent definition
def create_agent_for_table(table_name: str, prompt: str, image_name: str) -> Dict[str, Any]:
    update_sql_engine_description()

    agent = CodeAgent(
        tools=[sql_engine, process_table8, process_table10, process_table12],
        model= LiteLLMModel("openai/gpt-4o-mini")  # Replace with the appropriate model
    )

    gpt_response = process_image_with_prompt(image_name, prompt, table_name)
    agent.run(f"Process the following output for the data model: {table_name}: {gpt_response}")

    return {"gpt_response": gpt_response, "table_name": table_name}


# Example usage
if __name__ == "__main__":
    table_8_prompt = """
    Please analyze this technical drawing and extract the following:
    - Länge
    - Höhe
    - Fertigteil Position
    - Stück
    - Volumen (m3)
    - Gewicht (to)
    Provide the measurements in the format:
    {"Länge_Gesamt": "value", "Höhe_Gesamt": "value", ...}
    """
    table_10_prompt = """
    Please analyze this technical drawing and extract the following:
    - Planschlüssel
    - Stat.Pos
    - Aufr. Nr.
    Provide the measurements in the format:
    {"Planschlüssel": "value", "Stat.Pos": "value", ...}
    """
    table_12_prompt = """
    Please analyze this technical drawing and extract the following:
    -Pos.
    -Stck:
    -Bezeichnung:

    Provide the measurements in the format:
    {"Pos.": "value", "Stück": "value", "Bezeichnung": "value"}
    """

    # Agent
def run_agent():
    """
    Run an agent with tools for processing all tables.
    """
    #print(create_agent_for_table("table_8", table_8_prompt, "./table_8.png"))
    #print(create_agent_for_table("table_10", table_10_prompt, "./table_10.png"))
    #print(create_agent_for_table("table_12", table_12_prompt, "./table_0.png"))
agent = CodeAgent(
    tools=[sql_engine,process_table8, process_table10, process_table12],
    model=LiteLLMModel("openai/gpt-4o-mini"),  # Replace with a valid model
)

print("Agent running for Table 8")
print(agent.run("""Run process_table8 and save the result in the sql table.
# Define tables
table_12 = Table(
    "table_12",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Bauteilnummer", String(50)),
    Column("Materialangabe", String(50)),
    Column("Oberflächenbehandlung", String(50)),
    Column("Schweißnahtlänge", String(50)),
)

table_10 = Table(
    "table_10",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Planschlüssel", String(50)),
    Column("Stat_Pos", String(50)),
    Column("Aufr_Nr", String(50)),

    #Column("Index", String(50)),
    Column("Fertigteil_Position", String(50)),
    Column("Stück", Integer),
    Column("Volumen", Float),
    Column("Gewicht", Float),
)

table_8 = Table(
    "table_8",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Länge", Float),
    Column("Höhe", Float),
)"""))
#agent.reset()
print("""Agent running for Table 10 and save the result in the sql table
""")
print(agent.run("""Run process_table10 and save the result in the sql table
:
table_10 = Table(
    "table_10",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Planschlüssel", String(50)),
    Column("Stat_Pos", String(50)),
    Column("Aufr_Nr", String(50)),
    #Column("Index", String(50)),
    Column("Fertigteil_Position", String(50)),
    Column("Stück", Integer),
    Column("Volumen", Float),
    Column("Gewicht", Float),
)
"""))
#agent.reset()
print("Agent running for Table 12")
print(agent.run("""Run process_table12 and save the result in the sql table
table_12 = Table(
    "table_12",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("Bauteilnummer", String(50)),
    Column("Materialangabe", String(50)),
    Column("Oberflächenbehandlung", String(50)),
    Column("Schweißnahtlänge", String(50)),
)

"""))


if __name__ == "__main__":
    run_agent()


Agent running for Table 8


Message(content='To run the `process_table8` function and save the result in the SQL table `table_8`, follow these steps:\n\n1. **Run the `process_table8` function**: This function extracts measurements from a technical drawing.\n2. **Extract the results**: Manually parse the output data from the function to obtain the values for Length (Länge) and Height (Höhe).\n3. **Construct the SQL query**: Format anSQL `INSERT` statement to add the data into the `table_8`.\n4. **Execute the SQL insertion**: Use the correct method to insert the data into the SQL database.\n\n## Full Implementation\n\nHere’s how the code could look:\n\n```python\n# Step 1: Run the process_table8 function to get the measurements\ntable_8_data = process_table8()  # Assumed to return a JSON-like string\n\n# The extracted measurements in the assumed JSON format\n# Here is a direct representation for clarity\ntable_8_data_string = \'\'\'\n{\n    "Länge „Gesamt“": "6.66",\n    "Höhe „Gesamt“": "0.20",\n    "Fertigteil Po

Message(content='To run `process_table10` and save the result in the SQL table `table_10`, you can follow these steps:\n\n1. **Run the `process_table10` function** to extract measurements from the technical drawing.\n2. **Prepare the data for insertion** into the SQL table according to the defined schema.\n3. **Insert the data into the SQL table** using a correct query approach to ensure that it executes successfully.\n\nSince the previous attempts to insert data have failed, I will incorporate careful checks and ensure all required fields are accounted for.\n\nHere’s how the code would look in Python:\n\n```python\n# Step 1: Run the process_table10 to extract the measurements\nmeasurements = process_table10()\n\n# Assuming measurements is a dictionary structure based on previous observation\nplanschluessel = measurements.get("Planschlüssel", "")\nstat_pos = measurements.get("Stat.Pos", "")\naufr_nr = measurements.get("Aufr. Nr.", "")\nfertigteil_position = ""  # Set to default or empt

Message(content='To run `process_table12` and save the results into the SQL table `table_12`, we can follow these steps:\n\n1. **Execute the `process_table12` function** to retrieve the measurements.\n2. **Prepare the SQL insert commands** for the extracted data based on the table definition.\n3. **Execute the insert commands** to insert the data into the table.\n\nHere\'s how you can do it in code:\n\n### Step 1: Run `process_table12`\n```python\nresult = process_table12()\n```\n\n### Step 2: Prepare insertions\nNext, from the result obtained, we will format the SQL `INSERT` command for each extracted measurement. Note that the `Schweißnahtlänge` field is not present in the sample measurements, so we may need a placeholder or default value for that field.\n\n### Step 3: Execute the insert commands\n```python\n# Assuming result is a list of dictionaries from the process_table12 function\nmeasurements = [\n    {"Pos.": "1209", "Stück": "2", "Bezeichnung": "Kugelkopf-Stabanker 750-7,5to;

In [92]:
# List all tables
import pandas as pd


inspector = inspect(engine)

tables = inspector.get_table_names()
print("Tables in the database:", tables)

# Query and display each table
for table_name in tables:
    print(f"\nContents of table '{table_name}':")
    df = pd.read_sql_table(table_name, engine)
    display(df)

import pandas as pd

# Loop through each table and display its data as a DataFrame
for table in tables:
    table_name = table[0]
    print(f"\nContents of table '{table_name}':")

    # Read the table into a pandas DataFrame
    df = pd.read_sql_query(f"SELECT * FROM {table_name};", engine)
    display(df)  # For Colab, this displays the table nicely


Tables in the database: ['table_10', 'table_12', 'table_8']

Contents of table 'table_10':


Unnamed: 0,id,Planschlüssel,Stat_Pos,Aufr_Nr,Fertigteil_Position,Stück,Volumen,Gewicht



Contents of table 'table_12':


Unnamed: 0,id,Bauteilnummer,Materialangabe,Oberflächenbehandlung,Schweißnahtlänge



Contents of table 'table_8':


Unnamed: 0,id,Länge,Höhe



Contents of table 't':


OperationalError: (sqlite3.OperationalError) no such table: t
[SQL: SELECT * FROM t;]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

## first pdf to png WORKING - check


In [None]:
# Install necessary libraries
!pip install PyMuPDF pillow

import fitz  # PyMuPDF
from PIL import Image
import os

# Define the PDF file path
pdf_path = "./FT_XX_09-251_A_F.pdf"  # Upload your PDF to Colab and provide its path here
output_folder = "./pdf_images"  # Directory to save PNG images
os.makedirs(output_folder, exist_ok=True)

# Convert PDF pages to PNG
def pdf_to_png(pdf_path, output_folder):
    pdf_document = fitz.open(pdf_path)
    for page_number in range(len(pdf_document)):
        page = pdf_document[page_number]
        pix = page.get_pixmap(dpi=300)  # Adjust DPI for image quality
        output_path = os.path.join(output_folder, f"page_{page_number + 1}.png")
        pix.save(output_path)
    pdf_document.close()
    print(f"Saved PNG images to: {output_folder}")

# Run the conversion
pdf_to_png(pdf_path, output_folder)

# Display one of the converted images (optional)
from IPython.display import Image as IPImage, display
for img_file in sorted(os.listdir(output_folder)):
    display(IPImage(filename=os.path.join(output_folder, img_file)))


#segment png to pngs and then extract to sql - in progress


In [None]:
!pip install opencv-python-headless pillow pytesseract pandas


In [None]:
# Install dependencies
!pip install PyMuPDF pytesseract pandas pillow

import fitz  # PyMuPDF
import pytesseract
import pandas as pd
from PIL import Image
import os

# Define paths
pdf_path = "./FT_XX_09-251_A_F.pdf"  # Replace with your PDF file
output_folder = "./extracted_tables"
os.makedirs(output_folder, exist_ok=True)

# Function to extract images from a PDF
def extract_images_from_pdf(pdf_path):
    pdf_document = fitz.open(pdf_path)
    images = []
    for page_number in range(len(pdf_document)):
        page = pdf_document[page_number]
        pix = page.get_pixmap(dpi=300)  # High resolution for better OCR
        img_path = f"/content/page_{page_number + 1}.png"
        pix.save(img_path)
        images.append(img_path)
    return images

# Function to extract specific tables from text
def extract_table_data(text, table_name):
    # Define search patterns for each table
    table_patterns = {
        "Tabelle 2": r"Planschlüssel.*?Gewicht.*?\d+\.\d+",  # Plankopf
        "Tabelle 3": r"Pos.*?Bezeichnung.*?nach Herstellerangabe",  # Liste Einbauteile
        "Tabelle 4": r"Pos.*?Bemerkung.*?ohne",  # Liste Stahl
        "Tabelle 5": r"Bauteil:.*?Höhe.*?20",  # Vorderansicht
        "Tabelle 6": r"Bauteil:.*?Breite.*?\d+",  # Draufsicht
    }
    if table_name in table_patterns:
        import re
        match = re.search(table_patterns[table_name], text, re.DOTALL)
        if match:
            return match.group(0)
    return None

# Extract and save tables
def process_pdf(pdf_path):
    images = extract_images_from_pdf(pdf_path)
    for img_path in images:
        # OCR the image
        text = pytesseract.image_to_string(Image.open(img_path), config="--psm 6")

        # Process each table
        for table_name in ["Tabelle 2", "Tabelle 3", "Tabelle 4", "Tabelle 5", "Tabelle 6"]:
            table_data = extract_table_data(text, table_name)
            if table_data:
                # Split table into rows and columns
                rows = table_data.strip().split("\n")
                data = [row.split() for row in rows if row]
                df = pd.DataFrame(data)
                # Save as CSV
                output_csv_path = os.path.join(output_folder, f"{table_name}.csv")
                df.to_csv(output_csv_path, index=False, header=False)
                print(f"{table_name} saved to {output_csv_path}")

# Process the PDF
process_pdf(pdf_path)


# other working people


In [None]:
################