**Data loader**

In [None]:
import torch
import torch.utils.data as data
import scipy.io as sio
import numpy as np
import os
import glob


class dataloader_light(data.Dataset):

    def __init__(self, filename):

        self.mat_data = sio.loadmat(filename)
        self.data = self.mat_data['data']
        self.labels = self.mat_data['label']

    def __getitem__(self, index):

        intensity = self.data[index].astype('double')
        label = self.labels[index]

        pair = {'intensity': intensity, 'label': label}
        return pair

    def __len__(self):
        return self.mat_data['data'].shape[0]


class dataloader_tmp(data.Dataset):

    def __init__(self, filename):

        self.mat_data = sio.loadmat(filename)
        self.data = self.mat_data['tmp_set']
        self.labels = self.mat_data['labels']

    def __getitem__(self, index):

        temp = self.data[index].astype('double')
        label = self.labels[index][0]

        pair = {'temp': temp, 'label': label}
        return pair

    def __len__(self):
        return self.mat_data['tmp_set'].shape[0]

**Model**

In [None]:
import numpy as np
from numpy import asarray as ar
from numpy import sqrt
from numpy.random import rand, randn
import torch
import torch.nn as nn
from torch.autograd import Variable


class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()

    def forward(self, x):
        N = x.shape[0]
        return x.view(N, -1)


class Sandwich(nn.Module):
    def __init__(self, c_in, c_out, filter_size):
        super(Sandwich, self).__init__()

        self.net = nn.Sequential(
            nn.Conv1d(c_in, c_out, filter_size, stride=1, padding=(filter_size - 1) // 2),
            nn.BatchNorm1d(c_out),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

    def forward(self, x):
        return self.net(x)


class CNN_Light(nn.Module):
    def __init__(self, length, channel, num_layers, num_neu, pdrop):
        super(CNN_Light, self).__init__()

        self.len = length

        blocks = [Sandwich(1, channel, 7)]
        self.len = self.len // 2
        for _ in range(num_layers - 1):
            blocks.append(Sandwich(channel, channel, 3))
            self.len = self.len // 2

        blocks.append(Flatten())
        blocks.append(nn.Linear(self.len * channel, num_neu))
        blocks.append(nn.ReLU())
        blocks.append(nn.Dropout(p=pdrop))
        blocks.append(nn.Linear(num_neu, num_neu))
        blocks.append(nn.ReLU())
        blocks.append(nn.Dropout(p=pdrop))
        blocks.append(nn.Linear(num_neu, 1))

        self.net = nn.Sequential(*blocks)

    def forward(self, x):

        return self.net(x)


class CNN_Light_lite(nn.Module):
    def __init__(self, length, channel, num_layers, num_neu, pdrop):
        super(CNN_Light_lite, self).__init__()

        self.len = length

        blocks = [Sandwich(1, channel, 3)]
        self.len = self.len // 2
        for _ in range(num_layers - 1):
            blocks.append(Sandwich(channel, channel, 3))
            self.len = self.len // 2

        blocks.append(nn.Conv1d(channel, 1, 1, stride=1))

        self.net = nn.Sequential(*blocks)

    def forward(self, x):

        return torch.mean(self.net(x).squeeze(), dim=1, keepdim=True)


class CNN_Temp(nn.Module):
    def __init__(self, size, pdrop, num_neu):
        super(CNN_Temp, self).__init__()

        self.relu = nn.ReLU()
        self.drop = nn.Dropout(p=pdrop)

        self.fc1 = nn.Linear(size, num_neu)
        self.fc2 = nn.Linear(num_neu, num_neu)
        self.fc3 = nn.Linear(num_neu, num_neu)
        self.fc4 = nn.Linear(num_neu, 1)

    def forward(self, x):

        x = self.fc1(x)            # Din = 16*256, Dout = 1024
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc2(x)            # Din = 16*256, Dout = 1024
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc3(x)            # Din = 1024, Dout = 1024
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc4(x)
        return x


class CNN_hybrid(nn.Module):
    def __init__(self, size, isdrop=0):
        super(CNN_hybrid, self).__init__()
        # Cin = 1, Cout = 256, Kernel_size = 11
        self.relu = nn.ReLU()
        self.isdrop = isdrop
        #self.maxpool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        #self.maxpool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        if self.isdrop == 1:
            self.drop = nn.Dropout(p=0.25)
        self.fc1 = nn.Linear(size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):

        x = self.fc1(x)            # Din = 16*256, Dout = 1024
        x = self.relu(x)
        if self.isdrop == 1:
            x = self.drop(x)
        x = self.fc2(x)            # Din = 1024, Dout = 1024
        x = self.relu(x)
        if self.isdrop == 1:
            x = self.drop(x)
        x = self.fc3(x)

        return x


class CNN_large(nn.Module):
    def __init__(self, length, isdrop):
        super(CNN_large, self).__init__()
        # Cin = 1, Cout = 256, Kernel_size = 11
        self.isdrop = isdrop

        self.conv1 = nn.Conv1d(1, 64, 3, stride=1, padding=1)
        # Cin = 256, Cout = 256, Kernel_size = 5
        self.conv2 = nn.Conv1d(64, 128, 3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(128, 128, 3, stride=1, padding=1)

        # Batch Nromalization
        self.batnorm1 = nn.BatchNorm1d(64)
        self.batnorm2 = nn.BatchNorm1d(128)
        self.batnorm3 = nn.BatchNorm1d(128)

        self.relu = nn.ReLU()

        self.maxpool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.maxpool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.maxpool3 = nn.MaxPool1d(kernel_size=2, stride=2)

        if self.isdrop == 1:
            self.drop = nn.Dropout(p=0.25)

        self.len = length

        self.fc1 = nn.Linear(int(self.len / 8) * 128, 128)

        #self.fc1 = nn.Linear(self.len, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, x):

        x = self.conv1(x)          # Cin = 1, Cout = 64, Kernel_size = 11
        x = self.batnorm1(x)
        x = self.relu(x)
        if self.isdrop == 1:
            x = self.drop(x)
        x = self.maxpool1(x)

        x = self.conv2(x)          # Cin = 64, Cout = 128, Kernel_size = 5
        x = self.batnorm2(x)
        x = self.relu(x)
        if self.isdrop == 1:
            x = self.drop(x)
        x = self.maxpool2(x)

        x = self.conv3(x)          # Cin = 64, Cout = 128, Kernel_size = 5
        x = self.batnorm3(x)
        x = self.relu(x)
        if self.isdrop == 1:
            x = self.drop(x)
        x = self.maxpool3(x)

        x = x.view(-1, int(self.len / 8) * 128)
        #x = x.squeeze(1)
        x = self.fc1(x)            # Din = 16*256, Dout = 1024
        x = self.relu(x)
        x = self.fc2(x)            # Din = 1024, Dout = 1024
        x = self.relu(x)
        x = self.fc3(x)            # Din = 1024, Dout = 1

        return x


**Training**

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from numpy.random import rand, randn
from torch.optim.lr_scheduler import StepLR
from easydict import EasyDict


def test_network(net, dataloader, device):
    net.eval()
    target = []
    prediction = []
    for i, (frame) in enumerate(dataloader):
        x = frame['intensity'].float().to(device)
        y = frame['label'].float().to(device)
        pred = net(x.unsqueeze(1))
        pred_data = torch.sigmoid(pred).squeeze()
        decision = (pred_data > 0.5).float()
        target.append(y.squeeze().detach().cpu().numpy())
        prediction.append(decision.detach().cpu().numpy())
    target = np.hstack(target)
    prediction = np.hstack(prediction)
    TP = np.sum(np.logical_and(target == 1, prediction == 1))
    T = np.sum(target == 1)
    P = np.sum(prediction == 1)
    correct = np.sum(abs(target - prediction) == 0)
    Number = target.shape[0]
    net.train()
    return TP, T, P, correct, Number


def train_network(train_data, train_labels, batch_size, valid_loader, net, optimizer, scheduler, criterion, num_iter, log_path, net_path, device, print_interval=1):
    
    index_true = np.where(train_labels==1)[0]
    index_false = np.where(train_labels==0)[0]

    for it in range(num_iter):
        
        batch_true_index = np.random.choice(len(index_true), batch_size//2)
        batch_false_index = np.random.choice(len(index_false), batch_size//2)

        batch_true_data = torch.from_numpy(train_data[index_true[batch_true_index]]).float().to(device)
        batch_false_data = torch.from_numpy(train_data[index_false[batch_false_index]]).float().to(device)
        batch_data = torch.cat((batch_true_data, batch_false_data), 0)
        batch_label = torch.cat((torch.ones(batch_size//2), torch.zeros(batch_size//2)), 0).to(device)

        pred = net(batch_data.unsqueeze(1))
        loss = criterion(pred, batch_label.unsqueeze(1))

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        scheduler.step()

        if it % print_interval == 0:

            TP, T, P, correct, Number = test_network(net, valid_loader, device)

            recall = TP / T
            precision = TP / P
            accuracy = correct / Number

            F1 = 2 * recall * precision / (recall + precision)
            F2 = 5 * recall * precision / (recall + 4 * precision)
            F0 = (1 + 0.25**2) * recall * precision / (recall + 0.25**2 * precision)

            message = f'Iteration: {it}, loss: {loss.item():.3f}, test acc: {accuracy:.3f}, Recall: {recall:.3f}, Precision: {precision:.3f}, F0.5: {F0:.3f}, F1: {F1:.3f}, F2: {F2:.3f}'

            print(message)

            with open(log_path, "a") as log_file:
                log_file.write('%s/n' % message)

    print('Training finished!')
    torch.save(net.state_dict(), net_path)




**Main light**

In [None]:

#path laoder
from google.colab import drive
drive.mount('/content/gdrive')
!cp '/content/gdrive/My Drive/Light_Valid_Sample.mat' '/content'
import scipy
from scipy import io


In [None]:
def main(opt):

    # Prepare the training data
    train_mat_data = sio.loadmat(opt.train_data)
    valid_mat_data = sio.loadmat(opt.test_data)
    train_data = train_mat_data['data']
    train_labels = train_mat_data['label']

    dataset_valid = dataloader_light(opt.test_data)
    valid_loader = torch.utils.data.DataLoader(dataset_valid, batch_size=opt.batchsize, shuffle=False, num_workers=1, pin_memory=True)

    net = CNN_Light(480, 128, opt.num_layer, 128, opt.dropout).to(opt.device)
    optimizer = optim.Adam(net.parameters(), lr=opt.lr)
    scheduler = StepLR(optimizer, step_size=opt.stepsize, gamma=opt.gamma)

    # class 0 : class 1  =  3 : 1
    criterion = nn.BCEWithLogitsLoss()

    if not os.path.exists(opt.save_log_dir):
        os.makedirs(opt.save_log_dir)

    log_path = os.path.join(opt.save_log_dir, opt.save_log_name)

    if not os.path.exists(opt.save_net_dir):
        os.makedirs(opt.save_net_dir)

    net_path = os.path.join(opt.save_net_dir, opt.save_net_name)

    message = f'Start training Neural Network with Dropout: {opt.dropout}, CNN Layer: {opt.num_layer}'
    print(message)

    train_network(train_data, train_labels, opt.batchsize, valid_loader, net, optimizer, scheduler, criterion, opt.iteration, log_path, net_path, opt.device, opt.print_interval)


if __name__ == '__main__':
    opt = EasyDict()

    opt.train_data = 'Light_Train_Sample2019.mat'
    opt.test_data = 'Light_Train_Sample2019.mat'

    opt.batchsize = 400
    opt.lr = 1e-3
    opt.iteration = 10000
    opt.num_layer = 3
    opt.dropout = 0.001

    opt.stepsize = 4000
    opt.gamma = 0.1

    opt.save_log_dir = './logs'
    opt.save_log_name = 'train_light_log.txt'

    opt.save_net_dir = './model'
    opt.save_net_name = 'light_net.w'

    opt.print_interval = 100

    opt.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    main(opt)


**Test Light**

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
!cp '/content/gdrive/My Drive/Test_light.zip' '/content'
!unzip 'Test_light.zip' -d '/content'

In [None]:
import numpy as np
import torch

import scipy.io as sio
import os
from easydict import EasyDict
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
from matplotlib.ticker import LinearLocator, FormatStrFormatter


def test(opt):

    light_net = CNN_Light(480, 128, opt.num_layer, 128, opt.dropout).to(opt.device)

    net_path = os.path.join(opt.net_dir, opt.net_name)
    light_net.load_state_dict(torch.load(net_path, map_location=opt.device))

    if not os.path.exists(opt.output_folder):
        os.makedirs(opt.output_folder)

    if not os.path.exists(opt.output_fig_folder):
        os.makedirs(opt.output_fig_folder)
    
    light_net.eval()

    num = opt.num

    for n in range(num):

        data = sio.loadmat(os.path.join(opt.input_folder, str(n + 1) + '.mat'))
        #data = sio.loadmat(os.path.join(opt.input_folder,'outdoor_HOBO_Month_11_Day_'+ str(n + 6) + '.mat'))
        light_test = data['test_light']

        results_light = np.zeros((light_test.shape[0], light_test.shape[1]))

        for i in range(light_test.shape[0]):
            light = torch.from_numpy(light_test[i, :, :]).float().unsqueeze(1).to(opt.device)

            light_result = light_net(light)

            light_result = torch.sigmoid(light_result)

            results_light[i, :] = light_result.cpu().squeeze().detach().numpy()
        
        #fig_light = os.path.join(opt.output_fig_folder, 'light_'+str(n+1)+'.png')
       # plt.imshow(results_light.transpose(), cmap='viridis', interpolation='nearest')
        #plt.colorbar()
        #plt.savefig(fig_light)
       # plt.clf()
        
        path_light = os.path.join(opt.output_folder, 'light_' + str(n + 1) + '.mat')
        #path_light = os.path.join(opt.output_folder, 'light_outdoor_HOBO_Month_11_Day_' + str(n + 6) + '.mat')

        sio.savemat(path_light, {'results': results_light})
        print(n)


if __name__ == '__main__':
    opt = EasyDict()

    opt.dropout = 0.25
    opt.num_layer = 3

    opt.net_dir = '/content/'
    opt.net_name = 'light_net.w'

    opt.num = 437

    opt.output_folder = './results/Heatmaps_light'
    opt.output_fig_folder = './results/figs_light'
    opt.input_folder = '/content/Test_light'
    opt.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    test(opt)


In [None]:
!zip -r /content/result.zip /content/results