# Preprocess and Load Data

In [None]:
from ADUtils import *
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
# Whether you already have data stored in a npy file
load_from_npy = False

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
if not load_from_npy:

    sequences, labels = [], []
    skip_sequence = False
    for action in actions:
        action_path = os.path.join(DATA_PATH, action)
        if not os.path.exists(action_path):
            continue
        for sequence in np.array(os.listdir(action_path)).astype(int):
            window = []
            skip_sequence = False
            for frame_num in range(sequence_length):
                sequence_path = os.path.join(action_path, str(sequence))
                # 1. make sure path exists
                if not os.path.exists(sequence_path):
                    print("1")
                    skip_sequence = True
                    break
                # 2. make sure none are empty
                try:
                    res = np.load(os.path.join(sequence_path, "{}.npy".format(frame_num)))
                    window.append(res)
                except:
                    print(f"Cannot read {sequence_path} number {frame_num}")
                    print("2")
                    skip_sequence = True
                    break
            if not skip_sequence:
                # 3. make sure there is at least 30 frames
                if len(window) == 30:
                    sequences.append(window)
                    labels.append(label_map[action])
                    continue
                print("3")

In [None]:
print(np.array(sequences).shape)
# number of samples, number of frames per sample, number of data points per frame

In [None]:
save_to_numpy = False
if save_to_numpy and not load_from_numpy:
    np.save('action_detect_X.npy', sequences)
    np.save('action_detect_y.npy', labels)

In [None]:
if not load_from_npy:
    X = np.array(sequences)
else:
    X = np.load('action_detect.npy')

In [None]:
if not load_from_npy:
    y = to_categorical(labels).astype(int)
else:
    y = np.load('action_detect_y')

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15)

# Build and Train Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Input
from tensorflow.keras.callbacks import TensorBoard

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)
# might remove

In [None]:
model = Sequential()
# model.add(Input(shape=(30,258)))
model.add(Input(shape=X_train.shape[-2:]))
model.add(LSTM(64, return_sequences=True, activation='relu'))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
model.fit(X_train, y_train, epochs=100, callbacks=[tb_callback])

In [None]:
save_model = False
if save_model:
    model.save('action.keras')

# Statistics

In [None]:
# potentionally remove statistics generation

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, classification_report, f1_score

In [None]:
y_prob = model.predict(X_test)

In [None]:
y_true = np.argmax(y_test, axis=1).tolist()
y_pred = np.argmax(y_prob, axis=1).tolist()

In [None]:
accuracy_score(y_true, y_pred)

In [None]:
classification_report(y_true, y_pred)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt 
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap='GnBu')
plt.show()

# Test in Real Time

In [None]:
from tensorflow.keras.models import load_model
if save_model:
    model = load_model('action.keras')

In [None]:
# change the colors
colors = [(245,117,16), (117,245,16), (16,117,245), (245,117,16), (117,245,16), (16,117,245), (245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [None]:
sequence = []
threshold = 0.5

# if you want to use video as testing
# video = '../data/test/thumbs_up/thumbs_up9.mp4'
# cap = cv2.VideoCapture(video)

# if you want a bigger video at the cost of performance
# cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)

# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        word = ''
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0), verbose=0)[0]
            print(actions[np.argmax(res)])
            
            word = actions[np.argmax(res)]
                
            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
        
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)

        cv2.putText(image, word, (3,30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()