In [None]:
from random import sample
import cv2
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as functional
from torch.utils import data
import torchvision 
from torchvision.datasets import ImageFolder
import torch.optim as optim
import os
import xlwt
from xlwt import Workbook

In [None]:
custom_transform = torchvision.transforms.Compose(
    [torchvision.transforms.ToTensor(),
     torchvision.transforms.Normalize((0.5 ), (0.5))],
    )

test = ImageFolder(root="./testset", transform=custom_transform)
testloader = torch.utils.data.DataLoader(
    test,
    batch_size = 4,
    shuffle = False
)

train = ImageFolder(root='./trainset', transform=custom_transform)
trainloader = torch.utils.data.DataLoader(
    train,
    batch_size = 4,
    shuffle = False
)

classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
print('Number of training images is {}'.format(len(train)))
print('Number of testing images is {}'.format(len(test)))

dataiter = iter(trainloader)
images, labels = dataiter.next()

In [None]:
class Network(nn.Module):
    def __init__(self):
        self.output_size = 10
        
        super(Network, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.pool = nn.MaxPool2d(2)
        self.fc = nn.Linear(400, 120)
        self.fc1 = nn.Linear(120, 84)
        self.fc2 = nn.Linear(84, self.output_size)
        
        
    def forward(self, x):
        x = self.pool(functional.relu(self.conv1(x)))
        x = self.pool(functional.relu(self.conv2(x)))
        x = x.view(-1, 400)
        x = functional.relu(self.fc(x))
        x = functional.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
#Calculate how many images are predicted correctly on the testset(total 100)
def calculate_correction():
    net = Network()
    net.load_state_dict(torch.load('./result.pth'))
    correction = 0
    
    input = "./testset/"
    for list in os.listdir(input):
        right_num = eval(list)
        list = list + '/'
        for img in os.listdir(input+list):
            image = cv2.imread(input+list+img)
            num = check(image)
            if(num == right_num):
                correction += 1
            else:
                print(right_num, num, img)
    return correction
        

In [None]:
wb = Workbook()
sheet1 = wb.add_sheet('Sheet 1')
j = 1
sheet1.write(0, 0, 'epoch')
sheet1.write(0, 1, 'loss')
sheet1.write(0, 2, 'test_correction')
net = Network()
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr = 1e-2)
running_loss = 0.0
loss1 = 0

for epoch in range(50):
    correct = calculate_correction()
    print(correct)
    for i, mydata in enumerate(trainloader, 0):
        inputs, labels = mydata
        optimizer.zero_grad()
        predicted_labels = net(inputs)
        loss = loss_function(predicted_labels, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        loss1 += loss.item()
        if i % 20 == 19:    # print every 20 mini-batches
            print('Epoch: %d, after mini-batch: %5d loss: %.6f' % (epoch + 1, i + 1, running_loss / 20))
            running_loss = 0.0
    sheet1.write(j, 1, loss1)
    sheet1.write(j, 0, j)
    sheet1.write(j, 2, correct)
    j += 1
    running_loss = 0.0
    loss1 = 0
            
print('Finished training')
torch.save(net.state_dict(), './result.pth')
print('The magic is done')
dataiter = iter(testloader)
images, labels = dataiter.next()

# Predict the output using the trained neural network
outputs = net(images)

# Normalize the outputs using the Softmax function so that
# we can interpret it as a probability distribution.
sm = nn.Softmax(dim=1)      
sm_outputs = sm(outputs)

# For each output the prediction with the highest probability
# is the predicted label
probs, index = torch.max(sm_outputs, dim=1)
for p, i in zip(probs, index):
    print('True label {0}, Predicted label {0} - {1:.6f}'.format(classes[i], p))
if os.path.isfile('train_result.xls'):
    os.remove('train_result.xls')
wb.save('train_result.xls')

In [None]:
net = Network()
net.load_state_dict(torch.load('./result.pth'))

In [21]:
def processImage(img):
    ned = cv2.imread("./E.png")
    result = cv2.matchTemplate(img, ned, cv2.TM_CCOEFF_NORMED)
    _, _, _, max_loc = cv2.minMaxLoc(result)
    test_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    _, test_img = cv2.threshold(test_img, 150, 255, cv2.THRESH_BINARY)

    sample1 = test_img[max_loc[1] : max_loc[1]+21, max_loc[0]-41 : max_loc[0]-20]
    sample1 = cv2.resize(sample1,(32, 32))
    sample2 = test_img[max_loc[1] : max_loc[1]+21, max_loc[0]-19 : max_loc[0]+1]
    sample2 = cv2.resize(sample2, (32, 32))
    sample3 = test_img[max_loc[1] : max_loc[1]+21, max_loc[0]+36 : max_loc[0]+57]
    sample3 = cv2.resize(sample3, (32, 32))
    return sample1, sample2, sample3

def check2(image):#test three images
    test_path1 = './1/'
    test_path = './1/1/'
    first_path = './1/1/first.png'
    second_path = './1/1/second.png'
    third_path = './1/1/third.png'
    show_tag = 0
    first, second, third = processImage(image)
    if os.path.exists(test_path1):
        if os.path.isfile(first_path):
            os.remove(first_path)
        if os.path.isfile(second_path):
            os.remove(second_path)
        if os.path.isfile(third_path):
            os.remove(third_path)
        if os.path.exists(test_path):
            os.removedirs(test_path)
        
    os.makedirs(test_path)
    first = cv2.resize(first, (32, 32))
    cv2.imwrite(first_path, first)
    second = cv2.resize(second, (32, 32))
    cv2.imwrite(second_path, second)
    third = cv2.resize(third, (32, 32))
    cv2.imwrite(third_path, third)
    if show_tag:
        plt.imshow(first)
        plt.show()
        plt.imshow(second)
        plt.show()
        plt.imshow(third)
        plt.show()
    real_test = ImageFolder(root = test_path1, transform = custom_transform)
    real_testloader = torch.utils.data.DataLoader(real_test,
        batch_size = 3,
        shuffle = False)
    dataiter = iter(real_testloader)
    images, labels = dataiter.next()
    outputs = net(images)
    sm = nn.Softmax(dim=1)      
    sm_outputs = sm(outputs)
    probs, index = torch.max(sm_outputs, dim=1)
    first = eval(classes[index[0]])
    second = eval(classes[index[1]])
    third = eval(classes[index[2]])
    # print(first)
    # print(second)
    # print(third)
    result = (first + second*0.1)*(10**third)
    os.remove(first_path)
    os.remove(second_path)
    os.remove(third_path)
    os.removedirs(test_path)
    return result

In [22]:
def check(image):#test one image
    test_path1 = './1/'
    test_path = './1/1/'
    first_path = './1/1/image.png'
    if os.path.isfile(first_path):
        os.remove(first_path)
    if os.path.exists(test_path):
        os.removedirs(test_path)
            
    os.makedirs(test_path)
    image = cv2.resize(image, (32, 32))
    cv2.imwrite(first_path, image)
    real_test = ImageFolder(root=test_path1, transform=custom_transform)
    real_testloader = torch.utils.data.DataLoader(
        real_test,
        batch_size = 1,
        shuffle = False
        )
    dataiter = iter(real_testloader)
    images, labels = dataiter.next()
    outputs = net(images)#can't understand why!
    sm = nn.Softmax(dim=1)      
    sm_outputs = sm(outputs)
    _, index = torch.max(sm_outputs, dim=1)
    first = eval(classes[index[0]])
    result = first
    os.remove(first_path)
    os.removedirs(test_path)
    return result

In [None]:
# Sort images automatically
input = "./train_set/"
i = 1000
for img in os.listdir(input):
    image = cv2.imread(input+img, cv2.IMREAD_UNCHANGED)
    a = check(image)
    cv2.imwrite("./trainset/{result}/{i}.png".format(i = i, result = a), image)
    i = i + 1

In [None]:
# Customize images in testset or trainset

input = "./testset/"
for list in os.listdir(input):
    list = list + '/'
    for img in os.listdir(input+list):
        image = cv2.imread(input+list+img)
        ret, image = cv2.threshold(image, 127, 255, cv2.THRESH_BINARY)
        image = cv2.resize(image, (32, 32))
        cv2.imwrite(input+list+img, image)


In [None]:
# Sort names of images in testset or trainset

input = "./trainset/"
i = 0
for list in os.listdir(input):
    list = list + '/'
    for img in os.listdir(input+list):
        i += 1
        os.rename(input+list+img, input+list+'b{i}.png'.format(i = i))
        
i = 0
for list in os.listdir(input):
    list = list + '/'
    for img in os.listdir(input+list):
        i += 1
        os.rename(input+list+img, input+list+'{i}.png'.format(i = i))        
        
    

In [None]:
# Shrink testset or trainset 

input = "./trainset/"
i = 0
size = 4
for list in os.listdir(input):
    list = list + '/'
    for img in os.listdir(input+list):
        i += 1
        if(i % size == 0):
            continue
        else: 
            os.remove(input+list+img)
        
    

In [None]:
def delete(path):# Two layer delete
    i = 0
    for list in os.listdir(path):
        list = list + '/'
        for img in os.listdir(path+list):
            os.remove(path+list+img)
            i += 1
            if i%1000 == 0:
                print("%dimages have been deleted"%(i))
            
def delete1(path):# One layer delete
    i = 0
    for img in os.listdir(path):
        os.remove(path+img)
        i += 1
        if i%1000 == 0:
            print("%dimages have been deleted"%(i))

show_tag = 0
input = "./sample/"# Cut images in this folder into single numbers.
              #If you want to use this block you need to create this fold and put images into it.
delete1("./train_set/")
i = 0
for list in os.listdir(input):
    list = list + '/'
    for img in os.listdir(input+list):
        image = cv2.imread(input+list+img)
        first, second, third = processImage(image)
        cv2.imwrite("./train_set/{i}.png".format(i = i),first)
        cv2.imwrite("./train_set/{i}.png".format(i = i+1),second)
        cv2.imwrite("./train_set/{i}.png".format(i = i+2),third)
        i += 3
        if i%1000 == 0 or i%1000 == 1 or i%1000 == 2:
            print("%dimages have been saved"%(i))
        if i%10 == 0 and show_tag:
            plt.imshow(image)
            plt.show()
            plt.imshow(first)
            plt.show()
            plt.imshow(second)
            plt.show()
            plt.imshow(third)
            plt.show()
        


In [None]:
delete1("./train_set/")

In [None]:
input = "./sample.MTS" # Process sample video

print(input)
video = cv2.VideoCapture(input)
wb = Workbook()
sheet1 = wb.add_sheet('Sheet 1')


frequency = 10
if video.isOpened():  # Check whether the video is opened or not
    rval, frame = video.read()
else:
    rval = False
print("Total frams:" + str(video.get(7)))
print("FPS:" + str(video.get(5)))
print("Frequency: " + str(frequency) + "Hz")
timeF = video.get(5) / frequency
print("Frams needed to be extracted:" + str(video.get(7)/timeF))
sheet1.write(0, 0, "Time/s")
sheet1.write(0, 1, "Pressure/Pa")
print("Time/s", "Pressure/Pa")
c = 1
j = 1
while rval:  # Keep reading frames
    rval, frame = video.read()
    if (c % timeF == 0 and rval):  
        result = check2(frame)
        print(result, j)
        sheet1.write(j, 1, result)
        sheet1.write(j, 0, j/10)
        j += 1;
    c = c + 1

if os.path.exists(input+"_result/"):
    os.removedirs(input+"_result/")    
os.makedirs(input+"_result/")
wb.save(input+"_result/"+'result.xls')

In [None]:
video = cv2.VideoCapture("./sample.MTS")

frequency = 10
if video.isOpened():  # Check whether the video is opened or not
    rval, frame = video.read()
else:
    rval = False
    
print("Total frams:" + str(video.get(7)))
print("FPS:" + str(video.get(5)))
print("Frequency: " + str(frequency) + "Hz")
timeF = video.get(5) / frequency
print("Frams needed to be extracted:" + str(video.get(7)/timeF))

c = 1
while rval:  # Keep reading frames
    rval, frame = video.read()
    c = c + 1
    if (c % timeF == 0 and rval):  
        cv2.imwrite("./sample/{c}.png".format(c = c/50), frame)
        print("%.5f"%((c) / video.get(7) * 100) + "%")