<a href="https://colab.research.google.com/github/takky0330/NLP/blob/master/iris_IIC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch

class IIC(torch.nn.Module):
    def __init__(self):
      super(IIC, self).__init__()

    def IIC(self, z, zt, C):
      P = (z.unsqueeze(2) * zt.unsqueeze(1)).sum(dim=0)
      P = ((P + P.t()) / 2) / P.sum()
      EPS = torch.finfo(P.dtype).eps
      P[(P < EPS).data] = EPS
      Pi = P.sum(dim=1).view(C, 1).expand(C, C)
      Pj = P.sum(dim=0).view(1, C).expand(C, C)

      # 論文中の式は計算間違っているのでは？
      # (P * (log(Pi) + log(Pj) - log(P))).sum() (元計算)

      # 損失関数なので、最大化->最小化に切り替えるために負にする
      loss = (-1.0 * P * (torch.log(P) - torch.log(Pi) - torch.log(Pj))).sum()
      return loss

    def forward(self, Z, ZT, C=3):
      # headと呼んでいる処理のためにこのような計算になる
      # headで学習が早くなるのだが、なぜ早くなるのかは不勉強。。コメント待っています。
      return torch.sum(torch.stack([
                self.IIC(z, zt, C) for z, zt in zip(Z, ZT)
              ]))

In [None]:
from torch.utils.data import Dataset
from sklearn.datasets import load_iris
import pandas as pd

class Iris_forIIC(Dataset):
    def __init__(self):
      # irisデータセットをロード
      ## 　自身の iris データに変更
      #iris = load_iris()
      #self.df = pd.DataFrame(iris.data).assign(label=iris.target)
      _iris = pd.read_csv("https://stream.takky.org/rd/data/iris.csv", index_col=0)
      _iris.index.name =""
      _iris.columns = ["label", "0", "1", "2", "3"]
      self.df = _iris[["0", "1", "2", "3", "label"]]


    def __len__(self):
        return len(self.df.index)

    def __getitem__(self, idx):
      # idx行目を取り出して、データとラベルにわける
      row = self.df.iloc[idx].values
      label = row[4]

      # データはTensorのfloat型じゃないといけない
      data_target = torch.from_numpy(row[:4]).float()
      # IICのキモ！
      # 今回は雑に元データにノイズをのっける。
      data_other = torch.from_numpy(row[:4]).float() * torch.normal(torch.tensor(1).float(), torch.tensor(0.1).float())
      print(data_target)
      print(data_other)

      return data_target, data_other

In [None]:
import torch.nn as nn
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # irisデータにあわせて4次元入力の、3次元出力。
        self.nn = nn.Sequential(
            nn.Linear(4, 100),
            # バッチ正規化を利用しないと学習が進まない
            nn.BatchNorm1d(100),
            nn.ReLU(),
            nn.Linear(100, 50),
            nn.BatchNorm1d(50),
            nn.ReLU(),
            nn.Linear(50, 3),
            nn.BatchNorm1d(3),
            nn.ReLU())

      # 普通の教師ありなら下記のようなモデルで十分
      # self.nn = nn.Sequential(
      #     nn.Linear(4, 100),
      #     nn.ReLU(),
      #     nn.Linear(100, 50),
      #     nn.ReLU(),
      #     nn.Linear(50, 3))

    def forward(self, x):
        # headと呼んでいる処理。不勉強で何しているかわからない。
        # 学習がとても早くなる。
        return [F.softmax(self.nn(x), dim=1) for _ in range(5)]
        # return F.softmax(self.nn(x), dim=1)

In [None]:
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix

# cpu/gpuを指定
device = torch.device('cpu')

# モデルを生成
model = Net()
model = model.to(device)
# 損失関数
criterion = IIC()
# 最適化関数
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Dataloader
kwargs = {'num_workers': 1, 'pin_memory': True}
train_set = Iris_forIIC()
train_loader = torch.utils.data.DataLoader(train_set, batch_size=len(train_set), shuffle=True, **kwargs)

EPOCH = 250
for idx in range(EPOCH):
  model.train()
  for X, XT in train_loader:
    # 元データ
    X = X.to(device)
    # ノイズ付与データ
    XT = XT.to(device)
    # XとXTについてそれぞれ分類
    Z = model(X)
    ZT = model(XT)
    # 損失=相互情報量を計算
    loss = criterion(Z, ZT)

    # 勾配計算・学習
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


[1;30;43mストリーミング出力は最後の 5000 行に切り捨てられました。[0m
tensor([6.8746, 3.4373, 5.6930, 2.4706])
tensor([6.3000, 2.3000, 4.4000, 1.3000])
tensor([6.8088, 2.4857, 4.7553, 1.4050])
tensor([6.1000, 3.0000, 4.9000, 1.8000])
tensor([6.0601, 2.9804, 4.8680, 1.7882])
tensor([6.3000, 2.5000, 5.0000, 1.9000])
tensor([6.6262, 2.6294, 5.2589, 1.9984])
tensor([5.7000, 3.0000, 4.2000, 1.2000])
tensor([6.0844, 3.2023, 4.4832, 1.2809])
tensor([6.4000, 2.7000, 5.3000, 1.9000])
tensor([5.4683, 2.3069, 4.5284, 1.6234])
tensor([4.8000, 3.0000, 1.4000, 0.1000])
tensor([4.4408, 2.7755, 1.2952, 0.0925])
tensor([5.7000, 4.4000, 1.5000, 0.4000])
tensor([5.9456, 4.5896, 1.5646, 0.4172])
tensor([6.4000, 2.9000, 4.3000, 1.3000])
tensor([7.4810, 3.3898, 5.0263, 1.5196])
tensor([4.7000, 3.2000, 1.6000, 0.2000])
tensor([4.5596, 3.1044, 1.5522, 0.1940])
tensor([5.0000, 2.3000, 3.3000, 1.0000])
tensor([5.1655, 2.3761, 3.4092, 1.0331])
tensor([5.8000, 2.7000, 5.1000, 1.9000])
tensor([6.0199, 2.8023, 5.2933, 1.9720])
tensor([5.6

In [None]:
# 学習済みモデルを評価
model.eval()

In [None]:
# irisデータを読み込み

## 　自身の iris データに変更
#iris = load_iris()
#df_iris = pd.DataFrame(iris.data).assign(label=iris.target)
_iris = pd.read_csv("https://stream.takky.org/rd/data/iris.csv", index_col=0)
_iris.index.name =""
_iris.columns = ["label", "0", "1", "2", "3"]
df_iris = _iris[["0", "1", "2", "3", "label"]]

X = torch.Tensor(df_iris.drop('label', axis=1).values).to(device)
Y_predict = model.nn(X).argmax(1).cpu()
Y_actual = df_iris['label'].values

In [None]:
Y_predict.detach().numpy()

In [None]:
Y_actual

In [None]:
import numpy as np
# 混同行列を表示
cmatrix = confusion_matrix(Y_actual, Y_predict.detach().numpy())

In [None]:
cmatrix

In [None]:
correct_num = sum([np.max(cm1) for cm1 in cmatrix])
all_num      = len(Y_actual)
correct_rate = correct_num / all_num
correct_rate