<a href="https://colab.research.google.com/github/yonbrand/Gait-Recognition/blob/main/gait_identification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
import os 
import numpy as np
import pandas as pd
from numpy import genfromtxt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split

cuda = True if torch.cuda.is_available() else False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [9]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [10]:
PD_path='/content/drive/MyDrive/DL- Danny & Yonatan/Project DL/Data PD/Data'
HC_path='/content/drive/MyDrive/DL- Danny & Yonatan/Project DL/Data Healthy'



In [None]:
data_df = pd.read_csv(data_path)
labels_df = pd.read_csv(label_path)

In [None]:
print("data size: ", len(data_df.index))
print("labels size: ", len(labels_df.index))
print(len(labels_df.index)/sum(np.array(labels_df)))

data size:  11005406
labels size:  11005406
[13.85868815]


In [38]:
def get_data(df):
  file_names = os.listdir(PD_path)
  data_final=[]
  for ind in file_names:
    curr_file=str(PD_path+'/'+ind)
    df=pd.read_csv(curr_file)
    data_np = np.array(df)
    data = []
    n_counts = 128
    for i in range(int(np.floor(len(df.index)/ n_counts))):
      d = data_np[i*n_counts:(i+1)*n_counts,:]
      data.append(d)
      data = np.array(data)
    data_final=data_final.append(data)
  return data


In [None]:
def get_labels(df):

  data_np = np.array(df)

  labels = []

  n_counts = 128
  for i in range(int(np.floor(len(df.index)/ n_counts))):
    d = data_np[i*n_counts:(i+1)*n_counts]
    label = 0
    if np.sum(d)/d.shape[0] > 0.5:
      label = 1 
    
    #label = np.zeros(2)
    #label[l] = 1

    labels.append(label)

  labels = np.array(labels)

  return labels

In [None]:
data = get_data(data_df)
data.shape

(85979, 128, 3)

In [None]:
labels = get_labels(labels_df)
labels.shape

(85979,)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.33, random_state=42)

In [None]:
class LSTM(nn.Module):
  def __init__(self, n_inputs, n_steps, lstm_dim, num_layers):
      super(LSTM, self).__init__()

      self.linear = nn.Linear(n_inputs, lstm_dim)
      self.lstm = nn.LSTM(lstm_dim, lstm_dim, num_layers=num_layers, batch_first=True)
      
      self.n_layers = num_layers
      self.lstm_dim = lstm_dim
      self.n_steps = n_steps
      self.n_inputs = n_inputs
      
  def forward(self, x, h):

      n_batch, n_counts, n_input = list(x.size())
      #print(batch, n_counts, n_input)
      x = x.transpose(1, 0)
      #print(x.size())
      x = x.reshape(n_counts * n_batch, n_input)
      #print(x.size())
      x = self.linear(x)
      #print(x.size())
      x = x.reshape(n_counts , n_batch, self.lstm_dim)
      #print(x.size())

      out, (h, c) = self.lstm(x, h)

      return out[-1], (h, c)

  def init_states(self):
    weight = next(self.parameters()).data
    hidden = (weight.new(self.n_layers, self.n_steps, self.lstm_dim).zero_().to(device),
              weight.new(self.n_layers, self.n_steps, self.lstm_dim).zero_().to(device))
    return hidden

In [None]:
class CNN(nn.Module):
  def __init__(self):
      super().__init__()

      self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
      self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)

      self.linear = nn.Linear(in_features=64*32*3, out_features=64)


  # define forward function
  def forward(self, x):

      n_batch, n_counts, n_input = list(x.size())

      x = x.reshape(n_batch, 1, n_counts, n_input)

      x = self.conv1(x)
      x = F.relu(x)
      x = F.max_pool2d(x, kernel_size=(2,1), stride=(2,1))
        
      x = self.conv2(x)
      x = F.elu(x)
      x = F.max_pool2d(x, kernel_size=(2,1), stride=(2,1))

      x = x.reshape(-1, 64*32*3)

      x = self.linear(x)
      x = F.elu(x)

      return x


In [None]:
class Main(nn.Module):
    def __init__(self, CNNmodel, LSTMmodel):
        super().__init__()

        self.CNNmodel = CNNmodel
        self.LSTMmodel = LSTMmodel

        self.linear = nn.Linear(in_features=128, out_features=2)


    # define forward function
    def forward(self, x, states):

        x_cnn =  self.CNNmodel(x)

        # states = self.LSTMmodel.init_states()
        # states = tuple([e.data for e in states])
        x_lstm, states =  self.LSTMmodel(x, states)

        x = torch.cat([x_cnn, x_lstm], axis=1)

        out = self.linear(x)
        #out = torch.sigmoid(x)

        #out = out.reshape(-1)
        return out

    def init_states(self):
        return self.LSTMmodel.init_states()


In [None]:
learning_rate = 0.00001
batch_size = 64
display_step = 1


In [None]:

tensor_x_train = torch.Tensor(X_train).float() # transform to torch tensor
tensor_y_train = torch.Tensor(y_train).float()

train_dataset = TensorDataset(tensor_x_train,tensor_y_train) # create your datset
train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True) # create your dataloader


tensor_x_test = torch.Tensor(X_test).float() # transform to torch tensor
tensor_y_test = torch.Tensor(y_test).float()

test_dataset = TensorDataset(tensor_x_test,tensor_y_test) # create your datset
test_dataloader = DataLoader(test_dataset, batch_size = batch_size) # create your dataloader

In [None]:
n_inputs = 3 
n_steps = 128
lstm_dim = 64
layers = 2
LSTMmodel = LSTM(n_inputs, n_steps, lstm_dim, layers).to(device)

In [None]:
CNNmodel = CNN().to(device)

In [None]:
model = Main(CNNmodel, LSTMmodel).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

In [None]:
all_train_loss = []
all_test_loss = []


for epoch in range(3):
      
  states = model.init_states()
  model.train()
  train_loss = []
  for batch_idx, (data, labels) in enumerate(train_dataloader):
    
    states = tuple([e.data for e in states])

    preds = model(data.to(device), states)

    #loss = F.cross_entropy(preds, labels.to(device))
    loss = criterion(preds, labels.long().to(device))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    bach_loss = loss.item()

    train_loss.append(bach_loss)
    if batch_idx % 500 == 0:    
         print('[%d, %5d] loss: %.3f' %
               (epoch, batch_idx, bach_loss))
         
  epochloss = np.average(train_loss)
  all_train_loss.append(epochloss)
  #model.eval()

  print("epoch %d,  train loss : %.3f," % (epoch, epochloss))  


[0,     0] loss: 0.673
[0,   500] loss: 0.231
epoch 0,  train loss : 0.228,
[1,     0] loss: 0.101
[1,   500] loss: 0.096
epoch 1,  train loss : 0.207,
[2,     0] loss: 0.179
[2,   500] loss: 0.102
epoch 2,  train loss : 0.196,
