In [None]:
import numpy as np
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
from sklearn.metrics.pairwise import cosine_similarity
from itertools import product
import cv2
import os
import datetime
import shutil
import chromadb
import clip

In [None]:
# Création du client et de la base locale
client = chromadb.PersistentClient(path="./chroma_db")

In [None]:
# Define a function to create a collection in ChromaDB
def create_collection(collection_name, embedding_function):
    """
    Crée une collection dans ChromaDB avec la fonction d'embedding spécifiée.
    
    Args:
        collection_name (str): Le nom de la collection à créer.
        embedding_function: La fonction d'embedding à utiliser pour la collection.
    
    Returns:
        Collection: La collection créée.
    """
    return client.get_or_create_collection(
        name=collection_name,
        embedding_function=embedding_function
    )

In [None]:
# Ajout de documents avec vecteurs + métadonnées
def add_detection(collection, ids, embeddings, metadatas):
    collection.add(
        ids=ids,
        embeddings=embeddings,
        metadatas=metadatas
    )

def del_detection(collection, ids):
    collection.delete(ids=ids)

In [None]:
# charge le modèle CLIP et le préprocesseur
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, preprocess = clip.load("ViT-B/32", device=device)

In [None]:
# Path 
path = "C:/Users/cleme/Desktop/Ecole/M1/Projet/Projet_S2/V2/Video_test"
video_name = "This Video Is 3 Seconds"
video_path = os.path.join(path, video_name)+".mp4"

In [None]:
# Sanitize the video_name to make it a valid collection name
sanitized_video_name = video_name.replace(" ", "_")
sanitized_video_name = sanitized_video_name.replace("-", "_")
sanitized_video_name = sanitized_video_name.replace(".", "_")

# Define a custom embedding function that conforms to the required signature
class ClipEmbeddingFunction:
	def __call__(self, input):
		# Ensure input is a tensor and process it using the CLIP model
		if not isinstance(input, torch.Tensor):
			raise ValueError("Input to embedding function must be a torch.Tensor")
		return clip_model.encode_image(input)

# Create the collection with the custom embedding function
embedding_function = ClipEmbeddingFunction()
create_collection(sanitized_video_name, embedding_function=embedding_function)

Pas besoin de redimensionner les frames, CLIP le fait tout seul grâce à sa fonction "preprocess". Faire 2 redimensionnages augmente simplement le temps de calcul, voire peut faire perdre des informations de l'image d'origine. 

In [None]:
FPS_TARGET = 24  # target FPS

# Clear cache and re-download the SentenceTransformer model
cache_dir = os.path.expanduser("~/.cache/huggingface/hub")
if os.path.exists(cache_dir):
    shutil.rmtree(cache_dir)

def create_unique_folder(base_dir, prefix="frames"):
    timestamp = datetime.datetime.now().strftime("%d%m_%H%M%S")
    folder_path = os.path.join(base_dir, f"{prefix}_{timestamp}")
    os.makedirs(folder_path, exist_ok=True)
    return folder_path

def extract_frames(video_path, output_dir, target_fps, start_frame=0, custom_output_dir=None):
    if not os.path.exists(video_path):
        print(f"Error: Video file '{video_path}' does not exist.")
        return None

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Unable to open video '{video_path}'")
        return None

    original_fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames_original = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    total_frames_target = int((total_frames_original / original_fps) * target_fps)

    print(f"Original total frames: {total_frames_original}")
    print(f"Target total frames: {total_frames_target}")

    # Check and adjust start_frame
    if start_frame > total_frames_target:
        print(f"Error: Start frame ({start_frame}) is greater than total target frames ({total_frames_target}).")
        start_frame = total_frames_target
        print(f"start_frame adjusted to {start_frame}")

    # Cross multiplication to adjust start time
    start_time = (start_frame * total_frames_original) / total_frames_target
    adjusted_start_frame = round(start_time)
    print(f"Adjusted start frame: {adjusted_start_frame}")

    # Use custom directory or create a new one
    frames_dir = custom_output_dir if custom_output_dir else create_unique_folder(output_dir, "frames_resized")
    os.makedirs(frames_dir, exist_ok=True)

    # Extract and resize frames starting from the adjusted frame
    extracted_count = start_frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, adjusted_start_frame)

    while True:
        success, frame = cap.read()
        if not success:
            break

        frame_filename = os.path.join(frames_dir, f"frame_{extracted_count:06d}.png")
        cv2.imwrite(frame_filename, frame)
        extracted_count += 1

    cap.release()
    print(f"Extraction and resizing completed. {extracted_count - adjusted_start_frame} images saved to '{frames_dir}'")
    return frames_dir

def create_video_from_frames(frames_dir, output_path, fps):
    frames = sorted(f for f in os.listdir(frames_dir) if f.endswith(".png"))
    if not frames:
        print(f"Error: No frames found in '{frames_dir}'")
        return

    first_frame = cv2.imread(os.path.join(frames_dir, frames[0]))
    height, width, _ = first_frame.shape

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for frame_filename in frames:
        frame = cv2.imread(os.path.join(frames_dir, frame_filename))
        out.write(frame)

    out.release()
    print(f"Video saved to '{output_path}'")

# ============================
#           EXECUTION
# ============================

path = "C:/Users/cleme/Desktop/Ecole/M1/Projet/Projet_S2/V2/Video_test"
video_name = "This Video Is 3 Seconds"
video_path = os.path.join(path, video_name)+".mp4"

frames_base_dir = path + "/extraction"
timestamp = datetime.datetime.now().strftime("%d%m_%H%M%S")

custom_folder = ""
frames_dir = extract_frames(video_path, frames_base_dir, FPS_TARGET, start_frame=0, custom_output_dir=custom_folder)


In [None]:

class EmbeddingComparator:

    # Constructor to initialize the CLIP model and processor (options: "openai/clip-vit-base-patch32" for speed, "openai/clip-vit-large-patch14" for accuracy).

    def __init__(self, model_name="openai/clip-vit-base-patch32"):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model = CLIPModel.from_pretrained(model_name).to(self.device)
        self.processor = CLIPProcessor.from_pretrained(model_name)

    # Encode an image into an embedding vector (returns a numpy array of shape (1, embedding_dim)).
        
    def encode_image(self, image_path):
        
        image = Image.open(image_path)
        inputs = self.processor(images=image, return_tensors="pt", padding=True).to(self.device)
        
        with torch.no_grad():
            image_features = self.model.get_image_features(**inputs)
        
        # Normalize embeddings
        image_embedding = image_features / image_features.norm(dim=-1, keepdim=True)
        return image_embedding.cpu().numpy()
    
    # Encode text into an embedding vector (returns a numpy array of shape (1, embedding_dim)).

    def encode_text(self, text):
       
        inputs = self.processor(text=text, return_tensors="pt", padding=True).to(self.device)
        
        with torch.no_grad():
            text_features = self.model.get_text_features(**inputs)
        
        # Normalize embeddings
        text_embedding = text_features / text_features.norm(dim=-1, keepdim=True)
        return text_embedding.cpu().numpy()
    
    # Compute cosine similarity between two embedding vectors (returns a score between -1 and 1).
    
    def compare_embeddings(self, embedding1, embedding2):
        
        return cosine_similarity(embedding1, embedding2)[0][0]


In [None]:
# Example of use : 

def testing(image,text) :
    comparator = EmbeddingComparator()
    image_embedding = comparator.encode_image(image) 
    text_embedding = comparator.encode_text(text)
    
    similarity = comparator.compare_embeddings(image_embedding, text_embedding)
    print(f"Similarity between image and '{text}': {similarity:.4f}")

In [None]:
# Parcourir chaque image dans le dossier frames_base_dir
for root, dirs, files in os.walk(frames_base_dir):
    comparator = EmbeddingComparator()
    for file in files:
        if file.endswith(('.png', '.jpg', '.jpeg')):  # Vérifier les extensions d'image
            image_path = os.path.join(root, file)
            
            # Extraire l'embedding directement à partir du chemin de l'image
            embedding = comparator.encode_image(image_path)
            
            # Ensure embedding is a PyTorch tensor before calling detach()
            if isinstance(embedding, np.ndarray):
                embedding = torch.tensor(embedding)

            print(f"Embedding extrait pour {file}: {embedding.detach().cpu().numpy()}")

            # Ajouter l'embedding à la collection ChromaDB
            add_detection(
                collection=client.get_collection(sanitized_video_name),
                ids=[file],
                embeddings=embedding.detach().cpu().numpy().tolist(),
                metadatas=[{"frame": file}]
            )

In [None]:
# affichage de chaque nom de collection
collections = client.list_collections()
print("Collections existantes :")
for collection in collections:
    print(f"- {collection}")

In [None]:
# Récupérer toutes les données (ids, embeddings, metadatas) de chaque collection

collections = client.list_collections() # Liste des collections

for collection in collections:
    print(f"Collection: {collection}")
    detection = client.get_collection(collection) # Récupérer la collection
    all_data = detection.get(include=["embeddings", "metadatas"]) # Récupérer toutes les données de la collection

    # Afficher tout le contenu
    for i in range(len(all_data["ids"])):
        print(f"🔹 ID: {all_data['ids'][i]}")
        if all_data['embeddings'] is not None:
            print(f"🧠 Embedding: {all_data['embeddings'][i]}")
        print(f"📌 Métadonnées: {all_data['metadatas'][i]}")
        print("-" * 40)

"""
print(len(all_data['ids']))
print(all_data['embeddings'][0].shape)"""


In [None]:
print("Norm:", torch.norm(embedding).item())               # ✅ ≈ 1.0
print("Mean:", embedding.mean().item())                    # ✅ ≈ 0.0
print("Std:", embedding.std().item())                      # ✅ ≈ 0.05
print("Min/Max:", embedding.min().item(), embedding.max().item())


In [None]:
# Récupérer une requête textuelle et générer son embedding
def get_text_embedding(query):
    comparator = EmbeddingComparator()
    text_embedding = comparator.encode_text(query)
    return text_embedding

In [None]:
def find_similar_images(frames_dir, query_embedding, similarity_threshold, top_x):
    # Dossiers de sortie
    above_threshold_dir = os.path.join(frames_dir, "above_threshold")
    top_x_dir = os.path.join(frames_dir, "top_x_similar")
    os.makedirs(above_threshold_dir, exist_ok=True)
    os.makedirs(top_x_dir, exist_ok=True)

    # Récupérer la collection
    collection = client.get_collection(sanitized_video_name)
    all_data = collection.get(include=["documents", "embeddings", "metadatas"])

    # Calculer les similarités
    similarities = []
    for i in range(len(all_data["ids"])):
        image_id = all_data["ids"][i]
        image_embedding = np.array(all_data["embeddings"][i])
        similarity = cosine_similarity(query_embedding, image_embedding.reshape(1, -1))[0][0]
        similarities.append((image_id, similarity))
    
    # Copier les images au-dessus du seuil
    for image_id, similarity in similarities:
        if similarity >= similarity_threshold:
            src_path = os.path.join(frames_dir, image_id)
            dst_path = os.path.join(above_threshold_dir, image_id)
            shutil.copy(src_path, dst_path)

    # Trier les similarités par ordre décroissant
    similarities.sort(key=lambda x: x[1], reverse=True)

    # Copier les x images les plus similaires
    for i, (image_id, similarity) in enumerate(similarities[:top_x]):
        src_path = os.path.join(frames_dir, image_id)
        dst_path = os.path.join(top_x_dir, f"{i+1:02d}_{image_id}")
        shutil.copy(src_path, dst_path)

    print(f"Images au-dessus du seuil ({similarity_threshold}) copiées dans '{above_threshold_dir}'")
    print(f"Top {top_x} images les plus similaires copiées dans '{top_x_dir}'")

# Exemple d'utilisation
query = "Person walking in a park"
query_embedding = get_text_embedding(query)
find_similar_images(frames_dir, query_embedding, similarity_threshold=0.18, top_x=50)

In [None]:
# Chemin du dossier "above_threshold"
above_threshold_dir = os.path.join(frames_dir, "above_threshold")

# Chemin de sortie pour la vidéo recréée
output_video_path = os.path.join(frames_dir, "above_threshold_video.mp4")

# Recréer la vidéo à partir des images
create_video_from_frames(above_threshold_dir, output_video_path, FPS_TARGET)