In [1]:
import importlib
import requirements
importlib.reload(requirements)
from requirements import *

In [12]:
def process_video_with_selected_face_landmarks(input_video_path, output_video_path):
    # Inicializar o MediaPipe Holistic
    mp_holistic = mp.solutions.holistic
    mp_drawing = mp.solutions.drawing_utils

    # Lista de índices de landmarks faciais a serem mantidos
    face_landmark_indices = [10, 67, 297, 54, 284, 162, 389, 234, 454, 132, 
                              361, 58, 288, 136, 365, 149, 378, 148, 377]

    # Abrir o vídeo de entrada
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print(f"Erro ao abrir o vídeo: {input_video_path}")
        return

    # Obter informações do vídeo
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Configurar o vídeo de saída
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec para salvar vídeo (MP4)
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    # Inicializar o modelo Holistic
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        frame_idx = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_idx += 1
            print(f"Processando frame {frame_idx}/{frame_count}...")

            # Criar um frame em branco para desenhar os landmarks
            blank_frame = np.ones((frame_height, frame_width, 3), dtype=np.uint8) * 255

            # Converter BGR para RGB (requisito do mediapipe)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Processar o frame para landmarks
            results = holistic.process(rgb_frame)

            # Desenhar landmarks da pose
            if results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    blank_frame, 
                    results.pose_landmarks, 
                    mp_holistic.POSE_CONNECTIONS,
                    mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                    mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
                )

            # Desenhar landmarks das mãos
            if results.left_hand_landmarks:
                mp_drawing.draw_landmarks(
                    blank_frame, 
                    results.left_hand_landmarks, 
                    mp_holistic.HAND_CONNECTIONS,
                    mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2),
                    mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2)
                )
            if results.right_hand_landmarks:
                mp_drawing.draw_landmarks(
                    blank_frame, 
                    results.right_hand_landmarks, 
                    mp_holistic.HAND_CONNECTIONS,
                    mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2),
                    mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2)
                )

            # Desenhar apenas landmarks faciais filtrados
            if results.face_landmarks:
                for idx in face_landmark_indices:
                    landmark = results.face_landmarks.landmark[idx]
                    x = int(landmark.x * frame_width)
                    y = int(landmark.y * frame_height)
                    cv2.circle(blank_frame, (x, y), 2, (0, 0, 0), -1)

            # Escrever o frame anotado no vídeo de saída
            out.write(blank_frame)

        # Liberar recursos
        cap.release()
        out.release()
        print(f"Vídeo processado e salvo em: {output_video_path}")

input_video_path = "INPUT/VIDEOS/MINDS/A/Acontecer1.mp4"
output_video_path = "OUTPUT/teste.mp4"
process_video_with_selected_face_landmarks(input_video_path, output_video_path)


Processando frame 1/104...
Processando frame 2/104...
Processando frame 3/104...
Processando frame 4/104...
Processando frame 5/104...
Processando frame 6/104...
Processando frame 7/104...
Processando frame 8/104...
Processando frame 9/104...
Processando frame 10/104...
Processando frame 11/104...
Processando frame 12/104...
Processando frame 13/104...
Processando frame 14/104...
Processando frame 15/104...
Processando frame 16/104...
Processando frame 17/104...
Processando frame 18/104...
Processando frame 19/104...
Processando frame 20/104...
Processando frame 21/104...
Processando frame 22/104...
Processando frame 23/104...
Processando frame 24/104...
Processando frame 25/104...
Processando frame 26/104...
Processando frame 27/104...
Processando frame 28/104...
Processando frame 29/104...
Processando frame 30/104...
Processando frame 31/104...
Processando frame 32/104...
Processando frame 33/104...
Processando frame 34/104...
Processando frame 35/104...
Processando frame 36/104...
P

In [None]:
class TemporalDataset(Dataset):
    def __init__(self, input, method:str):
        self.X, self.y = method(input)
        self.shape = tuple(self.X.shape)
        self.labels = list(set(np.array(self.y)))
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [79]:
class SpatialRecurrentTransformerModel(nn.Module):
    def __init__(self, dataset:Dataset, heads, layers, dim_feedforward, dropout):
        super(SpatialRecurrentTransformerModel, self).__init__()
        self.embedding = nn.Linear(dataset.shape[2], dataset.shape[2])
        self.positional_embedding = nn.Parameter(torch.rand(dataset.shape[1], dataset.shape[2]))
        transformer_layer = nn.TransformerEncoderLayer(d_model=dataset.shape[2], nhead=heads, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(transformer_layer, num_layers=layers)
        self.feedforward = nn.Sequential(nn.Linear(dataset.shape[2], dim_feedforward), nn.ReLU(), nn.Dropout(dropout), nn.Linear(dim_feedforward, dataset.shape[2]))
        self.landmark_attention = nn.MultiheadAttention(embed_dim=dataset.shape[2], num_heads=heads, dropout=dropout)
        self.layer_norm = nn.LayerNorm(dataset.shape[2])
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(dataset.shape[2], len(dataset.labels))

    def forward(self, x):
        x = self.embedding(x)
        x = x + self.positional_embedding.unsqueeze(0).expand(x.size(0), -1, -1)
        x = self.layer_norm(x)
        x = self.transformer_encoder(x)
        x = self.dropout(x)
        spatial_output, _ = self.landmark_attention(x, x, x)
        att = self.feedforward(spatial_output)
        x = x + att
        x = x.sum(dim=1)
        #x = x.mean(dim=1)
        x = self.dropout(x)
        x = self.layer_norm(x)
        return self.fc(x)

In [80]:
def validation(train_accuracy, val_accuracy, max=0.985, threshold=1e-5):
    if train_accuracy[-1] >= max and val_accuracy[-1] >= max:
        return True
    return np.var(train_accuracy[-5:]) <= threshold

def train_model(dataset, train, epochs, batch_size, heads, layers, dim_feedforward, dropout, learning_rate, weight_decay):
    train_dataset, val_dataset = random_split(dataset, [int(train * len(dataset)), len(dataset) - int(train * len(dataset))])
    train_loader, val_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True), DataLoader(val_dataset, batch_size=batch_size)
    model = SpatialRecurrentTransformerModel(dataset, heads, layers, dim_feedforward, dropout)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    progress_bar = tqdm(total=len(train_loader), dynamic_ncols=True, desc="Training", position=0, leave=False)
    train_loss, val_loss, train_accuracy, val_accuracy = [0, 0], [0, 0], [0, 0], [0, 0]
    for epoch in range(epochs):
        progress_bar.set_description(f"Epoch {epoch + 1}/{epochs}")
        progress_bar.reset()
        model.train()
        sum_loss = 0
        all_train_preds, all_train_labels, all_val_preds, all_val_labels = [], [], [], []
        for batch_data, batch_labels in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_data)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            _, preds = torch.max(outputs, 1)
            all_train_preds.extend(preds.cpu().numpy())
            all_train_labels.extend(batch_labels.cpu().numpy())
            sum_loss += loss.item()
            progress_bar.set_postfix({"LOSS(train)": train_loss[-1], "LOSS(val)": val_loss[-1], "ACC(train)": train_accuracy[-1], "ACC(val)": val_accuracy[-1], "DIFF": train_accuracy[-1]-train_accuracy[-2]})
            progress_bar.update(1)
        sum_loss = 0
        model.eval()
        with torch.no_grad():
            for batch_data, batch_labels in val_loader:
                outputs = model(batch_data)
                loss = criterion(outputs, batch_labels)
                sum_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                all_val_preds.extend(predicted.cpu().numpy())
                all_val_labels.extend(batch_labels.cpu().numpy())
        train_loss.append(sum_loss/len(train_loader))
        val_loss.append(sum_loss/len(val_loader))
        train_accuracy.append(accuracy_score(all_train_labels, all_train_preds))
        val_accuracy.append(accuracy_score(all_val_labels, all_val_preds))
        if validation(train_accuracy, val_accuracy):
            break
    return model, train_loss, val_loss, train_accuracy, val_accuracy

In [None]:
TRAIN = 0.8
EPOCHS = 20
BATCH = 16
HEADS = 86
LAYERS = 6
DFF = 1024
DROPOUT = 0.1
LR = 1e-4
WD = 1e-4

dataset = torch.load("OUTPUT/TENSORS/inerpolate2h.pt", weights_only=False)
model, train_loss, val_loss, train_accuracy, val_accuracy = train_model(dataset, train=TRAIN, epochs=EPOCHS, batch_size=BATCH, heads=HEADS, layers=LAYERS, dim_feedforward=DFF, dropout=DROPOUT, learning_rate=LR, weight_decay=WD)
print(f'TRAIN: {train_accuracy[-1]}, VAL: {val_accuracy[-1]}')
graph(train_accuracy, val_accuracy, metric='Acurácia')
graph(train_loss, val_loss, metric='Perda')