In [1]:
'''
Author: Mingxin Zhang m.zhang@hapis.u-tokyo.ac.jp
Date: 2023-03-06 03:29:29
LastEditors: Mingxin Zhang
LastEditTime: 2023-04-12 01:54:25
Copyright (c) 2023 by Mingxin Zhang, All Rights Reserved.
'''

'\nAuthor: Mingxin Zhang m.zhang@hapis.u-tokyo.ac.jp\nDate: 2023-03-06 03:29:29\nLastEditors: Mingxin Zhang\nLastEditTime: 2023-04-12 01:54:25\nCopyright (c) 2023 by Mingxin Zhang, All Rights Reserved.\n'

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
import pickle
import math
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn import preprocessing
from torchvision import transforms
from torch.utils.data import DataLoader
from torch import nn
from torchvision import models
from scipy import stats
import torch.nn.functional as F
import torch.optim as optim


In [4]:
class ResNetEncoder(nn.Module):
    def __init__(self, encoded_space_dim):
        super(ResNetEncoder, self).__init__()

        self.flatten = nn.Flatten(start_dim=1)
        self.resize_input = nn.Linear(12 * 160, 3 * 128 * 128)
        self.unflatten = nn.Unflatten(dim=1, unflattened_size=(3, 128, 128))

        self.res50 = models.resnet50(weights="IMAGENET1K_V2")
        numFit = self.res50.fc.in_features
        self.res50.fc = nn.Linear(numFit, encoded_space_dim)

    def forward(self, x):
        x = self.flatten(x)
        x = self.resize_input(x)
        x = self.unflatten(x)

        x = self.res50(x)
        return x


class DisLatent(nn.Module):
    def __init__(self, encoded_space_dim):
        super(DisLatent, self).__init__()
        self.fc1 = nn.Linear(encoded_space_dim, 64)
        self.fc2 = nn.Linear(64, 16)
        self.fc3 = nn.Linear(16, 1)

    def forward(self, x):
        x = F.leaky_relu(self.fc1(x), 0.2)
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.sigmoid(self.fc3(x))
        return x


In [5]:
class _Residual_Block(nn.Module):
    def __init__(self):
        super(_Residual_Block, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False)
        self.in1 = nn.InstanceNorm2d(64, affine=True)
        self.relu = nn.LeakyReLU(0.2, inplace=True)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False)
        self.in2 = nn.InstanceNorm2d(64, affine=True)

    def forward(self, x):
        identity_data = x
        output = self.relu(self.in1(self.conv1(x)))
        output = self.in2(self.conv2(output))
        output = torch.add(output,identity_data)
        return output


class Generator(nn.Module):
    def __init__(self, encoded_space_dim):
        super(Generator, self).__init__()

        self.resize = nn.Linear(encoded_space_dim, 3 * 40)
        self.unflatten = nn.Unflatten(dim=1, unflattened_size=(1, 3, 40))

        self.conv_input = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=9, stride=1, padding=4, bias=False)
        self.relu = nn.LeakyReLU(0.2, inplace=True)

        self.residual = self.make_layer(_Residual_Block, 16)

        self.conv_mid = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn_mid = nn.InstanceNorm2d(64, affine=True)

        self.upscale4x = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=256, kernel_size=3, stride=1, padding=1, bias=False),
            nn.PixelShuffle(2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(in_channels=64, out_channels=256, kernel_size=3, stride=1, padding=1, bias=False),
            nn.PixelShuffle(2),
            nn.LeakyReLU(0.2, inplace=True),
        )

        self.conv_output = nn.Conv2d(in_channels=64, out_channels=1, kernel_size=9, stride=1, padding=4, bias=False)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                if m.bias is not None:
                    m.bias.data.zero_()

    def make_layer(self, block, num_of_layer):
        layers = []
        for _ in range(num_of_layer):
            layers.append(block())
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.unflatten(self.resize(x))
        out = self.relu(self.conv_input(x))
        residual = out
        out = self.residual(out)
        out = self.bn_mid(self.conv_mid(out))
        out = torch.add(out,residual)
        out = self.upscale4x(out)
        out = self.conv_output(out)
        return out


class DisSpec(nn.Module):
    def __init__(self):
        super(DisSpec, self).__init__()

        self.features = nn.Sequential(

            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=4, stride=2, padding=2, bias=False),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=4, stride=2, padding=2, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=4, stride=2, padding=2, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=4, stride=2, padding=2, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
        )

        self.LeakyReLU = nn.LeakyReLU(0.2, inplace=True)
        self.fc1 = nn.Linear(512 * 2 * 11, 1024)
        self.fc2 = nn.Linear(1024, 1)
        self.sigmoid = nn.Sigmoid()

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                m.weight.data.normal_(0.0, 0.02)
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.normal_(1.0, 0.02)
                m.bias.data.fill_(0)

    def forward(self, input):
        out = self.features(input)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.LeakyReLU(out)

        out = self.fc2(out)
        out = self.sigmoid(out)
        return out.view(-1, 1)

In [6]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

with open('/content/drive/MyDrive/Colab Notebooks/vibrotactile-encoder/trainset_LMT_large.pickle', 'rb') as file:
# with open('trainset.pickle', 'rb') as file:
    trainset = pickle.load(file)

spectrogram = torch.from_numpy(trainset['spectrogram'].astype(np.float32))
texture = trainset['texture']
le = preprocessing.LabelEncoder()
labels = torch.as_tensor(le.fit_transform(texture))

transform_spec = transforms.Normalize(
    mean = spectrogram.mean(),
    std = spectrogram.std()
)

spectrogram = transform_spec(spectrogram)

train_dataset = torch.utils.data.TensorDataset(spectrogram, labels)
train_dataloader = torch.utils.data.DataLoader(
    dataset = train_dataset,
    batch_size = 64,
    shuffle = True,
    num_workers = 2,
    )

adversarial_loss = nn.BCELoss()

FEAT_DIM = 128
encoder = ResNetEncoder(encoded_space_dim = FEAT_DIM)
generator= Generator(encoded_space_dim = FEAT_DIM)
dis_latent = DisLatent(encoded_space_dim = FEAT_DIM)
dis_spec = DisSpec()

gen_lr = 2e-4
encoder_lr = 2e-4
d_spec_lr = 2e-4
d_latent_lr = 2e-4

optimizer_G = optim.Adam(generator.parameters(), lr=gen_lr)
optimizer_E = optim.Adam(encoder.parameters(), lr=encoder_lr)
optimizer_D_spec = optim.Adam(dis_spec.parameters(), lr=d_spec_lr)
optimizer_D_latent = optim.Adam(dis_latent.parameters(), lr=d_latent_lr)

encoder.to(device)
dis_latent.to(device)
generator.to(device)
dis_spec.to(device)

epoch_num = 10

Selected device: cpu


Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 118MB/s]


In [7]:
EPS = 1e-15

for epoch in range(1, epoch_num + 1):
    encoder.train()
    dis_latent.train()
    generator.train()
    dis_spec.train()

    for i, (img, label) in enumerate(train_dataloader):
        img = torch.unsqueeze(img, 1) # Add channel axis (1 channel)
        img = img.to(device)
        label = label.to(device)

        soft_scale = 0.1
        valid = torch.autograd.Variable(torch.Tensor(img.size(0), 1).fill_(1.0), requires_grad=False)
        valid -= torch.rand(img.size(0), 1) * soft_scale
        valid = valid.to(device)
        fake = torch.autograd.Variable(torch.Tensor(img.size(0), 1).fill_(0.0), requires_grad=False)
        fake += torch.rand(img.size(0), 1) * soft_scale
        fake = fake.to(device)

        # 1) reconstruction
        # 1.1) generator
        optimizer_G.zero_grad()
        # input latent vector
        z = encoder(img)
        # train generator
        gen_img = generator(z)
        g_loss = adversarial_loss(dis_spec(gen_img), valid)
        g_loss.backward()
        optimizer_G.step()

        # 1.2) spectrogram discriminator
        optimizer_D_spec.zero_grad()

        s1, s2, s3, s4 = img.shape
        means = torch.zeros(s1,s2,s3,s4)
        std = torch.ones(s1,s2,s3,s4)

        sigma = 1
        std = std * sigma

        noise_r = torch.normal(means, std).to(device)
        noise_g = torch.normal(means, std).to(device)

        # Measure discriminator's ability to classify real from generated samples
        real_loss = adversarial_loss(dis_spec(img + noise_r), valid)
        fake_loss = adversarial_loss(dis_spec(gen_img.detach() + noise_g), fake)
        d_spec_loss = (real_loss + fake_loss) / 2

        d_spec_loss.backward()
        optimizer_D_spec.step()
        optimizer_E.step()

        # 2) latent discriminator
        encoder.eval()

        real_z = torch.autograd.Variable(torch.Tensor(np.random.normal(0, 1, (img.shape[0], FEAT_DIM)))).to(device)
        fake_z = encoder(img)

        print('Training the discriminator')
        u = fake_z[0].detach().numpy().mean()
        std = fake_z[0].detach().numpy().std()
        kstest = stats.kstest(fake_z[0].detach().numpy(), 'norm', (u, std))
        print('pvalue:' + str(kstest.pvalue))

        D_real_gauss = dis_latent(real_z)
        D_fake_gauss = dis_latent(fake_z)

        d_latent_loss = -torch.mean(torch.log(D_real_gauss + EPS) + torch.log(1 - D_fake_gauss + EPS))
        d_latent_loss.backward()
        optimizer_D_latent.step()

        # 3) encoder
        encoder.train()
        fake_z = encoder(img)
        print('Training the encoder')
        u = fake_z[0].detach().numpy().mean()
        std = fake_z[0].detach().numpy().std()
        kstest = stats.kstest(fake_z[0].detach().numpy(), 'norm', (u, std))
        print('pvalue:' + str(kstest.pvalue))

        D_fake_gauss = dis_latent(fake_z)
        E_loss = -torch.mean(torch.log(D_fake_gauss + EPS))
        E_loss.backward()
        optimizer_E.step()

    print('Epoch: ', epoch)
    print('Generator Loss: ', round(g_loss.item(), 4), '\tSpec Discriminator Loss: ', round(d_spec_loss.item(), 4))
    print('Encoder Loss: ', round(E_loss.item(), 4), '\tLatent Discriminator Loss: ', round(d_latent_loss.item(), 4))

torch.save(generator.state_dict(), '/content/drive/MyDrive/Colab Notebooks/vibrotactile-encoder/hapGAN_weights/generator_' + str(FEAT_DIM) + 'd.pt')
torch.save(dis_spec.state_dict(), '/content/drive/MyDrive/Colab Notebooks/vibrotactile-encoder/hapGAN_weights/dis_spec_' + str(FEAT_DIM) + 'd.pt')
torch.save(encoder.state_dict(), '/content/drive/MyDrive/Colab Notebooks/vibrotactile-encoder/hapGAN_weights/encoder_' + str(FEAT_DIM) + 'd.pt')
torch.save(dis_latent.state_dict(), '/content/drive/MyDrive/Colab Notebooks/vibrotactile-encoder/hapGAN_weights/dis_latent_' + str(FEAT_DIM) + 'd.pt')

Training the discriminator
pvalue:0.9187475196697695
Training the encoder
pvalue:0.9740270542254411
Training the discriminator
pvalue:0.8705444379705459
Training the encoder
pvalue:0.43535744515345465
Training the discriminator
pvalue:0.7567745885723148
Training the encoder
pvalue:0.4909022818978227
Training the discriminator
pvalue:0.7794638757471957


KeyboardInterrupt: ignored