# First get all of the data. 

In [1]:
# first get all X and y data from the all_points_folds
import numpy as np 
import pickle, random 
import cv2, os

X_all, y = [], []  # X needs to be changed for each fold, y doesn't need to be changed for each fold. 

for file in os.listdir("all_points_folds"): 
    with open(f"all_points_folds/{file}", 'rb') as f: 
        X_y = pickle.load(f)
        X_all.append(X_y[0])
        y.append(X_y[1])

        
def specify_data(dataset, landmarks): 
    ds = np.concatenate([X_i for X_i in dataset])
    """
    dataset should contain a 4D matrix of (5, _, 90, 126). 
    """ 
    cols = []
    for landmark in landmarks:
        cols += (np.array([0, 21, 42, 63, 84, 105]) + landmark).tolist()
    
    return ds[:, :, tuple(cols)]

X_six = specify_data(X_all, [0, 4, 8, 12, 16, 20]).reshape(5, 20, 90, 36)
X_one = specify_data(X_all, [0]).reshape(5, 20, 90, 6) # just take the first landmark 


import random 
def shuffle(X, y, seed = None):
    if seed == None:  
        seed = random.randrange(0, 100)
        print(f"using seed {seed}")
    np.random.seed(seed) 
    new_X = np.concatenate([X_i for X_i in X])
    new_y = np.concatenate([y_i for y_i in y])
    N = np.random.permutation(new_X.shape[0])
    new_X = new_X[N]
    new_y = new_y[N]
    new_X = new_X.reshape(5, 20, 90, new_X.shape[-1])
    new_y = new_y.reshape(5, 20)
    return new_X, new_y

In [2]:
SEED = 65
X_all, y = shuffle(X_all, y, seed = SEED)
X_six, _ = shuffle(X_six, y, seed = SEED)
X_one, _ = shuffle(X_one, y, seed = SEED)

In [3]:
import sklearn.metrics
def ensemble_val_acc(models, X_tests, y_test): 
    preds = np.zeros_like(y_test)
    for model, X_test in zip(models, X_tests): 
        preds += model.predict(X_test).flatten() 
    preds = preds / len(models)
    return (np.round_(preds) == y_test).mean(), sklearn.metrics.precision_score(y_test, np.round_(preds)), sklearn.metrics.recall_score(y_test, np.round_(preds))

In [4]:
import time, glob
def cross_validate(make_model_all, make_model_six, make_model_one, X_all, X_six, X_one, y, epochs = 75, callbacks = [], verbose = 1): 
    val_accuracies, precisions, recalls = [], [], []
    for i in range(5): 
        model_all = make_model_all() 
        model_six = make_model_six()
        model_one = make_model_one() 
        
        # define global labels
        y_train = np.concatenate([y_j for j, y_j in enumerate(y) if i != j])
        y_test = y[i]
        
        # first run all 
        X_test_all = X_all[i]
        X_train_all = np.concatenate([X_j for j, X_j in enumerate(X_all) if i != j])
        
        try:
            os.remove("best_aso.h5") 
        except Exception as e: 
            pass 

        # train 
        history_all = model_all.fit(X_train_all, y_train, validation_data = (X_test_all, y_test), epochs = epochs, callbacks = callbacks, verbose = verbose)
        
        try: 
            model_all.load_weights("best_aso.h5")
        except Exception as e: 
            pass
        
        print("\nevaluation on all:")
        model_all.evaluate(X_test_all, y_test)
        
        time.sleep(1)
        # next train six 
        
        X_test_six = X_six[i]
        X_train_six = np.concatenate([X_j for j, X_j in enumerate(X_six) if i != j])
        
        try:
            os.remove("best_aso.h5") 
        except Exception as e: 
            pass 

        # train 
        history_six = model_six.fit(X_train_six, y_train, validation_data = (X_test_six, y_test), epochs = epochs, callbacks = callbacks, verbose = verbose)
        
        try: 
            model_six.load_weights("best_aso.h5")
        except Exception as e: 
            pass
        
        print("\nevaluation on six:")
        model_six.evaluate(X_test_six, y_test)
        time.sleep(1)
        
        # next train one
        X_test_one = X_one[i]
        X_train_one = np.concatenate([X_j for j, X_j in enumerate(X_one) if i != j])
        
        try: 
            os.remove("best_aso.h5")
        except Exception as e: 
            pass 
    
        # train this model 
        history_one = model_one.fit(X_train_one, y_train, validation_data = (X_test_one, y_test), epochs = epochs, callbacks = callbacks, verbose = verbose)
        
        try: 
            model_six.load_weights("best_aso.h5")
        except Exception as e: 
            pass
        
        print("\nevaluation on one:")
        model_one.evaluate(X_test_one, y_test)
        time.sleep(1)
        
        # YAY! WE HAVE TRAINED THE MODEL ON EVERYTHING FOR THIS FOLD
    
        # get the aggregate validation accuracy on everything
        models = [model_all, model_six, model_one]
        val_acc, pres, recall = ensemble_val_acc(models, [X_test_all, X_test_six, X_test_one], y_test)
        val_accuracies.append(val_acc)
        precisions.append(pres)
        recalls.append(recall)
        print(f"ensemble validation accuracy : {(val_acc, pres, recall)}")
        time.sleep(2)

        for video in glob.glob("*mov"): 
            print(f"video is {video}, {predict_on_video(models, video)}")
    print(f"average : {sum(val_accuracies) / len(val_accuracies), sum(precisions) / len(precisions), sum(recalls) / len(recalls)}")

In [5]:
# create the functions to create models 
import tensorflow as tf 
def make_model_all(): 
    model = tf.keras.models.Sequential([
        tf.keras.layers.LSTM(64, return_sequences=False), 
        tf.keras.layers.Dropout(0.3), 
        tf.keras.layers.Dense(1, activation = "sigmoid")
    ])

    model.compile(loss = "binary_crossentropy", optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005), metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
    
    return model

def make_model_six(): 
    model = tf.keras.models.Sequential([
        tf.keras.layers.LSTM(64, return_sequences=False, input_shape = (None, 36)), 
        tf.keras.layers.Dropout(0.3), 
        tf.keras.layers.Dense(1, activation = 'sigmoid') 
    ]) 

    model.compile(loss = "binary_crossentropy", optimizer = tf.keras.optimizers.Adam(learning_rate=0.01), metrics = ['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

    return model

def make_model_one(): 
    model = tf.keras.models.Sequential([
        tf.keras.layers.LSTM(32, return_sequences=False), 
        tf.keras.layers.Dropout(0.2), 
        tf.keras.layers.Dense(1, activation = "sigmoid")
    ])
    
    model.compile(loss = "binary_crossentropy", optimizer = tf.keras.optimizers.Adam(learning_rate=0.01), metrics = ['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

    return model

In [6]:
import mediapipe as mp 
from tqdm import tqdm 
def hand_locations_general(frame, min_detection_confidence = 0.5, min_tracking_confidence = 0.5): 
    """give all landmarks"""

    hands = mp.solutions.hands.Hands(min_detection_confidence=min_detection_confidence, min_tracking_confidence=min_tracking_confidence) # MAKE SURE THIS IS ALL GOOD 
    results = hands.process(frame.astype('uint8'))
    X_locations = [0] * 42
    Y_locations = [0] * 42
    Z_locations = [0] * 42

    if results.multi_hand_landmarks:
        x = y = z = 0 
        for hand, hand_landmark in enumerate(results.multi_hand_landmarks):
            for i in range(0, 21):
                landmark = hand_landmark.landmark[i]
                X_locations[x] = landmark.x
                Y_locations[y] = landmark.y 
                Z_locations[z] = landmark.z
                x += 1; y += 1; z +=1; 
            
    hands.close()
    return np.concatenate([X_locations, Y_locations, Z_locations]) 


# create a function to pad your videos 
def pad(locations, maxlen = 90, padding = "post", truncating = "post"): 
    new_locations = locations.tolist() 
    empty_row = np.zeros((1, 126))
    for i, video in tqdm(enumerate(new_locations)): 
        if len(video) < maxlen:  
            for new_row in range(maxlen - len(video)): 
                if padding == "post": 
                    new_locations[i] = np.array(new_locations[i])
                    new_locations[i] = np.concatenate([new_locations[i], empty_row])
                if padding == "pre": 
                    new_locations[i] = np.array(new_locations[i])
                    new_locations[i] = np.concatenate([empty_row, new_locations[i]])

        if len(video) > maxlen: 
            if truncating == "post": 
                new_locations[i] = new_locations[i][:maxlen]
            elif truncating == "pre": 
                new_locations[i] = new_locations[i][len(video) - maxlen : ]
    return np.array(new_locations)

In [7]:
import mediapipe as mp 
from tqdm import tqdm 
def hand_locations_general(frame, min_detection_confidence = 0.5, min_tracking_confidence = 0.5): 
    """give all landmarks"""

    hands = mp.solutions.hands.Hands(min_detection_confidence=min_detection_confidence, min_tracking_confidence=min_tracking_confidence) # MAKE SURE THIS IS ALL GOOD 
    results = hands.process(frame.astype('uint8'))
    X_locations = [0] * 42
    Y_locations = [0] * 42
    Z_locations = [0] * 42
        
    
    if results.multi_hand_landmarks:
        x = y = z = 0 
        for hand, hand_landmark in enumerate(results.multi_hand_landmarks):
            for i in range(0, 21):
                landmark = hand_landmark.landmark[i]
                X_locations[x] = landmark.x
                Y_locations[y] = landmark.y 
                Z_locations[z] = landmark.z
                x += 1; y += 1; z +=1; 
            
    hands.close()
    return np.concatenate([X_locations, Y_locations, Z_locations]) 

In [8]:
model_to_arrangements = {1 : list(range(21)), 2 : [0, 4, 8, 12, 16, 20], 3 : [0]}

In [9]:
# function to ensemble predict on frames. 
def ensemble_predict(models, X_test):
    preds = np.zeros((X_test.shape[0], 1))
    for model, landmarks in tqdm(zip(models, model_to_arrangements.values())):
        test_data = specify_data(np.array([X_test]), landmarks)
        preds += model.predict(test_data)
    preds = preds / len(models)
    return preds.flatten()

In [10]:
# create a function to then predict on videos 
import cv2, numpy as np
def predict_on_video(models, path): 
    LOCATIONS = []
    cap = cv2.VideoCapture(path)
    while cap.isOpened():
        _, frame = cap.read()
        if not _: break 

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        LOCATIONS.append(hand_locations_general(frame))
    
    print('read all locations')
    LOCATIONS = np.array([LOCATIONS])
    LOCATIONS = pad(LOCATIONS)
    print("padded")
    return ensemble_predict(models, LOCATIONS)

In [None]:
checkpoint = tf.keras.callbacks.ModelCheckpoint("best_aso.h5", save_best_only=True, monitor = "val_accuracy")
early_stopping = tf.keras.callbacks.EarlyStopping(monitor = "val_accuracy", patience=10)
cross_validate(make_model_all, make_model_six, make_model_one, X_all, X_six, X_one, y, epochs = 75, callbacks=[checkpoint, early_stopping], verbose = 0)

2021-07-30 11:05:22.312662: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-30 11:05:22.791946: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)



evaluation on all:

evaluation on six:

evaluation on one:
ensemble validation accuracy : (0.7, 0.6666666666666666, 0.5)


In [None]:
# does this work either?
# now that we have restarted the kernel, will this actualy you know like read frames 
# my jupyter notebook is sort of broken ykw .