In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
import pickle
import numpy as np
import time
import matplotlib.pyplot as plt
import cv2

In [2]:
torch.manual_seed(1)

<torch._C.Generator at 0x203f5c3ced0>

In [3]:
word_to_ix = {}
label_to_ix = {}

In [4]:
instructions = []
labels = []

In [5]:
with open('instructions.txt','r') as f:

	f = f.readlines()
	
	for line in f:

		line = line.strip().split(',')
		label = [line[1]]
		sentence = list(map(lambda x : x.lower(),line[0].strip().split(' ')))
		instructions.append((sentence,label))

In [6]:
print (instructions)

[(['climb', 'down', 'the', 'ladder'], ['ladder']), (['climb', 'down', 'the', 'ladder'], ['ladder']), (['climb', 'down', 'the', 'ladder'], ['ladder']), (['climb', 'down', 'the', 'ladder'], ['ladder']), (['climb', 'down', 'the', 'ladder'], ['ladder']), (['climb', 'up', 'the', 'ladder'], ['ladder']), (['climb', 'up', 'the', 'ladder'], ['ladder']), (['climb', 'up', 'the', 'ladder'], ['ladder']), (['climb', 'up', 'the', 'ladder'], ['ladder']), (['get', 'the', 'key'], ['key']), (['get', 'the', 'key'], ['key']), (['get', 'the', 'sword'], ['sword']), (['get', 'the', 'torch'], ['torch']), (['go', 'between', 'the', 'lasers'], ['laser']), (['go', 'between', 'the', 'lasers'], ['laser']), (['go', 'between', 'the', 'lasers'], ['laser']), (['go', 'between', 'the', 'lasers'], ['laser']), (['go', 'to', 'the', 'bottom', 'of', 'the', 'room'], ['room']), (['go', 'to', 'the', 'bottom', 'of', 'the', 'room'], ['room']), (['go', 'to', 'the', 'bottom', 'of', 'the', 'room'], ['room']), (['go', 'to', 'the', 'bot

In [7]:
for sent,label in instructions:
	for word in sent:
		if word not in word_to_ix:
			word_to_ix[word] = len(word_to_ix)
	for lab in label:
		if lab not in label_to_ix:
			label_to_ix[lab] = len(label_to_ix)

In [8]:
print(word_to_ix)
print(label_to_ix)

{'climb': 0, 'down': 1, 'the': 2, 'ladder': 3, 'up': 4, 'get': 5, 'key': 6, 'sword': 7, 'torch': 8, 'go': 9, 'between': 10, 'lasers': 11, 'to': 12, 'bottom': 13, 'of': 14, 'room': 15, 'center': 16, 'left': 17, 'side': 18, 'right': 19, 'top': 20, 'jump': 21, 'rope': 22, 'use': 23}
{'ladder': 0, 'key': 1, 'sword': 2, 'torch': 3, 'laser': 4, 'room': 5, 'rope': 6}


In [9]:
def prepare_sentence(sent, to_ix):
	sent = sent.lower().strip().split(' ')
	idxs = [to_ix[w] for w in sent]
	return torch.tensor(idxs, dtype=torch.long)

In [10]:
class LSTMClassifier(nn.Module):

	def __init__(self):
		
		super(LSTMClassifier, self).__init__()

		self.embeddings = nn.Embedding(VOCAB_SIZE, EMBEDDING_DIM)
		self.lstm = nn.LSTM(EMBEDDING_DIM, HIDDEN_DIM_LSTM)
		self.fullyconnected = nn.Linear(HIDDEN_DIM_LSTM, 10)
		self.hidden = self.init_hidden()

	def init_hidden(self):
		# the first is the hidden h
		# the second is the cell  c
		return (autograd.Variable(torch.zeros(1, 1, HIDDEN_DIM_LSTM)),
                autograd.Variable(torch.zeros(1, 1, HIDDEN_DIM_LSTM)))

	def forward(self, sentence):

		embeds = self.embeddings(sentence)
		x = embeds.view(len(sentence), 1, -1)
		lstm_out, self.hidden = self.lstm(x, self.hidden)
		#print (lstm_out)
		y  = self.fullyconnected(lstm_out[-1])
		# log_probs = F.log_softmax(y)
		#print (y)
		return y


In [11]:
class ConvNetClassifier(nn.Module):

	def __init__(self):
		
		super(ConvNetClassifier, self).__init__()

		self.layer1 = nn.Sequential(
				nn.Conv2d(6, 32, kernel_size = 5, stride = 1, padding = 2),
				nn.ReLU(),
				nn.MaxPool2d(kernel_size = 2, stride = 2)
			)

		self.layer2 = nn.Sequential(
				nn.Conv2d(32, 32, kernel_size = 5, stride = 1, padding = 2),
				nn.ReLU(),
				nn.MaxPool2d(kernel_size = 2, stride = 2)
			)

		self.layer3 = nn.Sequential(
				nn.Conv2d(32, 64, kernel_size = 4, stride = 1, padding = 2),
				nn.ReLU(),
				nn.MaxPool2d(kernel_size = 2, stride = 2)
			)

		self.layer4 = nn.Sequential(
				nn.Conv2d(64, 64, kernel_size = 3, stride = 1, padding = 1)
			)

		self.layer5 = nn.Linear(26*20*64 , 10)

		self.layer6 = nn.PReLU()

		self.layer7 = nn.Linear(10, 10)

	def forward(self, x):

		x = np.swapaxes(x,0,2)
		x = np.swapaxes(x,1,2)

		x = autograd.Variable(torch.from_numpy(x).unsqueeze(0).float())

		out = self.layer1(x)
		out = self.layer2(out)
		out = self.layer3(out)
		out = self.layer4(out)
		out = out.view(out.size(0), -1)
		out = self.layer5(out)
		out = self.layer6(out)
		out = self.layer7(out)
		#print (out)
		
		return out

In [12]:
EMBEDDING_DIM = 20
HIDDEN_DIM_LSTM = 10
VOCAB_SIZE = len(word_to_ix)
LABEL_SIZE = len(label_to_ix)

In [13]:
text_model = LSTMClassifier()
image_model = ConvNetClassifier()
loss_function = nn.CosineEmbeddingLoss()
optimizer1 = optim.SGD(text_model.parameters(), lr = 0.001)
optimizer2 = optim.SGD(image_model.parameters(), lr = 0.001)

In [14]:
def train():

	with open('dataset/dataset_true.pickle','rb') as f:

		dataset = pickle.load(f)

	with open('dataset/dataset_false.pickle','rb') as g:
		dataset_false = pickle.load(g)

	for epoch in range(100):

		t1 = time.time()

		total_loss = 0.0

		for (frame1,frame2), sentence in dataset[:300]:

			text_model.hidden = text_model.init_hidden()
			
			text_model.zero_grad()
			image_model.zero_grad()

			enc_sentence = prepare_sentence(sentence, word_to_ix)

			text_embed = text_model(enc_sentence)

			stack = np.dstack((frame1,frame2))

			frame_embed = image_model(stack)

			loss = loss_function(text_embed, frame_embed,torch.tensor([1]).float())

			total_loss += loss.item()

			loss.backward()

			torch.nn.utils.clip_grad_norm(text_model.parameters(),1)
			torch.nn.utils.clip_grad_norm(image_model.parameters(),1)

			optimizer1.step()
			optimizer2.step()

			ind = np.random.randint(0,10000,5)

			for j in ind:

				text_model.hidden = text_model.init_hidden()
			
				text_model.zero_grad()
				image_model.zero_grad()

				enc_sentence = prepare_sentence(dataset_false[j][1], word_to_ix)

				text_embed = text_model(enc_sentence)

				stack = np.dstack(dataset_false[j][0])

				frame_embed = image_model(stack)

				loss = loss_function(text_embed, frame_embed,torch.tensor([-1]).float())

				total_loss += loss.item()

				loss.backward()

				torch.nn.utils.clip_grad_norm(text_model.parameters(),1)
				torch.nn.utils.clip_grad_norm(image_model.parameters(),1)

				optimizer1.step()
				optimizer2.step()

		t2 = time.time()

		print("epoch %d loss %f time %f"%(epoch,total_loss,t2-t1))

		if (epoch+1) % 20 == 0:
			torch.save(text_model, 'models/sentence/text_model_' + str(epoch+1))
			torch.save(image_model, 'models/image/image_model_' + str(epoch+1))


In [15]:
train()



epoch 0 loss 311.436743 time 247.544576
epoch 1 loss 299.015654 time 156.242164
epoch 2 loss 289.814876 time 127.960447
epoch 3 loss 281.724949 time 137.589285
epoch 4 loss 268.464626 time 126.810302
epoch 5 loss 251.551462 time 125.800061
epoch 6 loss 235.933428 time 125.547859
epoch 7 loss 222.313873 time 125.510409
epoch 8 loss 208.731844 time 124.812763
epoch 9 loss 197.394570 time 125.379255
epoch 10 loss 182.958803 time 125.113297
epoch 11 loss 173.178114 time 125.250710
epoch 12 loss 162.841952 time 125.628282
epoch 13 loss 153.646677 time 125.869439
epoch 14 loss 146.143791 time 126.454809
epoch 15 loss 138.492091 time 125.854172
epoch 16 loss 127.874507 time 126.711901
epoch 17 loss 112.803808 time 125.118589
epoch 18 loss 105.526410 time 125.435634
epoch 19 loss 104.247145 time 127.637588
epoch 20 loss 91.221749 time 134.050073
epoch 21 loss 92.232859 time 127.089964
epoch 22 loss 91.187590 time 127.221822
epoch 23 loss 88.236347 time 129.442087
epoch 24 loss 79.841021 time 1

In [16]:
def false_dataset():

	#text_model = torch.load('models/text_model_50')
	#image_model = torch.load('models/image_model_50')

	with open('dataset/dataset_true.pickle','rb') as f:
		dataset = pickle.load(f)

	dataset_false = []

	for i in range(300):
		for j in range(300):

			if dataset[i][1] != dataset[j][1]:

				dataset_false.append((dataset[i][0],dataset[j][1]))
				dataset_false.append((dataset[j][0],dataset[i][1]))

	print (len(dataset_false))

	with open('dataset/dataset_false.pickle','wb') as f:
		pickle.dump(dataset_false,f)

	test_dataset_false = []

	for i in range(301, 347):
		for j in range(301, 347):

			if dataset[i][1] != dataset[j][1]:

				test_dataset_false.append((dataset[i][0],dataset[j][1]))
				test_dataset_false.append((dataset[j][0],dataset[i][1]))

	print (len(test_dataset_false))


In [17]:
false_dataset()

142044
3384


In [18]:
def test():

	text_model = torch.load('models/sentence/text_model_60')
	image_model = torch.load('models/image/image_model_60')

	# True labels

	with open('dataset/dataset_true.pickle','rb') as f:
		true_dataset = pickle.load(f)

	items = np.random.randint(301, 347, 15)

	iter = 1
	for i in items:
		(img1, img2), text = true_dataset[i]
		#img1 = img1[:,:,::-1]
		#img2 = img2[:,:,::-1]
		'''
		enc_sentence = prepare_sentence(text, word_to_ix)
		text_embed = text_model(enc_sentence)
		stack = np.dstack((img1, img2))
		frame_embed = image_model(stack)

		dp = torch.dot(text_embed[0], frame_embed[0]) / (torch.norm(text_embed[0]) * torch.norm(frame_embed[0]))
		print(dp)
		'''
		#both = np.hstack((img1, img2))
		c1 = cv2.copyMakeBorder(img1,10,10,10,10,cv2.BORDER_CONSTANT,value=[255,255,255])
		c2 = cv2.copyMakeBorder(img2,10,10,10,10,cv2.BORDER_CONSTANT,value=[255,255,255])

		both = np.hstack((c1,c2))
		print (text)
		cv2.imshow('sample', both)
		cv2.waitKey(0)

In [19]:
test()

Climb up the ladder
Go to the center of the room
Go to the center of the room
Jump to the rope
Jump to the rope
Go to the center of the room
Jump to the rope
Climb up the ladder
Go to the right room
Use the key
Go to the center of the room
Go to the center of the room
Climb up the ladder
Use the key
Go to the center of the room
