In [None]:
import os
import datetime

#打印时间
def printbar():
    nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print("\n"+"=========="*8 + "%s"%nowtime)

#mac系统上pytorch和matplotlib在jupyter中同时跑需要更改环境变量
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" 

In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt 
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset

#样本数量
n = 400

# 生成测试用数据集
X = 10 * torch.rand((n, 2)) - 5.0
w0 = torch.tensor([[2.0], [-3.0]])
b0 = torch.tensor([[10.0]])
Y = X@w0 + b0 + torch.normal(0.0, 2.0, size=(n, 1))

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

plt.figure(figsize=(12, 5))
ax1 = plt.subplot(121)
ax1.scatter(X[:, 0].numpy(), Y[:, 0].numpy(), c="b", label="samples")
ax1.legend()
plt.xlabel("x1")
plt.ylabel("y", rotation=0)

ax2 = plt.subplot(122)
ax2.scatter(X[:, 1].numpy(), Y[:, 0].numpy(), c="g", label="samples")
ax2.legend()
plt.xlabel("x2")
plt.ylabel("y", rotation=0)

plt.show()

In [None]:
ds = TensorDataset(X, Y)
dl = DataLoader(ds, batch_size=10, shuffle=True)

In [None]:
model = nn.Linear(2, 1)

model.loss_func = nn.MSELoss()
model.optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

In [None]:
def train_step(model, features, labels):
    predictions = model(features)
    loss = model.loss_func(predictions, labels)
    loss.backward()
    model.optimizer.step()
    model.optimizer.zero_grad()
    return loss.item()

# 测试train_step效果
features, labels = next(iter(dl))
train_step(model, features, labels)

In [None]:
def train_model(model, epochs):
    for epoch in range(1, epochs + 1):
        for features, labels in dl:
            loss = train_step(model, features, labels)
        if epoch % 50 == 0:
            printbar()
            w = model.state_dict()["weight"]
            b = model.state_dict()["bias"]
            print("epoch =", epoch, " loss =", loss)
            print("w =", w)
            print("b =", b)

train_model(model, epochs=200)

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

w = model.state_dict()["weight"]
b = model.state_dict()["bias"]

fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))
ax1.scatter(X[:, 0], Y[:, 0], c="b", label="samples")
ax1.plot(X[:, 0], w[0, 0] * X[:, 0] + b[0], "-r", linewidth=5.0, label="model")
ax1.legend()
plt.xlabel("x1")
plt.ylabel("y", rotation = 0)

ax2.scatter(X[:, 1], Y[:, 0], c="g", label="samples")
ax2.plot(X[:, 1], w[0, 1] * X[:, 1] + b[0], "-r", linewidth=5.0, label="model")
ax2.legend()
plt.xlabel("x2")
plt.ylabel("y", rotation = 0)

plt.show()


In [None]:
# DNN
import numpy as np 
import pandas as pd 
from matplotlib import pyplot as plt
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset

#正负样本数量
n_positive, n_negative = 1000, 1000

#生成正样本, 小圆环分布
r_p = torch.normal(5.0, 1.0, size=[n_positive, 1])  # random r
theta_p = 2 * np.pi * torch.rand([n_positive, 1])   # random theta
Xp = torch.cat([r_p * torch.cos(theta_p), r_p * torch.sin(theta_p)], axis=1)
Yp = torch.ones_like(r_p)

#生成负样本, 大圆环分布
r_n = torch.normal(8.0, 1.0, size=[n_negative, 1]) 
theta_n = 2 * np.pi * torch.rand([n_negative, 1])
Xn = torch.cat([r_n * torch.cos(theta_n), r_n * torch.sin(theta_n)], axis=1)
Yn = torch.zeros_like(r_n)

#汇总样本
X = torch.cat([Xp, Xn], axis=0)
Y = torch.cat([Yp, Yn], axis=0)

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

plt.figure(figsize=(6, 6))
axe = plt.subplot(111)
axe.scatter(Xp[:, 0].numpy(), Xp[:, 1].numpy(), c='r', label='positive')
axe.scatter(Xn[:, 0].numpy(), Xn[:, 1].numpy(), c='g', label='negative')
axe.legend()
plt.show()

In [None]:
ds = TensorDataset(X, Y)
dl = DataLoader(ds, batch_size=10, shuffle=True)

In [None]:
class DNNModel(nn.Module):
    def __init__(self):
        super(DNNModel, self).__init__()
        self.fc1 = nn.Linear(2, 4)
        self.fc2 = nn.Linear(4, 8)
        self.fc3 = nn.Linear(8, 1)

    # 正向传播
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return F.sigmoid(self.fc3(x))

    # 损失函数(二元交叉熵), BCE Loss
    def loss_func(self, y_pred, y_true):
        return nn.BCELoss()(y_pred, y_true)

    # 评估指标(准确率)
    def metric_func(self, y_pred, y_true):
        y_pred = torch.where(y_pred > 0.5, torch.ones_like(y_pred, dtype=torch.float32), torch.zeros_like(y_pred,dtype=torch.float32))
        acc = torch.mean(1 - torch.abs(y_true - y_pred))
        return acc
    
    @property
    def optimizer(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

model = DNNModel()

In [None]:
(features, labels) = next(iter(dl))

predictions = model(features)

loss = model.loss_func(predictions.data, labels)
metric = model.metric_func(predictions, labels)

print("init loss:", loss.item())
print("init metric:", metric.item())

In [None]:
def train_step(model, features, labels):
    predictions = model(features)
    loss = model.loss_func(predictions, labels)
    metric = model.metric_func(predictions, labels)

    loss.backward()
    
    model.optimizer.step()
    model.optimizer.zero_grad()

    return loss.item(), metric.item()

def train_model(model, epochs):
    for epoch in range(1, epochs + 1):
        loss_list, metric_list = [], []
        for features, labels in dl:
            lossi, metrici = train_step(model, features, labels)
            loss_list.append(lossi)
            metric_list.append(metrici)
        
        loss = np.mean(loss_list)
        metric = np.mean(metric_list)

        if epoch % 100 == 0:
            printbar()
            print("epoch =", epoch, " loss =", loss, " metric =", metric)

train_model(model, epochs=1000)

In [None]:
# 结果可视化
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))

ax1.scatter(Xp[:, 0], Xp[:, 1], c="r", label="positive")
ax1.scatter(Xn[:, 0], Xn[:, 1], c="g", label="negative")
ax1.legend()
ax1.set_title("y_true")

Xp_pred = X[torch.squeeze(model.forward(X) >= 0.5)]
Xn_pred = X[torch.squeeze(model.forward(X) < 0.5)]

ax2.scatter(Xp_pred[:, 0], Xp_pred[:, 1], c="r", label="positive")
ax2.scatter(Xn_pred[:, 0], Xn_pred[:, 1], c="g", label="negative")
ax2.legend()
ax2.set_title("y_pred")

plt.show()