# New baseline for the bovine with Transformers

inspired from this guide :
https://keras.io/examples/vision/video_transformers/

In [39]:
import matplotlib.pyplot as plt

In [40]:
from problem import get_train_data, get_test_data, WeightedClassificationError

In [41]:
import numpy as np
#CPU
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"  
import tensorflow as tf
from time import time
import pandas as pd
import math

In [42]:
from tensorflow.keras import layers
from tensorflow import keras

## Define hyperparams

In [43]:
#MAX_SEQ_LENGTH = 20
NUM_FEATURES = 1024 #nbr of feat output by densenet121
IMG_SIZE = 224
NB_FRAMES=30 #2.56gb vs 0.33gb

EPOCHS = 5


## Data preparation

In [44]:
videos_train, labels_train = get_train_data()

In [45]:
X_for_classifier= np.array(videos_train)
y_for_classifier= labels_train

In [46]:
videos_test, labels_test  = get_test_data()

In [47]:
Xtest_for_classifier = np.array(videos_test)
ytest_for_classifier = labels_test

In [48]:
from PIL import Image

def resize_frames(video):
    res=[]
    for frame in video:
        resized_img=Image.fromarray(frame).resize(size=(224, 224))
        res.append(np.array(resized_img))
    return np.array(res)

In [49]:
#function that gets all dataset
# 30 frames per video for 177 video = 2.65 gb !if considering each frame of float64
# as uint8 it takes 0.33 gb

def gen_videos(videolist):
    newvideos=[] # 177*30*250*250
    for video in videolist:
        reducedvideo= video.read_samples(video.frame_times[0:299:10])
        reducedvideo= reducedvideo.astype('uint8')
        #add dimnesion this takes quite a bit of memory ???? dim= 30*250*250*3
        reducedvideo=np.repeat(reducedvideo[...,np.newaxis], 3, -1)
        
        #CROP from 250 to 224()DenseNet121 standards !! TODO !
        reducedvideo=resize_frames(reducedvideo)
        #and add a batch dimension. dim= 1*30*250*250*3
        #reducedvideo = reducedvideo[None, ...]

        newvideos.append(reducedvideo)
    return newvideos

In [50]:
X_for_classifier= np.array(gen_videos(X_for_classifier))
X_for_classifier.shape

(177, 30, 224, 224, 3)

In [52]:
Xtest_for_classifier= np.array(gen_videos(Xtest_for_classifier))
Xtest_for_classifier.shape

(100, 30, 224, 224, 3)

In [53]:
def class_to_int(argument):
    switcher = {
        'A':0,
        'B':1,
        'C':2,
        'D':3,
        'E':4,
        'F':5,
        'G':6,
        'H':7,
    }
 
    # get() method of dictionary data type returns
    # value of passed argument if it is present
    # in dictionary otherwise second argument will
    # be assigned as default value of passed argument
    return switcher.get(argument, "nothing")
 

In [54]:
func=np.vectorize(class_to_int)
#Train
y_for_classifier=func(y_for_classifier)
#Test
ytest_for_classifier=func(ytest_for_classifier)

In [55]:
y_for_classifier

array([7, 7, 0, 5, 7, 0, 0, 5, 5, 0, 2, 1, 2, 5, 7, 5, 5, 7, 1, 1, 7, 2,
       7, 1, 1, 5, 7, 7, 7, 2, 2, 4, 6, 3, 4, 2, 0, 0, 7, 7, 6, 2, 5, 2,
       6, 1, 7, 7, 7, 2, 7, 7, 7, 6, 0, 6, 0, 5, 4, 0, 5, 6, 5, 4, 5, 4,
       5, 0, 1, 0, 6, 4, 6, 5, 6, 1, 1, 4, 5, 5, 1, 6, 5, 3, 1, 0, 3, 6,
       3, 5, 5, 2, 1, 6, 2, 2, 2, 3, 6, 1, 0, 0, 5, 2, 2, 1, 2, 1, 3, 5,
       6, 4, 1, 2, 1, 3, 3, 1, 0, 7, 7, 5, 7, 7, 7, 5, 7, 6, 4, 5, 0, 7,
       6, 7, 5, 7, 7, 6, 1, 1, 7, 7, 6, 5, 7, 7, 4, 5, 7, 5, 5, 7, 5, 5,
       5, 7, 7, 6, 0, 7, 1, 7, 6, 7, 6, 0, 7, 7, 7, 7, 4, 1, 2, 6, 7, 3,
       3])

In [56]:
ytest_for_classifier

array([0, 0, 6, 0, 5, 2, 7, 2, 2, 7, 7, 5, 7, 7, 4, 4, 4, 2, 2, 6, 7, 7,
       7, 4, 6, 4, 3, 3, 6, 6, 5, 6, 4, 2, 4, 0, 4, 4, 2, 6, 6, 4, 0, 3,
       3, 3, 0, 0, 5, 2, 1, 5, 2, 1, 1, 1, 2, 4, 4, 1, 4, 3, 6, 4, 1, 6,
       4, 3, 3, 3, 0, 4, 4, 2, 6, 7, 7, 7, 5, 7, 6, 1, 5, 7, 7, 6, 6, 4,
       2, 7, 0, 0, 7, 1, 6, 6, 7, 4, 3, 3])

## CNN


### Build the feature extractor

In [57]:
def build_feature_extractor():
    feature_extractor = keras.applications.DenseNet121(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.densenet.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


In [58]:
feature_extractor = build_feature_extractor()

In [59]:
feature_extractor.predict(X_for_classifier[1][1][None,:,:])

array([[2.8418709e-04, 3.0397894e-03, 3.3622744e-04, ..., 1.7108216e+00,
        1.2420495e+00, 6.7036861e-01]], dtype=float32)

### Extract video features

In [60]:

def prepare_all_videos(videos, labels):
    num_samples = videos.shape[0] 

    # `frame_features` are what we will feed to our sequence model.
    frame_features = np.zeros(
        shape=(num_samples, NB_FRAMES, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idv, video in enumerate(videos):
        # Extract features from the frames of the current video.
        for idf, frame in enumerate(video):
            frame_features[idv,idf, :] = feature_extractor.predict(frame[None,:,:])
            
    return frame_features, labels

### Train Feature extraction

In [61]:
#Compute the CNN feature map
#train_data,train_labels=prepare_all_videos(X_for_classifier,y_for_classifier)

In [78]:
#save the model
from numpy import savetxt
filename = 'CNN_featuremap_train.npy'
np.save(filename, train_data)

In [79]:
train_data = np.load("CNN_featuremap_train.npy")

In [80]:
train_data.shape

(177, 30, 1024)

### Test Feature extraction

In [65]:
#Compute the CNN feature map
test_data,test_labels=prepare_all_videos(Xtest_for_classifier,ytest_for_classifier)

In [74]:
#save the model
from numpy import savetxt
filename = 'CNN_featuremap_test.npy'
np.save(filename, test_data)

In [75]:
test_data = np.load("CNN_featuremap_test.npy")

In [77]:
test_data.shape

(100, 30, 1024)

## Tranformer-based model

In [81]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask


In [82]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation=tf.nn.gelu), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


### Utility functions for training

In [83]:
def get_compiled_model():
    sequence_length = 30
    embed_dim = NUM_FEATURES
    dense_dim = 4
    num_heads = 1
    classes = 8

    inputs = keras.Input(shape=(None, None))
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )
    return model


def run_experiment():
    filepath = "/tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    model = get_compiled_model()
    history = model.fit(
        train_data,
        train_labels,
        validation_split=0.15,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    model.load_weights(filepath)
    _, accuracy = model.evaluate(test_data, test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return model


### Train the model:

In [84]:
trained_model = run_experiment()

Epoch 1/5
Epoch 1: val_loss improved from inf to 6.11262, saving model to /tmp/video_classifier
Epoch 2/5
Epoch 2: val_loss improved from 6.11262 to 2.93212, saving model to /tmp/video_classifier
Epoch 3/5
Epoch 3: val_loss did not improve from 2.93212
Epoch 4/5
Epoch 4: val_loss improved from 2.93212 to 2.11247, saving model to /tmp/video_classifier
Epoch 5/5
Epoch 5: val_loss did not improve from 2.11247
Test accuracy: 21.0%


## Loading test data

In [None]:
videos_test, labels_test  = get_test_data()

In [None]:
#videos_test, labels_test=filter(filters,videos_test, labels_test)



In [None]:
builtx, builty= create_dataset(290,300, videos_test, labels_test)

In [None]:
Xtest_for_classifier = np.array(builtx)
ytest_for_classifier = np.array(builty)

In [None]:
ytest_for_classifier

In [None]:
grayscale_batch=Xtest_for_classifier
rgb_batch = np.repeat(grayscale_batch[..., np.newaxis], 3, -1)

In [None]:
ytest_for_classifier=func(ytest_for_classifier)

In [None]:
ytest_for_classifier

In [None]:
Xtest_for_classifier= rgb_batch

In [None]:
Xtest_for_classifier.shape

In [None]:
loss, accuracy = model.evaluate(Xtest_for_classifier, ytest_for_classifier)
print('Test accuracy :', accuracy)

In [None]:
preds = model.predict(Xtest_for_classifier)
preds = np.argmax(preds, axis=1)
preds

In [None]:
ytest_for_classifier

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
con_mat = confusion_matrix(ytest_for_classifier, preds)
disp = ConfusionMatrixDisplay(confusion_matrix=con_mat)
disp.plot()
plt.show()

In [None]:
from sklearn.metrics import classification_report
target_names = ['class 0', 'class 1']


In [None]:
print(classification_report(ytest_for_classifier, preds,labels=[0,1]))