In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

### 全結合層の定義

In [None]:
# シード値固定
torch.manual_seed(1)

# 全結合層の定義
fc = nn.Linear(3, 2) # 3次元から2次元へ

In [None]:
# 重み(W)：ランダムに格納
fc.weight

In [None]:
# バイアス(b)：ランダムに格納
fc.bias

### 線形変換

In [None]:
# PyTorchで使用するTensor型に変換(各値はfloat型に)
x = torch.tensor([[1, 2, 3]], dtype=torch.float32)

In [None]:
# 線形変換を実行
u = fc(x)
u

### 非線形変換(ReLU関数による)

In [None]:
# ReLU関数による変換
z = F.relu(u)
z

### 例題

In [None]:
torch.manual_seed(1)
fc = nn.Linear(3, 2)
fc_2 = nn.Linear(2, 1)

x = torch.tensor([[1, 2, 3]], dtype=torch.float32)

z = F.relu(fc(x))
y = fc_2(z)

y

### 目的関数(平均二乗誤差による)

In [None]:
# 目標値の設定
t = torch.tensor([[1]], dtype=torch.float32)
t

In [None]:
# MSEの算出
loss = F.mse_loss(t, y)
loss

## 実装（基礎編）

### データセットの準備

In [None]:
from sklearn.datasets import load_iris
import torch 
import torch.nn as nn
import torch.nn.functional as F

In [None]:
# 特徴量と目標値の定義
x, t = load_iris(return_X_y=True)

In [None]:
# torch型に変換
x = torch.tensor(x, dtype=torch.float32)
t = torch.tensor(t, dtype=torch.int64)

In [None]:
# まとめて一つのオブジェクトdatasetに変換
dataset = torch.utils.data.TensorDataset(x, t)
dataset

In [None]:
# データセットを学習用,検証用,テスト用に分割(train_test_splitと同じ？)
n_train = int(len(dataset) * 0.6)
n_val = int(len(dataset) * 0.2)
n_test = len(dataset) - n_train - n_val

torch.manual_seed(0)
train, val, test = torch.utils.data.random_split(dataset, [n_train, n_val, n_test])

### ミニバッチ学習

In [None]:
batch_size = 10
# shuffleは学習データのみTrueに(目的関数を変化させることで,局所解に囚われないように)
train_loader = torch.utils.data.DataLoader(train, batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val, batch_size)
test_loader = torch.utils.data.DataLoader(test, batch_size)

### モデルの定義

In [None]:
# インスタンスnetの作成
class Net(nn.Module):
    # 使用するオブジェクトの定義
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(4, 4)
        self.fc2 = nn.Linear(4, 3)
    
    # 順伝播
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)

        return x

In [None]:
torch.manual_seed(0)
# インスタンス化
net = Net()

In [None]:
# モデルの確認
net

### 目的関数の選択(クロスエントロピーによる)

In [None]:
# 目的関数の定義
criterion = F.cross_entropy
criterion

### 最適化手法の選択

In [None]:
# パラメータの取得
for parameter in iter(net.parameters()):
    print(parameter)

In [None]:
# 最適化手法の選択(今回はSGD：確率的勾配降下法)
optimizer = torch.optim.SGD(net.parameters(), lr=0.1) # lr:学習係数
optimizer

### 学習

In [None]:
# バッチサイズ分のサンプルを抽出
batch = next(iter(train_loader)) # iter: 1イテレーション分取り出せる, next: 順に
batch

In [None]:
# 入力値と目標値に分割
x, t = batch

In [None]:
# 全結合層fc1のパラメータ(重み)の値
net.fc1.weight

In [None]:
# 全結合層fc1のバイアスの値
net.fc1.bias

In [None]:
# 予測値の算出
y = net(x) # net.forward(x)と書いても同じ
y

In [None]:
# 目的関数の算出
loss = criterion(y, t)
loss

In [None]:
# 勾配の算出
loss.backward()

In [None]:
# fc1の重みに関する勾配が求まった
net.fc1.weight.grad

In [None]:
# fc1のバイアスに関する勾配が求まった
net.fc1.bias.grad

In [None]:
# 勾配情報を用いたパラメータの更新
optimizer.step()

### データのデバイスへの転送

In [None]:
# GPUが使用可能かの確認
torch.cuda.is_available()

In [None]:
# GPUが使用可能ならcuda、そうでなければcpuを選択
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

In [None]:
net.to(device)

In [None]:
# デバイスへの入力変数の転送
x = x.to(device)
x

In [None]:
# デバイスへの目標値の転送
t = t.to(device)
t

### 勾配の初期化

In [None]:
# パラメータの勾配を求める前に勾配情報の初期化が必要
optimizer.zero_grad()

## モデルの学習(まとめ)

In [None]:
# エポック数は1
max_epoch = 1

In [None]:
torch.manual_seed(0)
# モデルのインスタンス化とデバイスへの転送
net = Net().to(device)

In [None]:
# 最適化手法の選択
optimizer = torch.optim.SGD(net.parameters(), lr=0.1)

In [None]:
# 学習
for epoch in range(max_epoch):
    
    for batch in train_loader:
        
        # バッチサイズ分のサンプルを抽出
        x, t = batch
        
        # 学習時に使用するデバイスへデータの転送
        x = x.to(device)
        t = t.to(device)
        
        # パラメータの勾配を初期化
        optimizer.zero_grad()
        
        # 予測値の算出
        y = net(x)
        
        # 目標値と予測値から目的関数の値を算出
        loss = criterion(y, t)
        
        # 目的関数の値を表示して確認
        # item(): tensot.Tensor => float
        print('loss: ', loss.item())
        
        # 各パラメータの勾配を算出
        loss.backward()
        
        # 勾配の情報を用いたパラメータの更新
        optimizer.step()

In [None]:
# 「行ごと/列ごと」の最大値に対する要素番号を取得（dim=0 は列ごと）
y_label = torch.argmax(y, dim=1)# dim=1 で「行ごと」に
y_label

In [None]:
# 目的変数
t

In [None]:
# 正解率の算出
acc = torch.sum(y_label == t) * 1.0 / len(t) # 切り捨て防止のためにfloatへ変換
acc

In [None]:
# 学習(改)
for epoch in range(max_epoch):

    for batch in train_loader:

        x, t = batch
        x = x.to(device)
        t = t.to(device)
        optimizer.zero_grad()
        y = net(x)
        loss = criterion(y, t)

        # New：正解率の算出
        y_label = torch.argmax(y, dim=1)
        acc  = torch.sum(y_label == t) * 1.0 / len(t)
        print('accuracy:', acc)

        loss.backward()
        optimizer.step()

### 評価用

In [None]:
# 正解率の計算
def calc_acc(data_loader):
    
    with torch.no_grad():
        
        accs = [] # 各バッチごとの結果格納用
        
        for batch in data_loader:
            x, t = batch
            x = x.to(device)
            t = t.to(device)
            y = net(x)
            
            y_label = torch.argmax(y, dim=1)
            acc = torch.sum(y_label == t) * 1.0 / len(t)
            accs.append(acc)
            
    # 全体の平均を算出
    avg_acc = torch.tensor(accs).mean()
    print('Accuracy: {:.1f}%'.format(avg_acc * 100))
    
    return avg_acc

In [None]:
calc_acc(val_loader)

In [None]:
calc_acc(test_loader)

## 回帰

In [None]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_diabetes

In [None]:
data = load_diabetes()
t = data.data
x = data.target

### 1. Tensor型へ変換

In [None]:
import torch
t = np.array(t)
x = np.array(x)

t = torch.from_numpy(t).float()
x = torch.from_numpy(x).float()

### 2. Datasetにまとめる

In [None]:
import torch.utils.data

dataset = torch.utils.data.TensorDataset(x, t)
dataset

### 3. データの分割

In [None]:
n_train = int(len(dataset) * 0.6)
n_val = int(len(dataset) * 0.2)
n_test = len(dataset) - n_train - n_val

torch.manual_seed(0)

train, val, test = torch.utils.data.random_split(dataset, [n_train, n_val, n_test])

### 4. モデルと学習手順の定義

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl

# 学習用データに対する処理
class TrainNet(pl.LightningModule):
    
    #@pl.data_loader 
    def train_dataloader(self):
        return torch.utils.data.DataLoader(train, self.batch_size, shuffle=True)
    
    def training_step(self, batch, batch_nb):
        x, t = batch
        y = self.forward(x)
        loss = self.lossfun(y, t)
        results = {'loss': loss}
        return results

In [None]:
# 検証データに対する処理
class ValidationNet(pl.LightningModule):

    #@pl.data_loader
    def val_dataloader(self):
        return torch.utils.data.DataLoader(val, self.batch_size)

    def validation_step(self, batch, batch_nb):
        x, t = batch
        y = self.forward(x)
        loss = self.lossfun(y, t)
        results = {'val_loss': loss}
        return results

    def validation_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        results = {'val_loss': avg_loss}
        return results

In [None]:
# テストデータに対する処理
class TestNet(pl.LightningModule):

    #@pl.data_loader
    def test_dataloader(self):
        return torch.utils.data.DataLoader(test, self.batch_size)

    def test_step(self, batch, batch_nb):
        x, t = batch
        y = self.forward(x)
        loss = self.lossfun(y, t)
        results = {'test_loss': loss}
        return results

    def test_end(self, outputs):
        avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
        results = {'test_loss': avg_loss}
        return results

In [None]:
# 学習データ、検証データ、テストデータへの処理を継承したクラス
class Net(TrainNet, ValidationNet, TestNet):
    
    def __init__(self, input_size=13, hidden_size=5, output_size=1, batch_size=10):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.batch_size = batch_size

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x
    
    # New: 平均ニ乗誤差
    def lossfun(self, y, t):
        return F.mse_loss(y, t)

    def configure_optimizers(self):
        return torch.optim.SGD(self.parameters(), lr=0.1)

### 5. モデルの学習

In [None]:
from pytorch_lightning import Trainer

In [None]:
# 再現性の確保
torch.manual_seed(0)

# インスタンス化
net = Net()
trainer = Trainer()

# 学習の実行
trainer.fit(net)

### 6. 検証データとテストデータに対する精度評価

In [None]:
trainer.test()

In [None]:
trainer.callback_metrics

In [None]:
torch.sqrt(torch.tensor(trainer.callback_metrics['test_loss']))