# Demo: Nightshade-Video

O Nightshade original é dividido em três partes (ou códigos do repositório oficial):

1. ***data_extraction***: responsável pela extração dos dados válidos do conceito A (origem) para envenamento. Neste passo, a rede CLIP (uma rede pré-treinada que associa imagens a textos e possibilita fazer essa 'tradução' de um para o outro) valida as imagens que queremos envenenar com o algoritmo, verificando-as com o conceito (classe) passado por hiperparâmetro ao modelo (ex: "essa é uma imagem de um _gato_"). Ao final do código, os embeddings (representação vetorial via similaridade de cosseno) das imagens obtidos com o CLIP são salvos em um formato _pickle_ (contendo os embeddings e os conceitos associados com cada um) para posterior utilização pelo Nightshade e redes de teste.

2. ***opt***: contém a classe de geração de veneno (_PoisonGenerator_) propriamente dita. Nesta classe, o modelo do StableDiffusion é carregado, gera uma imagem do conceito a se envenenar, aplica o algoritmo de envenenamento que nada mais é do que aproximar os vetores descritores do Conceito A e Conceito B num espaço latente (espaço comum, por isso opta-se pelo uso da similaridade de cosseno na etapa 1) e, para que isso seja possível, realiza uma aplicação de "lixo" na imagem do Conceito A de tal forma a aproximá-la com o Conceito B, analisando por meio da perda do modelo (dado pela norma da diferença do vetor do Conceito A com o vetor do Conceito B).

3. ***gen_poison***: responsável por juntar os passos 1 e 2 em um script, ou seja, recupera os dados obtidos pela extração do passo 1 e aplica o algoritmo de envenenamento do passo 2, gerando as imagens resultantes.

Desta forma, o algoritmo para vídeo segue as mesmas diretrizes, tendo em vista que há modelos neurais adaptados para vídeo daqueles que foram utilizados para imagens na publicação do Nightshade.

Portanto, inicialmente para teste:

- ***[VideoCLIP-XL-v2](https://arxiv.org/abs/2410.00741)*** para substituir o _CLIP_; poderia ser possível, ainda, aplicar o _CLIP_ original nos quadros de cada vídeo e fazer uma média de score por vídeo, obtendo aqueles vídeos com os melhores scores, mas menos acurado já que o _CLIP_ originalmente não trata as informações temporais.

- ***[StableVideoDiffusion-Img2Vid-XT](https://huggingface.co/stabilityai/stable-video-diffusion-img2vid-xt)*** (SVD) para substituir o _Stable Diffusion_ (SD); nota-se que este modelo necessita de um frame para a geração de um vídeo novo. O método proposto para o Nightshade-Vídeo será abordado nas etapas posteriores por conta desta diferença.

Com esses dois modelos, já é possível aplicar o algoritmo de envenenamento, já que poderemos avaliar se o SVD consegue recriar e compreender os vídeos envenenandos. Outros modelos alternativos aos citados no artigo: 
- ***[VideoCrafter2](https://ailab-cvc.github.io/videocrafter2/)*** para substituir o _DeepFloyd IF_, em termos avaliativos;
- ***[LVDM](https://yingqinghe.github.io/LVDM/)*** para substituir o _Latent Diffusion Models_, em termos avaliativos;


## Pré-Requisitos do Teste

Para testar a metodologia do vídeo, optou-se pela seleção de uma pequena quantidade amostral obtida da base ***[UCF‑101](https://www.kaggle.com/datasets/matthewjansen/ucf101-action-recognition)*** relativa ao Conceito '**Playing Piano**' e pretende-se envenenar para que fique parecido com o Conceito '**Swing**'. 

Os arquivos de vídeo foram copiados manualmente da base UCF-101 baixada e contemplam todos os dados do Conceito '**Playing Piano**' (_train_, _val_ e _test_ juntados).

Além disso, os seguintes pacotes serão necessários para utilização do algoritmo e das redes.

In [1]:
%pip install numpy scipy scikit-learn matplotlib pandas pillow tqdm
%pip install opencv-python
%pip install torch==2.6.0 torchvision==0.21.0 torchaudio==2.6.0 --index-url https://download.pytorch.org/whl/cu126

# Modelos
%pip install --upgrade huggingface_hub 
%pip install "huggingface_hub[cli]"
%pip install "transformers[torch]"

## VideoCLIP-XL-v2
%pip install opencv-python ftfy regex timm decord einops

## Stable Video Diffusion (SVD)

## 

[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.
Looking in indexes: https://download.pytorch.org/whl/cu126
[0mNote: you may need to restart the kernel to use updated packages.
Collecting huggingface_hub
  Using cached huggingface_hub-1.1.2-py3-none-any.whl.metadata (13 kB)
Using cached huggingface_hub-1.1.2-py3-none-any.whl (514 kB)
Installing collected packages: huggingface_hub
  Attempting uninstall: huggingface_hub
    Found existing installation: huggingface-hub 0.36.0
    Uninstalling huggingface-hub-0.36.0:
      Successfully uninstalled huggingface-hub-0.36.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
transformers 4.56.0 requires huggingface-hub<1.0,>=0.34.0, but you have huggingface-hub 1.1.2 which is incompatible.
tokenizers 0.22.0 requires huggingfac

In [2]:
# Check PyTorch installation
import torch, torchvision
import warnings

warnings.filterwarnings("ignore")  # ignore PyTorch warnings
print(torch.__version__, torch.cuda.is_available())

2.6.0+cu126 True


## Passo 1: Extração e Seleção de Dados

In [3]:
# Baixando o VideoCLIP-XL-v2 com Huggingface
import os
from huggingface_hub import snapshot_download

VIDEOCLIPXL_PATH = snapshot_download(
    repo_id="alibaba-pai/VideoCLIP-XL",
)
VIDEOCLIPXLV2_PATH = snapshot_download(
    repo_id="alibaba-pai/VideoCLIP-XL-v2",
)  # new weights for VideoCLIP-XL

print(f"{VIDEOCLIPXL_PATH}")
print(f"{VIDEOCLIPXLV2_PATH}")

# Moving the VideoCLIP-XL-v2 weight file to VideoCLIP-XL-v1 directory
# for file in os.listdir(VIDEOCLIPXLV2_PATH):
#     if file.endswith(".bin"):
#         os.rename(
#             os.path.join(VIDEOCLIPXLV2_PATH, file), os.path.join(VIDEOCLIPXL_PATH, file)
#         )

Fetching 15 files: 100%|██████████| 15/15 [01:34<00:00,  6.31s/it]
Fetching 3 files: 100%|██████████| 3/3 [01:40<00:00, 33.62s/it]

/root/.cache/huggingface/hub/models--alibaba-pai--VideoCLIP-XL/snapshots/17e20a3f2a82b2a4ad729f19e12747e756c459a7
/root/.cache/huggingface/hub/models--alibaba-pai--VideoCLIP-XL-v2/snapshots/552b698005f54886fb94dd534e4968fd20e4fbf2





In [4]:
!dir "{VIDEOCLIPXL_PATH}"
!dir "{VIDEOCLIPXLV2_PATH}"

README.md  VideoCLIP-XL.bin  demo.py  modeling.py  requirements.txt  utils
README.md  VideoCLIP-XL-v2.bin


In [5]:
# Adding model to PYTHON PATH
!export PYTHONPATH=f"{VIDEOCLIPXL_PATH}:$PYTHONPATH"

import sys
sys.path.insert(0, VIDEOCLIPXL_PATH)

In [6]:
import os
import glob
import shutil
import random

import numpy as np
import pickle
import pandas as pd
import cv2

from PIL import Image
import torch
import torch.nn.functional as F
from torchvision import transforms

from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel  # HuggingFace importing models

from modeling import VideoCLIP_XL
from utils.text_encoder import text_encoder

In [7]:
# CONFIGURAÇÕES DE DIRETÓRIO
SOURCE_CONCEPT = "PlayingPiano"
CONCEPT_TEXTS = [
    # "Um vídeo de uma pessoa tocando piano",
    # "Um vídeo de um piano sendo tocado",
    # "Um piano sendo tocado",
    # "Piano",
    "Um vídeo contendo um piano",
]  # Podem ser um ou mais textos
DATA_DIR = f"./data/{SOURCE_CONCEPT}"
OUTPUT_DIR = f"./output_embeddings/{SOURCE_CONCEPT}"
MAX_VIDEOS = None  # se None, todos os vídeos serão analisados

# ao selecionar vídeos candidatos, este é o número máximo a ser considerado
MAX_CANDIDATES = 5

# CONFIGURAÇÕES DO MODELO
MODEL_NAME = "alibaba-pai/VideoCLIP-XL"
WEIGHTS_PATH = VIDEOCLIPXLV2_PATH  # or VIDEOCLIPXL_PATH
# Considerar que NUM_FRAMES_AMOSTRADOS = MAX(1, TOTAL / FNUM) e que STEPS = MAX(1, TOTAL / FNUM)
FNUM = 8
TYPE_SAMPLING = "uniform"
SIM_THRESHOLD = 0.20  # limiar mínimo de similaridade

In [8]:
# Carregando o modelo
print(f"Carregando modelo {MODEL_NAME}...")
model = VideoCLIP_XL()
state_dict = torch.load(
    # os.path.join(WEIGHTS_PATH, "VideoCLIP-XL.bin"),
    os.path.join(WEIGHTS_PATH, "VideoCLIP-XL-v2.bin"),
    map_location="cpu",
)
model.load_state_dict(state_dict)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device).eval()

# tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
# model = AutoModel.from_pretrained(MODEL_PATH).to(device)

Carregando modelo alibaba-pai/VideoCLIP-XL...


In [9]:
# Normalização de quadros para o VideoCLIP-XL
v_mean = np.array([0.485, 0.456, 0.406]).reshape(1, 1, 3)
v_std = np.array([0.229, 0.224, 0.225]).reshape(1, 1, 3)


def normalize(data):
    return (data / 255.0 - v_mean) / v_std


def sample_frames_with_normalization(video_path, fnum=1000, type_sample="uniform"):
    """
    Extrai frames de um vídeo, uniformemente distribuídos ao longo do tempo.
    Também, os frames são normalizados para o VideoCLIP-XL, como no redimensionamento
    para 224x224 e transposição para tensor da rede.

    Args:
    - video_path (str): O caminho para o vídeo de entrada.
    - fnum (int): O intervalo de passos dos frames a serem extraídos. Se fnum=1, todos os frames serão amostrados.
    - type_sample (str): O tipo de amostragem a ser utilizado. Pode ser 'uniform' ou 'gaussian'.

    Returns:
    - Torch.tensor: Um tensor contendo os frames extraídos e normalizados, com formato (B, T, C, H, W)
    """
    frames = sample_frames(video_path, fnum=fnum, type_sample=type_sample)

    if frames is None:
        return None

    # Normalizando os frames amostrados
    normalized_frames = []
    for fr in frames:
        fr = cv2.cvtColor(fr, cv2.COLOR_BGR2RGB)  # ou fr[:,:,::-1]  # BGR -> RGB
        fr = cv2.resize(fr, (224, 224))
        fr = np.expand_dims(normalize(fr), (0, 1))  # normalizando
        normalized_frames.append(fr)

    # Convertendo para tensor
    normalized_frames = np.concatenate(normalized_frames, axis=1)
    normalized_frames = np.transpose(
        normalized_frames,
        (0, 1, 4, 2, 3),  # [B, T, C, H, W]
    )
    normalized_frames = torch.from_numpy(normalized_frames)

    return normalized_frames


def sample_frames(video_path, fnum=1000, type_sample="uniform"):
    """
    Extrai frames de um vídeo, uniformemente distribuídos ao longo do tempo.
    Os frames NÃO são normalizados ou sofrem qualquer transformação
    ou codificação de imagens.

    Args:
    - video_path (str): O caminho para o vídeo de entrada.
    - fnum (int): O intervalo de passos dos frames a serem extraídos. Se fnum=1, todos os frames serão amostrados.
    - type_sample (str): O tipo de amostragem a ser utilizado. Pode ser 'uniform' ou 'gaussian'.

    Returns:
    - list(frames): Uma lista de frames extraídos do vídeo.
    """
    # Carregando o vídeo
    video = cv2.VideoCapture(video_path)
    # fps = video.get(cv2.CAP_PROP_FPS)
    # total_frames = video.get(cv2.CAP_PROP_FRAME_COUNT)
    frames = []
    while True:
        success, frame = video.read()
        if not success:
            break
        frames.append(frame)
    video.release()

    # Caso tenha dado erro, returna nulo
    if len(frames) == 0:
        return None

    # Amostrando os frames com base em passos e no tipo de amostragem
    step = max(1, len(frames) // fnum)
    if type_sample == "uniform":
        frames = frames[::step][:fnum]
    elif type_sample == "gaussian":
        total_frames = int(len(frames))
        mean = total_frames / 2
        std_dev = mean * 0.4  # 20% para cada lado
        if total_frames <= fnum:
            frame_indices = list(range(total_frames))
        # Seleciona apenas o intervalo central (exclui 20% das margens)
        start = int(total_frames * 0.2)
        end = int(total_frames * 0.8)
        if (end - start) < fnum:
            frame_indices = list(range(start, end))
        else:
            # Amostragem uniforme dentro do intervalo central
            frame_indices = np.linspace(start, end - 1, fnum, dtype=int)
        frame_indices = list(frame_indices)
        frames = [frames[i] for i in frame_indices]

    return frames

In [9]:
def get_video_embedding(video_path, fnum=1000, type_sample="uniform"):
    """
    Extrai embedding do vídeo usando VideoCLIP-XL.

    Args:
    - video_path (str): O caminho para o vídeo de entrada.
    - fnum (int): O número de frames a serem extraídos do vídeo.
    - type_sample (str): O tipo de amostragem a ser utilizado. Pode ser 'uniform' ou 'gaussian'.

    Output:
    - embedding (np.ndarray): O embedding extraído do vídeo.
    """
    normalized_frames = sample_frames_with_normalization(
        video_path,
        fnum=fnum,
        type_sample=type_sample,
    )
    if normalized_frames is None:
        return None

    # Entrando no modo de avaliação da rede
    with torch.no_grad():
        video_inputs = normalized_frames.float().to(device)
        video_features = model.vision_model.get_vid_features(video_inputs).float()
        video_features = video_features / video_features.norm(dim=-1, keepdim=True)

    return video_features.cpu().numpy()


def get_text_embedding(texts):
    """
    Extrai embedding de uma lista de textos usando VideoCLIP-XL.

    Args:
    - text (list[str]): A lista de textos de entrada.

    Output:
    - embedding (np.ndarray): Os embeddings extraídos dos textos.
    """
    with torch.no_grad():
        text_inputs = text_encoder.tokenize(texts, truncate=True).to(device)
        text_features = model.text_model.encode_text(text_inputs).float()
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)

    return text_features.cpu().numpy()

In [10]:
# Obtendo o caminho absoluto dos vídeos
video_files = glob.glob(
    os.path.join(DATA_DIR, "*.avi")
)  # Na base escolhida, os vídeos tem formato .avi
if MAX_VIDEOS:
    video_files = random.sample(video_files, min(len(video_files), MAX_VIDEOS))

# Obtendo o embedding dos prompts do Conceito A para cálculo de similaridade
embeddings_dict = {}
text_emb = get_text_embedding(CONCEPT_TEXTS)

# Obtendo os candidatos a partir da similaridade de cosseno dos vídeos com o prompt do Conceito A
candidates_paths = []
best_sims = []
best_texts = []
for video_path in video_files:
    print(f"Processando: {video_path}")
    try:
        video_emb = get_video_embedding(
            video_path=video_path,
            fnum=FNUM,
            type_sample=TYPE_SAMPLING,
        )

        # sim = cosine_similarity([video_emb], text_emb)[0][0]  # type: ignore
        sim = cosine_similarity(text_emb, video_emb)  # type: ignore
        # sim = float(np.dot(video_emb, text_emb.T))  # type: ignore

        # Obtendo a melhor similaridade e prompt referente a ela
        best_sim = np.max(sim)
        best_text = CONCEPT_TEXTS[np.argmax(sim)]
        print(f" → Melhor Similaridade: {best_sim:.3f}")

        if best_sim >= SIM_THRESHOLD:
            # Se a similaridade foi maior que o limiar, salva o caminho do vídeo
            # para posterior análise
            candidates_paths.append(video_path)
            best_sims.append(best_sim)
            best_texts.append(best_text)
            print(f" → Adicionado como candidato.")

    except Exception as e:
        print(f"Erro no vídeo {video_path}: {e}")

# Com a lista de vídeos candidatos obtida, selecionamos aleatoriamente X amostras
if candidates_paths:
    print("\n" + "-" * 15 + " INÍCIO DOS RESULTADOS " + "-" * 15 + "\n")
    print(f"Total de candidatos encontrados: {len(candidates_paths)}")
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    num_select = min(len(candidates_paths), MAX_CANDIDATES)
    top_indices = np.argsort(best_sims)[-num_select:][::-1]  # índices dos maiores
    selected_videos = [candidates_paths[i] for i in top_indices]
    selected_sims = [best_sims[i] for i in top_indices]
    selected_texts = [best_texts[i] for i in top_indices]
    print(f"Vídeos selecionados: {selected_videos}")
    print(f"Quantidade de vídeos selecionados: {len(selected_videos)}")

    print(f"Iniciando processamento dos vídeos selecionados...")
    # Para cada vídeo selecionado, extraímos os frames e salvamos os embeddings
    for idx, video_path in enumerate(selected_videos):
        frames = sample_frames(video_path, type_sample=TYPE_SAMPLING)
        if frames is not None:
            print(f" → Salvando frames do vídeo {video_path}...")
            base_name = os.path.splitext(os.path.basename(video_path))[0]
            frame_dir = os.path.join(OUTPUT_DIR, base_name)
            os.makedirs(frame_dir, exist_ok=True)

            for i, frame in enumerate(frames):
                frame = Image.fromarray(
                    cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                )  # Coloração RGB
                # frame.save(os.path.join(frame_dir, f"frame_{i:03d}.jpg"))
                frame.save(os.path.join(frame_dir, f"{i:03d}.jpg"))

            # Salvar embedding
            embeddings_dict[base_name] = {
                "embedding": get_video_embedding(video_path, fnum=FNUM, type_sample=TYPE_SAMPLING),  # type: ignore
                "text": selected_texts[idx],
                "similarity": selected_sims[idx],
            }
            print(f" → Amostra candidata salva.")
        else:
            raise Exception(
                f"Erro ao processar a amostra candidata {video_path}. Tente novamente."
            )

    # Salvar todos embeddings em pickle
    with open(os.path.join(OUTPUT_DIR, "video_embeddings.pkl"), "wb") as f:
        pickle.dump(embeddings_dict, f)
else:
    raise Exception(
        "Não foram encontradas amostras candidatas para o processo. Tente novamente."
    )
print(f"Processamento concluído! {len(embeddings_dict)} vídeos salvos em {OUTPUT_DIR}")

Processando: ./data/PlayingPiano/v_PlayingPiano_g01_c01.avi
 → Melhor Similaridade: 0.208
 → Adicionado como candidato.
Processando: ./data/PlayingPiano/v_PlayingPiano_g01_c02.avi
 → Melhor Similaridade: 0.208
 → Adicionado como candidato.
Processando: ./data/PlayingPiano/v_PlayingPiano_g01_c03.avi
 → Melhor Similaridade: 0.208
 → Adicionado como candidato.
Processando: ./data/PlayingPiano/v_PlayingPiano_g01_c04.avi
 → Melhor Similaridade: 0.200
 → Adicionado como candidato.
Processando: ./data/PlayingPiano/v_PlayingPiano_g02_c01.avi
 → Melhor Similaridade: 0.195
Processando: ./data/PlayingPiano/v_PlayingPiano_g02_c02.avi
 → Melhor Similaridade: 0.200
Processando: ./data/PlayingPiano/v_PlayingPiano_g02_c03.avi
 → Melhor Similaridade: 0.188
Processando: ./data/PlayingPiano/v_PlayingPiano_g02_c04.avi
 → Melhor Similaridade: 0.196
Processando: ./data/PlayingPiano/v_PlayingPiano_g03_c01.avi
 → Melhor Similaridade: 0.192
Processando: ./data/PlayingPiano/v_PlayingPiano_g03_c02.avi
 → Melhor 

## Passo 2: Criação do Algoritmo do Nightshade para Vídeo

Com os candidatos salvos devidamente em suas pastas, juntamente com um arquivo pickle contendo os embeddings de vídeo, o texto correspondente e a similaridade de cosseno correspondente do vídeo com o texto, é possível aplicar o algoritmo do Nightshade na próxima etapa.

In [11]:
import os

## Passo 3: Aplicação do Algoritmo do Nightshade para Vídeo

In [12]:
import os

---

## 