In [None]:
model = YOLO('/content/is_dog.pt') #YOLO Model
output_folder = '/content/dog_npy/' #For testing --ignore
frames_folder = '/content/frames/' #Just a temporary folder

#Better to use GDrive
root_folder = "/content/dogs" #Dataset root path
new_root_folder = "/content/train" #Storage of npy

batch_size = 12 #For LSTM Sequence

In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.2.17-py3-none-any.whl (757 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m757.0/757.0 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1 (from ultralytics)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1

In [None]:
from ultralytics import YOLO
from PIL import Image, ImageOps
import numpy as np
import matplotlib.pyplot as plt
import os
import cv2

In [None]:
def extract_frames(video_path, frames_folder):
    os.makedirs(frames_folder, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    success = True

    # Extract the video base name without extension
    video_basename = os.path.splitext(os.path.basename(video_path))[0]

    while success:
        success, frame = cap.read()
        if success:
            frame_filename = os.path.join(frames_folder, f'frame_{frame_count:04d}.jpg')
            cv2.imwrite(frame_filename, frame)
            frame_count += 1

    cap.release()
    return frame_count

def detect_dog(img_path, output_folder, dog_frames, show=False):
    if img_path is not None:
        image = Image.open(img_path).convert('RGB')
        image_np = np.array(image)
        results = model(image_np, conf=0.25)
        os.makedirs(output_folder, exist_ok=True)
        detected_objects = results[0].boxes
        dog_count = 0

        for box in detected_objects:
            if box.cls == 0:  # Adjust this if 'dog' has a different class id
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                cropped_image = image.crop((x1, y1, x2, y2))
                width, height = cropped_image.size
                max_dim = max(width, height)
                pad_left = (max_dim - width) // 2
                pad_top = (max_dim - height) // 2
                pad_right = max_dim - width - pad_left
                pad_bottom = max_dim - height - pad_top

                padded_image = ImageOps.expand(cropped_image, (pad_left, pad_top, pad_right, pad_bottom), fill=(0, 0, 0))
                resized_image = padded_image.resize((224, 224))

                dog_frames.append(np.array(resized_image))

                if show:
                  plt.imshow(resized_image)
                  plt.axis('off')
                  plt.show()

        dog_count = len(dog_frames)

        return dog_count

def frame_to_npy(video_path, frames_folder, output_folder, num_frames, show=False):
      dog_frames = []
      image_basename = os.path.splitext(os.path.basename(video_path))[0]

      for frame_number in range(num_frames):
          frame_path = os.path.join(frames_folder, f'frame_{frame_number:04d}.jpg')
          dog_num = detect_dog(frame_path, output_folder, dog_frames, show)

      np.save(os.path.join(output_folder, f'{image_basename}.npy'), np.array(dog_frames))

def batch_data(data, batch_size):
    """
    Batch the data based on the given batch size.

    Parameters:
    - data: numpy array, the data to be batched
    - batch_size: int, the size of each batch

    Returns:
    - batched_data: list of numpy arrays, where each array is a batch
    """
    num_samples = len(data)
    num_batches = num_samples // batch_size
    batched_data = []

    for i in range(num_batches):
        batch_start = i * batch_size
        batch_end = (i + 1) * batch_size
        batch = data[batch_start:batch_end]
        batched_data.append(batch)

    return np.array(batched_data)


In [None]:
'''#Get frame of a video
num_frames = extract_frames(video_path, frames_folder)
print(f"Extracted {num_frames} frames from the video.")
#Save to npy
frame_to_npy(video_path, frames_folder, output_folder, num_frames)
data = np.load('/content/dog_npy/Pitbull_barking_#shorts(360p).npy')
#Batch it for lstm
batch_size = 10
batched_data = batch_data(data, batch_size)
print("Number of batches:", len(batched_data))
print("Shape of first batch:", batched_data[0].shape)'''

In [None]:
def process_class_folder(class_folder, output_folder, batch_size):
    # Iterate over videos in class folder
    for video_file in os.listdir(class_folder):
        if video_file.endswith(".mp4"):
            video_path = os.path.join(class_folder, video_file)
            frames_folder = '/content/frames/'
            os.makedirs(frames_folder, exist_ok=True)
            num_frames = extract_frames(video_path, frames_folder)
            frame_to_npy(video_path, frames_folder, output_folder, num_frames)
            # Load processed data
            data = np.load(os.path.join(output_folder, f'{os.path.splitext(video_file)[0]}.npy'))
            # Batch data
            batched_data = batch_data(data, batch_size)
            # Save batched data
            np.save(os.path.join(output_folder, f'{os.path.splitext(video_file)[0]}_batched.npy'), batched_data)

def process_all_class_folders(root_folder, new_root_folder, batch_size):
    # Iterate over class folders
    for class_name in os.listdir(root_folder):
        class_folder = os.path.join(root_folder, class_name)
        if os.path.isdir(class_folder):
            output_class_folder = os.path.join(new_root_folder, class_name)
            os.makedirs(output_class_folder, exist_ok=True)
            process_class_folder(class_folder, output_class_folder, batch_size)

In [None]:
process_all_class_folders(root_folder, new_root_folder, batch_size)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
0: 384x640 1 0, 228.3ms
Speed: 2.7ms preprocess, 228.3ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 228.2ms
Speed: 2.7ms preprocess, 228.2ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 243.5ms
Speed: 3.5ms preprocess, 243.5ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 247.0ms
Speed: 4.9ms preprocess, 247.0ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 216.9ms
Speed: 3.0ms preprocess, 216.9ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 131.1ms
Speed: 3.9ms preprocess, 131.1ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 133.7ms
Speed: 2.6ms preprocess, 133.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 0, 132.9ms
Speed: 2.5ms preprocess, 132.9ms inference, 0.9ms 

In [None]:
'''import os

parent_folder = "dogs"
os.makedirs(parent_folder, exist_ok=True)
subfolders = ["barking", "sitting", "walking", "standing"]
for subfolder in subfolders:
    subfolder_path = os.path.join(parent_folder, subfolder)
    os.makedirs(subfolder_path, exist_ok=True)
    print(f"The folder '{subfolder}' has been created successfully inside '{parent_folder}'.")
'''

The folder 'barking' has been created successfully inside 'dogs'.
The folder 'sitting' has been created successfully inside 'dogs'.
The folder 'walking' has been created successfully inside 'dogs'.
The folder 'standing' has been created successfully inside 'dogs'.


In [None]:
import os
import numpy as np

def load_npy_files(root_folder):
    npy_data = {}

    for class_name in os.listdir(root_folder):
        class_folder = os.path.join(root_folder, class_name)
        if os.path.isdir(class_folder):
            npy_data[class_name] = []
            for npy_file in os.listdir(class_folder):
                if npy_file.endswith(".npy"):
                    npy_path = os.path.join(class_folder, npy_file)
                    data = np.load(npy_path)
                    npy_data[class_name].append(data)
                    print(f"Loaded '{npy_file}' with shape: {data.shape}")

    return npy_data

npy_data = load_npy_files(root_folder)

Loaded 'VID_20240501131125.npy' with shape: (53, 224, 224, 3)
Loaded 'VID_20240501131125_batched.npy' with shape: (4, 12, 224, 224, 3)
Loaded 'Dog_barking_videos#4kviral#status#ytshorts(720p)_batched.npy' with shape: (11, 12, 224, 224, 3)
Loaded 'Pitbull_barking_#shorts(360p)_batched.npy' with shape: (11, 12, 224, 224, 3)
Loaded 'Dog_barking_videos#4kviral#status#ytshorts(720p).npy' with shape: (142, 224, 224, 3)
Loaded 'Pitbull_barking_#shorts(360p).npy' with shape: (136, 224, 224, 3)
Loaded 'Cute_tired_street_dog_sitting_at_the_beach_of_Alanya(720p).npy' with shape: (1558, 224, 224, 3)
Loaded 'Dog_sound__dog_barking__#shorts_#dogbarking_#viralshorts_#youtubeshorts_#dog_sound(720p).npy' with shape: (123, 224, 224, 3)
Loaded 'Cute_tired_street_dog_sitting_at_the_beach_of_Alanya(720p)_batched.npy' with shape: (129, 12, 224, 224, 3)
Loaded 'Dog_sound__dog_barking__#shorts_#dogbarking_#viralshorts_#youtubeshorts_#dog_sound(720p)_batched.npy' with shape: (10, 12, 224, 224, 3)
Loaded 'VID_2

#Loading for Training

In [None]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [None]:
def load_data(root_folder):
    X = []
    y = []
    class_dict = {}

    label_encoder = LabelEncoder()

    for idx, class_name in enumerate(os.listdir(root_folder)):
        class_folder = os.path.join(root_folder, class_name)
        if os.path.isdir(class_folder):
            class_dict[idx] = class_name
            for npy_file in os.listdir(class_folder):
                if npy_file.endswith(".npy"):
                    npy_path = os.path.join(class_folder, npy_file)
                    data = np.load(npy_path)
                    if data.shape[1:] == (12, 224, 224, 3):
                        X.extend(data)
                        y.extend([idx] * len(data))
                    else:
                        print(f"Skipping '{npy_file}' due to incorrect shape: {data.shape}")

    y_encoded = label_encoder.fit_transform(y)
    class_labels = {idx: class_name for idx, class_name in enumerate(label_encoder.classes_)}

    return np.array(X), y_encoded, class_dict, class_labels

# Example usage:
X, y_encoded, class_dict, class_labels = load_data(root_folder)

# Print the class dictionary
print("Class dictionary:")
print(class_dict)

# Print the class labels dictionary
print("Class labels dictionary:")
print(class_labels)


Skipping 'VID_20240501131125.npy' due to incorrect shape: (53, 224, 224, 3)
Skipping 'Dog_barking_videos#4kviral#status#ytshorts(720p).npy' due to incorrect shape: (142, 224, 224, 3)
Skipping 'Pitbull_barking_#shorts(360p).npy' due to incorrect shape: (136, 224, 224, 3)
Skipping 'Cute_tired_street_dog_sitting_at_the_beach_of_Alanya(720p).npy' due to incorrect shape: (1558, 224, 224, 3)
Skipping 'Dog_sound__dog_barking__#shorts_#dogbarking_#viralshorts_#youtubeshorts_#dog_sound(720p).npy' due to incorrect shape: (123, 224, 224, 3)
Skipping 'VID_20240501130111.npy' due to incorrect shape: (130, 224, 224, 3)
Skipping 'VID_20240501130837.npy' due to incorrect shape: (98, 224, 224, 3)
Skipping 'VID_20240501130158.npy' due to incorrect shape: (100, 224, 224, 3)
Class dictionary:
{0: 'standing', 1: 'barking', 2: 'sitting', 3: 'walking'}
Class labels dictionary:
{0: 0, 1: 1, 2: 2, 3: 3}


In [None]:
from keras.utils import to_categorical
y_one_hot = to_categorical(y_encoded)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.3, random_state=42)

print("Training set shape:", X_train.shape, y_train.shape)
print("Testing set shape:", X_test.shape, y_test.shape)

Training set shape: (133, 12, 224, 224, 3) (133, 4)
Testing set shape: (58, 12, 224, 224, 3) (58, 4)


# Training LSTM

## Stacked LSTM

In [None]:
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.layers import Input, TimeDistributed, Flatten, LSTM, BatchNormalization, Dropout, Dense, Conv2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

In [None]:
IMAGE_SIZE = (224, 224)
SEQUENCE_LENGTH = 12  # Number of frames per sequence
NUM_CLASSES = 4

In [None]:
base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
base_model.trainable = False

# Define LSTM model
inputs = Input(shape=(SEQUENCE_LENGTH,) + IMAGE_SIZE + (3,))
x = TimeDistributed(base_model)(inputs)
x = TimeDistributed(Flatten())(x)
x = LSTM(256, return_sequences=True)(x)  # First LSTM layer with return_sequences=True
x = LSTM(256, return_sequences=False)(x)  # Second LSTM layer with return_sequences=False
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
outputs = Dense(NUM_CLASSES, activation='softmax')(x)
stack_model = Model(inputs, outputs)
opt = Adam(learning_rate=0.0001)
stack_model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

checkpoint = ModelCheckpoint("best_stackmodel.keras", monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=1, min_lr=0.00000001)

# Training
history_stack = stack_model.fit(X_train, y_train, epochs=50, batch_size=4, validation_data=(X_test, y_test), callbacks=[checkpoint, early_stopping, reduce_lr])

Epoch 1/50
Epoch 1: val_accuracy improved from -inf to 0.10345, saving model to best_stackmodel.keras
Epoch 2/50
Epoch 2: val_accuracy did not improve from 0.10345
Epoch 3/50
Epoch 3: val_accuracy did not improve from 0.10345
Epoch 4/50
Epoch 4: val_accuracy did not improve from 0.10345
Epoch 5/50
Epoch 5: val_accuracy did not improve from 0.10345
Epoch 6/50
Epoch 6: val_accuracy did not improve from 0.10345
Epoch 7/50
Epoch 7: val_accuracy did not improve from 0.10345
Epoch 8/50
Epoch 8: val_accuracy improved from 0.10345 to 0.27586, saving model to best_stackmodel.keras
Epoch 9/50
Epoch 9: val_accuracy improved from 0.27586 to 0.62069, saving model to best_stackmodel.keras
Epoch 10/50
Epoch 10: val_accuracy improved from 0.62069 to 0.81034, saving model to best_stackmodel.keras
Epoch 11/50
Epoch 11: val_accuracy did not improve from 0.81034
Epoch 12/50
Epoch 12: val_accuracy improved from 0.81034 to 0.87931, saving model to best_stackmodel.keras
Epoch 13/50
Epoch 13: val_accuracy imp

## Bidirectional LSTM

In [None]:
from tensorflow.keras.layers import Bidirectional

base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
base_model.trainable = False

# Define LSTM model
inputs = Input(shape=(SEQUENCE_LENGTH,) + IMAGE_SIZE + (3,))
x = TimeDistributed(base_model)(inputs)
x = TimeDistributed(Flatten())(x)
x = Bidirectional(LSTM(256, return_sequences=False))(x)  # Bidirectional LSTM layer
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
outputs = Dense(NUM_CLASSES, activation='softmax')(x)
bi_model = Model(inputs, outputs)
opt = Adam(learning_rate=0.0001)
bi_model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
# Define callbacks
checkpoint = ModelCheckpoint("best_bimodel2.keras", monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=1, min_lr=0.0000001)

# Training
history_bidirectional = bimodel.fit(X_train, y_train, epochs=50, batch_size=4, validation_data=(X_test, y_test), callbacks=[checkpoint, early_stopping, reduce_lr])

## Vanila LSTM

In [None]:
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.layers import Input, TimeDistributed, Flatten, LSTM, BatchNormalization, Dropout, Dense, Conv2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam


base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3))
base_model.trainable = False

# Define LSTM model
inputs = Input(shape=(SEQUENCE_LENGTH,) + IMAGE_SIZE + (3,))
x = TimeDistributed(base_model)(inputs)
x = TimeDistributed(Flatten())(x)  # Flatten the output of InceptionV3
x = LSTM(256, return_sequences=False)(x)  # Set return_sequences to False
x = BatchNormalization()(x)  # Batch normalization
x = Dropout(0.5)(x)  # Dropout
outputs = Dense(NUM_CLASSES, activation='softmax')(x)
model = Model(inputs, outputs)
opt = Adam(learning_rate=0.0001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
# Define callbacks
checkpoint = ModelCheckpoint("best_model2.keras", monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=1, min_lr=0.000001)

# Training
history = model.fit(X_train, y_train, epochs=50, batch_size=4, validation_data=(X_test, y_test), callbacks=[checkpoint, early_stopping, reduce_lr])

# Evaluation

In [None]:
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

def plot_training_history(history, main_title):
    plt.figure(figsize=(12, 6))

    # Plot model loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title(main_title + ' - Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')

    # Plot model accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title(main_title + ' - Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')

    plt.show()

def plot_confusion_matrix(X, y, model, label_encoder):
    y_pred = model.predict(X)
    y_true = label_encoder.inverse_transform(np.argmax(y, axis=1))
    y_pred = label_encoder.inverse_transform(np.argmax(y_pred, axis=1))
    conf_matrix = confusion_matrix(y_true, y_pred)

    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title('Confusion Matrix')
    plt.show()

def plot_roc_auc_curve(model, X, y_true):
    # Predict probabilities for each class
    y_prob = model.predict(X)

    # Compute ROC AUC score for each class
    auc_scores = []
    for i in range(len(label_encoder.classes_)):
        auc_score = roc_auc_score(y_true[:, i], y_prob[:, i])
        auc_scores.append(auc_score)

    # Plot ROC AUC curve for each class
    plt.figure(figsize=(8, 6))
    for i in range(len(label_encoder.classes_)):
        fpr, tpr, _ = roc_curve(y_true[:, i], y_prob[:, i])
        plt.plot(fpr, tpr, label=label_encoder.classes_[i] + ' (AUC = {:.2f})'.format(auc_scores[i]))

    plt.plot([0, 1], [0, 1], 'k--', label='Random Guessing')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC AUC Curve')
    plt.legend(loc='lower right')
    plt.grid(True)
    plt.show()

In [None]:
label_encoder = LabelEncoder()

## Stacked LSTM

### Loss accuracy plot

In [None]:
plot_training_history(history_stack, 'Stacked LSTM')

### Confusion Matrix

In [None]:
plot_confusion_matrix(X, y_one_hot, stack_model, label_encoder)

### ROC Curve

In [None]:
plot_roc_auc_curve(stack_model, X, y_one_hot)

## Bidirectional LSTM

### Loss accuracy plot

In [None]:
plot_training_history(history_bidirectional, 'Bidirectional LSTM')

### Confusion Matrix

In [None]:
plot_confusion_matrix(X, y_one_hot, bi_model, label_encoder)

### ROC Curve

In [None]:
plot_roc_auc_curve(bi_model, X, y_one_hot)

## Vanila LSTM

### Loss accuracy plot

In [None]:
plot_training_history(history, 'Vanila LSTM')

### Confusion Matrix

In [None]:
plot_confusion_matrix(X, y_one_hot, model, label_encoder)

### ROC Curve

In [None]:
plot_roc_auc_curve(model, X, y_one_hot)