In [None]:
!unzip /content/traffic_data.zip

In [29]:
import torch
import numpy as np
import random
from matplotlib import pyplot as plt
import torch.utils.data as Data
from PIL import Image
import os
from torch import nn
import torch.optim as optim
from torch.nn import init
import torch.nn.functional as F
import time
import torchvision
from torchvision import transforms,datasets
from shutil import copy, rmtree
import json

In [30]:
def mk_file(file_path: str):
  if os.path.exists(file_path):
  # 如果文件夹存在，则先删除原文件夹在重新创建
    rmtree(file_path)
  os.makedirs(file_path)
def split_data():
  random.seed(0)
  # 将数据集中25%的数据划分到验证集中
  split_rate = 0.25
  data_root = os.getcwd()
  origin_car_path = os.path.join(data_root, "traffic_data")
  assert os.path.exists(origin_car_path), "path '{}' does not exist.".format(origin_car_path)
  car_class = [cla for cla in os.listdir(origin_car_path) if os.path.isdir(os.path.join(origin_car_path, cla))]

  # 建立保存训练集的文件夹
  train_root = os.path.join(data_root, "train")
  mk_file(train_root)
  for cla in car_class:
  # 建立每个类别对应的文件夹
    mk_file(os.path.join(train_root, cla))

  # 建立保存验证集的文件夹
  test_root = os.path.join(data_root, "test")
  mk_file(test_root)
  for cla in car_class:
  # 建立每个类别对应的文件夹
    mk_file(os.path.join(test_root, cla))
  for cla in car_class:
    cla_path = os.path.join(origin_car_path, cla)
    images = os.listdir(cla_path)
    num = len(images)
    # 随机采样验证集的索引
    eval_index = random.sample(images, k=int(num*split_rate))
    for index, image in enumerate(images):
      if image in eval_index:
      # 将分配至验证集中的文件复制到相应目录
        image_path = os.path.join(cla_path, image)
        new_path = os.path.join(test_root, cla)
        copy(image_path, new_path)
      else:
      # 将分配至训练集中的文件复制到相应目录
        image_path = os.path.join(cla_path, image)
        new_path = os.path.join(train_root, cla)
        copy(image_path, new_path)
      print("\r[{}] processing [{}/{}]".format(cla, index+1, num), end="")  # processing bar
  print("processing done!")
split_data()

[car] processing [779/779]processing done!


In [31]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("using {} device.".format(device))

data_transform = {"train": transforms.Compose([transforms.Resize((64,64)),transforms.RandomHorizontalFlip(),transforms.ToTensor(),transforms.Normalize((0.5,0.5,0.5),
(0.5,0.5,0.5))]),"test": transforms.Compose([transforms.Resize((64,64)),transforms.ToTensor(),transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))])}


image_path = os.getcwd()
print(image_path)
train_dataset = datasets.ImageFolder(root=os.path.join(image_path,"train"),transform = data_transform["train"])
train_num = len(train_dataset)
print(train_num)

batch_size = 32
train_loader = torch.utils.data.DataLoader(train_dataset,batch_size = batch_size,shuffle = True,num_workers = 0)

test_dataset = datasets.ImageFolder(root=os.path.join(image_path,"test"),transform = data_transform["test"])

test_num = len(test_dataset)
print(test_num)#val_num = 364
test_loader = torch.utils.data.DataLoader(test_dataset,batch_size = batch_size,shuffle=False,num_workers = 0)
print("using {} images for training, {} images for validation .".format(train_num,test_num))

using cuda:0 device.
/content
1019
338
using 1019 images for training, 338 images for validation .


In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 二维卷积

## 手写二维卷积

In [42]:
#自定义单通道卷积  
def corr2d(X,K):  
  '''
  X:输入，shape (batch_size,H,W) 
  K:卷积核，shape (k_h,k_w) 
  单通道 
  '''  
  batch_size,H,W = X.shape  
  k_h, k_w = K.shape  
  #初始化结果矩阵  
  Y = torch.zeros((batch_size,H - k_h + 1,W- k_w + 1)).to(device)
  for i in range(Y.shape[1]):  
    for j in range(Y.shape [2]):  
      Y[:,i,j] = (X[:,i:i+k_h,j:j+k_w]* K).sum()
  return Y  
#自定义多通道卷积  
def corr2d_mu1ti_in(X, K):  
  #输入X:维度(batch_size,C_in,H, W)  
  #卷积核K:维度(C_in,k_h,k_w)  
  #输出:维度(batch_size,H_out,W_out)  
  #先计算第一通道  
  res = corr2d(X[:,0,:,:], K[0,:,:])
  for i in range(1, X.shape[1]):  
  #按通道相加  
    res += corr2d(X[:,i,:,:], K[i,:,:])
  return res  

#自定义多个多通道卷积  
def corr2d_multi_in_out(X, K):  
  # X: shape (batch_size,C_in,H,W)  
  # K: shape (C_out,C_in,h,w)  
  # Y: shape(batch_size,C_out,H_out,W_out)  
  return torch.stack([corr2d_mu1ti_in(X, k) for k in K],dim=1)



## 自定义卷积层

In [43]:
class MyConv2D(nn.Module):  
  def __init__(self,in_channels, out_channels,kernel_size):  
    super(MyConv2D,self).__init__()  
    #初始化卷积层的2个参数:卷积核、偏差  
    #isinstance判断类型  
    if isinstance(kernel_size,int):  
      kernel_size = (kernel_size,kernel_size)  
      self.weight = nn.Parameter(torch.randn((out_channels, in_channels) + kernel_size)).to(device)  
      self.bias = nn.Parameter(torch.randn(out_channels,1,1)).to(device)  
  def forward(self,x):    #x:输入图片，维度(batch_size,C_in,H,W)
    res =  corr2d_multi_in_out(x,self.weight) + self.bias
    return res

## 定义卷积网络

In [44]:
class MyConvModule(nn.Module):
  def __init__(self):
    super(MyConvModule,self).__init__()
    #定义一层卷积层
    self.conv = nn.Sequential(
      MyConv2D(in_channels = 3,out_channels = 32,kernel_size = 3),
      nn.BatchNorm2d(32),
      # inplace-选择是否进行覆盖运算
      nn.ReLU(inplace=True))
      #输出层,将通道数变为分类数量
    self.fc = nn.Linear(32,3)
  def forward(self,x):
    #图片经过一层卷积，输出维度变为(batch_size,C_out,H,W)
    out = self.conv(x)
    #使用平均池化层将图片的大小变为1x1,第二个参数为最后输出的长和宽（这里默认相等了）64-3/1 + 1 = 62
    out = F.avg_pool2d(out,62)
    #将张量out从shape batchx32x1x1 变为 batch x32
    out = out.squeeze()
    #输入到全连接层将输出的维度变为3
    out = self.fc(out)
    return out

## 训练函数

In [45]:
lr = 0.001
epochs = 5
num_class = 3
#初始化模型
net = MyConvModule().to(device)
#使用多元交叉熵损失函数
loss_func = nn.CrossEntropyLoss()
#使用Adam优化器
optimizer = optim.Adam(net.parameters(),lr = lr)

def train_epoch(net, data_loader, device):
  net.train() #指定当前为训练模式
  train_batch_num = len(data_loader) #记录共有多少个batch
  total_1oss = 0 #记录Loss
  correct = 0 #记录共有多少个样本被正确分类
  sample_num = 0 #记录样本总数
  #遍历每个batch进行训练
  for batch_idx, (data,target) in enumerate (data_loader):
    t1 = time.time()
    #将图片放入指定的device中
    data = data.to(device).float()
    #将图片标签放入指定的device中
    target = target.to(device).long()
    #将当前梯度清零
    optimizer.zero_grad()
    #使用模型计算出结果
    output = net(data)
    #计算损失
    loss = loss_func(output, target.squeeze())
    #进行反向传播
    loss.backward()
    optimizer.step()
    #累加loss
    total_1oss += loss.item()
    #找出每个样本值最大的idx,即代表预测此图片属于哪个类别
    prediction = torch.argmax(output, 1)
    #统计预测正确的类别数量
    correct += (prediction == target).sum().item()
    #累加当前的样本总数
    sample_num += len(prediction)
    #if batch_idx//5 ==0:
    t2 = time.time()
    print("processing:{}/{},消耗时间{}s".format(batch_idx+1,len(data_loader),t2-t1))
  #计算平均loss与准确率
  loss = total_1oss / train_batch_num
  acc = correct / sample_num
  return loss, acc

def test_epoch(net, data_loader, device):
  net.eval() #指定当前模式为测试模式
  test_batch_num = len(data_loader)
  total_loss = 0
  correct = 0
  sample_num = 0
  #指定不进行梯度变化
  with torch.no_grad():
    for batch_idx, (data, target) in enumerate(data_loader):
      data = data.to(device).float()
      target = target.to(device).long()
      output = net(data)
      loss = loss_func(output, target)
      total_loss += loss.item( )
      prediction = torch.argmax(output, 1)
      correct += (prediction == target).sum().item()
      sample_num += len(prediction)
  loss = total_loss / test_batch_num
  acc = correct / sample_num
  return loss,acc

## 训练

In [46]:
## 存储每一个epoch的loss与acc的变化，便于后面可视化
train_loss_list = []
train_acc_list = []
test_loss_list = []
test_acc_list = []
time_list = []
timestart = time.time()
print(device)
#进行训练
for epoch in range(epochs):
  #每一个epoch的开始时间
  time_start = time.time()
  #在训练集上训练  
  train_loss, train_acc = train_epoch(net,data_loader=train_loader, device=device)
  #在测试集上验证
  test_loss, test_acc = test_epoch(net,data_loader=test_loader, device=device)
  #每一个epoch的结束时间
  time_end = (time.time() - time_start)
  #保存各个指际
  train_loss_list.append(train_loss)  
  train_acc_list.append(train_acc )  
  test_loss_list.append(test_loss)  
  test_acc_list.append(test_acc)  
  time_list.append(time_end)  
  print('epoch %d, train_loss %.6f,test_loss %.6f,train_acc %.6f,test_acc %.6f,Time used %.6fs'%(epoch+1, train_loss,test_loss,train_acc,test_acc,time_end))  
#计算总时间  
timesum = (time.time() - timestart)  
print('The total time is %fs',timesum) 


cuda:0
processing:1/32,消耗时间58.37450075149536s
processing:2/32,消耗时间60.53974366188049s
processing:3/32,消耗时间59.64996123313904s
processing:4/32,消耗时间59.738276720047s
processing:5/32,消耗时间59.29375863075256s
processing:6/32,消耗时间59.90777635574341s
processing:7/32,消耗时间59.40642046928406s
processing:8/32,消耗时间59.51416754722595s
processing:9/32,消耗时间58.91418790817261s
processing:10/32,消耗时间58.90377998352051s
processing:11/32,消耗时间59.90839457511902s
processing:12/32,消耗时间59.37030363082886s
processing:13/32,消耗时间59.29256749153137s
processing:14/32,消耗时间58.58264088630676s
processing:15/32,消耗时间59.5806519985199s
processing:16/32,消耗时间58.380335092544556s
processing:17/32,消耗时间59.41806697845459s
processing:18/32,消耗时间59.03585457801819s
processing:19/32,消耗时间59.53761959075928s
processing:20/32,消耗时间58.703988790512085s
processing:21/32,消耗时间59.773438453674316s
processing:22/32,消耗时间58.5155394077301s
processing:23/32,消耗时间58.56944799423218s
processing:24/32,消耗时间59.14865970611572s
processing:25/32,消耗时间58.77095937728882s
pro