In [None]:
import numpy as np
import pandas as pd
import winsound
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# spectrogram_array = np.loadtxt('spectrograms_Atrain.csv', delimiter=',')
# spectrogram_array = spectrogram_array.reshape(32400, 256, 74) # reshaping the array to have time frames as 3 dimensions

In [None]:
train_array_2d = np.loadtxt('sg_train.csv', delimiter=',')
train_array = train_array_2d.reshape(len(train_array_2d), 128, 22)

In [None]:
# spectrograms_tensor_Atrain = torch.tensor(spectrogram_array, dtype=torch.float32)
# spectrograms_tensor_Atrain = spectrograms_tensor_Atrain.unsqueeze(1)

In [None]:
# spectrograms_tensor_Atrain.shape

In [None]:
training_tensor = torch.tensor(train_array, dtype=torch.float32)
training_tensor = training_tensor.unsqueeze(1)

In [None]:
output_data = pd.read_csv('train.csv', header = None)

In [None]:
output_data

In [None]:
maneuvering_direction = output_data[[4, 5, 6, 7, 8, 9]].values[1:]
maneuvering_direction = maneuvering_direction.astype('int32')
fault = output_data[[10, 11, 12, 13, 14, 15, 16, 17,18]].values[1:]
fault = fault.astype('int32')
model_type = output_data[[1, 2, 3]].values[1:]
model_type = model_type.astype('int32')

In [None]:
y1 = torch.tensor(model_type, dtype=torch.float16).cuda()
y2 = torch.tensor(maneuvering_direction, dtype=torch.float16).cuda()
y3 = torch.tensor(fault, dtype=torch.float16).cuda()
Xtr = torch.tensor(training_tensor, dtype=torch.float32).cuda()

In [None]:
# Xtrain = torch.tensor(spectrograms_tensor_Atrain, dtype=torch.float32).cuda()

In [None]:
dataset = TensorDataset(Xtr, y1, y2, y3)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(planes)
        self.softplus1 = nn.Softplus(beta=1, threshold=20)  
        self.softplus2 = nn.Softplus(beta=1, threshold=20)
        self.relu = nn.ReLU(inplace=True)
        

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = self.softplus1(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        shortcut = self.shortcut(x)
        out += shortcut
        out = self.softplus2(out)  
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.rl = nn.ReLU(inplace=True)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.rl(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        return out

class mtl(nn.Module):
    def __init__(self, num_classes1=3, num_classes2=6, num_classes3=9):
        super(mtl, self).__init__()
        self.resnet18 = ResNet(BasicBlock, [3, 4, 6, 3])
        self.fc1 = nn.Linear(512, num_classes1)
        self.fc2 = nn.Linear(512, num_classes2)
        self.fc3 = nn.Linear(512, num_classes3)
        self.sm1 = nn.Softmax(dim=1)#dim=1)
        self.sm2 = nn.Softmax(dim=1)#dim=1)
        self.sm3 = nn.Softmax(dim=1)#dim=1)

    def forward(self, x):
        out = self.resnet18(x)
        out1 = self.sm1(self.fc1(out))#, dim=1)
        out2 = self.sm2(self.fc2(out))#, dim=1)
        out3 = self.sm3(self.fc3(out))#, dim=1)
        return out1, out2, out3

model = mtl(num_classes1=3, num_classes2=6, num_classes3=9).cuda()

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.1)

In [None]:
num_epochs = 30
for epoch in tqdm(range(num_epochs)):
    for batch_idx, (data, target1, target2, target3) in enumerate(dataloader):
        data, target1, target2, target3 = data.cuda(), target1.cuda(), target2.cuda(), target3.cuda()
        outputs1, outputs2, outputs3 = model(data)

        loss1 = criterion(outputs1, target1)
        loss2 = criterion(outputs2, target2)
        loss3 = criterion(outputs3, target3)
        total_loss = loss1 + loss2 + loss3
    
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        # scheduler.step()

        if (batch_idx+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(dataloader)}], Total Loss: {total_loss.item():.4f}')
winsound.Beep(440, 500)

In [None]:
model(Xtr[:100])

In [None]:
model.eval()
torch.cuda.empty_cache()

In [None]:
sg_array_test_2d = np.loadtxt('sg_test.csv', delimiter=',')
sg_array_test = sg_array_test_2d.reshape(len(sg_array_test_2d), 128, 22)
X_test = torch.tensor(sg_array_test, dtype=torch.float32)
X_test = X_test.unsqueeze(1)
X_test = X_test.cuda()

In [None]:
dataloader1 = DataLoader(X_test, batch_size=64, shuffle=False)

In [None]:
model_type_mapping = {0: 'A', 1: 'B', 2: 'C'}
maneuvering_direction_mapping = {0: 'B', 1: 'C', 2: 'CC', 3: 'F', 4: 'L', 5: 'R'}
fault_mapping = {0: 'MF1', 1: 'MF2', 2: 'MF3', 3: 'MF4', 4: 'N', 5: 'PC1', 6: 'PC2', 7: 'PC3', 8: 'PC4'}

model_type_pred = []
maneuvering_direction_pred = []
fault_pred = []

with torch.no_grad(), tqdm(total=len(dataloader1)) as progress_bar:
    for batch_idx, (data) in enumerate(dataloader1):
        output1, output2, output3 = model(data)
        
        m1_indices = output1.argmax(dim=1).cpu().numpy()
        m2_indices = output2.argmax(dim=1).cpu().numpy()
        m3_indices = output3.argmax(dim=1).cpu().numpy()

        model_type_pred.extend([model_type_mapping.get(idx) for idx in m1_indices])
        maneuvering_direction_pred.extend([maneuvering_direction_mapping.get(idx) for idx in m2_indices])
        fault_pred.extend([fault_mapping.get(idx) for idx in m3_indices])
        
        progress_bar.update(1)

model_type_pred = np.array(model_type_pred)
maneuvering_direction_pred = np.array(maneuvering_direction_pred)
fault_pred = np.array(fault_pred)

In [None]:
data = np.array([model_type_pred, maneuvering_direction_pred, fault_pred])
df = pd.DataFrame(data.T, columns=['model_type', 'maneuvering_direction', 'fault']) 
df.to_csv('predwithoutid.csv', index = False)

In [None]:
predwithoutid = pd.read_csv('predwithoutid.csv')
predwithoutid.isnull().sum()

In [None]:
ID = []
# Extract labels from file paths and store them in a list
for file in os.listdir('./Dataset-ArIES/drone_A/A/test/mic1'):
    ID.append(file +'_mic1')
for file in os.listdir('./Dataset-ArIES/drone_B/B/test/mic1'):
    ID.append(file + '_mic1')
for file in os.listdir('./Dataset-ArIES/drone_C/C/test/mic1'):
    ID.append(file + '_mic1')
for file in os.listdir('./Dataset-ArIES/drone_A/A/test/mic2'):
    ID.append(file + '_mic2')
for file in os.listdir('./Dataset-ArIES/drone_B/B/test/mic2'):
    ID.append(file + '_mic2')
for file in os.listdir('./Dataset-ArIES/drone_C/C/test/mic2'):
    ID.append(file + '_mic2')

In [None]:
predwithoutid.insert(0, 'ID', ID)
predwithoutid.to_csv('pred1.csv', index = False)