In [55]:
import chromadb
from logging import INFO, ERROR
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.models import vgg16
from PIL import Image
import os

VECTOR_DB_PATH = "./vectordatabase_beer_1"

def init_vectordb():
    try:
        chromadb.PersistentClient(path=VECTOR_DB_PATH)
        print("Setup vectordb OK")
    except Exception as e:
        print(f"Setup vectordb failed! - {str(e)}")
    
def get_chroma_collection():
    try:
        client = chromadb.PersistentClient(path=VECTOR_DB_PATH)
        collection = client.get_or_create_collection(name="beer_similarity_check_1", metadata={"hnsw:space": "cosine"})
        
        return collection
    except Exception as e:
        print(f"Get vectordb client error {str(e)}")

In [56]:
import cv2
from ultralytics import YOLO
def beer_detection(image_path,detector):
        image = cv2.imread(image_path)
        results = detector(image, conf=0.25)
        crop_list=[]
        for result in results:
                boxes = result.boxes.xyxy.cpu().numpy()
                classes = result.boxes.cls.cpu().numpy()
                scores = result.boxes.conf.cpu().numpy()
                for i, (box, cls, score) in enumerate(zip(boxes, classes, scores)):
                        x1, y1, x2, y2 = map(int, box)
                        crop_img = image[y1:y2, x1:x2]
                        transform = transforms.ToTensor()
                        crop_list.append(transform(crop_img))
        return crop_list

In [57]:
TARGET_SIZE = (224, 224)

def process_image(image_path, normalize=True):
    """Load and preprocess an image."""
    img = Image.open(image_path).convert('RGB')
    transform_list = [
        transforms.Resize(TARGET_SIZE),
        transforms.ToTensor()
    ]
    if normalize:
        transform_list.append(transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))
    
    transform = transforms.Compose(transform_list)
    img = transform(img)
    img = img.unsqueeze(0)  # Add batch dimension
    return img

class EmbeddingModel:
    
    def __init__(self):
        model = vgg16(pretrained=True)
        self.embedding_model = nn.Sequential(*list(model.children())[:-1])  # Remove the classifier
        self.embedding_model.eval()
        
    def embed(self, img_path, normalize=True):
        """Embed the image using the model."""
        if isinstance(img_path, str):
            assert os.path.exists(img_path), f"Image path {img_path} not found!"
            img_tensor = process_image(img_path, normalize)
        else:
            img_tensor = img_path
            if normalize:
                img_tensor = self._normalize(img_tensor)
        
        with torch.no_grad():
            representation = self.embedding_model(img_tensor)
        
        return representation.flatten().numpy()
    
    def _normalize(self, img_tensor):
        """Normalize the image tensor."""
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        return normalize(img_tensor)

In [58]:
def main(image_path, top_k=5):
    model = YOLO('runs/detect/beer7/weights/best.pt')
    beers=beer_detection(image_path,model)
    init_vectordb()
    vectordb_coll=get_chroma_collection()
    embedding_model = EmbeddingModel()
    results=[]
    for beer in beers:
        embed_feature=embedding_model.embed(beer).tolist()
        result = vectordb_coll.query(query_embeddings=[embed_feature],n_results=top_k)
        results.append(result)
    return results

In [59]:
image_path="dataset/train/874a993ebf4f444a3e6c21c9b172143a20211221120302.5637770saigonbeer.jpg"
results=main(image_path,top_k=20)


0: 480x640 9 Beers, 8.5ms
Speed: 1.5ms preprocess, 8.5ms inference, 1.1ms postprocess per image at shape (1, 3, 480, 640)
Setup vectordb OK


In [60]:
from collections import Counter

all_brands = []

# Lặp qua từng dictionary trong data
for d in results:
    # Lấy ra các IDs từ dictionary hiện tại
    ids = d['ids'][0]  # ids là một list chứa các ID
    
    # List để lưu trữ các brand của từng ID
    id_brands = []
    
    # Lặp qua từng metadata_list trong metadatas của dictionary hiện tại
    for metadata_list in d['metadatas']:
        # List để lưu trữ các brand từng metadata_list
        metadata_brands = []
        
        # Lặp qua từng metadata trong metadata_list
        for metadata in metadata_list:
            # Lấy ra brand từ metadata và thêm vào list metadata_brands
            metadata_brands.append(metadata['brand'])
        
        # Thêm danh sách các brand của metadata_list vào id_brands
        id_brands.extend(metadata_brands)
    
    # Lưu trữ tất cả các brand của dictionary hiện tại
    all_brands.append((ids, id_brands))

brand_dict = {
  "Heineken": 0,
  "Tiger": 0,
  "Bia_Viet": 0,
  "Larue": 0,
  "Bivina": 0,
  "Edelweiss": 0,
  "Strongbow": 0,
  "Sai_gon": 0,
  "333": 0,
  "Huda": 0,
}
for ids, brands in all_brands:
    # Đếm số lần xuất hiện của mỗi brand trong similarity
    brand_counter = Counter(brands)
    most_common_brand = brand_counter.most_common(1)[0][0]
    brand_dict[f"{most_common_brand}"]+=1

for key, value in brand_dict.items():
    if value != 0:
        print(f"{value} bia {key}")



9 bia Sai_gon
