In [1]:
%load_ext autoreload
%autoreload 2

import sys
import time
import glob

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.autograd import Variable
from torch.utils import data
from torch.optim import Adam

from torchvision import transforms
from torchvision import datasets

import numpy as np

from matplotlib import pyplot as plt
from tqdm   import tqdm_notebook as tqdm
from pandas import read_fwf, DataFrame

from PIL import Image

In [2]:
from radioreader import *
from methods import *
from kittler import kittler_float

In [3]:
#Simple Variational Auto Encoder
class VAE(nn.Module):
    def __init__(self, lt_dim=10):
        super(VAE, self).__init__()
        self.k = [1, 16, 32, 64, 128, 256]
        encoder_layers = []
        decoder_layers = []
        
        for i in range(len(self.k) - 1):
            layer = nn.Conv2d(self.k[i], self.k[i+1], 3, 2, 1, 1)
            encoder_layers.append(layer)
            encoder_layers.append(nn.ReLU())
        
        for i in range(len(self.k) - 1, 0, -1):
            layer = nn.ConvTranspose2d(self.k[i], self.k[i-1], 3, 2, 1, 1)
            decoder_layers.append(layer)
            decoder_layers.append(nn.ReLU())
        self.encoder = nn.Sequential(*encoder_layers)
        self.decoder = nn.Sequential(*decoder_layers[:-1])
        
        self.fc_mu = nn.Linear(self.k[-1]*2*2, lt_dim)
        self.fc_ep = nn.Linear(self.k[-1]*2*2, lt_dim)
        
        self.fc_dc = nn.Linear(lt_dim, self.k[-1]*2*2)
    def encode(self, x):
        encoded = self.encoder(x).view(-1, self.k[-1]*2*2)
        return self.fc_mu(encoded), self.fc_ep(encoded)
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    def decode(self, x):
        x = F.relu(self.fc_dc(x)).view(-1, self.k[-1], 2, 2)
        return torch.sigmoid(self.decoder(x))
    
    def forward(self, x):
        mu, var = self.encode(x)
        z = self.reparameterize(mu, var)
        d = self.decode(z)
        return d, mu, var

In [4]:
vae = torch.load('b_vae_model', map_location='cpu')
vae.eval()

VAE(
  (encoder): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (7): ReLU()
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (9): ReLU()
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(256, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (1): ReLU()
    (2): ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (3): ReLU()
    (4): ConvTranspose2d(64, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (5): ReLU()
    (6): ConvTranspose2d(32, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (

In [5]:
class LRG(data.Dataset):
    def __init__(self, images, targets, transform=None):
        self.targets = targets
        self.data = images
        self.data_len = len(self.data)
        if(transform == None):
            self.transform = transforms.Compose([
                transforms.RandomHorizontalFlip(),
                transforms.RandomVerticalFlip(),
                transforms.RandomRotation(180),
                transforms.CenterCrop(80),
                transforms.Resize(64),
                transforms.ToTensor()])
        else : self.transform = transform

            
    def __getitem__(self, index):
        index = index % self.data_len
        np_arr = self.data[index, :]
        ## reshape np_arr to 28x28
        np_arr = np_arr.reshape(128, 128)

        ## convert to PIL-image
        img = Image.fromarray((np_arr*255).astype('uint8'))

        #apply the transformations and return tensors
        return self.transform(img), self.targets[index]
    def __len__(self):
        return self.data_len * 10
    def __repr__(self) -> str:
        return 'unLRG dataset'

In [6]:
lrg = read_fwf('catalog/mrt-table3.txt', skiprows=41, header=None)
labeled = DataFrame({'Name':lrg[0], 'Label':lrg[7]})

#load the images
names = labeled['Name'].tolist()
labels = labeled['Label'].tolist()
images = []
directory = 'lrg'
ext = 'fits'

for i in tqdm(range(len(names))):
    f_name = '{0}/{1}.{2}'.format(directory, 
                                  names[i].replace('.','_'), 
                                  ext)
    im = readImg(f_name, normalize=True, sz=128)
    im = kittler_float(im, copy=False)
    images.append(im.T)

images = np.array(images)
labels = np.array([ 1 if (l == '1' or l == '1F') else int(l) for l in labels])

HBox(children=(IntProgress(value=0, max=1442), HTML(value='')))




In [7]:
transform = transforms.Compose([
                transforms.CenterCrop(80),
                transforms.Resize(64),
                transforms.ToTensor()])
trans_img = []
for img in images:
    img = img.reshape(128, 128)
    ## convert to PIL-image
    img = Image.fromarray((img*255).astype('uint8'))
    imgs_tr = transform(img)
    trans_img.append(imgs_tr.numpy())
trans_img = np.array(trans_img)

In [8]:
with torch.no_grad():
    out, mu, sig = vae(torch.tensor(trans_img))
mu = torch.sigmoid(mu)

In [9]:
from sklearn.svm import LinearSVC

In [46]:
np.sum(labels == 0 ), np.sum(labels > 1)

(0, 1037)

In [74]:
clf = LinearSVC(random_state=0, tol=1e-5, penalty='l1', dual=False, class_weight='balanced')

In [75]:
clf.fit(mu.numpy(), labels > 1)
clf.coef_

array([[ 2.16289166,  0.37581329, -0.27648376,  0.19372355,  0.05984313,
         0.        ,  0.31915881,  0.        ,  0.        ,  0.06914613]])

In [76]:
l2 = labels[labels > 1]
mu2 = mu.numpy()[labels > 1]

In [77]:
preds = clf.predict(mu.numpy()[labels == 1])
print("Acc classifying Compact  {:.4f}".format(1 - np.sum(preds) / len(preds)))

Acc classifying Compact  0.9358


In [69]:
l3 = l2[l2 < 4]
mu3 = mu2[l2 < 4]

In [71]:
clf2 = LinearSVC(random_state=0, tol=1e-5, penalty='l1', dual=False, class_weight='balanced')
clf2.fit(mu3, l3 == 3)
clf2.coef_

array([[-1.72537577,  2.24699937,  0.76448716,  0.40652161,  1.88855397,
        -0.09498197, -0.04941776, -1.82635156,  0.10030751, -0.22051949]])

In [72]:
preds = clf2.predict(mu3[l3 == 2])
print("Acc classifying FRI  {:.4f}".format(1-np.sum(preds) / len(preds)))

Acc classifying FRI  0.8930


In [73]:
preds = clf2.predict(mu3[l3 == 3])
print("Acc classifying FRII  {:.4f}".format(np.sum(preds) / len(preds)))

Acc classifying FRII  0.8488


In [57]:
l4 = l2[l2 > 3]
mu4 = mu2[l2 > 3]

In [58]:
clf4 = LinearSVC(random_state=0, tol=1e-5, penalty='l1', dual=False, class_weight='balanced')
clf4.fit(mu4, l4 > 4)
clf4.coef_

array([[ 4.90579154,  1.24737321,  0.29483753, -0.22996249, -0.43764896,
         0.55259859, -0.23845465, -1.86844398,  0.        , -0.2323927 ]])

In [59]:
preds = clf4.predict(mu4[l4 == 4])
print("Acc classifying Bent  {:.4f}".format(1- np.sum(preds) / len(preds)))

Acc classifying Bent  0.7190


In [60]:
l5 = l4[l4 > 4]
mu5 = mu4[l4 > 4]

In [62]:
clf5 = LinearSVC(random_state=0, tol=1e-5, penalty='l1', dual=False, class_weight='balanced')
clf5.fit(mu5, l5 == 6)
clf5.coef_

array([[ 0.4530156 ,  0.4617024 , -1.28002934, -0.5542719 ,  0.        ,
         0.43710752, -1.05770063,  0.87460983, -0.31649865,  0.        ]])

In [63]:
preds = clf5.predict(mu5[l5 == 5])
print("Acc classifying XRG  {:.4f}".format(1- np.sum(preds) / len(preds)))

Acc classifying XRG  0.6829


In [64]:
preds = clf5.predict(mu5[l5 == 6])
print("Acc classifying RRG  {:.4f}".format(np.sum(preds) / len(preds)))

Acc classifying RRG  0.7500
