In [1]:
from torch.autograd import Variable
import torch.nn.functional as F
import torch.nn as nn
import torch
import torchvision
from torchvision import datasets, transforms
import torch.utils.data as data
import torchvision.models as models
import matplotlib.image as pli
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
from PIL import Image
from PIL import ImageOps
from PIL import ImageEnhance
import random
import math
import pickle
import glob
import librosa
import os
import time
import scipy.signal as ss
from enum import Enum

os.environ["CUDA_VISIBLE_DEVICES"] = "1"
print(torch.cuda.is_available())
path = './dataset/train'
labels = os.listdir(path)
pos_train_folders = {l: glob.glob(f'{path}/{l}/[0-9][0-9]*/') for l in labels}
pos_val_folders = {l: glob.glob(f'{path}/{l}/[0-9]/') for l in labels}
# print(pos_train_folders)
# print(pos_val_folders)
print(labels)

is_plot = False

freq_length = 57
time_length = 221
trainingset_size = 10000
val_set_size = 100
batch_size = 64 if torch.cuda.is_available() else 8

True
['toothpaste_box', 'whiteboard_spray', 'toy_elephant', 'green_basketball', '061_foam_brick', 'shiny_toy_gun', 'salt_cylinder', 'strawberry', 'stanley_screwdriver', 'yellow_block']


In [4]:
import cv2
from findContourCenter import findContourCenter
class Direction(Enum):
    No = 0
    Up = 1
    RightUp = 2
    Right = 3
    RightDown = 4
    Down = 5
    LeftDown = 6
    Left = 7
    LeftUp = 8
def findCollision(folder, is_debug):
    # folder = './dataset/train/toothpaste_box/21/'
    before = 0
    after = 1
    row = 0
    col = 1
    mask_img_files = glob.glob(f'{folder}mask/*.png')
    mask_img = np.array([plt.imread(mask_img_files[0]),
                        plt.imread(mask_img_files[-1])])
    
    center_before, cnt = findContourCenter(mask_img[before])
    center_after, cnt = findContourCenter(mask_img[after])

    distance = math.sqrt((center_after[row] - center_before[row])
                        ** 2 + (center_after[col] - center_before[col])**2)
    angle = math.atan2((center_after[row] - 220),
                        (center_after[col] - 220))
    
    # if distance < 2:
    #     direction = Direction.No
    if is_debug:
        print(folder)
        # print(f'min_row = {after_min_row}')
        # print(f'max_row = {after_max_row}')
        # print(f'min_col = {after_min_col}')
        # print(f'max_col = {after_max_col}')
        print(angle, distance)
        # print(direction)
        inter_img = np.array([mask_img[before], mask_img[after],
                            np.zeros(mask_img[after].shape)])
        inter_img = np.moveaxis(inter_img, 0, -1)
        inter_img = cv2.UMat(inter_img)
        inter_img = cv2.UMat.get(inter_img)
        cv2.drawContours(inter_img, [cnt], -1, (0, 0, 255), 2)
        plt.imshow(inter_img)
        plt.plot([center_before[1], center_after[1]],
                [center_before[0], center_after[0]])
        plt.plot(center_after[1],
                center_after[0], marker='o')
        plt.show()
    
    return angle, distance

In [5]:
from STFT import STFT
class ImageSet(data.Dataset):
    def __init__(self, behav):
        if behav == 'train':
            self.length = trainingset_size
        elif behav == 'val':
            self.length = val_set_size
        else:
            raise Exception('Error')
        self.behav = behav

    def __getitem__(self, index):
        # print(index)
        d = index % 9

        while(True):
            label = random.choice(labels)
            if self.behav == 'train':
                folder = random.choice(pos_train_folders[label])
            elif self.behav == 'val':
                folder = random.choice(pos_val_folders[label])
            else:
                raise Exception('Error')
            # audio_file = glob.glob(f'{path}/stanley_screwdriver/331/*.pkl')[0]
            angle, distance = findCollision(folder, is_plot)
            if distance > -1:
                break
        audio_map = STFT(f'{folder}audio_data.pkl', is_plot, freq_length, time_length)
        return audio_map, angle

    def __len__(self):
        return self.length

train_loader = data.DataLoader(ImageSet('train'), batch_size=batch_size, shuffle=True)

In [2]:
class PositionCNN(nn.Module):
    def __init__(self,):
        super(PositionCNN, self).__init__()
        self.layer1 = nn.Sequential(
            # 57 221
            nn.Conv2d(in_channels=4, out_channels=64,
                      kernel_size=(3, 11)),
            # 55 211
            nn.BatchNorm2d(64),
            nn.ReLU()
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=self.layer1[0].out_channels, out_channels=64,
                      kernel_size=(3, 10), stride=(2, 3)),
            # 27 68
            nn.BatchNorm2d(64),
            nn.ReLU()
        )
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=self.layer2[0].out_channels,
                      out_channels=128, kernel_size=(3, 5)),
            # 25 64
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        self.layer4 = nn.Sequential(
            nn.Conv2d(in_channels=self.layer3[0].out_channels,
                      out_channels=128, kernel_size=(3, 7), stride=(2, 3)),
            # 12 20
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        self.layer5 = nn.Sequential(
            nn.Conv2d(in_channels=self.layer4[0].out_channels,
                      out_channels=256, kernel_size=(3, 5)),
            # 10 16
            nn.BatchNorm2d(256),
            nn.ReLU()
        )
        self.layer6 = nn.Sequential(
            nn.Conv2d(in_channels=self.layer5[0].out_channels,
                      out_channels=256, kernel_size=(3, 3)),
            # 8 14
            nn.BatchNorm2d(256),
            nn.ReLU()
        )
        self.avg_pool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = nn.Linear(self.layer6[0].out_channels, 1)

    def forward(self, input):
        out = self.layer1(input)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        # print(out.shape)
        out = self.avg_pool(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out.reshape(out.size(0))

In [3]:
directionNet = PositionCNN()

In [6]:
state_dict = torch.load('./directionNet.model')
directionNet.load_state_dict(state_dict)

<All keys matched successfully>

In [7]:
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(directionNet.parameters(), lr=0.001)

directionNet.train()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device('cpu')
directionNet = directionNet.to(device)

for i, (imgs, lbs) in enumerate(train_loader):
    imgs = imgs.float().to(device)
    lbs = lbs.to(device)
    # print(lbs)
    outputs = directionNet(imgs)
    # print(outputs)
    loss = torch.mean(torch.pow(1 - torch.cos(outputs - lbs), 2))
    # print(loss)
    # loss = loss_func(outputs, lbs)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # predict = torch.argmax(F.softmax(outputs, dim=1), dim=1)
    # print(int(round(time.time() * 1000)))
    if i % 2 == 0:
        print(f"""i = {i},  loss = {loss},
        labels = {lbs}
        accuracy = {torch.cos(outputs - lbs)}""")

i = 0,  loss = 0.2942995706451647,
        labels = tensor([-2.4944,  2.3706,  3.1359, -0.7752,  1.8037,  2.3969,  2.6756,  3.0073,
         2.1548,  0.8927,  2.3680,  2.3237,  0.6923, -1.8069, -1.3035,  0.7743,
         0.7884,  1.9597, -0.7790, -0.7559, -1.3106,  2.0729, -2.2615, -1.0265,
         0.1912, -2.8874,  0.5020,  0.8224,  0.5047, -2.2615,  0.8030, -0.7682,
        -0.6305, -0.7504,  0.8581, -2.4764,  2.3684,  0.8323,  1.6597,  0.9215,
         2.3721, -1.4668,  2.7480,  1.0761, -2.9394,  1.3812, -0.8629,  2.6401,
        -0.8629, -1.5069,  2.2649,  0.7942,  2.7671, -2.2525,  2.2823,  0.8010,
         2.7480,  0.1717, -0.6160, -2.2089,  2.0644,  1.1849, -3.0503,  0.9880],
       device='cuda:0', dtype=torch.float64)
        accuracy = tensor([ 0.9728,  0.6391,  0.9935, -0.1868,  0.5256,  0.9941,  0.9759,  0.0394,
         0.9706,  0.9576,  0.5574,  0.9803,  0.4533,  0.9287,  0.9677, -0.2268,
        -0.0647,  0.8220,  0.6686,  0.9952,  0.9234, -0.8007,  0.4363,  0.7774,
   

KeyboardInterrupt: 

In [8]:
# 保存模型， 请谨慎操作， 会覆盖文件中的模型
torch.save(directionNet.state_dict(), './directionNet.model')

In [9]:
val_loader = data.DataLoader(ImageSet('val'), batch_size=50, shuffle=False)

directionNet.eval()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device('cpu')
directionNet = directionNet.to(device)

for i, (imgs, lbs) in enumerate(val_loader):
    imgs = imgs.float().to(device)
    lbs = lbs.to(device)
    # print(lbs)
    outputs = directionNet(imgs)
    # print(outputs)
    loss = torch.mean(1 - torch.cos(outputs - lbs))
    # print(loss)
    # loss = loss_func(outputs, lbs)
    # print(int(round(time.time() * 1000)))
    if i % 1 == 0:
        print(f"i = {i}, \n lables = {lbs}, \n predict = {outputs}  \n accuracy = {torch.cos(outputs - lbs)}")

i = 0, 
 lables = tensor([ 0.4890, -2.2640, -1.6765, -0.7532,  1.3879,  1.1000, -0.7164,  0.7743,
         2.2679,  1.7262,  1.7262, -0.7598,  1.1000,  2.3192,  0.9574,  2.3735,
        -1.0471, -2.5630, -2.2042, -0.7532, -0.9774,  2.1686, -0.7157, -0.7532,
        -0.9987,  1.9494, -0.7905,  0.5135,  0.5135,  1.1000,  1.3274,  1.1000,
        -0.6006, -2.3254,  1.1000, -2.4737, -3.0739,  2.5536,  1.3274,  0.9574,
         0.6850,  2.4390,  2.3965, -3.1041,  2.5480,  0.6850,  1.0879,  0.4499,
         1.2230, -2.2098], device='cuda:0', dtype=torch.float64), 
 predict = tensor([ 0.6877,  2.2639,  2.5748, -0.7755,  2.0083,  1.9373, -0.0263,  0.4572,
         3.0406,  1.7885,  1.7885,  0.1804,  1.9373,  1.2199,  2.2912,  2.0561,
        -0.7119,  1.6241,  1.0719, -0.7755,  3.5572,  1.4333,  1.0916,  0.6722,
         1.1047,  2.2536, -0.6052, -0.5437, -0.5437,  1.9373,  1.7841,  1.9373,
         0.3813,  0.9297,  1.9373,  3.7358,  4.4008,  3.7031,  1.7841,  2.2912,
         0.4421,  0.1496

In [10]:
from STFT import STFT
audio_test_files = glob.glob(f'./dataset/task2/test/*/*.pkl')
class TestSet(data.Dataset):
    def __init__(self):
        self.length = len(audio_test_files)

    def __getitem__(self, index):
        audio_file = audio_test_files[index]
        audio_map = STFT(audio_file,is_plot, freq_length, time_length)
        return audio_map, audio_file

    def __len__(self):
        return self.length

test_loader = data.DataLoader(TestSet(), batch_size=batch_size, shuffle=False)

In [12]:
directionNet.eval()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device('cpu')
directionNet = directionNet.to(device)
predict_all = []
files_all = []
for i, (imgs, files) in enumerate(test_loader):
    imgs = imgs.float().to(device)
    outputs = directionNet(imgs)
    if i % 1 == 0:
        print(f"""i = {i}
        outputs = {outputs}""")
    predict_all += outputs.tolist()
    files_all += list(files)
print(files_all)
print(predict_all)

i = 0
        outputs = tensor([-1.1879,  1.8246,  1.0526,  2.1373, -0.1861,  0.0472, -1.0629,  2.1939,
        -0.1554,  1.7300, -1.4682, -0.1636,  1.3126,  0.3432,  2.1821,  1.0588,
         0.3321,  0.5833, -0.4609,  2.2934,  0.6476,  3.3580,  2.6790, -0.0974,
         0.4232,  1.7341,  2.0436,  1.2160,  1.9476,  2.0971, -0.7163,  0.0870,
         0.0645,  2.1979, -2.0592,  1.4590,  1.8300,  2.9355, -0.4328,  0.1542,
         0.8426,  1.4822,  1.0099,  0.9833,  2.3604, -0.0729,  0.1966,  1.9587,
        -0.2850,  0.0659,  2.1979,  1.4822,  0.1422, -0.1636,  1.5329,  1.8213,
         0.6476,  0.6904, -2.0592,  1.4634,  0.1092,  3.3580,  1.2369,  0.5810],
       device='cuda:0', grad_fn=<ViewBackward>)
i = 1
        outputs = tensor([-0.7537,  1.3401,  1.9192,  0.5902, -0.2740,  2.0685,  1.7289,  1.7767,
         1.6441,  1.2739, -0.0856,  1.7509, -0.9214,  0.1250,  0.0870,  2.1821,
         2.1302, -1.6203,  0.4232,  1.4008,  0.5258, -0.6882,  0.8510, -0.2850,
         1.8209, -1.192

In [17]:
vedio_test_folders = glob.glob(f'./dataset/task2/test/*/*/')
angle_all = []
folder_all = []
for folder in vedio_test_folders:
    angle, _ = findCollision(folder, is_plot)
    angle_all.append(angle)
    folder_all.append(folder)

print(folder_all)
print(angle_all)


['./dataset/task2/test/6/video_0043/', './dataset/task2/test/6/video_0030/', './dataset/task2/test/6/video_0005/', './dataset/task2/test/6/video_0031/', './dataset/task2/test/6/video_0012/', './dataset/task2/test/6/video_0021/', './dataset/task2/test/6/video_0033/', './dataset/task2/test/6/video_0003/', './dataset/task2/test/6/video_0028/', './dataset/task2/test/6/video_0025/', './dataset/task2/test/6/video_0048/', './dataset/task2/test/6/video_0047/', './dataset/task2/test/6/video_0002/', './dataset/task2/test/6/video_0029/', './dataset/task2/test/6/video_0026/', './dataset/task2/test/6/video_0007/', './dataset/task2/test/6/video_0036/', './dataset/task2/test/6/video_0042/', './dataset/task2/test/6/video_0019/', './dataset/task2/test/6/video_0039/', './dataset/task2/test/6/video_0006/', './dataset/task2/test/6/video_0016/', './dataset/task2/test/6/video_0011/', './dataset/task2/test/6/video_0046/', './dataset/task2/test/6/video_0037/', './dataset/task2/test/6/video_0024/', './dataset/