In [5]:
import torch
import torch.nn as nn
from torch.nn import init
import logging
from torchvision import models
import numpy as np
import torch.nn.functional as F

def double_conv1(in_c, out_c):
    conv = nn.Sequential(
        nn.Conv2d(in_c, out_c, kernel_size=(10,4),padding=1 ,stride=1, bias=True),
        nn.BatchNorm2d(out_c),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_c, out_c, kernel_size=(7,2),stride=1, bias=True),
        nn.BatchNorm2d(out_c),
        nn.ReLU(inplace=True))
    return conv

def double_conv2(in_c, out_c):
    conv = nn.Sequential(
        nn.Conv2d(in_c, out_c, kernel_size=(9,3),padding=1 ,stride=1, bias=True),
        nn.BatchNorm2d(out_c),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_c, out_c, kernel_size=(7,3), padding=1 ,stride=1, bias=True),
        nn.BatchNorm2d(out_c),
        nn.ReLU(inplace=True))
    return conv


class cnn(nn.Module):
    def __init__(self,img_ch=3,output_ch=1):
        super(cnn, self).__init__()
        self.max_pool_2x2 = nn.MaxPool2d(kernel_size=(2,1), stride=2)
        self.down_conv_1 = double_conv1(img_ch, 64)
        self.down_conv_2 = double_conv2(64, 128)
        self.down_conv_3 = double_conv2(128, 256)
        self.out = nn.Conv2d(
            in_channels=256,
            out_channels=output_ch,
            kernel_size=1,stride=1,padding=0)
        self.fc1 = nn.Linear(125, 150)
        self.fc2 = nn.Linear(150, 170)
        self.fc3 = nn.Linear(170, 181)
    def forward(self, image):
        # cnn
        x1 = self.down_conv_1(image)
        x2 = self.max_pool_2x2(x1)
        x3 = self.down_conv_2(x2)
        x4 = self.max_pool_2x2(x3)
        x5 = self.down_conv_3(x4)
        x6 = self.fc1(x5)
        x7 = self.fc2(x6)
        x8 = self.fc3(x7)
        out = self.out(x8)
        return out





if __name__ == "__main__":
    image = torch.rand(1, 3, 80, 500)
    en = cnn()
    en(image)




In [None]:
import scipy.io as sio
import numpy as np
import torch
from torch import nn, optim
import torchvision
from torch.utils.data import Dataset, DataLoader
import numpy as np 
import math
import pandas as pd
import cmath

from collections import OrderedDict
from torch.utils.data.sampler import SubsetRandomSampler
from torch.autograd import Variable

from functools import partial
from threading import Thread
from tornado import gen

torch.cuda.empty_cache()

In [None]:
%%capture
!pip install wandb -qqq
import wandb
!wandb login

In [None]:
wandb.init(project="SNR_NS_10_50000_colab_CNN")

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
torch.cuda.empty_cache()
df1 = sio.loadmat("./gdrive/MyDrive/DOA/DOA1/SNR_NS_10_50000.mat")

In [None]:
max_r =0.3408866150170333
max_i =0.29720502659995574
max_p =0.06803637694674271
min_r =-0.33121108049659753
min_i =-0.298407779910961
min_p =0.08578641590154056

class DOA_dataset(Dataset):
    def __init__(self, df):
        transp = np.transpose(df['NS_data'], (2, 0, 1))
        new = np.zeros((30000, 3, 80, 500))
        for i in range(0, transp.shape[0]):
            for j in range(0, transp.shape[1]):
                for k in range(0, transp.shape[2]):
                    new[i][0][j][k] = (transp[i][j][k].real - min_r)/(max_r-min_r)
                    new[i][1][j][k] = (transp[i][j][k].imag - min_i)/(max_i-min_i)
                    new[i][2][j][k] = (cmath.phase(transp[i][j][k]) - min_p)/(max_p-min_p)

        self.x = torch.from_numpy(new)
        self.y = torch.from_numpy(np.asarray(df['DOA']))
        self.n_sample = len(self.y)
    def __getitem__(self, index):
        return self.x[index], self.y[index]
    def __len__(self):
        return self.n_sample


dataset = DOA_dataset(df1)
validation_split = .1
shuffle_dataset = True
random_seed= 42

# Creating data indices for training and validation splits:
dataset_size = len(dataset)
print(dataset_size)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
if shuffle_dataset :
    np.random.seed(random_seed)
    np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)


#dataloader = DataLoader(dataset=dff, batch_size=100, shuffle=True,  num_workers=2)

train_loader = torch.utils.data.DataLoader(dataset, batch_size=128, 
                                           sampler=train_sampler)
validation_loader = torch.utils.data.DataLoader(dataset, batch_size=256,
                                                sampler=valid_sampler)

In [None]:
autoencoder = cnn()
criterion = nn.CrossEntropyLoss()

if torch.cuda.is_available():
	print(torch.cuda.get_device_name(0))
	autoencoder = autoencoder.cuda()
	optimizer = optim.AdamW(autoencoder.parameters(), lr=0.00001, weight_decay=1e-5)
	criterion = criterion.cuda()

#==========================================================================
def train():
	for i in range(200):

		training_loss = 0
		autoencoder.train()
		tcorrect = 0
		ttotal = 0
		for features, labels in train_loader:
			features, labels = Variable(features.cuda()), Variable(labels.cuda())
			optimizer.zero_grad()
			enn = autoencoder(features.float())
			auto_outputs = torch.transpose(enn, 2, 3)
			auto_outputs = torch.reshape(auto_outputs.cuda(), (auto_outputs.shape[0], 181, 1))
			losss = criterion(auto_outputs.cuda(), labels.type(torch.LongTensor).cuda())
			losss.backward()
			optimizer.step()
			training_loss += losss.item()
   
      _, pred = torch.max(auto_outputs, 1)
			ttotal+= labels.reshape(-1).size(0)
			tcorrect+=(pred.reshape(-1).cuda() == labels.reshape(-1)).sum().item()

		validation_loss = 0
		correct = 0
		total = 0
		autoencoder.eval()
		with torch.no_grad():
			for features, label in validation_loader:
				features, label = Variable(features.cuda()), Variable(label.cuda())
				enn = autoencoder(features.float())
				auto_outputs = torch.transpose(enn, 2, 3)
				auto_outputs = torch.reshape(auto_outputs, (auto_outputs.shape[0], 181, 1))
				loss = criterion(auto_outputs.cuda(), label.type(torch.LongTensor).cuda())
				validation_loss += loss.item()
        _, pred = torch.max(auto_outputs, 1)
				total+= label.reshape(-1).size(0)
				correct+=(pred.reshape(-1).cuda() == label.reshape(-1)).sum().item()
    
		wandb.log({"Trainig Acc":(100*(tcorrect/ttotal)), "Traning loss":(training_loss/len(train_loader)), "Validation Acc":(100*(correct/total)), "Validation loss":( validation_loss/len(validation_loader))})
		print("Epoch {} - Traningloss: {}".format(i+1, training_loss/len(train_loader)))
		print("Validationloss: {}".format( validation_loss/len(validation_loader)))
		print("Trianing Acc: {}".format( 100*(tcorrect/ttotal)))
		print("Validaion Acc: {}".format( 100*(correct/total)))
	wandb.finish()
train()
print("Training Complete")









