In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as f
from torch.utils.data import Dataset,DataLoader
from torch.optim.lr_scheduler import StepLR,ReduceLROnPlateau
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import json
from scipy import signal
import os
from collections import Counter
import random
# os.environ["MP_DUPLICATE_LIB_OK"] = "True"

## Call model

In [None]:
class lstm_linear(nn.Module):
    def __init__(self,chunk =10,sigmoid_state=True,len_input = 16,outputa = 50):
        super().__init__()
        if chunk == 10:
            sep = 384//4 # due to 2 maxPool1d Kernel_size = 2
        if chunk == 50:
            sep = 1280
        if chunk == 100:
            sep = 3200//4
        if chunk == 200:
            sep = 6400//4

        self.dropout = nn.Dropout(p=0.3)
        
        self.lstm1 = nn.LSTM(chunk,128)
        
        self.biLSTM = nn.LSTM(chunk,128,bidirectional=True)
        self.linear1 = nn.Linear(7168,128)
        self.Lazyl1 = nn.LazyLinear(128)
        self.linear2 = nn.Linear(128,outputa)
        self.sigmoid  = nn.Sigmoid()
        self.softmax  = nn.Softmax(dim=1)
        self.sigmoid_state = sigmoid_state
    def forward(self,x):
        
        out1,(hn,cn) = self.biLSTM(x)
        out1 = out1.flatten()
        output = self.Lazyl1(out1)
        y_final = self.linear2(output)
        
        if self.sigmoid_state:
            y_final = self.sigmoid(y_final)
        else:
            y_final = self.softmax(y_final)

        return y_final
    
class CustomDataset(Dataset):
    def __init__(self,
                 csv_path,
                 batch=16,
                 chunk = 100,
                 vocab_path = "../Sign_Language_Detection/label.json",
                table:bool = False,
                dataframe=None):
        
        
        with open(vocab_path,"r") as f:
            compare = json.load(f)
        self.vocab = len(compare)
        
        if not table:
            self._data_csv = pd.read_csv(csv_path)
        else:
            self._data_csv = dataframe
        
        
        self._data_csv = self._data_csv[~(self._data_csv.Label.isin([ "cooldown","error_redo","break_time",]))]
        self._data_csv["Label"] = self._data_csv["Label"].apply(lambda x:compare[x])
        self.train_data = self.convert_data_csv_train(self._data_csv,compare,segment=chunk,range_data=25)
        # print(len(self.train_data))
        
        print(self.train_data.size())
        fity = []
        normal =[]
        for i in self.train_data:
            # print(i[:,-1][0])
            if int(i[:,-1][-1]) == 0:
                fity.append(i)
            else:
                normal.append(i)
        # print(len(fity))
        # print(len(normal))
        fity = random.sample(fity,int(len(fity) * 0.5))
        normal.extend(fity)
        self.train_data = normal
        self.nums  = len(self.train_data)
        self.answer_transform = []
    
        self.train_data = torch.tensor([i.tolist() for i in self.train_data])
        
        
        for i in range(0,len(self.train_data)):
            # print(self.train_data)
            
            
            dummy = torch.zeros(self.vocab)
            ct = Counter(self.train_data[i][:,-1].tolist()).most_common()
            if len(ct) == 2 and ct[0][0] ==0:
                idx,count = Counter(self.train_data[i][:,-1].tolist()).most_common()[1]
            else:
                idx,count = Counter(self.train_data[i][:,-1].tolist()).most_common()[0]
            
            dummy[int(idx)] = 1
            self.answer_transform.append(dummy)
        
        self.train_data = self.train_data[:,:,:-1]
        self.nums,self.segment,self.input = self.train_data.size()
        
        
    def __getitem__(self, index):
        
        inputs = self.train_data[index]
        answer = self.answer_transform[index]
        
        if torch.cuda.is_available():
            return inputs.to(torch.float32).movedim(1,0).to("cuda"),answer.to("cuda")
        else:
            return inputs.to(torch.float32).movedim(1,0),answer
        
        
    def __len__(self):
        return self.nums
    
    def len_answer(self):
        return self.vocab
    
    def data_info(self):
        return self.nums,self.segment,self.input,self.train_data
    
    
    
    def convert_data_csv_train(self,data,compare,segment=50,range_data = 0):

        datta = []
        previous = None
        samples = []
        abc= []
        
        
        data['group_id'] = (data['Label'] != data['Label'].shift()).cumsum()
        grouped_dfs = [g.drop(columns='group_id').values for _, g in data.groupby('group_id')]
        
        
        
        print("len(data): ",len(grouped_dfs))
        print("filter Value")
        all_data = []
        for i in tqdm(grouped_dfs,total = len(grouped_dfs)):
            if len(i) > range_data:
                all_data.append(i)


        print("pad&mean Value")

        real = []
        for i in tqdm(all_data,total = len(all_data)):
            segment = segment
            if len(i) < segment:
                tensor_df = (torch.tensor(i))
                n,b = tensor_df.size()
                padded_tensor = torch.nn.functional.pad(tensor_df, pad=(0, 0, segment-n, 0), mode='constant', value=0)
                # print(padded_tensor.size())
                real.append(padded_tensor.tolist())
            else:
                step = int(np.ceil(len(i)//segment))
                temp = []
                for k in range(segment):
                    temp.append(torch.mean(torch.tensor(i[k*step:(k+1)*step]),dim=0).tolist())
                real.append(temp)

        train_data = torch.tensor(real)
        return train_data
    
    

## Train

In [4]:
import glob
import os

base_df = pd.DataFrame()
for i in glob.glob(r"./collect_data/new_data/*"):
    df = pd.read_csv(rf"{i}")
    base_df = pd.concat([base_df,df])
print(len(base_df))


481770


In [None]:
# train_dataset = CustomDataset("collect_data/20250624_101902_ชาตชาย24062025_sensor.csv",vocab_path="../Sign_Language_Detection/label.json",chunk=50,table=True,dataframe=base_df)
# data_answer = []
# for inputs,answer in tqdm(train_dataset):
      
      # try:
          # print(torch.tensor(inputs[i:i+chunk]).size())
    #   data_answer.append(answer)


# train.size()

# with open("rollback.json",'r') as f:
#     ct = json.load(f)
    
    
# print(len(data_answer))
# label_list = [int(torch.argmax(i)) for i in data_answer] 
# nv = Counter(label_list)
# print(nv.most_common())



# not_eng = []
# for i,v in nv.most_common():
#     print(f"{ct[str(i)]} : {v} ")

In [8]:


epoch = 1
lr = 2e-5
chunk = 50
e = 0
best_loss = 0
path_save = "../Sign_Language_Detection/model/Version1"
num_still = 0
sigoid_state = True
batch_size = 1

train_dataset = CustomDataset("collect_data/20250624_101902_ชาตชาย24062025_sensor.csv",vocab_path="../Sign_Language_Detection/label.json",chunk=50,table=True,dataframe=base_df)


len_output = train_dataset.len_answer()
len_input = train_dataset.data_info()[-2]
train_dataset = DataLoader(train_dataset,batch_size=batch_size)

if torch.cuda.is_available():
  lstm = lstm_linear(chunk,sigmoid_state=sigoid_state,len_input=len_input,outputa=len_output).to("cuda")
else:
  lstm = lstm_linear(chunk,sigmoid_state=sigoid_state,len_input=len_input,outputa=len_output)






optimizer = torch.optim.AdamW(lstm.parameters(),lr=lr,weight_decay=0.0001)
criterion = nn.MSELoss()
scheduler = ReduceLROnPlateau(optimizer, 'min',patience =3  ,min_lr = 5e-6,factor=0.5)
print(f" data train = {int(len(train_dataset)*batch_size)} with {len_input} feature")


n = 0
for param in lstm.parameters():
  param.requires_grad=True
lstm.train()

for k in range(1,epoch+1):
    loss_total = 0
    data_answer = []
    for inputs,answer in tqdm(train_dataset):
      answer = answer[0]
      data_answer.append(answer)
      output = lstm(inputs)
      optimizer.zero_grad()
      if torch.argmax(answer).item() == 0:
        loss = criterion(output,answer)
      else:
        loss = criterion(output,answer)
      
      
      loss.backward()

      optimizer.step()

      loss_total += loss
      n+=1
        
    if best_loss == 0 or best_loss > loss_total/len(train_dataset):
      best_loss = loss_total/len(train_dataset)
      state_dict = lstm.state_dict()
      e = k
      num_still = 0
    else:
      num_still +=1
      
    scheduler.step(loss_total/len(train_dataset))
    num_still = 0
    if num_still >= 3:
      print("step up to learning = ",scheduler.get_last_lr())
      break
      
    
    print(f"epoch number {k} loss = {loss_total/len(train_dataset)} with lr = {scheduler.get_last_lr()}")
print(f"best loss at epoch = {e} with {best_loss}")
torch.save(state_dict,f"{path_save}/model_epoch_{e}")
torch.save(lstm, r"./model/Version1/finalmodel.pt")


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._data_csv["Label"] = self._data_csv["Label"].apply(lambda x:compare[x])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['group_id'] = (data['Label'] != data['Label'].shift()).cumsum()


extrac Value
len(data):  6811
filter Value


  0%|          | 0/6811 [00:00<?, ?it/s]

pad&mean Value


  0%|          | 0/3579 [00:00<?, ?it/s]

torch.Size([3579, 50, 30])
 data train = 3490 with 29 feature


  0%|          | 0/3490 [00:00<?, ?it/s]

epoch number 1 loss = 0.01996440440416336 with lr = [2e-05]
best loss at epoch = 1 with 0.01996440440416336


In [11]:
import glob
import os



test_df = pd.read_csv(rf"F:\Hybridmodel-project\Sign_Language_Detection\collect_data\20250715_111750_DATA_INDICATOR_sensor.csv")
# test_df  = test_df[test_df.columns[1:]]
test_dataset = CustomDataset("collect_data/20250624_131408_พชชาภา24062025_sensor.csv",vocab_path="../Sign_Language_Detection/label.json",chunk=chunk,table=True,dataframe=test_df)
test_dataset = DataLoader(test_dataset,batch_size=1)

y_pred = []
y_true = []

with torch.no_grad():
    for inputs,answer in tqdm(test_dataset):
        output = lstm(inputs)
        y_pred += [(torch.argmax(output,dim=0)).tolist()]
        y_true += (torch.argmax(answer,dim=1)).tolist()
            
            

extrac Value
len(data):  201
filter Value


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._data_csv["Label"] = self._data_csv["Label"].apply(lambda x:compare[x])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['group_id'] = (data['Label'] != data['Label'].shift()).cumsum()


  0%|          | 0/201 [00:00<?, ?it/s]

pad&mean Value


  0%|          | 0/120 [00:00<?, ?it/s]

torch.Size([120, 50, 30])


  0%|          | 0/109 [00:00<?, ?it/s]

In [12]:
from sklearn.metrics import f1_score,recall_score,accuracy_score
f1_scores = f1_score(y_true, y_pred, average="micro")
print("f1 score    ",f1_scores)
recall_scores = recall_score(y_true, y_pred, average="micro")
print("recal score ",recall_scores)
acc = accuracy_score(y_true, y_pred)
print("acc score   ",acc)

f1 score     0.10091743119266056
recal score  0.10091743119266056
acc score    0.10091743119266056


# How to use Model

In [50]:
def convert_data_csv_train(data,segment=50):

    real = []
    for i in tqdm(data,total = len(data)):
        segment = segment
        if len(i) < segment:
            # tensor_df = (torch.tensor(i.clone()))
            tensor_df = i.clone()
            n,b = tensor_df.size()
            padded_tensor = torch.nn.functional.pad(tensor_df, pad=(0, 0, segment-n, 0), mode='constant', value=0)
            # print(padded_tensor.size())
            real.append(padded_tensor.tolist())
        else:
            step = int(np.ceil(len(i)//segment))
            temp = []
            for k in range(segment):
                temp.append(torch.mean(torch.tensor(i[k*step:(k+1)*step]),dim=0).tolist())
            real.append(temp)
    return torch.tensor(real)

In [75]:
model = torch.load(r"./model/Version1/finalmodel.pt",weights_only=False)
with open(r"rollback.json",'r') as f:
    vocap = json.load(f)

In [None]:
data = torch.rand(1,29)
datas = torch.rand(1,29)
i=0
while i<20:
    
    ### you need to put a streaming inout here
    
    
    datas = torch.concat([datas,data])
    i+=1
    
tas = convert_data_csv_train(datas.unsqueeze(0))
tas = tas.movedim(1,2)
answer = torch.argmax(model(tas))
finalans = vocap[str(answer.item())]

torch.Size([21, 29])


  0%|          | 0/1 [00:00<?, ?it/s]

torch.Size([1, 29, 50])
