In [1]:
# -*- coding: utf-8 -*-
from io import open
import os.path
from os import path
import random
import numpy as np
import pickle
import pandas as pd
import scipy.signal
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as pl
import matplotlib.ticker as ticker
from torch import nn
from data_prepare import *

torch.cuda.set_device(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_loader, test_loader, valid_loader, WholeSet= get_dataloader(10,31,30)
iters = iter(train_loader)
X_train, y = next(iters)

****************************** ——-————-————-————-————-————-————-————-————-————-————-————-————-————-————-————-————-————-————-————-—— ******************************
轨迹条数： 34163
---轨迹输入数据结构： torch.Size([1, 31, 44]) ---轨迹输出数据结构： torch.Size([1, 31, 4]) --行为输入数据结构
---轨迹长度： 31 ---预测轨迹长度： 30


In [2]:
class LSTM(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(LSTM, self).__init__()
        
        self.hidden_dim=hidden_dim
        self.n_layers = n_layers
        # define an RNN with specified parameters
        # batch_first means that the first dim of the input and output will be the batch_size
        self.lstm = nn.LSTM(input_size, hidden_dim, n_layers,dropout=0., batch_first=True,bidirectional=False)
        
        # last, fully-connected layer
        self.fc1 = nn.Linear(hidden_dim,hidden_dim*2)
        self.fc = nn.Linear(hidden_dim*2, output_size)
        # self.fc2 = nn.Linear(X_train.shape[2],output_size)
        # self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x, hidden=None):
        batch_size = x.size(0)
        
        # get RNN outputs
        r_out, hidden = self.lstm(x, hidden)
        r_out = r_out.contiguous().view(-1, self.hidden_dim)  
        
        # get final output 
        output = self.fc1(r_out)
        output = self.fc(output)
        output = output.view(batch_size,-1,6) #------num
        output = output[:,-1]
        
        return output
    
if torch.cuda.is_available():
    train_on_gpu = True
else:
    train_on_gpu = False

In [3]:
def trajectory_to_behavior_data(x):
    x = x.to(device)
    std = WholeSet.std.repeat(x.shape[0],x.shape[1],1)
    std = std.to(device)
    mn = WholeSet.mn.repeat(x.shape[0],x.shape[1],1)
    mn = mn.to(device)
    rg = WholeSet.range.repeat(x.shape[0],x.shape[1],1)
    rg = rg.to(device)
    input_return = (x*(rg*std)+mn).detach().cpu()
    inputs = torch.from_numpy(input_return[:,:,:-1].detach().cpu().numpy().astype(np.float32))
    inputs = inputs.view(-1,inputs.shape[2])
    y = torch.from_numpy(input_return[:,:,-1].detach().cpu().numpy().astype(np.int))
    y = y.view(-1,1).squeeze()
    return inputs,y

In [6]:
def train(net,epochs,train_loader,valid_loader,clip,lr = 0.0002):
    # train for some number of epochs
    # loss and optimization functions
    loss_min = np.inf
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    counter = 0
    losses_train = []
    losses_valid = []
    accuracies_e = []
    for e in range(epochs):
        # initialize hidden state
        net.train()
        # batch loop
        train_loss = []
        for inputs, labels in train_loader:
            inputs,labels = trajectory_to_behavior_data(inputs)
#             print(inputs.shape,labels.shape,labels)
            inputs = inputs.unsqueeze(2)
            if (train_on_gpu):
                inputs, labels = torch.from_numpy(inputs.numpy().astype(np.float32)).to(device), labels.long().to(device)
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            # zero accumulated gradients
            net.zero_grad()
            # get the output from the model
            output = net(inputs)
            # calculate the loss and perform backprop
            # print(output.shape,labels.shape)
            # print(output[:1])
            loss = criterion(output, labels)
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            optimizer.step()
            train_loss.append(loss.item())
            # loss stats
        # Get validation loss
        val_losses = []
        net.eval()
        accuracies = []
        for inputs, labels in valid_loader:
            inputs,labels = trajectory_to_behavior_data(inputs)
            inputs = inputs.unsqueeze(2)
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            # val_h = tuple([each.data for each in val_h])
            if (train_on_gpu):
                inputs, labels = torch.from_numpy(inputs.numpy().astype(np.float32)).to(device), labels.long().to(device)
            #labels = labels.long()
            output= net(inputs)
            val_loss = criterion(output, labels)
            _, class_ = torch.max(output, dim=1)
            equal = class_ == labels.view(class_.shape)
            accuracy = torch.mean(equal.type(torch.FloatTensor)).item()
            val_losses.append(val_loss.item())
            accuracies.append(accuracy)               
        net.train()
        losses_train.append(np.mean(train_loss))
        losses_valid.append(np.mean(val_losses))
        accuracies_e.append(np.mean(np.mean(accuracies)))
        print("Epoch: {}/{}...".format(e + 1, epochs),
              "Loss: {}...".format(np.mean(train_loss)),
              "Val Loss: {}...".format(np.mean(val_losses)),
              "val accuracy:{}.".format(np.mean(accuracies))
                     )
        if np.mean(val_losses) < loss_min:
            print('Val loss decreased...')
            torch.save(net.state_dict(),'model/behavior_prediction_0707_43_30.pth')
            loss_min = np.mean(val_losses)
    print('min loss',loss_min)
    plt.plot(losses_train,color='r',label='train_loss')
    plt.plot(losses_valid,color='g',label='valid_loss')
    plt.title('Loss_Trend')
    plt.xlabel('Epoches')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig('image/loss_lstm_behavior_0707_43_30.svg',dpi=600)
    plt.savefig('image/loss_lstm_behavior_0707_43_30.png',dpi=600)
    return accuracies_e

def test(net,test_loader):
    # Get test data loss and accuracy
    lr = 0.001

    criterion = nn.CrossEntropyLoss()
    test_losses = []  # track loss
    accuracies = []
    net.eval()
    # iterate over test data
    class_correct = np.zeros(6) #------num
    class_total = np.zeros(6)
    classes = ["Straight", "LeftTurn", "RightTurn", "UTurn", "LeftChange", "RightChange"]
    for inputs, y in test_loader:
        inputs,y = trajectory_to_behavior_data(inputs)
        inputs = inputs.unsqueeze(2)
        # Creating new variables for the hidden state, otherwise
        # we'd backprop through the entire training history
        if (train_on_gpu):
            inputs, y = torch.from_numpy(inputs.numpy().astype(np.float32)).to(device), y.long().to(device)
        # get predicted outputs
       # inputs,y = torch.from_numpy(inputs.numpy().astype(np.float32)),y.long()
        output= net(inputs)
        
        # calculate loss
        test_loss = criterion(output, y)
        _, class_ = torch.max(output, dim=1)
        equal = class_ == y.view(class_.shape)
        for i in range(y.shape[0]):
            label = y.data[i].item()
            class_correct[label] += equal[i].item()
            class_total[label] += 1
        accuracy = torch.mean(equal.type(torch.FloatTensor)).item()
        test_losses.append(test_loss.item())
        accuracies.append(accuracy)

    # -- stats! -- ##
    # avg test loss
    print('Test Loss: {:.20f}\n'.format(test_loss.item()))
    for i in range(4):
        if class_total[i]>0:
            print('Test Accuracy of {}:{:.4f}({}/{})'.format(classes[i],100*class_correct[i]/class_total[i],
                 int(np.sum(class_correct[i])),
                 int(np.sum(class_total[i]))))
        else:
            print('Test Accuracy of {}:N/A(no examples)'.format(classes[i]))
    print('Test Accuracy(Overall):{:.4f} ({}/{})'.format(100*np.sum(class_correct)/np.sum(class_total),
                                                    int(np.sum(class_correct)),
                                                    int(np.sum(class_total))))
    print("Test loss: {:.10f}".format(np.mean(test_losses)),'Test Accuracy:{}'.format(np.mean(accuracies)))

In [7]:
net = LSTM(1,6,256,2) #------num
from time import time
if train_on_gpu:
    net.to(device)
start = time()
epochs = 100
accuracy = train(net,epochs,train_loader,valid_loader,clip=5,lr=0.0001)
print('Training time is:',time()-start,'s')

Epoch: 1/100... Loss: 0.7112118919788336... Val Loss: 0.6402720421823034... val accuracy:0.8080470499533257.
Val loss decreased...
Epoch: 2/100... Loss: 0.6063366368042534... Val Loss: 0.5862302641353095... val accuracy:0.8319880652748635.
Val loss decreased...
Epoch: 3/100... Loss: 0.48058154883366777... Val Loss: 0.4431750660159991... val accuracy:0.8687492298354242.
Val loss decreased...
Epoch: 4/100... Loss: 0.36616339209825804... Val Loss: 0.30380687007975804... val accuracy:0.9004219861312467.
Val loss decreased...
Epoch: 5/100... Loss: 0.2889292787080152... Val Loss: 0.24651579935055723... val accuracy:0.9189913774796107.
Val loss decreased...
Epoch: 6/100... Loss: 0.24134025409199386... Val Loss: 0.22170510366884566... val accuracy:0.9281486276038948.
Val loss decreased...
Epoch: 7/100... Loss: 0.20925743464411298... Val Loss: 0.17524929770265035... val accuracy:0.9404495521564389.
Val loss decreased...
Epoch: 8/100... Loss: 0.18664903233868135... Val Loss: 0.17398531927949093.

KeyboardInterrupt: 

In [None]:
plt.plot(accuracy,color='b',label='accuracy')
plt.title('Accuracy_Trend')
plt.xlabel('Epoches')
plt.ylabel('Accuracy')
plt.legend()
plt.savefig('image/accuracy_lstm_behavior_25.svg',dpi=300)
plt.savefig('image/accuracy_lstm_behavior_25.png',dpi=300)

In [None]:
net_test = LSTM(1,6,256,2) #------num
net_test.load_state_dict(torch.load('model/behavior_prediction_0707_43_30.pth',map_location='cuda'))
if train_on_gpu:
    net_test.cuda()   
test(net_test,test_loader)

In [20]:
net_test.cpu().eval()
from time import time
# An example input you would normally provide to your model's forward() method.
#example = torch.rand(1, 4)

example = torch.ones(1,43,1).cuda()
t = time()
out = net_test(example)
print('time',(time()-t)*1000,'ms')
print(out)
# Use torch.jit.trace to generate a torch.jit.ScriptModule via tracing.
traced_script_module = torch.jit.trace(net_test, example)
traced_script_module.save("model/behavior_prediction_LSTM_0707_31_30_cuda.pt")
example1 = torch.zeros(1,43,1).cpu()
out= traced_script_module(example)
out1 = traced_script_module(example1)
print(out)
print(out1)

time 37.323713302612305 ms
tensor([[  9.8558,   6.5670, -11.7465, -14.8528]], grad_fn=<SelectBackward>)
tensor([[  9.8558,   6.5670, -11.7465, -14.8528]], grad_fn=<SelectBackward>)
tensor([[  6.2286,   3.8929,  -5.5848, -13.0892]], grad_fn=<SelectBackward>)
