In [8]:
import os
import pandas as pd
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import math

In [10]:
class AudioDataset(Dataset):
    def __init__(self, csv_path, audio_dir, transform=None):
        self.data = pd.read_csv(csv_path)
        self.audio_dir = audio_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        audio_path = os.path.join(self.audio_dir, row['name'])
        waveform, sample_rate = torchaudio.load(audio_path)

        if self.transform:
            waveform = self.transform(waveform)

        # Labels
        category = row['cate']
        distance = row['dist']
        direction = row['dire']

        # Convert labels to indices
        cate_idx = {'AR': 0, 'Sniper': 1, 'nogun': 2}[category]
        dist_idx = {'none': 0, '0m': 1, '50m': 2, '100m': 3, '200m': 4, '400m': 5, '600m': 6}[distance]
        dire_idx = {'none': 0, 'center': 1, 'back': 2, 'front': 3, 'left': 4, 'right': 5}[direction]

        return waveform, (cate_idx, dist_idx, dire_idx)


In [11]:
class FeatureExtractor:
    def __init__(self, sample_rate):
        self.transforms = torchaudio.transforms.Spectrogram(n_fft=512, hop_length=256)

    def __call__(self, waveform):
        return self.transforms(waveform)


In [29]:
class CNNTransformer(nn.Module):
    def __init__(self, input_channels=1, hidden_dim=128, n_heads=4, num_layers=2, num_classes=(3, 7, 6)):
        super(CNNTransformer, self).__init__()
        
        # CNN Feature Extractor
        self.cnn = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        
        self.fc_proj = nn.Linear(2752, hidden_dim)

        # Transformer Encoder
        self.positional_encoding = PositionalEncoding(hidden_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=n_heads)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Fully Connected Layers for each task
        self.fc_cate = nn.Linear(hidden_dim, num_classes[0])
        self.fc_dist = nn.Linear(hidden_dim, num_classes[1])
        self.fc_dire = nn.Linear(hidden_dim, num_classes[2])

    def forward(self, x):
        # CNN
        x = self.cnn(x)
        
        # Reshape CNN output for Transformer
        x = x.permute(0, 2, 3, 1).contiguous()  # B, H, W, C
        x = x.view(x.size(0), x.size(1), -1)  # B, H, (W*C)
        x = self.fc_proj(x)  # Project to hidden_dim
        x = x.permute(1, 0, 2)  # seq_len, batch_size, hidden_dim
        
        # Transformer
        x = self.positional_encoding(x)
        x = self.transformer(x)
        x = x.mean(dim=0)  # Aggregate over sequence length
        
        # Task-specific outputs
        cate_out = self.fc_cate(x)
        dist_out = self.fc_dist(x)
        dire_out = self.fc_dire(x)
        
        return cate_out, dist_out, dire_out



In [30]:
class PositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len=500):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, hidden_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

In [31]:
def train_model(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        cate_labels, dist_labels, dire_labels = [label.to(device) for label in labels]

        optimizer.zero_grad()

        # Forward pass
        cate_out, dist_out, dire_out = model(inputs)

        # Compute loss
        loss_cate = criterion(cate_out, cate_labels)
        loss_dist = criterion(dist_out, dist_labels)
        loss_dire = criterion(dire_out, dire_labels)
        loss = loss_cate + loss_dist + loss_dire

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(dataloader)

def evaluate_model(model, dataloader, device):
    model.eval()
    correct_cate, correct_dist, correct_dire = 0, 0, 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            cate_labels, dist_labels, dire_labels = [label.to(device) for label in labels]

            cate_out, dist_out, dire_out = model(inputs)

            _, cate_pred = torch.max(cate_out, 1)
            _, dist_pred = torch.max(dist_out, 1)
            _, dire_pred = torch.max(dire_out, 1)

            correct_cate += (cate_pred == cate_labels).sum().item()
            correct_dist += (dist_pred == dist_labels).sum().item()
            correct_dire += (dire_pred == dire_labels).sum().item()

            total += cate_labels.size(0)

    cate_acc = correct_cate / total
    dist_acc = correct_dist / total
    dire_acc = correct_dire / total

    return cate_acc, dist_acc, dire_acc

In [None]:
def main():
    # Paths
    csv_path = 'dataset1.csv'
    audio_dir = 'gun_sound_v8'

    # Hyperparameters
    batch_size = 16
    learning_rate = 0.001
    num_epochs = 20
    sample_rate = 44100

    # Device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Dataset and DataLoader
    feature_extractor = FeatureExtractor(sample_rate)
    dataset = AudioDataset(csv_path, audio_dir, transform=feature_extractor)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Model, Loss, Optimizer
    model = CNNTransformer().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=learning_rate)

    # Training Loop
    for epoch in range(num_epochs):
        train_loss = train_model(model, dataloader, criterion, optimizer, device)
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}")

    # Evaluate Model
    cate_acc, dist_acc, dire_acc = evaluate_model(model, dataloader, device)
    print(f"Test Accuracy - Category: {cate_acc:.4f}, Distance: {dist_acc:.4f}, Direction: {dire_acc:.4f}")

    # Save Model
    torch.save(model.state_dict(), 'cnn_transformer_model.pth')

if __name__ == '__main__':
    main()


In [2]:
import math
import os
import pandas as pd
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR

# Dataset Class with Mel-Spectrogram
class AudioDataset(Dataset):
    def __init__(self, csv_path, audio_dir, transform=None):
        self.data = pd.read_csv(csv_path)
        self.audio_dir = audio_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        audio_path = os.path.join(self.audio_dir, row['name'])
        waveform, sample_rate = torchaudio.load(audio_path)

        if self.transform:
            waveform = self.transform(waveform)

        # Labels
        category = row['cate']
        direction = row['dire']

        # Convert labels to indices
        cate_idx = {'AR': 0, 'Sniper': 1, 'nogun': 2}[category]
        dire_idx = {'none': 0, 'center': 1, 'back': 2, 'front': 3, 'left': 4, 'right': 5}[direction]

        # Ensure direction is none if category is Noise
        if cate_idx == 2:  # Noise
            dire_idx = 0

        return waveform, dire_idx

# Feature Extraction with Mel-Spectrogram
class FeatureExtractor:
    def __init__(self, sample_rate, n_mels=128):
        self.transforms = nn.Sequential(
            torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate, n_fft=1024, hop_length=512, n_mels=n_mels),
            torchaudio.transforms.AmplitudeToDB()
        )

    def __call__(self, waveform):
        return self.transforms(waveform)

# Updated CNN-Transformer Model
class CNNTransformer(nn.Module):
    def __init__(self, input_channels=1, hidden_dim=128, n_heads=4, num_layers=2, num_classes=6, n_mels=128):
        super(CNNTransformer, self).__init__()
        
        # CNN Feature Extractor
        self.cnn = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        
        # Projection Layer to match Transformer input
        cnn_output_dim = self._get_cnn_output_dim(n_mels)
        self.fc_proj = nn.Linear(cnn_output_dim, hidden_dim)

            
        # Transformer Encoder
        self.positional_encoding = PositionalEncoding(hidden_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=n_heads)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Fully Connected Layer for direction prediction
        self.fc_dire = nn.Linear(hidden_dim, num_classes)
    
    def _get_cnn_output_dim(self, n_mels, n_frames=512):
        with torch.no_grad():
            dummy_input = torch.zeros(1, 1, n_mels, n_frames)  # Simulate input (batch=1, channels=1, n_mels, time_steps)
            output = self.cnn(dummy_input)
            _, channels, height, width = output.size()
            return channels * height * width  # Compute flattened size

        

    def forward(self, x):
        # CNN
        x = self.cnn(x)
        print(f"Shape after CNN: {x.shape}")
        
        # Reshape CNN output for Transformer
        batch_size = x.size(0)
        x = x.permute(0, 2, 3, 1).contiguous()  # B, H, W, C
        x = x.view(batch_size, x.size(1), -1)  # B, H, (W*C)
        x = self.fc_proj(x)  # Project to hidden_dim
        x = x.permute(1, 0, 2)  # seq_len, batch_size, hidden_dim
        
        # Transformer
        x = self.positional_encoding(x)
        x = self.transformer(x)
        x = x.mean(dim=0)  # Aggregate over sequence length
        
        # Direction output
        dire_out = self.fc_dire(x)
        
        return dire_out

class PositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len=500):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, hidden_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

# Main Script
def main():
    # Paths
    csv_path = 'dataset1.csv'
    audio_dir = 'gun_sound_v8'

    # Hyperparameters
    batch_size = 16
    learning_rate = 0.001
    num_epochs = 20
    sample_rate = 44100
    n_mels = 128

    # Device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Dataset and DataLoader
    feature_extractor = FeatureExtractor(sample_rate, n_mels=n_mels)
    dataset = AudioDataset(csv_path, audio_dir, transform=feature_extractor)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=True)

    # Model, Loss, Optimizer
    model = CNNTransformer(input_channels=1, hidden_dim=128, n_heads=4, num_layers=2).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=learning_rate)
    scheduler = StepLR(optimizer, step_size=5, gamma=0.5)

    # Training Loop
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        scheduler.step()
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss / len(dataloader):.4f}")

    # Evaluate Model
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        print(f"Test Accuracy - Direction: {correct / total:.4f}")

if __name__ == '__main__':
    main()




Shape after CNN: torch.Size([16, 64, 32, 21])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (512x1344 and 262144x128)