## Task 2: Baseline for Multi-view rider intention prediction

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#done for both test and train
import os
import numpy as np

# Path to the base directory
base_dir = '/content/drive/My Drive/vgg_test_features'

# Path to the output directory
output_base_dir = '/content/drive/My Drive/vgg_test_features_concatenated'

# Main folders names
main_folders = ['Right', 'Left', 'Front']

# Get the names of subfolders (assuming the same structure in all main folders)
subfolders = os.listdir(os.path.join(base_dir, main_folders[0]))

# Function to load and concatenate .npy files from identical paths across different main folders
def concatenate_npy_files(base_dir, main_folders, subfolder, filename):
    arrays = []
    paths = []
    for folder in main_folders:
        file_path = os.path.join(base_dir, folder, subfolder, filename)
        arrays.append(np.load(file_path))
        paths.append(file_path)
    concatenated_array = np.concatenate(arrays, axis=0)
    return concatenated_array, paths, [array.shape for array in arrays]

# Create output directory and subfolders if they don't exist
os.makedirs(output_base_dir, exist_ok=True)
for subfolder in subfolders:
    os.makedirs(os.path.join(output_base_dir, subfolder), exist_ok=True)

# Iterate through subfolders and files to concatenate
for subfolder in subfolders:
    file_names = os.listdir(os.path.join(base_dir, main_folders[0], subfolder))
    for filename in file_names:
        concatenated_array, paths, shapes = concatenate_npy_files(base_dir, main_folders, subfolder, filename)

        # Print the paths and initial dimensions
        print(f"Concatenating the following files for {subfolder}/{filename}:")
        for path, shape in zip(paths, shapes):
            print(f" - {path}, shape: {shape}")

        # Print the final dimension
        print(f"Final concatenated shape: {concatenated_array.shape}")

        # Save the concatenated array in the appropriate subfolder within the output directory
        save_path = os.path.join(output_base_dir, subfolder, filename)
        np.save(save_path, concatenated_array)
        print(f"Saved concatenated array at {save_path}\n")

print("All files concatenated and saved successfully.")


**Chanel-wise Fusion (optional - uncomment to use)**

In [None]:
# #channel-wise fusion
# import os
# import numpy as np

# # Path to the base directory
# base_dir = '/content/drive/My Drive/vgg_train_features'

# # Path to the output directory
# output_base_dir = '/content/drive/My Drive/vgg_train_features_cw'

# # Main folders names
# main_folders = ['Right', 'Left', 'Front']

# # Get the names of subfolders (assuming the same structure in all main folders)
# subfolders = os.listdir(os.path.join(base_dir, main_folders[0]))

# # Function to load and stack .npy files from identical paths across different main folders
# def stack_npy_files(base_dir, main_folders, subfolder, filename):
#     arrays = []
#     paths = []
#     for folder in main_folders:
#         file_path = os.path.join(base_dir, folder, subfolder, filename)
#         arrays.append(np.load(file_path))
#         paths.append(file_path)
#     stacked_array = np.stack(arrays, axis=-1)  # Stack along a new axis (channel-wise)
#     return stacked_array, paths, [array.shape for array in arrays]

# # Create output directory and subfolders if they don't exist
# os.makedirs(output_base_dir, exist_ok=True)
# for subfolder in subfolders:
#     os.makedirs(os.path.join(output_base_dir, subfolder), exist_ok=True)

# # Iterate through subfolders and files to stack
# for subfolder in subfolders:
#     file_names = os.listdir(os.path.join(base_dir, main_folders[0], subfolder))
#     for filename in file_names:
#         stacked_array, paths, shapes = stack_npy_files(base_dir, main_folders, subfolder, filename)

#         # Print the paths and initial dimensions
#         print(f"Stacking the following files for {subfolder}/{filename}:")
#         for path, shape in zip(paths, shapes):
#             print(f" - {path}, shape: {shape}")

#         # Print the final dimension
#         print(f"Final stacked shape: {stacked_array.shape}")

#         # Save the stacked array in the appropriate subfolder within the output directory
#         save_path = os.path.join(output_base_dir, subfolder, filename)
#         np.save(save_path, stacked_array)
#         print(f"Saved stacked array at {save_path}\n")

# print("All files stacked and saved successfully.")


In [None]:
!pip install git+https://github.com/tensorflow/docs

In [None]:
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

In [None]:
dataset_path = os.listdir('/content/drive/MyDrive/vgg_train_features_concatenated')

label_types = os.listdir('/content/drive/MyDrive/vgg_train_features_concatenated')
print (label_types)

In [None]:
rooms = []

for item in dataset_path:
 # Get all the file names
 all_rooms = os.listdir('/content/drive/MyDrive/vgg_train_features_concatenated' + '/' +item)

 # Add them to the list
 for room in all_rooms:
    rooms.append((item, str('/content/drive/MyDrive/vgg_train_features_concatenated' + '/' +item) + '/' + room))

# Build a dataframe
train_df = pd.DataFrame(data=rooms, columns=['tag', 'video_name'])
print(train_df.head())
print(train_df.tail())

In [None]:
df = train_df.loc[:,['video_name','tag']]
df
df.to_csv('/content/drive/MyDrive/train_v.csv')

In [None]:
dataset_path = os.listdir('/content/drive/MyDrive/vgg_test_features_concatenated')
print(dataset_path)

room_types = os.listdir('/content/drive/MyDrive/vgg_test_features_concatenated')
print("Types of activities found: ", len(dataset_path))

rooms = []

for item in dataset_path:
 # Get all the file names
 all_rooms = os.listdir('/content/drive/MyDrive/vgg_test_features_concatenated' + '/' +item)

 # Add them to the list
 for room in all_rooms:
    rooms.append((item, str('/content/drive/MyDrive/vgg_test_features_concatenated' + '/' +item) + '/' + room))

# Build a dataframe
test_df = pd.DataFrame(data=rooms, columns=['tag', 'video_name'])
print(test_df.head())
print(test_df.tail())

df = test_df.loc[:,['video_name','tag']]
df
df.to_csv('/content/drive/MyDrive/test_v.csv')

In [None]:
train_df = pd.read_csv("/content/drive/MyDrive/train_v.csv")
test_df = pd.read_csv("/content/drive/MyDrive/test_v.csv")

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")


train_df.sample(10)

In [None]:
label_processor = keras.layers.StringLookup(num_oov_indices=0, vocabulary=np.unique(train_df["tag"]))
print(label_processor.get_vocabulary())

labels = train_df["tag"].values
labels = label_processor(labels[..., None]).numpy()
labels

In [None]:
#hyperparameters
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 100
MAX_SEQ_LENGTH = 20
NUM_FEATURES = 512

In [None]:
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(frame_features_input, mask=mask_input)
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model

In [None]:
EPOCHS = 30

In [None]:
# Define the function to prepare all videos
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["video_name"].values.tolist()
    labels = df["tag"].values

    # Convert class labels to label encoding
    labels = label_processor(labels[..., None]).numpy()

    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    # For each video
    for idx, path in enumerate(video_paths):
        # Load concatenated features from the appropriate file
        features_path = os.path.join(root_dir, path)
        features = np.load(features_path, allow_pickle = True)
        features = np.squeeze(features)

        # Extract the features and masks
        num_frames = min(MAX_SEQ_LENGTH, features.shape[0])
        frame_features[idx, :num_frames, :] = features[:num_frames, :]
        frame_masks[idx, :num_frames] = 1  # 1 = not masked, 0 = masked

    return (frame_features, frame_masks), labels


# Define the function to run the experiment
def run_experiment(train_data, train_labels, test_data, test_labels):
    filepath = "./tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


# Load and prepare training and testing data
train_data, train_labels = prepare_all_videos(train_df, "/content/drive/MyDrive/vgg_train_features_concatenated")
test_data, test_labels = prepare_all_videos(test_df, "/content/drive/MyDrive/vgg_test_features_concatenated")

# Run the experiment
_, sequence_model = run_experiment(train_data, train_labels, test_data, test_labels)
