<a href="https://colab.research.google.com/github/zhihong1224/CNN_Demo/blob/master/fastai_2_fc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 导入数据

In [0]:
%load_ext autoreload
%autoreload 2

%matplotlib inline

from pathlib import Path
from IPython.core.debugger import set_trace
from fastai import datasets
import pickle,gzip,math,torch,matplotlib as mpl
import matplotlib.pyplot as plt
from torch import tensor

MNIST_URL='http://deeplearning.net/data/mnist/mnist.pkl'

import torch

def get_data():
  path=datasets.download_data(MNIST_URL,ext='.gz')
  with gzip.open(path,'rb') as f:
    ((x_train,y_train),(x_valid,y_valid),_)=pickle.load(f,encoding='latin-1')
  return map(tensor,(x_train,y_train,x_valid,y_valid))

def normalize(x,m,s):return (x-m)/s

x_train,y_train,x_valid,y_valid=get_data()
train_mean,train_std=x_train.mean(),x_train.std()

x_train=normalize(x_train,train_mean,train_std)
x_valid=normalize(x_valid,train_mean,train_std)

n,m=x_train.shape
c=y_train.max()+1
nh=50
n,m,c

Downloading http://deeplearning.net/data/mnist/mnist.pkl.gz


(50000, 784, tensor(10))

# 参数初始化

In [0]:
w1=torch.randn(m,nh)*math.sqrt(2/m)
b1=torch.zeros(nh)
w2=torch.randn(nh,1)*math.sqrt(2/nh)
b2=torch.zeros(1)

# 纯函数实现

In [0]:
# 不同的层的函数
def lin(x,w,b):
  return x@w+b

def relu(x):
  return x.clamp_min(0.)-0.5

# 模型函数
def model(x):
  l1=lin(x,w1,b1)
  l2=relu(l1)
  l3=lin(l2,w2,b2)
  return l3

# 损失函数
def mse(out,targ):
  return ((out.squeeze(-1)-targ)**2).mean()

# 反向传播
def mse_grad(inp,targ):
  inp.g=2*(inp.squeeze()-targ).unsqueeze(-1)/inp.shape[0]

def relu_grad(inp,out):
  inp.g=(inp>0.).float()*out.g

def lin_grad(inp,out,w,b):
  inp.g=out.g@(w.t())
  # w.g=inp.t()@out.g
  w.g=(inp.unsqueeze(-1)*out.g.unsqueeze(1)).sum(0)
  b.g=out.g.sum(0)

def forward_and_backward(inp,targ):
  l1=inp@w1+b1
  l2=relu(l1)
  out=l2@w2+b2
  print(out.shape)
  loss=mse(out,targ)

  mse_grad(out,targ)
  lin_grad(l2,out,w2,b2)
  relu_grad(l1,l2)
  lin_grad(inp,l1,w1,b1)

# 测试
w1.g,b1.g,w2.g,b2.g=[None]*4
forward_and_backward(x_train,y_train)

torch.Size([50000, 1])


In [0]:
# 与pytorch的结果比较
def test(a,b,cmp,cname=None):
  if cname is None:cname=cmp.__name__
  assert cmp(a,b),f'{cname}:\n{a}\{b}'
def near(a,b):
  return torch.allclose(a,b,rtol=1e-3,atol=1e-5)
def test_near(a,b):
  test(a,b,near)

w1g=w1.g.clone()
w2g=w2.g.clone()
b1g=b1.g.clone()
b2g=b2.g.clone()
ig=x_train.g.clone()

xt2=x_train.clone().requires_grad_(True)
w12=w1.clone().requires_grad_(True)
w22=w2.clone().requires_grad_(True)
b12=b1.clone().requires_grad_(True)
b22=b2.clone().requires_grad_(True)

def forward(inp,targ):
  l1=inp@w12+b12
  l2=relu(l1)
  out=l2@w22+b22
  return mse(out,targ)

loss=forward(xt2,y_train)

loss.backward()

In [0]:
test_near(w22.grad,w2g)
test_near(b22.grad,b2g)
test_near(w12.grad,w1g)
test_near(b12.grad,b1g)
test_near(xt2.grad,ig)

# 类形式实现

In [0]:
class Relu():
  def __call__(self,inp):   # 传入前向传播需要输入的数据
    self.inp=inp
    self.out=inp.clamp_min(0.)-0.5
    return self.out
  def backward(self):
    self.inp.g=(self.inp>0).float()*self.out.g

In [0]:
class Lin():
  def __init__(self,w,b):   # 传入层的权重
    self.w,self.b=w,b
  def __call__(self,inp):   # 传入前向传播需要输入的数据
    self.inp=inp
    self.out=self.inp@self.w+self.b
    return self.out
  def backward(self):
    self.inp.g=self.out.g@self.w.t()
    self.w.g=self.inp.t()@self.out.g
    self.b.g=self.out.g.sum(0)

In [0]:
class Mse():
  def __call__(self,inp,targ):
    self.inp=inp
    self.targ=targ
    self.out=((self.inp.squeeze()-self.targ)**2).mean()
    return self.out
  def backward(self):
    self.inp.g=2*(self.inp.squeeze()-self.targ).unsqueeze(-1)/self.targ.shape[0]


In [0]:
class Model():
  def __init__(self,w1,b1,w2,b2):
    self.layers=[Lin(w1,b1),Relu(),Lin(w2,b2)]
    self.loss=Mse()
  def __call__(self,x,targ):
    for l in self.layers:
      x=l(x)
    return self.loss(x,targ)
  def backward(self):
    self.loss.backward()
    for l in reversed(self.layers):
      l.backward()

In [0]:
w1.g,b1.g,w2.g,b2.g=[None]*4
model=Model(w1,b1,w2,b2)

In [0]:
%time loss=model(x_train,y_train)

CPU times: user 111 ms, sys: 18 µs, total: 111 ms
Wall time: 112 ms


In [0]:
model.backward()

# 在基类的基础上写子类

In [0]:
# 基类
class Module():
  def __call__(self,*args):
    self.args=args
    self.out=self.forward(*args)
    return self.out
  def forward(self):
    raise Exception('not implemented')
  def backward(self):
    self.bwd(self.out,*self.args)

class Relu(Module):
  def forward(self,inp):
    return inp.clamp_min(0.)-0.5
  def bwd(self,out,inp):
    inp.g=(inp>0).float()*out.g

class Lin(Module):
  def __init__(self,w,b):
    self.w,self.b=w,b
  def forward(self,inp):
    return inp@self.w+self.b
  def bwd(self,out,inp):
    inp.g=out.g@self.w.t()
    self.w.g=torch.einsum('bi,bj->ij',inp,out.g)
    self.b.g=out.g.sum(0)

class Mse(Module):
  def forward(self,inp,targ):
    return ((inp.squeeze()-targ)**2).mean()
  def bwd(self,out,inp,targ):
    inp.g=2*(inp.squeeze()-targ).unsqueeze(-1)/targ.shape[0]

class Model():
  def __init__(self):
    self.layers=[Lin(w1,b1),Relu(),Lin(w2,b2)]
    self.loss=Mse()
  def __call__(self,x,targ):
    for l in self.layers:
      x=l(x)
    return self.loss(x,targ)
  def backward(self):
    self.loss.backward()
    for l in reversed(self.layers):
      l.backward()

In [0]:
w1.g,b1.g,w2.g,b2.g=[None]*4
model=Model()

In [0]:
%time loss=model(x_train,y_train)

CPU times: user 115 ms, sys: 0 ns, total: 115 ms
Wall time: 115 ms


In [0]:
model.backward()