In [None]:
import warnings
warnings.filterwarnings('ignore')
%reload_ext autoreload
%autoreload 2

import sys
sys.path.insert(0, "../../")

import textwrap

from utils.utils import normalize_cols
from utils_sheets import save_data_in_sheets
import base64
import requests
import json
import io




from IPython.display import display, Audio

import random
import datetime
import glob


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import basedosdados as bd


bd.config.billing_project_id = 'rj-escritorio-dev'
bd.config.from_file = True
bd.__version__


pd.options.display.max_columns = 999
pd.options.display.max_rows = 1999
pd.options.display.max_colwidth = 200

from PIL import Image, ImageDraw, ImageFont

- storage images: https://console.cloud.google.com/storage/browser/datario-public/flooding_detection?project=datario
- imagens figma: https://www.figma.com/file/Qv89NLopXS60Lqf3XfTZiN/Untitled?type=design&node-id=3-44&mode=design&t=3a4g8D4QLiDQ8f3i-0
- langchain ref: https://python.langchain.com/docs/integrations/platforms/google


In [None]:
def get_image_link_from_storage():
    dataset_id = "flooding_detection"
    table_id = "classified_images"
    st = bd.Storage(dataset_id=dataset_id, table_id=table_id)
    blobs = (
        st.client["storage_staging"]
        .bucket("datario-public")
        .list_blobs(prefix=f"{dataset_id}/{table_id}")
    )

    url_list = []
    for blob in blobs:
        url = str(blob.public_url)
        if "." in url.split("/")[-1]:
            url_list.append(url)
    return url_list


def get_urls_and_labels():
    urls = get_image_link_from_storage()
    labels = [
        {"path": "images_with_label/flood", "label": True, "object": "alagamento"},
        {"path": "images_with_label/no_flood", "label": False, "object": "alagamento"},
    ]
    data = []
    for url in urls:
        for item in labels:
            if item.get("path") in str(url):
                data.append(
                    {
                        "object": item.get("object"),
                        "label": item.get("label"),
                        "image_url": url,
                    }
                )
    return pd.DataFrame(data)


def balance_and_sample(df, N):
    # Get the minimum count of the two labels
    min_count = min(df["label"].value_counts())

    # Balance the DataFrame
    df_balanced = pd.concat(
        [df[df["label"] == True].head(min_count), df[df["label"] == False].head(min_count)]
    )
    df_balanced = df_balanced.sample(frac=1).reset_index(drop=True)
    # Sample N rows
    if N > len(df_balanced):
        print(
            f"Requested number of samples ({N}) is more than the available balanced dataset size ({len(df_balanced)})."
        )
        return df_balanced
    return df_balanced.head(N)


# OpenAI API Key


def get_image_from_url(image_url):
    response = requests.get(image_url)
    img = Image.open(io.BytesIO(response.content))
    img.thumbnail((640, 480))
    return img


def get_ai_label(response):
    if response.get("error"):
        return "Error"
    else:
        # r = response['choices'][0]['message']['content']
        json_string = r.replace("```json\n", "").replace("\n```", "")
        json_object = json.loads(json_string)
        return json_object["label"]


def get_ai_label_gemini(response):
    if type(response) == tuple:
        response = response[0]
    json_string = str(response).replace("```json\n", "").replace("\n```", "")
    json_object = json.loads(json_string)
    return json_object["label"]


def gemini_pro_vision_classify_image(image):
    prompt = """
            "You are an expert flooding detector. You are
            given a image. You must detect if there is flooding in the image. The output MUST
            be a JSON object with a boolean value for the key ""label"". If you don't
            know what to anwser, you can set the key ""label"" as false. Example:
            {
                ""label"": true
            }"
    """
    try:
        genai.configure(api_key=GOOGLE_API_KEY)
        model = genai.GenerativeModel("gemini-pro-vision")
        responses = model.generate_content(
            contents=[prompt, image],
            generation_config={
                "max_output_tokens": 2048,
                "temperature": 0.4,
                "top_p": 1,
                "top_k": 32,
            },
            stream=True,
        )
        responses.resolve()
        return (responses.text, True, None)
    except Exception as e:
        return (' {\n  "label": false\n}', False, str(e))

In [None]:
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI, chat_models
from pydantic import BaseModel, Field
from typing import List, Union
from langchain.output_parsers import PydanticOutputParser


def get_parser():
    # Define the structure of each item in the output
    class OutputItem(BaseModel):
        object: str = Field(description="The object identified in the image")
        label_explanation: str = Field(
            description="Highly detailed visual description of the image given the object context"
        )
        label: Union[bool, str] = Field(
            description="Label indicating the condition or characteristic of the object"
        )

    # Define the structure for the list of items
    class OutputList(BaseModel):
        image_description: str = Field(
            description="Image description and visual elements from identification_guide column"
        )
        objects: List[OutputItem]

    # Create the output parser using the Pydantic model
    output_parser = PydanticOutputParser(pydantic_object=OutputList)

    # Valid JSON string
    output_example_str = """
    {
        "image_description":"<Insert the Image description and visual elements here>",
        "objects":[
            {
                "object": "<Object from objects table>", 
                "label_explanation": "<Visual description of the image given the object context>",  
                "label": "<Respective label from object in objects table>"
            }
        ]
    }
    """

    output_example_str = textwrap.dedent(output_example_str)

    output_example = output_parser.parse(output_example_str)

    return output_parser, json.dumps(output_example.dict(), indent=4)


def get_content():

    output_parser, output_example = get_parser()
    prompt_table = get_prompt_table(
        url="https://docs.google.com/spreadsheets/d/122uOaPr8YdW5PTzrxSPF-FD0tgco596HqgB7WK7cHFw/edit#gid=1672006844"
    )
    output_schema = json.dumps(json.loads(output_parser.pydantic_object.schema_json()), indent=4)
    content = [
        {
            "type": "text",
            "text": (
                f"""
                You are a highly skilled CCTV camera operator. Your task is to conduct a detailed analysis of the provided image. Analyze, classify, and describe the visual features of objects given an objects table. Ensure that every object from the objects table has at least one entry in the output with the respective label given the criteria and identification_guide.
                
                **Objects Table**
                
                {prompt_table}
                  
                **Thought Process**

                    1. Begin by providing a highly detailed description of the image and fill the image_description output.
                    2. Use the criteria and identification_guide as context to describe the visual features of each object and fill the label_explanation output. Do not copy the criteria and identification_guide; just use them as context to give a highly detailed visual description of label_explanation.
                    3. Then, using the label_explanation, select the most accurate label for each object. 
                    4. Ensure that every object from the objects table has at least one entry in the output with the respective label.
                    5. Return the output.    


                **Input:**
                A CCTV image.

                **Output:**
                Format the output as a JSON instance following the provided schema. 

                **Output Schema:**
                
                ```json
                {output_schema}
                ```
                
                **Example Output:**
                
                ```json
                {output_example}
                ```
                
                Now classify the image bellow:
                """
            ),
        },
    ]

    for d in content:
        for key, value in d.items():
            d[key] = textwrap.dedent(value.replace("                ", ""))
    return content


def get_prompt_table(url: str):
    request_url = url.replace("edit#gid=", "export?format=csv&gid=")
    response = requests.get(request_url)
    return pd.read_csv(io.StringIO(response.content.decode("utf-8"))).to_markdown(index=False)


def gemini_pro_vision_langchain(
    image_url, content, max_output_token=300, temperature=0.4, top_k=32, top_p=1
):
    llm = ChatGoogleGenerativeAI(
        model="gemini-pro-vision",
        google_api_key=GOOGLE_API_KEY,
        max_output_token=max_output_token,
        temperature=temperature,
        top_k=top_k,
        top_p=top_p,
    )

    message = HumanMessage(content=content + [{"type": "image_url", "image_url": image_url}])
    return llm.invoke([message])

In [None]:
from IPython.display import Markdown


def get_urls_from_path(url_path):
    urls = get_image_link_from_storage()
    data = []
    for url in urls:
        if url_path in str(url):
            data.append(
                {
                    "image_url": url,
                }
            )
    return pd.DataFrame(data)


# df = get_urls_from_path(url_path="images_predicted_as_flood")
df = get_urls_from_path(url_path="/")
# content = get_content()
experiment_name = "test-road-blockade"

experiment_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = get_content()
max_output_token = 2000
temperature = 0.1
top_k = 32
top_p = 1

In [None]:
# display(Markdown(content[0]['text']))
# print(content[0]['text'])

In [None]:
from pathlib import Path

predictions_path = Path(f"./data/predictions/{experiment_name}__{experiment_datetime}.csv")

if predictions_path.exists():
    predictions = pd.read_csv(predictions_path)
    predictions_list = predictions["image_url"].tolist()
else:
    predictions_list = []

output_parser, output_example = get_parser()

retry = 5
for index, row in df.iterrows():
    image_url = row["image_url"]
    retry_count = 0
    while retry_count <= retry:
        try:
            if image_url not in predictions_list:
                response = gemini_pro_vision_langchain(
                    image_url=image_url,
                    content=content,
                    max_output_token=max_output_token,
                    temperature=temperature,
                    top_k=top_k,
                    top_p=top_p,
                )

                response_parsed = output_parser.parse(response.content)
                response_parsed = response_parsed.dict()

                print(f"{index} - {len(df)}")
                # print(json.dumps(response_parsed, indent=4))
                # display(get_image_from_url(image_url))

                save_data_in_sheets(
                    save_data=True,
                    data={
                        "content": get_content(),
                        "max_output_token": max_output_token,
                        "temperature": temperature,
                        "top_k": top_k,
                        "top_p": top_p,
                        "experiment_name": experiment_name,
                        "experiment_datetime": experiment_datetime,
                        "true_object": "",
                        "response": response_parsed,
                        "image_url": image_url,
                        "image": f'=IMAGE("{image_url}")',
                    },
                    data_url="https://docs.google.com/spreadsheets/d/122uOaPr8YdW5PTzrxSPF-FD0tgco596HqgB7WK7cHFw/edit#gid=436224340",
                    content_url="https://docs.google.com/spreadsheets/d/122uOaPr8YdW5PTzrxSPF-FD0tgco596HqgB7WK7cHFw/edit#gid=1779223884",
                )

                pd.DataFrame([{"image_url": image_url}]).to_csv(
                    path_or_buf=predictions_path,
                    index=False,
                    header=not predictions_path.exists(),
                    mode="a",
                )
            else:
                print(f"{index} - {len(df)}: already predicted")

            retry_count = retry + 1
        except Exception as e:
            print(f"{index} - {len(df)}: Error\n {e}\n\n\nAI Response:")

            print(response.content)

            retry_count += 1

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
)
import seaborn as sns


true_values = df_final["flood"].tolist()
predicted_values = df_final["ai_label"].tolist()

# Calculate metrics
accuracy = accuracy_score(true_values, predicted_values)
precision = precision_score(true_values, predicted_values, pos_label=True)
recall = recall_score(true_values, predicted_values, pos_label=True)
f1 = f1_score(true_values, predicted_values, pos_label=True)
conf_matrix = confusion_matrix(true_values, predicted_values)

# Print metrics
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")


cm = confusion_matrix(true_values, predicted_values)

plt.figure(figsize=(8, 6))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=["False", "True"],
    yticklabels=["False", "True"],
)
plt.ylabel("Actual")
plt.xlabel("Predicted")
plt.title("Confusion Matrix")
plt.show()


# resize_factor = 3
# imgs = 10
# time = 1:00
# Accuracy: 0.7
# Precision: 0.6666666666666666
# Recall: 0.5
# F1 Score: 0.5714285714285715

# resize_factor = 1
# imgs = 10
# time = 1:15
# Accuracy: 0.8
# Precision: 0.75
# Recall: 0.75
# F1 Score: 0.75


# resize_factor = 5
# imgs = 10
# time = 1:20
# Accuracy: 0.8
# Precision: 0.75
# Recall: 0.75
# F1 Score: 0.75

In [None]:
df_final["miss"] = np.where(df_final["flood"] == df_final["ai_label"], False, True)
mask = df_final["miss"] == True
miss = df_final[mask]
miss_imgs = miss["base64"].tolist()
miss_ai_labels = miss["ai_label"].tolist()


for base64_image, ai_label in zip(miss_imgs, miss_ai_labels):
    print(f"AI classyfy as: {ai_label}")
    display_img(base64_image)

You are a highly skilled prompt engineering focus in create prompts for CCTV image recognition. Your task is to optimize and refine a provided example prompt.

you output is a diff between the provided prompt and the new optmized prompt

shall we start?
