In [19]:
import numpy as np
import pandas as pd

# Load the feature mapping
data_dir = '../dataset'
feature_mapping_file = f'{data_dir}/meta/idx_to_feature_name.csv'
feature_mapping = pd.read_csv(feature_mapping_file)
print(feature_mapping.head())

# Load an example feature file
example_feature_file = f'{data_dir}/scenes/npy/9_speech_true_Radio_aus.npy'
features = np.load(example_feature_file)
print(f'Feature shape: {features.shape}')
print(features)


In [47]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tqdm import tqdm
import logging
import matplotlib.pyplot as plt
from sklearn.utils import class_weight

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

data_dir = '../dataset'
annotations_file = f'{data_dir}/development_scene_annotations.csv'
model_save_path = 'best_command_model.keras'

# Load annotations
logging.info('Loading annotations...')
annotations = pd.read_csv(annotations_file)
logging.info('Annotations loaded.')

# Check class distribution
class_counts = annotations['command'].value_counts()
plt.figure(figsize=(12, 6))
class_counts.plot(kind='bar')
plt.title('Class Distribution')
plt.xlabel('Commands')
plt.ylabel('Count')
plt.show()

def prepare_feature_data(annotations, data_dir, feature_dir):
    command_features = []
    command_labels = []
    command_mapping = {}  # Mapping of command texts to numerical labels
    current_label = 0
    max_len = 0  # To determine the maximum length of features

    logging.info('Preparing command data...')
    for index, row in tqdm(annotations.iterrows(), total=annotations.shape[0]):
        feature_path = os.path.join(feature_dir, row['filename'] + '.npy')
        features = np.load(feature_path)
        max_len = max(max_len, features.shape[1])  # Update max_len
        
        command_text = row['command']
        if command_text not in command_mapping:
            command_mapping[command_text] = current_label
            current_label += 1
        
        command_label = command_mapping[command_text]
        
        command_features.append(features)
        command_labels.append(command_label)

    # Pad features to the same length
    padded_features = []
    for feature in command_features:
        pad_width = max_len - feature.shape[1]
        if pad_width > 0:
            feature = np.pad(feature, ((0, 0), (0, pad_width)), mode='constant')
        padded_features.append(feature)
    
    logging.info('Command data prepared.')
    return np.array(padded_features), np.array(command_labels), command_mapping

# Prepare feature-based command data
feature_dir = f'{data_dir}/scenes/npy'
command_features, command_labels, command_mapping = prepare_feature_data(annotations, data_dir, feature_dir)

# Normalize features across each feature dimension
command_features = (command_features - np.mean(command_features, axis=(0, 2), keepdims=True)) / np.std(command_features, axis=(0, 2), keepdims=True)

# One-hot encode labels
num_classes = len(command_mapping)
command_labels = to_categorical(command_labels, num_classes=num_classes)

logging.info(f'Command mapping: {command_mapping}')

# Data Augmentation Function
def augment_data(features, noise_factor=0.005):
    noise = np.random.randn(*features.shape) * noise_factor
    augmented_features = features + noise
    augmented_features = np.clip(augmented_features, -1.0, 1.0)
    return augmented_features

# Augment the training data
augmented_features = augment_data(command_features)
combined_features = np.concatenate((command_features, augmented_features), axis=0)
combined_labels = np.concatenate((command_labels, command_labels), axis=0)

# Calculate class weights
class_weights = class_weight.compute_class_weight(class_weight='balanced', classes=np.unique(np.argmax(command_labels, axis=1)), y=np.argmax(command_labels, axis=1))
class_weights = dict(enumerate(class_weights))

# GRU with Attention Model
def attention_block(inputs):
    attention = layers.Dense(1, activation='tanh')(inputs)
    attention = layers.Flatten()(attention)
    attention = layers.Activation('softmax')(attention)
    attention = layers.RepeatVector(inputs.shape[-1])(attention)
    attention = layers.Permute([2, 1])(attention)
    output_attention = layers.Multiply()([inputs, attention])
    return output_attention

def build_gru_attention_model(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)
    x = layers.GRU(128, return_sequences=True)(inputs)
    x = layers.Dropout(0.5)(x)
    x = layers.GRU(128, return_sequences=True)(x)
    x = layers.Dropout(0.5)(x)
    x = attention_block(x)
    x = layers.Flatten()(x)
    x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.01))(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

input_shape = (command_features.shape[1], command_features.shape[2])
command_model = build_gru_attention_model(input_shape, num_classes)

logging.info('Training command recognition model...')
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
model_checkpoint = ModelCheckpoint(model_save_path, save_best_only=True, monitor='val_loss')

history = command_model.fit(combined_features, combined_labels, epochs=50, batch_size=32, validation_split=0.2,
                            callbacks=[early_stopping, model_checkpoint], class_weight=class_weights)
logging.info('Command recognition model trained.')

# Plot training & validation accuracy values
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()


In [50]:
import os
import numpy as np
import tensorflow as tf
import logging
import pandas as pd
import random
from tqdm import tqdm

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

data_dir = '../dataset'
model_save_path = 'best_command_model.keras'
annotations_file = f'{data_dir}/development_scene_annotations.csv'

# Load annotations to get command mapping
logging.info('Loading annotations...')
annotations = pd.read_csv(annotations_file)
logging.info('Annotations loaded.')

# Extract command mapping
command_mapping = {}
current_label = 0
for command in annotations['command'].unique():
    command_mapping[command] = current_label
    current_label += 1

# Reverse command mapping
reverse_command_mapping = {v: k for k, v in command_mapping.items()}

# Load the best model
logging.info('Loading the best model...')
command_model = tf.keras.models.load_model(model_save_path)
logging.info('Model loaded.')


# Function to prepare feature data (from the training script)
def prepare_feature_data(annotations, data_dir, feature_dir):
    command_features = []
    command_labels = []
    command_mapping = {}  # Mapping of command texts to numerical labels
    current_label = 0
    max_len = 0  # To determine the maximum length of features

    logging.info('Preparing command data...')
    for index, row in tqdm(annotations.iterrows(), total=annotations.shape[0]):
        feature_path = os.path.join(feature_dir, row['filename'] + '.npy')
        features = np.load(feature_path)
        max_len = max(max_len, features.shape[1])  # Update max_len

        command_text = row['command']
        if command_text not in command_mapping:
            command_mapping[command_text] = current_label
            current_label += 1

        command_label = command_mapping[command_text]

        command_features.append(features)
        command_labels.append(command_label)

    # Pad features to the same length
    padded_features = []
    for feature in command_features:
        pad_width = max_len - feature.shape[1]
        if pad_width > 0:
            feature = np.pad(feature, ((0, 0), (0, pad_width)), mode='constant')
        padded_features.append(feature)

    logging.info('Command data prepared.')
    return np.array(padded_features), np.array(command_labels), command_mapping, max_len


# Calculate mean, std, and max_len from training data
feature_dir = f'{data_dir}/scenes/npy'
annotations = pd.read_csv(annotations_file)
command_features, _, _, max_len = prepare_feature_data(annotations, data_dir, feature_dir)
mean = np.mean(command_features, axis=(0, 2), keepdims=True)
std = np.std(command_features, axis=(0, 2), keepdims=True)


# Function to prepare new data for prediction
def prepare_new_data(new_feature_file, mean, std, max_len):
    new_features = np.load(new_feature_file)
    new_features = np.pad(new_features, ((0, 0), (0, max_len - new_features.shape[1])), mode='constant')
    new_features = (new_features - mean) / std
    #new_features = new_features.reshape(1, new_features.shape[0], new_features.shape[1])
    return new_features


# Function to predict and assess accuracy
def predict_command(file_path):
    new_features = prepare_new_data(file_path, mean, std, max_len)
    predicted_command = command_model.predict(new_features)
    predicted_label = np.argmax(predicted_command)
    confidence = np.max(predicted_command)
    predicted_command_text = reverse_command_mapping[predicted_label]
    return predicted_command_text, confidence


# Loop over random files and assess accuracy
num_files_to_evaluate = 100
random_files = random.sample(list(annotations['filename'].unique()), num_files_to_evaluate)

correct_predictions = 0
total_predictions = 0

for file_name in random_files:
    file_path = os.path.join(feature_dir, f"{file_name}.npy")
    actual_command = annotations[annotations['filename'] == file_name]['command'].values[0]
    predicted_command_text, confidence = predict_command(file_path)

    logging.info(f"File: {file_name}")
    logging.info(f"Actual Command: {actual_command}")
    logging.info(f"Predicted Command: {predicted_command_text}")
    logging.info(f"Confidence: {confidence:.2f}")

    if actual_command == predicted_command_text:
        correct_predictions += 1
    total_predictions += 1

accuracy = correct_predictions / total_predictions
logging.info(f"Overall Accuracy: {accuracy:.2%}")
