Step 1: Extract Audio Files from Video Clips in another folder: Audio

In [None]:
from moviepy.editor import VideoFileClip
import os

def extract_audio_from_folder(input_folder, output_folder):
    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # Process each video file in the input folder
    for video_file in os.listdir(input_folder):
        if video_file.endswith('.mp4'):
            input_video_path = os.path.join(input_folder, video_file)
            output_audio_path = os.path.join(output_folder, f'{os.path.splitext(video_file)[0]}.wav')

            extract_audio(input_video_path, output_audio_path)

def extract_audio(input_video_path, output_audio_path):
    video_clip = VideoFileClip(input_video_path)
    audio_clip = video_clip.audio

    audio_clip.write_audiofile(output_audio_path, codec='pcm_s16le', fps=audio_clip.fps)

    video_clip.close()

# Example usage
input_video_folder = 'Single_Actor_01'
output_audio_folder = 'Single_Audio_01'

extract_audio_from_folder(input_video_folder, output_audio_folder)


Step 2: Generate Normalized Spectograms from Audio Files present in folder: Audio

In [None]:
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import os

def generate_spectrogram(audio_file, fft_size=256, hop_size=10, window_size=32, num_parts=6):
    # Load audio file
    y, sr = librosa.load(audio_file, sr=None, duration=4)

    # Calculate the required padding for the spectrogram
    n_fft = fft_size
    hop_length = int(sr * hop_size / 1000)  # Convert hop_size from ms to samples
    win_length = int(sr * window_size / 1000)  # Convert window_size from ms to samples

    # Adjust the n_fft to be at least the length of the signal
    n_fft = max(n_fft, len(y))

    # Compute spectrogram
    spectrogram = librosa.feature.melspectrogram(
        y=y,
        sr=sr,
        n_fft=n_fft,
        hop_length=hop_length,
        win_length=win_length,
        window="hamming",
        n_mels=256  # Number of frequency components
    )

    # Convert to decibels
    spectrogram_db = librosa.power_to_db(spectrogram, ref=np.max)

    # Split spectrogram into N shorter parts
    part_size = spectrogram_db.shape[1] // num_parts
    spectrogram_parts = [spectrogram_db[:, i * part_size:(i + 1) * part_size] for i in range(num_parts)]

    return spectrogram_parts

def normalize_sequences(sequences):
    # Flatten the sequences to compute mean and variance
    flat_sequences = np.concatenate(sequences, axis=1)

    # Compute mean and variance
    mean = np.mean(flat_sequences, axis=1, keepdims=True)
    std = np.std(flat_sequences, axis=1, keepdims=True)

    # Normalize sequences
    normalized_sequences = [(seq - mean) / std for seq in sequences]

    return normalized_sequences
    
def save_normalized_spectrogram_images(audio_file_path, audio_file,normalized_parts, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    y, sr = librosa.load(audio_file_path, sr=None, duration=4)
    hop_length = int(sr * 10 / 1000)
    
    for i, part in enumerate(normalized_parts):
        # Plot the normalized spectrogram without labels
        plt.figure(figsize=(6, 4))
        librosa.display.specshow(part, sr=sr, hop_length=hop_length, x_axis=None, y_axis=None)
        plt.axis('off')

        # Save the image
        image_path = os.path.join(output_folder, f'{os.path.splitext(audio_file)[0]}-0{i+1}.png')
        plt.savefig(image_path, bbox_inches='tight', pad_inches=0)
        plt.close()

def save_normalized_spectrogram_images_from_folder(input_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    # Process each audio file in the input folder
    for audio_file in os.listdir(input_folder):
        if audio_file.endswith('.wav'):
            audio_file_path = os.path.join(input_folder, audio_file)
            spectrogram_parts = generate_spectrogram(audio_file_path)
            normalized_parts = normalize_sequences(spectrogram_parts)
            save_normalized_spectrogram_images(audio_file_path, audio_file, normalized_parts, output_folder)
            

# Example usage
input_audio_folder = 'Single_Audio_01'
output_spectrogram_folder = 'Single_Spectogram_01'

save_normalized_spectrogram_images_from_folder(input_audio_folder, output_spectrogram_folder)

Step 3: Get the output from resnet-18 and also apply spatial pooling afterwords

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import torchvision.models as models
import torch.nn as nn
import os
import warnings

# # Suppress all warnings
# warnings.filterwarnings("ignore")
# resnet18 = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
# warnings.resetwarnings()
# # Remove global average pooling layer
# resnet18 = nn.Sequential(*list(resnet18.children())[:-1])


# Load the pretrained ResNet-18 model
resnet18 = models.resnet18(pretrained=True)
print(resnet18)
# Get the features part of the ResNet-18 model
features = list(resnet18.children())[:-2]  # Remove the last two layers (average pooling and fully connected)

# Create a new model without the global average pooling and fully connected layers
resnet18_without_top = nn.Sequential(*features)
print(resnet18_without_top)
# Optionally, set the new model to evaluation mode
resnet18_without_top.eval()
# Add spatial average pooling layer
# resnet18.add_module('avgpool', nn.AdaptiveAvgPool2d(1))

# resnet18.eval()


# Define a transformation to preprocess the input image
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x[:3, :, :]),  # Remove alpha channel if present
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Folder containing images
image_folder = 'Single_Spectogram_01'

# List to store individual outputs
All_audio_outputs = []

# Process each image in the folder
for filename in os.listdir(image_folder):
    if filename.endswith('.png'):
        image_path = os.path.join(image_folder, filename)
        
        # Load and preprocess the image
        image = Image.open(image_path)
        input_tensor = transform(image)
        input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

        # Perform inference
        with torch.no_grad():
            audio_output = resnet18(input_batch)
        
        # Append the output to the list
        All_audio_outputs.append(audio_output)

All_audio_outputs = torch.stack(All_audio_outputs)
print(All_audio_outputs.shape)

In [None]:
from functools import partial
from typing import Any, Callable, List, Optional, Type, Union

import torch
import torch.nn as nn
from torch import Tensor

# from ..transforms._presets import ImageClassification
# from ..utils import _log_api_usage_once
# from ._api import register_model, Weights, WeightsEnum
# from ._meta import _IMAGENET_CATEGORIES
# from ._utils import _ovewrite_named_param, handle_legacy_interface

def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )


def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class BasicBlock(nn.Module):
    expansion: int = 1

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError("BasicBlock only supports groups=1 and base_width=64")
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class SpatialAveragePooling(nn.Module):
    def __init__(self):
        super(SpatialAveragePooling, self).__init__()

    def forward(self, x):
        #dimension of x --> N x m x s
        pooled = torch.mean(x, dim=(2,3))
        return pooled
    
class ResNet(nn.Module):
    def __init__(
        self,
        block: Type[BasicBlock],
        layers: List[int],
        num_classes: int = 1000,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        replace_stride_with_dilation: Optional[List[bool]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        #_log_api_usage_once(self)
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError(
                "replace_stride_with_dilation should be None "
                f"or a 3-element tuple, got {replace_stride_with_dilation}"
            )
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, BasicBlock) and m.bn2.weight is not None:
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

    def _make_layer(
        self,
        block: Type[Union[BasicBlock]],
        planes: int,
        blocks: int,
        stride: int = 1,
        dilate: bool = False,
    ) -> nn.Sequential:
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(
            block(
                self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
            )
        )
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(
                block(
                    self.inplanes,
                    planes,
                    groups=self.groups,
                    base_width=self.base_width,
                    dilation=self.dilation,
                    norm_layer=norm_layer,
                )
            )

        return nn.Sequential(*layers)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
       
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
              

        x = self.layer1(x)
        
        x = self.layer2(x)
        
        x = self.layer3(x)
        
        x = self.layer4(x)
       

        # x = self.avgpool(x)
        # x = torch.flatten(x, 1)
        # x = self.fc(x)

        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)


def _resnet(
    block: Type[BasicBlock],
    layers: List[int],
    **kwargs: Any,
) -> ResNet:
    # if weights is not None:
    #     _ovewrite_named_param(kwargs, "num_classes", len(weights.meta["categories"]))

    model = ResNet(block, layers, **kwargs)

    # if weights is not None:
    #     model.load_state_dict(weights.get_state_dict(progress=progress, check_hash=True))

    return model






In [34]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import torchvision.models as models
import torch.nn as nn
import os
import math
import warnings

model_path = 'resnet18-f37072fd.pth'
checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
Resnet18 = _resnet(BasicBlock, [2, 2, 2, 2])

Resnet18.load_state_dict(checkpoint)

pooling_layer = SpatialAveragePooling()
# Define a transformation to preprocess the input image
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x[:3, :, :]),  # Remove alpha channel if present
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Folder containing images
image_folder = 'Single_Spectogram_01'

# List to store individual outputs
All_audio_outputs = []

# Process each image in the folder
for filename in os.listdir(image_folder):
    if filename.endswith('.png'):
        image_path = os.path.join(image_folder, filename)
        
        # Load and preprocess the image
        image = Image.open(image_path)
        input_tensor = transform(image)
        input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

        # Perform inference
        with torch.no_grad():
            # print(input_batch.shape)
            audio_output = Resnet18(input_batch)
            audio_output = pooling_layer(audio_output)
            
        
        # Append the output to the list
        All_audio_outputs.append(audio_output)

All_audio_outputs = torch.stack(All_audio_outputs)
All_audio_outputs = All_audio_outputs.squeeze(1)
# print(All_audio_outputs.shape)

After conv1
torch.Size([1, 64, 112, 112])
After Maxpool
torch.Size([1, 64, 56, 56])
After l1
torch.Size([1, 64, 56, 56])
After l2
torch.Size([1, 128, 28, 28])
After l3
torch.Size([1, 256, 14, 14])
After l4
torch.Size([1, 512, 7, 7])
After conv1
torch.Size([1, 64, 112, 112])
After Maxpool
torch.Size([1, 64, 56, 56])
After l1
torch.Size([1, 64, 56, 56])
After l2
torch.Size([1, 128, 28, 28])
After l3
torch.Size([1, 256, 14, 14])
After l4
torch.Size([1, 512, 7, 7])
After conv1
torch.Size([1, 64, 112, 112])
After Maxpool
torch.Size([1, 64, 56, 56])
After l1
torch.Size([1, 64, 56, 56])
After l2
torch.Size([1, 128, 28, 28])
After l3
torch.Size([1, 256, 14, 14])
After l4
torch.Size([1, 512, 7, 7])
After conv1
torch.Size([1, 64, 112, 112])
After Maxpool
torch.Size([1, 64, 56, 56])
After l1
torch.Size([1, 64, 56, 56])
After l2
torch.Size([1, 128, 28, 28])
After l3
torch.Size([1, 256, 14, 14])
After l4
torch.Size([1, 512, 7, 7])
After conv1
torch.Size([1, 64, 112, 112])
After Maxpool
torch.Size([

In [36]:
class Audio_TemporalMultiHeadAttention(nn.Module):
    def __init__(self, m: int, H: int) -> None:
        super().__init__()
        self.H = H
        self.m = m

        #making sure s is divisible by  H, otherwise problem!
        assert m%H == 0, "dimensions of model are divisble by number of heads"

        self.dim_head = m // H
        self.w_q = nn.Linear(m, m, bias = False)
        self.w_k = nn.Linear(m, m, bias = False)
        self.w_v = nn.Linear(m, m, bias = False)
        self.w_o = nn.Linear(m, m, bias = False)

    @staticmethod
    def attention(query, key, value):
        dim_head = query.shape[-1]
        # Just apply the formula from the paper
        # (batch, h, seq_len, d_k) --> (batch, h, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(dim_head)
        attention_scores = attention_scores.softmax(dim=-1) # (batch, h, seq_len, seq_len) # Apply softmax
        
        # (batch, h, seq_len, seq_len) --> (batch, h, seq_len, d_k)
        # return attention scores which can be used for visualization
        return (attention_scores @ value), attention_scores

    def forward(self, q):
        query = self.w_q(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        key = self.w_k(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        value = self.w_v(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)

        # (batch, seq_len, d_model) --> (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)
        query = query.view(query.shape[0], query.shape[1], self.H, self.dim_head).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.H, self.dim_head).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.H, self.dim_head).transpose(1, 2)

        # Calculate attention
        x, self.attention_scores = Audio_TemporalMultiHeadAttention.attention(query, key, value)
        
        # Combine all the heads together
        # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.H * self.dim_head)

        # Multiply by Wo
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)  
        return self.w_o(x)
        
        

Audio_selfAttention = Audio_TemporalMultiHeadAttention(512, 8)
All_audio_outputs = All_audio_outputs.unsqueeze(1)
output = Audio_selfAttention(All_audio_outputs)
print(output.shape)        

torch.Size([6, 1, 512])
