<a href="https://colab.research.google.com/github/jaime-rodrigues/J74Manager/blob/master/Image_Embedder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Atualizar pacotes e adicionar repositório oficial do PostgreSQL
!apt-get update
!apt-get install -y wget gnupg2
!wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add -
!echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" | tee /etc/apt/sources.list.d/pgdg.list
!apt-get update

# Instalar PostgreSQL 17 e a extensão pgvector
!apt-get update
!apt-get install -y postgresql-17 postgresql-17-pgvector postgresql-contrib postgresql-server-dev-17
!service postgresql start

# Criar um usuário e um banco de dados no PostgreSQL
!sudo -u postgres psql -c "CREATE USER colab WITH SUPERUSER PASSWORD 'colab';"
!sudo -u postgres psql -c "CREATE DATABASE colab OWNER colab;"

print("PostgreSQL instalado e configurado!")
# Instalar a extensão pgvector
!sudo -u postgres psql -d colab -c "CREATE EXTENSION IF NOT EXISTS vector;"
print("PostgreSQL com pgvector instalado e configurado!")

In [None]:
!pip install retry

In [None]:
import os
import subprocess
import sys
import numpy as np
from PIL import Image
import torch
import psycopg2
from psycopg2.extras import execute_values
from psycopg2 import OperationalError, InterfaceError
from transformers import CLIPProcessor, CLIPModel
import matplotlib.pyplot as plt
from typing import List, Tuple
from retry import retry
from pathlib import Path
from IPython.display import display, HTML, clear_output
import threading
import time
from tqdm.notebook import tqdm
import torchvision.transforms as T

In [None]:
# import zipfile

# # 1. Specify the path to the zip file and the destination folder
# zip_file_path = '/content/drive/MyDrive/DevMaster/Fotos/produtos/Licuri/Pronto.zip'
# destination_folder = '/content/drive/MyDrive/DevMaster/Fotos/produtos/Licuri/Pronto'  # Create a folder to extract to

# # 2. Create the destination folder if it doesn't exist
# os.makedirs(destination_folder, exist_ok=True)

# # 3. Open the zip file and extract its contents
# with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
#     zip_ref.extractall(destination_folder)

# print(f"Files extracted to: {destination_folder}")

In [None]:
# Configurações globais
DATABASE_CONFIG = {
    "host": "localhost",
    "database": "colab",
    "user": "colab",
    "password": "colab",
    "port": "5432"
}

IMAGE_EXTS = ('.jpg', '.jpeg', '.png', '.webp')
BATCH_SIZE = 32  # Tamanho do lote para inserção no banco de dados
EMBEDDING_DIM = 768  # ResNet50 output size
CLIP_BASE = "openai/clip-vit-large-patch14"

In [None]:
# 2. Gerenciamento de banco de dados aprimorado
class DatabaseManager:
    def __init__(self, config: dict, backup_file_path='/content/backup.sql'):
        self.config = config
        self.backup_file_path = backup_file_path
        self._connect_and_restore()

    @retry((OperationalError, InterfaceError), tries=3, delay=2)
    def _connect_with_retry(self):
        """Conexão com retentativa automática"""
        return psycopg2.connect(**self.config)

    def _connect_and_restore(self):
        """Conecta ao banco de dados e restaura se o backup existir."""
        try:
            # Tenta conectar ao banco de dados
            self.conn = self._connect_with_retry()

            # Verifica se o arquivo de backup existe
            if os.path.exists(self.backup_file_path):
                # Restaura o banco de dados a partir do backup
                self._restore_database()
                print(f"Banco de dados restaurado de: {self.backup_file_path}")
            else:
              self.recreate_table()

        except Exception as e:
            print(f"Erro ao conectar ou restaurar o banco de dados: {e}")
            raise

    def _restore_database(self):
        """Restaura o banco de dados a partir do arquivo de backup."""
        try:
            # Obtem as configurações do banco de dados
            host = self.config['host']
            port = self.config['port']
            database = self.config['database']
            user = self.config['user']
            password = self.config['password']

            # Comando psql para restauração
            command = [
                'psql',
                '-h', host,
                '-p', port,
                '-U', user,
                '-d', database,
                '-f', self.backup_file_path
            ]

            # Executa o comando psql
            process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            process.communicate(input=password.encode())
        except Exception as e:
            print(f"Erro ao restaurar o banco de dados: {e}")
            raise

    def recreate_table(self):
        """Drop and recreate the table with the unique constraint."""
        with self.conn.cursor() as cursor:
            cursor.execute("DROP TABLE IF EXISTS image_embeddings;")
            self.conn.commit()
        self.create_table()

    def create_table(self):
        """Cria tabela com índice especializado"""
        with self.conn.cursor() as cursor:
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS image_embeddings (
                    id SERIAL PRIMARY KEY,
                    filename VARCHAR(255) NOT NULL,
                    filepath VARCHAR(4096) NOT NULL,
                    embedding vector({EMBEDDING_DIM}) NOT NULL
                );
            """)
            self.conn.commit()

    def create_table_ivfflat(self):
        """Cria tabela com índice especializado"""
        with self.conn.cursor() as cursor:
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS image_embeddings (
                    id SERIAL PRIMARY KEY,
                    filename VARCHAR(255) NOT NULL,
                    filepath VARCHAR(4096) NOT NULL,
                    embedding vector({EMBEDDING_DIM}) NOT NULL
                );

                CREATE INDEX IF NOT EXISTS embedding_idx
                ON image_embeddings USING ivfflat (embedding vector_l2_ops)
                WITH (lists = 100);
            """)
            self.conn.commit()

    def create_table_hnsw(self):
        """Cria tabela com índice especializado"""
        with self.conn.cursor() as cursor:
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS image_embeddings (
                    id SERIAL PRIMARY KEY,
                    filename VARCHAR(255) NOT NULL,
                    filepath VARCHAR(4096) NOT NULL,
                    embedding vector({EMBEDDING_DIM}) NOT NULL
                );

                CREATE INDEX IF NOT EXISTS embedding_idx
                ON image_embeddings USING hnsw (embedding vector_l2_ops)
                WITH (m = 16, ef_construction = 100);
            """)
            self.conn.commit()

    @retry((OperationalError, InterfaceError), tries=3, delay=1)
    def insert_embeddings_batch(self, records: List[Tuple[str, str, list]]):
        """Inserção em lote otimizada"""
        query = """
            INSERT INTO image_embeddings (filename, filepath, embedding)
            VALUES %s;
        """
        try:
            with self.conn.cursor() as cursor:
                execute_values(cursor, query, records, page_size=BATCH_SIZE)
                self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            raise

    @retry((OperationalError, InterfaceError), tries=3, delay=1)
    def search_similar(self, embedding: np.ndarray, top_k: int = 5) -> List[Tuple]:
        """Busca com tratamento de erros"""
        embedding_str = ','.join(map(str, embedding.tolist()))  # Formata o embedding
        with self.conn.cursor() as cursor:
            cursor.execute(f"""
                SELECT filename, filepath, 1 - (embedding <-> ARRAY[{embedding_str}]::vector) as similarity
                FROM image_embeddings
                ORDER BY embedding <-> ARRAY[{embedding_str}]::vector
                LIMIT {top_k};
            """)
            return cursor.fetchall()

    def listar_primeiros_100_registros(self):
        """
        Lista os primeiros 100 registros da tabela image_embeddings.

        Args:
            None

        Returns:
            Uma lista de tuplas, onde cada tupla representa uma linha da tabela.
        """
        try:
            with self.conn.cursor() as cursor:
                cursor.execute("SELECT * FROM image_embeddings LIMIT 100;")
                registros = cursor.fetchall()
            return registros
        except Exception as e:
            print(f"Erro ao buscar registros: {e}")
            return []

    def close(self):
        if self.conn:
            self.conn.close()

    def backup_database(self):
        """
        Faz o backup do banco de dados.

        Args:
            backup_file_path: Caminho para o arquivo de backup.
        """
        try:
            # Obtem as configurações do banco de dados
            host = self.config['host']
            port = self.config['port']
            database = self.config['database']
            user = self.config['user']
            password = self.config['password']

            # Comando pg_dump
            command = [
                'pg_dump',
                '-h', host,
                '-p', port,
                '-U', user,
                '-d', database,
                '-f', self.backup_file_path
            ]

            # Executa o comando pg_dump
            process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            process.communicate(input=password.encode())

            print(f"Backup do banco de dados criado em: {self.backup_file_path}")

        except Exception as e:
            print(f"Erro ao criar o backup do banco de dados: {e}")

In [None]:
# 1. Modelo CLIP para embeddings semânticos
class CLIPEmbedder:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = CLIPModel.from_pretrained(CLIP_BASE)
        self.processor = CLIPProcessor.from_pretrained(CLIP_BASE)
        self.model.to(self.device).eval()

    @retry(tries=3, delay=1)
    def generate_embedding(self, image: Image) -> np.ndarray:
        """Gera embedding usando CLIP com tratamento de erros"""
        try:
            inputs = self.processor(images=image, return_tensors="pt").to(self.device)
            with torch.no_grad():
                features = self.model.get_image_features(**inputs)
            return features.cpu().numpy().squeeze().astype(np.float32)
        except Exception as e:
            print(f"Erro ao processar imagem: {str(e)}")
            raise

In [None]:
# 3. Interface gráfica simplificada
class ImageSearchGUI:
    def __init__(self, db_manager: DatabaseManager, embedder: CLIPEmbedder):
        self.db = db_manager
        self.embedder = embedder

    def process_folder(self, folder):
        """Seleção de pasta via diálogo"""
        print(folder)
        if folder:
            self.process_images(Path(folder))
            self.db.backup_database()

    def process_folders(self, diretorio = "/content/drive/MyDrive/DevMaster/Fotos/produtos/", folders =['Anel','Chocker','Brinco','Colar', 'Conjuntos']):
        for folder in folders:
          self.process_folder(diretorio+folder)

    def search_image(self, filepath):
        """Seleção de imagem para busca"""
        if filepath:
            print(filepath)
            image = Image.open(filepath).convert("RGB")
            self.show_results(image)

    def process_images(self, folder: Path):
        """Processamento em lote com barra de progresso, thread separada e subpastas"""
        all_image_files = []
        for root, _, files in os.walk(folder):
            for file in files:
                if file.lower().endswith(IMAGE_EXTS):
                    all_image_files.append(os.path.join(root, file))

        total_images = len(all_image_files)
        progress_bar = tqdm(total=total_images, desc="Processando Imagens")

        def process_images_thread():
            records = []
            for i, image_file in enumerate(all_image_files):
                try:
                  # Chama a função de transformação para gerar novas imagens
                  imagens_transformadas = self.transformar_imagem(image_file)

                  # Gera embeddings para cada imagem transformada
                  for imagem_transformada in imagens_transformadas:
                      embedding = self.embedder.generate_embedding(imagem_transformada)
                      records.append((os.path.basename(image_file), image_file, embedding.tolist()))  # Armazena o nome da imagem original

                      if len(records) >= BATCH_SIZE:
                          self.db.insert_embeddings_batch(records)
                          records = []

                  progress_bar.update(1)
                except Exception as e:
                    print(f"Erro ao processar {image_file}: {e}")
            # Inserir quaisquer registros restantes
            if records:
                self.db.insert_embeddings_batch(records)

        thread = threading.Thread(target=process_images_thread)
        thread.start()

        # Aguarda o término da thread antes de retornar
        thread.join()
        progress_bar.close()

    def show_results(self, image: Image):
        """Exibe resultados com visualização"""
        try:
            embedding = self.embedder.generate_embedding(image)
            results = self.db.search_similar(embedding)

            # Plot dos resultados
            plt.figure(figsize=(15, 10))

            # Imagem de consulta
            plt.subplot(2, 3, 1)
            plt.imshow(image)
            plt.title("Query Image")
            plt.axis('off')

            # Imagens similares
            for i, (filename, filepath, similarity) in enumerate(results, 2):
                plt.subplot(2, 3, i)
                plt.imshow(Image.open(filepath))
                plt.title(f"{filename}\nSimilarity: {similarity:.2f}")
                plt.axis('off')

            plt.tight_layout()
            plt.show()

        except Exception as e:
            print(f"Erro ao processar imagem: {e}")
            pass

    def listar_primeiros(self):
        """Seleção de pasta via diálogo"""
        return self.db.listar_primeiros_100_registros()

    def on_close(self):
        self.db.close()
        self.root.destroy()

    def transformar_imagem(self, image_path):
        """Aplica transformações na imagem original e retorna uma lista de imagens transformadas."""
        image = Image.open(image_path).convert("RGB")

        # Define as transformações a serem aplicadas
        transformacoes = [
            T.RandomRotation(degrees=30),
            T.RandomResizedCrop(size=(224, 224)),
            T.RandomHorizontalFlip(),
            T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
            # Adicione mais transformações conforme necessário
        ]

        # Aplica as transformações e gera novas imagens
        imagens_transformadas = []
        for transformacao in transformacoes:
            transformed_image = transformacao(image)
            imagens_transformadas.append(transformed_image)

        # Adiciona a imagem original à lista
        imagens_transformadas.append(image)

        return imagens_transformadas

In [None]:
# 4. Funções principais otimizadas
def main():
    # Inicialização de componentes
    embedder = CLIPEmbedder()
    db = DatabaseManager(DATABASE_CONFIG, backup_file_path='/content/drive/MyDrive/colab_backup.sql')

    # Interface gráfica
    gui = ImageSearchGUI(db, embedder)

    return gui

In [None]:
gui = main()

In [None]:
gui.listar_primeiros()

In [None]:
gui.search_image("/content/drive/MyDrive/DevMaster/Fotos/modificada/103_1.jpg")

In [None]:
gui.search_image("/content/drive/MyDrive/DevMaster/Fotos/modificada/103_2.jpg")

In [None]:
gui.search_image("/content/drive/MyDrive/DevMaster/Fotos/modificada/IMG_7224_modificada.jpeg")

In [None]:
gui.search_image("/content/drive/MyDrive/DevMaster/Fotos/modificada/IMG_7233_modificada.jpeg")

In [None]:
gui.search_image("/content/drive/MyDrive/DevMaster/Fotos/produtos/Anel/201-400/IMG_6727.jpeg")

In [None]:
gui.search_image("/content/drive/MyDrive/DevMaster/Fotos/modificada/094-modificada.png")