In [None]:
%pip install --upgrade --quiet langchain langchain-openai mediapipe
%pip install -r requirements.txt

In [2]:
!cp -r /content/drive/MyDrive/TIDLLM/lib/ .

In [3]:
%cd "/content/drive/MyDrive/TIDLLM"

/content/drive/MyDrive/TIDLLM


In [4]:
import os
import re
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

import cv2
import numpy as np
import pandas as pd
import tqdm
import glob
import matplotlib.pyplot as plt

import mediapipe as mp

import warnings
warnings.filterwarnings('ignore')

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import tarfile

In [5]:
from lib.utils.pose import *

In [6]:
def get_pose_array(SAMPLE_POSE):
    """Converts the pose data into a numpy array
    """

    POSE_RAW = pd.DataFrame(SAMPLE_POSE['pose'])
    RIGHT_HAND_RAW = pd.DataFrame(SAMPLE_POSE['right'])
    LEFT_HAND_RAW = pd.DataFrame(SAMPLE_POSE['left'])

    POSE_DF = {}

    for col in POSE_RAW.columns:
        POSE_DF[ 'POSE_' + col + '_X'] = POSE_RAW[col].apply(lambda x: x[0])
        POSE_DF[ 'POSE_' + col + '_Y'] = POSE_RAW[col].apply(lambda x: x[1])

    for col in RIGHT_HAND_RAW.columns:
        POSE_DF[ 'RIGHT_' + col + '_X' ] = RIGHT_HAND_RAW[col].apply(lambda x: x[0])
        POSE_DF[ 'RIGHT_' + col + '_Y' ] = RIGHT_HAND_RAW[col].apply(lambda x: x[1])

    for col in LEFT_HAND_RAW.columns:
        POSE_DF[ 'LEFT_' + col + '_X' ] = LEFT_HAND_RAW[col].apply(lambda x: x[0])
        POSE_DF[ 'LEFT_' + col + '_Y' ] = LEFT_HAND_RAW[col].apply(lambda x: x[1])

    POSE_DF = pd.DataFrame(POSE_DF)

    return POSE_DF


def get_matrices(POSE_DF):
    """Converts the pose data into a numpy array of distance matrices
    """
    x_cols = [col for col in POSE_DF.columns if col.endswith('_X')]
    y_cols = [col for col in POSE_DF.columns if col.endswith('_Y')]

    frames = []
    for i in range(1, POSE_DF.shape[0]):
        x_row = POSE_DF[x_cols].iloc[i].to_numpy()
        y_row = POSE_DF[y_cols].iloc[i].to_numpy()

        def get_difference_matrix(row):
            m, n = np.meshgrid(row, row)
            out = m-n
            return out

        x_diff = get_difference_matrix(x_row)
        y_diff = get_difference_matrix(y_row)

        frame = np.stack([x_diff, y_diff], axis=2)
        frames.append(frame)

    frames = np.stack(frames, axis=0)
    return frames

In [7]:
def gloss2text(GLOSSES):
    """Translate set of glosses to text
    """
    return chat_bot.invoke(
        [
            HumanMessage(
                content=f"""
                    Kelimelerin girdisinden anlamlı türkçe cümle oluştur

                    Girdi:{GLOSSES}
                    Cümle:
                """
            )
        ]
    ).content

In [8]:
class GlossClassifier(nn.Module):
    def __init__(self, input_size, hidden_dim, num_layers, out_size):
        super(GlossClassifier, self).__init__()
        self.input_size = input_size
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.out_size = out_size

        self.lstm = nn.LSTM(input_size, hidden_dim, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_dim, out_size)
        self.dropout = nn.Dropout(0.2)


    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        x = self.dropout(x)
        out, (ht, ct) = self.lstm(x, (h0, c0))
        return self.linear(ht[-1])


In [9]:
class Quantizer(nn.Module):
    """
    Quantizer for Continuous Sign Language Recognition

    Args:
        quantizer (nn.Module): base vq-vae based quantizer model
        num_frames (int): number of frames to consider
        stride (int): stride of the sliding window

    ### A. Usage:

    Define the quantizer model

    ```python
    quantizer = Quantizer(quantizer, num_frames=25, stride=1)
    quantized, indices = quantizer.quantize(x)
    ```

    If you want to transform the indices into one-hot, set `transform=True`

    ```python
    quantized, indices = quantizer.quantize(x, transform=True)
    ```

    Also process the video to get the pose estimation

    ```python
    x = Quantizer.process_video(video_path)
    ```
    """

    def __init__(self,
                 base_model,
                 num_frames=25,
                 stride=1,
                 num_codebooks=1,
                 codebook_size=512):
        super(Quantizer, self).__init__()

        self.quantizer = torch.load(
            base_model,
            map_location=torch.device('cpu' if not torch.cuda.is_available() else 'cuda:0')
        )

        self.quantizer.eval()

        self.num_frames = num_frames
        self.stride = stride

        self.codebook_size = codebook_size
        self.num_codebooks = num_codebooks

    @torch.no_grad()
    def encode(self, x):
        x_hat = self.quantizer.encoder(x)
        quantized, indices, _ = self.quantizer.vq_vae(x_hat)
        return quantized, indices

    def quantize(self, x, transform=False):
        x = x.permute(0, 4, 1, 2, 3).contiguous()
        # NOTE: x.shape = (B, C, D, H, W)
        # Slice input channels
        x = x[:, :self.quantizer.encoder.input_size[1], :, :, :]
        quantized, indices = self.encode(x)
        if transform:
            indices = indices.view(-1, self.num_codebooks)
            # turn indices into one-hot
            indices = F.one_hot(indices, num_classes= self.num_codebooks * self.codebook_size).float()

        return quantized, indices

    def process_video(self, video_path: str):

        print('** Get Pose Estimation **')
        pose, _ = get_pose_estimation(video_path)
        print('** Translate Pose Array **')
        pose_array = get_pose_array(pose)
        # Replace missing values with zeros
        pose_array = pose_array.replace(np.nan, 0)
        matrices = get_matrices(pose_array)

        # Generate overlapping windows
        print('** Get windows **')
        windows = []
        for i in range(0, len(matrices) - self.num_frames, self.stride):
            window = matrices[i:i+self.num_frames]
            windows.append(window)

        # Convert to tensor
        data = torch.tensor(windows).float()

        return data

    def process_pose(self, pose_array):

        pose_array = pose_array.replace(np.nan, 0)
        matrices = get_matrices(pose_array)

        # Generate overlapping windows
        windows = []
        for i in range(0, len(matrices) - self.num_frames, self.stride):
            window = matrices[i:i+self.num_frames]
            windows.append(window)

        # Convert to tensor
        data = torch.tensor(windows).float()

        return data

In [10]:
chat_bot = ChatOpenAI(
    model="gpt-3.5-turbo-1106",
    organization='###',
    temperature=0.0,
    api_key='###'
)

In [11]:
QUANTIZER_PATH = 'quantizer/encoder.pt'
GLOSS_MODEL_PATH = '/content/drive/MyDrive/TIDLLM/dataset/bsign22k/pose_arrayentire_model.pt'

In [None]:
!pip install vector_quantize_pytorch

In [32]:
quantizer = Quantizer(
    base_model=QUANTIZER_PATH,
    num_frames=25,
    stride=5,
    num_codebooks=5,
    codebook_size=128,
)

In [33]:
gloss_model = torch.load(GLOSS_MODEL_PATH)

In [34]:
quantizer.to('cuda:0')
gloss_model.to('cuda:0')

GlossClassifier(
  (lstm): LSTM(768, 256, num_layers=2, batch_first=True)
  (linear): Linear(in_features=256, out_features=745, bias=True)
  (dropout): Dropout(p=0.2, inplace=False)
)

In [72]:
import json

class SignTranslation():

    def __init__(self,
                 quantizer,
                 gloss_model):

        self.quantizer = quantizer
        self.gloss_model = gloss_model

        self.ID2GLOSS = json.loads(open('/content/drive/MyDrive/TIDLLM/dataset/bsign22k/pose_arraygloss_id_pairs.json', 'r').read())


    def translate(self, video, frames = 10, stride = 1):

        print('** Processing video **')
        poses = self.quantizer.process_video(video)

        print('** Quantize video **')
        quantization, _ = quantizer.quantize(poses.to('cuda:0'))

        batch = []
        matrix_shape = quantization.shape[0]
        for i in range(0, matrix_shape - frames, stride):
            window = quantization[None,i:i+frames,:]
            batch.append(window)

        batched = torch.cat(batch, axis=0)

        print('** Predicting Glossess **')
        outputs = self.gloss_model(batched.to('cuda:0'))

        gloss_list = []
        for cls in outputs.argmax(0):
            gloss_list.append(self.ID2GLOSS[str(cls.item()+1)])

        gloss_sent = ''
        current_gloss = None
        for gloss in gloss_list:
            if current_gloss != gloss:
                gloss_sent += f'{gloss} '

        return gloss2text(gloss_sent)

In [73]:
translator = SignTranslation(
    quantizer=quantizer,
    gloss_model=gloss_model,
)

In [74]:
#gloss2text('BEN BUGÜN YEMEK PAZAR BEN GİTMEK ELMA GRUP PORTAKAL GRUP YEMEK ÇEŞİT GÖRE GRUP GRUP BEN BİR SEÇMEK SEÇMEK')

In [78]:
import time

t1 = time.perf_counter()

translation = translator.translate('/content/drive/MyDrive/TIDLLM/dataset/sample.mp4', frames=10, stride=3)

t2 = time.perf_counter()

print(f'Translation: {translation}')
print(f'Took seconds: {t2-t1}')

** Processing video **
** Get Pose Estimation **
** Translate Pose Array **
** Get windows **
** Quantize video **
torch.Size([9, 10, 768])
** Predicting Glossess **
Translation: Acil servis, acılarınızı hafifletmek için burada.
Took seconds: 28.154761472000246
