In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Installation

In [None]:
!pip install mediapipe



In [None]:
!pip install moviepy
!pip3 install imageio==2.4.1
!pip install imageio-ffmpeg
!pip install --upgrade moviepy

Collecting imageio==2.4.1
  Using cached imageio-2.4.1-py3-none-any.whl
Installing collected packages: imageio
  Attempting uninstall: imageio
    Found existing installation: imageio 2.34.2
    Uninstalling imageio-2.34.2:
      Successfully uninstalled imageio-2.34.2
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
moviepy 1.0.3 requires imageio<3.0,>=2.5; python_version >= "3.4", but you have imageio 2.4.1 which is incompatible.[0m[31m
[0mSuccessfully installed imageio-2.4.1
Collecting imageio<3.0,>=2.5 (from moviepy)
  Using cached imageio-2.34.2-py3-none-any.whl (313 kB)
Installing collected packages: imageio
  Attempting uninstall: imageio
    Found existing installation: imageio 2.4.1
    Uninstalling imageio-2.4.1:
      Successfully uninstalled imageio-2.4.1
Successfully installed imageio-2.34.2


In [None]:
pip install keras --upgrade



### Video Trim

**This script trims raw surveillance or gait videos to extract only a single walking cycle of a person.**

In [None]:
import cv2
import os
import mediapipe as mp
input_dir = 'path_to_input'
output_dir = 'path_to_output'


mp_pose = mp.solutions.pose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7)


for video_filename in os.listdir(input_dir):

    input_path = os.path.join(input_dir, video_filename)


    cap = cv2.VideoCapture(input_path)

    num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    frames_with_people = []


    for i in range(num_frames):

        ret, frame = cap.read()

        results = mp_pose.process(frame)


        if results.pose_landmarks is not None:

            frames_with_people.append(i)

    if len(frames_with_people) > 0:
        start_frame = frames_with_people[0]
        end_frame = frames_with_people[-1]

        output_path = os.path.join(output_dir, f'{video_filename[:-4]}_trimmed.mp4')
        out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), cap.get(cv2.CAP_PROP_FPS), (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

        for i in range(start_frame, end_frame + 1):

            cap.set(cv2.CAP_PROP_POS_FRAMES, i)

            ret, frame = cap.read()
            out.write(frame)
        cap.release()
        out.release()

mp_pose.close()




### Video to Frame

In [None]:
import cv2
import os
import mediapipe as mp

video_dir = 'path_to_output'
frame_dir = 'path_to_frame'
num_frames = 6
start_skip_seconds = 0.28
end_skip_seconds = 0.18

mp_pose = mp.solutions.pose.Pose()
mp_pose_estimation = mp.solutions.pose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7)

for video_file in os.listdir(video_dir):
    video_path = os.path.join(video_dir, video_file)
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    start_frame = int(start_skip_seconds * fps)
    end_frame = total_frames - int(end_skip_seconds * fps)

    left_foot_positions = []

    for frame_num in range(total_frames):
        ret, frame = cap.read()
        if not ret:
            break

        if frame_num < start_frame or frame_num >= end_frame:
            continue

        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        results_detection = mp_pose.process(rgb)
        if not results_detection.pose_landmarks:
            continue

        visibility = results_detection.pose_world_landmarks.landmark[0].visibility
        if visibility < 0.9:
            continue

        if results_detection.pose_landmarks:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            rgb = cv2.cvtColor(gray, cv2.COLOR_BGR2RGB)
            results_estimation = mp_pose_estimation.process(rgb)
            if results_estimation.pose_landmarks:
                left_foot_x = results_estimation.pose_landmarks.landmark[mp.solutions.pose.PoseLandmark.LEFT_ANKLE].x
                left_foot_positions.append(left_foot_x)

    cap.release()

    if left_foot_positions:
        min_pos = left_foot_positions.index(min(left_foot_positions))
        max_pos = left_foot_positions.index(max(left_foot_positions))
    else:
        continue

    cycle_length = max_pos - min_pos
    if cycle_length == 0:
        continue
    half_cycle_length = int(cycle_length / 2)
    spacing = int(half_cycle_length / (num_frames - 1))

    subject_name = video_file.split('-')[0]
    frame_subdir = os.path.join(frame_dir, subject_name)
    os.makedirs(frame_subdir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    for i in range(num_frames):
        frame_num = max_pos - half_cycle_length + i * spacing

        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        ret, frame = cap.read()
        if not ret:
            break

        frame_num_str = f'{frame_num:04d}'
        filename = f'{subject_name}-{frame_num_str}.jpg'
        frame_path = os.path.join(frame_subdir, filename)
        cv2.imwrite(frame_path, frame)

    cap.release()

mp_pose.close()


### Point Collection

In [None]:
images_in_folder= 'path_to_frame'
images_out_folder='path_to_detected_images_using_mediapipe'
csv_out_path = 'landmarks.csv'

In [None]:
import csv
import cv2
import numpy as np
import os
import sys
import tqdm
from mediapipe.python.solutions import drawing_utils as mp_drawing
from mediapipe.python.solutions import pose as mp_pose

pose = mp_pose.Pose()
header = ['image',  'Id', '0x', '0y', '0z', '1x', '1y', '1z', '2x', '2y', '2z', '3x',
          '3y', '3z', '4x', '4y', '4z', '5x', '5y', '5z', '6x', '6y', '6z', '7x', '7y',
          '7z', '8x', '8y', '8z', '9x', '9y', '9z', '10x', '10y', '10z','11x', '11y', '11z',
          '12x', '12y', '12z', '13x', '13y', '13z', '14x', '14y', '14z', '15x', '15y', '15z',
          '16x', '16y', '16z', '17x', '17y', '17z', '18x', '18y', '18z', '19x', '19y', '19z',
          '20x', '20y', '20z', '21x', '21y', '21z', '22x', '22y', '22z', '23x', '23y', '23z',
          '24x', '24y', '24z', '25x', '25y', '25z', '26x', '26y', '26z', '27x', '27y', '27z',
          '28x', '28y', '28z', '29x', '29y', '29z', '30x', '30y', '30z', '31x', '31y', '31z',
          '32x', '32y', '32z']

with open(csv_out_path, 'w') as csv_out_file:
    csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
    csv_out_writer.writerow(header)


    class_names = sorted([n for n in os.listdir(images_in_folder) if not n.startswith('.')])

    for class_name in class_names:
        print('Processing ', class_name, file=sys.stderr)
        if not os.path.exists(os.path.join(images_out_folder,class_name)):
            os.makedirs(os.path.join(images_out_folder, class_name))

        image_names = sorted([
            n for n in os.listdir(os.path.join(images_in_folder, class_name))
            if not n.startswith('.')])

        print(image_names)

        with mp_pose.Pose(min_detection_confidence=0.5) as pose:
            for image_name in tqdm.tqdm(image_names):
              input_frame = cv2.imread(os.path.join(images_in_folder, class_name, image_name))

              input_frame = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)

              result = pose.process(image=input_frame)
              pose_landmarks = result.pose_landmarks

              output_frame = input_frame.copy()
              if pose_landmarks is not None:
                mp_drawing.draw_landmarks(
                    image=output_frame,
                    landmark_list=pose_landmarks,
                    connections=mp_pose.POSE_CONNECTIONS)
              output_frame = cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR)
              cv2.imwrite(os.path.join(images_out_folder, image_name), output_frame)

              if pose_landmarks is not None:
                assert len(pose_landmarks.landmark) == 33, 'Unexpected number of predicted pose landmarks: {}'.format(len(pose_landmarks.landmark))
                pose_landmarks = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]

                frame_height, frame_width = output_frame.shape[:2]
                pose_landmarks *= np.array([frame_width, frame_height, frame_width])

                pose_landmarks = np.around(pose_landmarks, 5).flatten().astype(str).tolist()
                csv_out_writer.writerow([image_name, class_name] + pose_landmarks)

In [None]:
import pandas as pd
df = pd.read_csv('landmarks.csv')
df.head(6)

### Data Preparation

**Only IDs that appear exactly six times are retained, since MediaPipe failed to detect poses in some frames for certain IDs.**

In [None]:
import pandas as pd

df = pd.read_csv('landmarks.csv')

id_counts = df['Id'].value_counts()

valid_ids = id_counts[id_counts == 6].index.tolist()

filtered_df = df[df['Id'].isin(valid_ids)]

filtered_df.to_csv('updated_landmarks.csv', index=False)

removed_ids = set(id_counts.index) - set(valid_ids)
print(f"The following IDs were removed: {removed_ids}")
filtered_df

In [None]:
filtered_df=pd.read_csv('updated_landmarks.csv')
unique = filtered_df.nunique()
missing =filtered_df.isnull().sum()
dtypes = filtered_df.dtypes

unq_mis = {'Data_types': dtypes, 'Unique_frequency': unique, 'Missing_count': missing}
unq_mis_data = pd.concat(unq_mis, axis=1)
unq_mis_data = unq_mis_data.reset_index(level=0)
unq_mis_data.columns = ['Column_name', 'Data_types', 'Unique_frequency', 'Missing_count']
rows_cols = filtered_df.shape
unq_mis_data

Unnamed: 0,Column_name,Data_types,Unique_frequency,Missing_count
0,image,object,642,0
1,Id,int64,107,0
2,0x,float64,642,0
3,0y,float64,642,0
4,0z,float64,642,0
...,...,...,...,...
96,31y,float64,642,0
97,31z,float64,642,0
98,32x,float64,642,0
99,32y,float64,642,0


**Normalize Using Procrustes Analysis**

In [None]:
import numpy as np
from scipy.spatial import procrustes
import pandas as pd

df = pd.read_csv('updated_landmarks.csv')

def normalize_group(group):
    id_col = group['Id'].iloc[0]
    image_col = group['image'].iloc[0]
    data = group.drop(columns=['Id', 'image'])

    data_array = data.to_numpy()[:, :99].reshape((-1, 33, 3))
    mtx1, mtx2, disparity = procrustes(data_array[0], data_array[1])
    aligned_frames = [mtx1]
    for i in range(1, len(data_array)):
        mtx1, mtx2, disparity = procrustes(data_array[i-1], data_array[i])
        aligned_frames.append(mtx2)
    mean_shape = np.mean(aligned_frames, axis=0)
    normalized_frames = []
    for i in range(len(aligned_frames)):
        mtx1, mtx2, disparity = procrustes(mean_shape, aligned_frames[i])
        normalized_frames.append(mtx2)

    normalized_array = np.array(normalized_frames).reshape((-1, 99))
    normalized_data = pd.DataFrame(normalized_array, columns=data.columns)
    normalized_data['image'] = image_col
    return normalized_data

normalized_data = df.groupby('Id').apply(normalize_group).reset_index(drop=True)

id_col = df[['Id']]
normalized_data = pd.concat([id_col, normalized_data], axis=1)

normalized_data.to_csv('normalized_landmarks.csv', index=False)
normalized_data

In [None]:
import numpy as np
from keras.layers import Input, LSTM, Dense, Dropout
from keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

data = pd.read_csv('normalized_landmarks.csv')

normalized_data = data.drop(['image'], axis=1)
normalized_data.to_csv('updated_normalized_landmarks.csv', index=False)
normalized_data


In [None]:
import pandas as pd
df = pd.read_csv('updated_normalized_landmarks.csv')

grouped = df.groupby('Id')

new_df = pd.DataFrame()

for name, group in grouped:
    joined_rows = pd.concat([group.iloc[i] for i in range(6)], axis=0)
    new_df[name] = joined_rows

new_df = new_df.transpose()
new_df = new_df.iloc[:, [0] + [i for i, col in enumerate(new_df.columns) if col != 'Id']]

new_df = new_df.rename(columns={new_df.columns[0]: 'Id'})

new_df.to_csv('grouped_landmarks.csv', index=False)
new_df


### SBidirectional GRU



In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, concatenate, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
from tensorflow.keras import backend as K
from keras.layers import Lambda
from tensorflow.keras.regularizers import l2

df = pd.read_csv('grouped_landmarks.csv')

# Split data by ID
unique_labels = np.unique(df['Id'].values)
train_ids, test_ids = train_test_split(unique_labels, test_size=0.4)

print(train_ids.shape)
print(test_ids.shape)

train_data = df[df['Id'].isin(train_ids)].reset_index(drop=True)
test_data = df[df['Id'].isin(test_ids)].reset_index(drop=True)

# Reshape the data
num_points = 198   #22*6
seq_len = 1


X_train = train_data.drop('Id', axis=1).values.reshape(-1, seq_len, num_points*3)
y_train = train_data['Id'].values

X_test = test_data.drop('Id', axis=1).values.reshape(-1, seq_len, num_points*3)
y_test = test_data['Id'].values


from sklearn.preprocessing import StandardScaler, MinMaxScaler

scaler = StandardScaler()

X_train_2d = np.reshape(X_train, (X_train.shape[0]*X_train.shape[1], -1))
X_train_normalized_2d = scaler.fit_transform(X_train_2d)
X_train= np.reshape(X_train_normalized_2d, (X_train.shape[0], X_train.shape[1], -1))

num_pairs = 400
pairs = []
labels = []

positive_pairs = min(num_pairs, len(X_train))
negative_pairs = num_pairs - positive_pairs

while positive_pairs > 0 or negative_pairs > 0:
    if positive_pairs > 0:
        idx = np.random.randint(len(X_train))
        seq = X_train[idx]
        pairs.append([seq, seq])
        labels.append(1)
        positive_pairs -= 1

    if negative_pairs > 0:
        while True:
            idx1, idx2 = np.random.choice(len(X_train), 2, replace=False)
            if idx1 != idx2:
                break
        seq1, seq2 = X_train[idx1], X_train[idx2]
        pairs.append([seq1, seq2])
        labels.append(0)
        negative_pairs -= 1

pairs = np.array(pairs)
labels = np.array(labels)

positive_pairs = np.bincount(labels)[1]
negative_pairs = np.bincount(labels)[0]
print("Number of positive pairs:", positive_pairs)
print("Number of negative pairs:", negative_pairs)

print(pairs.shape)


def identity_loss(y_true, y_pred):
    y_true = K.cast(y_true, dtype='float32')
    return K.mean(K.square(y_pred - y_true))


def contrastive_loss(y_true, y_pred, margin=1):
    y_true = K.cast(y_true, dtype='float32')
    squared_pred = K.square(y_pred)
    margin_squared = K.square(K.maximum(margin - y_pred, 0))
    return K.mean((1 - y_true) * squared_pred + y_true * margin_squared)

# Siamese RNN network
input1 = Input(shape=(seq_len, num_points*3))
input2 = Input(shape=(seq_len, num_points*3))
from tensorflow.keras.layers import Bidirectional, GRU

gru_layer1 = Bidirectional(GRU(128, activation='relu', return_sequences=True, input_shape=(seq_len, num_points*3)))
gru_layer4 = Bidirectional(GRU(128, activation='relu'))

gru1 = gru_layer4(gru_layer1(input1))
gru2 = gru_layer4(gru_layer1(input2))



concat_layer = concatenate([gru1, gru2])
output_layer = Dense(1, activation='sigmoid')(concat_layer)

siamese_gru = Model(inputs=[input1, input2], outputs=output_layer)


siamese_gru.compile(loss=contrastive_loss, optimizer=Adam(learning_rate=0.0001), metrics=['accuracy'])

# Train the model
siamese_gru.fit([pairs[:,0], pairs[:,1]], labels, epochs=10, batch_size=32,  validation_split=0.3)


In [None]:
X_test_2d = np.reshape(X_test, (X_test.shape[0]*X_test.shape[1], -1))
X_test_normalized_2d = scaler.transform(X_test_2d)
X_test = np.reshape(X_test_normalized_2d, (X_test.shape[0], X_test.shape[1], -1))

# Create pairs for evaluation
test_pairs = []
test_labels = []

# Add positive pairs
for i in range(len(X_test)):
    test_pairs.append([X_test[i], X_test[i]])
    test_labels.append(1)

# Add negative pairs
for i in range(len(X_test)):
    for j in range(i + 1, len(X_test)):
        test_pairs.append([X_test[i], X_test[j]])
        test_labels.append(0)

# Convert pairs and labels to arrays
test_pairs = np.array(test_pairs)
test_labels = np.array(test_labels)


# Evaluate accuracy and loss of predictions
loss, accuracy = siamese_gru.evaluate([test_pairs[:, 0], test_pairs[:, 1]], test_labels)
print("Test loss:", loss)
print("Test accuracy:", accuracy)



In [None]:
# Calculate Rank-1 accuracy
num_correct = 0

for i in range(len(test_pairs)):
    similarity_scores = siamese_gru.predict([np.expand_dims(test_pairs[i, 0], axis=0), np.expand_dims(test_pairs[i, 1], axis=0)])
    max_similarity_score = np.max(similarity_scores)
    max_similarity_index = np.argmax(similarity_scores)

    # Skip duplicated pairs
    if np.array_equal(test_pairs[i, 0], test_pairs[i, 1]):
        continue

    # Check if the pair is correctly classified (maximum similarity score corresponds to the current pair)
    if max_similarity_index == 0:  # Assuming positive pair has index 0
        num_correct += 1

rank1_accuracy = num_correct / len(test_pairs)
print("Rank-1 accuracy:", rank1_accuracy)
