<a href="https://colab.research.google.com/github/mark-wijnands/internship_rainbow_mwijnands/blob/main/CollisionNetwork_2layer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Notebook to train and evaluate a collision detection network developed by M. Wijnands for the Rainbow institute.

Install and import packages. Torchmetrics is not a standard colab package and has to be installed using:

In [None]:
!pip install torchmetrics

import numpy as np
import torch
import torchvision
import os
import time
import random
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torchmetrics
from torch.utils.data import DataLoader

%matplotlib inline


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchmetrics
  Downloading torchmetrics-0.10.3-py3-none-any.whl (529 kB)
[K     |████████████████████████████████| 529 kB 5.2 MB/s 
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.10.3


Mount drive for extracting the data:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Create test and train dataset classes and initialize:

In [None]:
class CollisionDataset(torch.utils.data.Dataset):
  def __init__(self, path, convertion) -> None:
      self.path = path
      self.convertion = convertion
      with open(path+'Xnorm') as f1:
          lines = f1.readlines()
          self.source = [line.split() for line in lines]
      with open(path+'labels') as f1:
          lines = f1.readlines()
          self.target = [line.split() for line in lines]

  def convert(self,x,convertion):
    if convertion == 0:
      pass
    if convertion == 1:
      if x > 0:
        x = 1
    if convertion == 2:
      if x == 1:
        x = 0
      else:
        x = 1
    if convertion == 3:
      if x == 2:
        x = 0
      else:
        x = 1
    return x

  def __getitem__(self, idx) -> torch.Tensor:
      source_sample = self.source[idx]
      target_sample = self.target[idx]

      source_sample = [float(x) for x in source_sample]
      
      # #augment:
      # W = int(len(source_sample)/7)
      # for i in range(7):
      #   multiplier = 0.5+random.randrange(0, 1000, 1)/1000
      #   for j in range(W):
      #     source_sample[i+7*j] = multiplier*source_sample[i+7*j]

      source_sample = np.array(source_sample, dtype=np.float16)

      target_sample = [float(x) for x in target_sample]
      target_sample = [int(x) for x in target_sample]
      target_sample = [self.convert(x,self.convertion) for x in target_sample]
      target_sample = np.array(target_sample)

      source_sample = torch.tensor(source_sample, dtype=torch.double)
      target_sample = torch.tensor(target_sample, dtype=torch.int32)

      source_sample = source_sample[None, :]
      target_sample = target_sample[None, :]

      return source_sample, target_sample

  def __len__(self):
      return len(self.target)

generator = torch.Generator()
generator.manual_seed(42)        #in order to keep same sets as trained on (otherwise samples in eval that were trained on) --> obsolete because of earlier split in train/test v.s. eval

dataset1 =  CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/ds7_avg2_W10/set1/',1)
dataset2 =  CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/ds7_avg2_W10/set2/',2)
dataset3 =  CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/ds7_avg2_W10/set3/',3)

trainlen1 = int(0.8*dataset1.__len__())
testlen1 = dataset1.__len__() - trainlen1
testdataset1, traindataset1 = torch.utils.data.random_split(dataset1, [testlen1, trainlen1], generator=generator)

trainlen2 = int(0.8*dataset2.__len__())
testlen2 = dataset2.__len__() - trainlen2
testdataset2, traindataset2 = torch.utils.data.random_split(dataset2, [testlen2, trainlen2], generator=generator)

trainlen3 = int(0.8*dataset3.__len__())
testlen3 = dataset3.__len__() - trainlen3
testdataset3, traindataset3 = torch.utils.data.random_split(dataset3, [testlen3, trainlen3], generator=generator)

evalset = CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/ds7_test_avg2_eval_W30/',0)

save_path1 = "/content/drive/MyDrive/Colab Notebooks/Models_ds7_2layer_ch64_ch128_W10_avg2/ModelsNN1"
save_path2 = "/content/drive/MyDrive/Colab Notebooks/Models_ds7_2layer_ch64_ch128_W10_avg2/ModelsNN2"
save_path3 = "/content/drive/MyDrive/Colab Notebooks/Models_ds7_2layer_ch64_ch128_W10_avg2/ModelsNN3"


Create network class. Normal 1D convolutions with alternating valid and same padding are used. There is no sigmoid as the out function, as a BCEWithLogitsLoss is used in the training which already incorporates this function.

In [None]:
class CollisionNetwork(nn.Module):

	def __init__(self, no_layers=11, kernel = 3, device=None):
		super(CollisionNetwork, self).__init__()
		self.no_layers = no_layers
		self.kernel = kernel
		self.layers = {}
		self.device = device

		self.Conv1d_1 = nn.Conv1d(1, 64, kernel, stride=1, padding='valid', bias=True, dtype=torch.double)
		self.BatchNorm1d_1 = nn.BatchNorm1d(64, dtype=torch.double)
		self.ReLU_1= nn.ReLU(True)

		self.Conv1d_2 = nn.Conv1d(64, 128, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		self.BatchNorm1d_2 = nn.BatchNorm1d(128, dtype=torch.double)
		self.ReLU_2= nn.ReLU(True)        

		# self.Conv1d_3 = nn.Conv1d(32, 32, kernel, stride=1, dilation=2, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_3 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_3= nn.ReLU(True)    

		# self.Conv1d_4 = nn.Conv1d(32, 64, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_4 = nn.BatchNorm1d(64, dtype=torch.double)
		# self.ReLU_4= nn.ReLU(True)    
	
		# self.Conv1d_5 = nn.Conv1d(64, 64, kernel, stride=1, dilation=4, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_5 = nn.BatchNorm1d(64, dtype=torch.double)
		# self.ReLU_5= nn.ReLU(True)        

		# self.Conv1d_6 = nn.Conv1d(64, 64, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_6 = nn.BatchNorm1d(64, dtype=torch.double)
		# self.ReLU_6= nn.ReLU(True)    

		# self.Conv1d_7 = nn.Conv1d(64, 128, kernel, stride=1, dilation=8, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_7 = nn.BatchNorm1d(128, dtype=torch.double)
		# self.ReLU_7= nn.ReLU(True)    

		# self.Conv1d_8 = nn.Conv1d(128, 128, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_8 = nn.BatchNorm1d(128, dtype=torch.double)
		# self.ReLU_8= nn.ReLU(True) 

		self.Flatten = nn.Flatten()
	
		self.out = nn.Linear(8704,1, dtype=torch.double) 

	def forward(self, x):
		x = self.Conv1d_1(x)
		x = self.BatchNorm1d_1(x)
		x = self.ReLU_1(x)

		x = self.Conv1d_2(x)
		x = self.BatchNorm1d_2(x)
		x = self.ReLU_2(x)

		# x = self.Conv1d_3(x)
		# x = self.BatchNorm1d_3(x)
		# x = self.ReLU_3(x)
		
		# x = self.Conv1d_4(x)
		# x = self.BatchNorm1d_4(x)
		# x = self.ReLU_4(x)
		
		# x = self.Conv1d_5(x)
		# x = self.BatchNorm1d_5(x)
		# x = self.ReLU_5(x)
		
		# x = self.Conv1d_6(x)
		# x = self.BatchNorm1d_6(x)
		# x = self.ReLU_6(x)
		
		# x = self.Conv1d_7(x)
		# x = self.BatchNorm1d_7(x)
		# x = self.ReLU_7(x)
		
		# x = self.Conv1d_8(x)
		# x = self.BatchNorm1d_8(x)
		# x = self.ReLU_8(x)
	
		x = self.Flatten(x)
  
		return self.out(x)



Train the network. Batch size and epochs can be chosen here. A weight is given in the BCE criterion as there are far more non-collision samples. Otherwise accuracy will be high, recall will be bad.

In [None]:
batch_size = 64
epochs = 30
# save_path = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30/ModelsNN1"
save_path = save_path1

train = DataLoader(traindataset1, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)
test = DataLoader(testdataset1, batch_size=batch_size, shuffle=False, num_workers =1, pin_memory = True)

#Defining the model and training it on loss function
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = CollisionNetwork().to(device)

optimizer = torch.optim.Adam(net.parameters())
pos_weight = torch.ones([1])									#-- there is about 20-25x more no_collision data so add pos_weight to balance
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) 			#more weight for false_positives, as dataset contains far more negative collision samples

loss_overall = []
acc_overall = [None]*epochs
recall_overall = [None]*epochs
precision_overall = [None]*epochs
acc_overall_test = [None]*epochs
acc_overall_train = [None]*epochs
time_start = time.time()
print('Training Started')

train_accuracy = torchmetrics.Accuracy() 			#in order to check if model learns properly
# train_recall = torchmetrics.Recall()
# train_precision =  torchmetrics.Precision()

plt.figure()
for i in range(epochs):

  net.train(True)
  step = 0
  loss_= 0
  for images, target in train:

    target = target.squeeze()
    target = target.double()
    images = images.to(device)
    target = target.to(device)

    optimizer.zero_grad()

    output = net(images)
    output = output.squeeze()

    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

    out_rounded = torch.round(output)
    target_tensor = torch.tensor(target, dtype=torch.int8)

    acc = train_accuracy(out_rounded, target_tensor)

    # recall = train_recall(out_rounded, target_tensor)
    # recall_overall[i] = recall.item()

    # precision = train_precision(out_rounded, target_tensor)
    # precision_overall[i] = precision.item()

    loss_+=loss
    step+=1

    # if(step%100 == 0):
    #   print('Epoch:'+str(i)+'\t'+ str(step) +'\t Iterations Complete \t'+'loss: ', loss.item()/1000.0)
    #   loss_overall.append(loss_/1000.0)
    #   loss_=0

  total_train_accuracy = train_accuracy.compute()
  print(f"Accuracy: {total_train_accuracy}")
  train_accuracy.reset()
  # total_train_recall= train_recall.compute()
  # print(f"Recall: {total_train_recall}")
  # train_recall.reset()    
  # total_train_precision = train_precision.compute()
  # print(f"Precision: {total_train_precision}")
  # train_precision.reset()

  #Saving the model
  if not os.path.exists(save_path):
    os.makedirs(save_path)
  if(i==epochs-1):
    torch.save(net.state_dict(), save_path+'/Model_Checkpoint_'+'Last'+'.pt')
  else:
    torch.save(net.state_dict(), save_path+'/Model_Checkpoint_'+str(i)+'.pt')

  net.eval()

  test_accuracy = torchmetrics.Accuracy() 
  test_confusion = torchmetrics.ConfusionMatrix(num_classes=2)
  for sample, target in test:
      target = target.squeeze()
      target = target.double()
      sample = sample.to(device)
      target = target.to(device)

      out = net(sample).squeeze()

      out_rounded = torch.round(out)
      target_tensor = torch.tensor(target, dtype=torch.int8)

      acc = test_accuracy(out_rounded, target_tensor)

      confusion = test_confusion(out_rounded, target_tensor)

  total_test_accuracy = test_accuracy.compute()
  print(f"Accuracy: {total_test_accuracy}")
  test_accuracy.reset()
  total_test_confusion = test_confusion.compute()
  print(f"Confusion: {total_test_confusion}")
  test_confusion.reset()

print('Training Finished! Time Taken: ', time.time()-time_start)


Training Started


RuntimeError: ignored

<Figure size 432x288 with 0 Axes>

Load the model from the last checkpoint and evaluate. Note that W should equal the window size W of the used data and batch_size of the test and train loaders should be equal!! (variable has been declared here again s.t. the cell can be run independent of the training cell)

# Second network: hard v.s. soft classification

In [None]:
class CollisionNetwork2(nn.Module):

	def __init__(self, no_layers=11, kernel = 3, device=None):
		super(CollisionNetwork2, self).__init__()
		self.no_layers = no_layers
		self.kernel = kernel
		self.layers = {}
		self.device = device

		self.Conv1d_1 = nn.Conv1d(1, 64, kernel, stride=1, padding='valid', bias=True, dtype=torch.double)
		self.BatchNorm1d_1 = nn.BatchNorm1d(64, dtype=torch.double)
		self.ReLU_1= nn.ReLU(True)

		self.Conv1d_2 = nn.Conv1d(64, 128, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		self.BatchNorm1d_2 = nn.BatchNorm1d(128, dtype=torch.double)
		self.ReLU_2= nn.ReLU(True)        

		# self.Conv1d_3 = nn.Conv1d(32, 32, kernel, stride=1, dilation=2, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_3 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_3= nn.ReLU(True)    

		# self.Conv1d_4 = nn.Conv1d(32, 32, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_4 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_4= nn.ReLU(True)    
	
		# self.Conv1d_5 = nn.Conv1d(32, 32, kernel, stride=1, dilation=4, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_5 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_5= nn.ReLU(True)        

		# self.Conv1d_6 = nn.Conv1d(32, 32, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_6 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_6= nn.ReLU(True)    

		# self.Conv1d_7 = nn.Conv1d(32, 32, kernel, stride=1, dilation=8, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_7 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_7= nn.ReLU(True)    

		# self.Conv1d_8 = nn.Conv1d(32, 32, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_8 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_8= nn.ReLU(True) 

		self.Flatten = nn.Flatten()
	
		self.out = nn.Linear(8704,1, dtype=torch.double) 

	def forward(self, x):
		x = self.Conv1d_1(x)
		x = self.BatchNorm1d_1(x)
		x = self.ReLU_1(x)

		x = self.Conv1d_2(x)
		x = self.BatchNorm1d_2(x)
		x = self.ReLU_2(x)

		# x = self.Conv1d_3(x)
		# x = self.BatchNorm1d_3(x)
		# x = self.ReLU_3(x)
		
		# x = self.Conv1d_4(x)
		# x = self.BatchNorm1d_4(x)
		# x = self.ReLU_4(x)
		
		# x = self.Conv1d_5(x)
		# x = self.BatchNorm1d_5(x)
		# x = self.ReLU_5(x)
		
		# x = self.Conv1d_6(x)
		# x = self.BatchNorm1d_6(x)
		# x = self.ReLU_6(x)
		
		# x = self.Conv1d_7(x)
		# x = self.BatchNorm1d_7(x)
		# x = self.ReLU_7(x)
		
		# x = self.Conv1d_8(x)
		# x = self.BatchNorm1d_8(x)
		# x = self.ReLU_8(x)
	
		x = self.Flatten(x)
  
		return self.out(x)



In [None]:
batch_size = 64
epochs = 30
# save_path = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30/ModelsNN2"
save_path = save_path2

# train = DataLoader(traindataset2, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)
# test = DataLoader(testdataset2, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)

train = DataLoader(traindataset2, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)
test = DataLoader(testdataset2, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)

#Defining the model and training it on loss function
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = CollisionNetwork2().to(device)

# optimizer = torch.optim.Adam(net.parameters())
optimizer = torch.optim.Adam(net.parameters())
# optimizer = torch.optim.Adam(net.parameters(), lr=0.0004, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)
pos_weight = torch.ones([1])					        #-- there is about 6x more soft_collision data so add pos_weight to balance --> obsolete because of new balanced sampler
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) 			#more weight for false_positives, as dataset contains far more negative collision samples
# criterion = nn.BCEWithLogitsLoss() 			#more weight for false_positives, as dataset contains far more negative collision samples

loss_overall = []
acc_overall = [None]*epochs
recall_overall = [None]*epochs
precision_overall = [None]*epochs
acc_overall_test = [None]*epochs
acc_overall_train = [None]*epochs
time_start = time.time()
print('Training Started')

train_accuracy = torchmetrics.Accuracy() 			#in order to check if model learns properly
# train_recall = torchmetrics.Recall()
# train_precision =  torchmetrics.Precision()

plt.figure()
for i in range(epochs):

  net.train(True)
  step = 0
  loss_= 0
  for images, target in train:

    # images, target = next(iter(train))
    target = target.squeeze()
    target = target.double()
    images = images.to(device)
    target = target.to(device)

    optimizer.zero_grad()

    output = net(images)
    output = output.squeeze()

    # print(np.shape(output))
    # print(np.shape(target))

    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

    out_rounded = torch.round(output)
    target_tensor = torch.tensor(target, dtype=torch.int8)

    acc = train_accuracy(out_rounded, target_tensor)
    acc_overall_test[i] = acc.item()

    # recall = train_recall(out_rounded, target_tensor)
    # recall_overall[i] = recall.item()

    # precision = train_precision(out_rounded, target_tensor)
    # precision_overall[i] = precision.item()

    loss_+=loss
    step+=1

    # if(step%100 == 0):
    #   print('Epoch:'+str(i)+'\t'+ str(step) +'\t Iterations Complete \t'+'loss: ', loss.item()/1000.0)
    #   loss_overall.append(loss_/1000.0)
    #   loss_=0

  total_train_accuracy = train_accuracy.compute()
  print(f"Accuracy: {total_train_accuracy}")
  train_accuracy.reset()
  # total_train_recall= train_recall.compute()
  # print(f"Recall: {total_train_recall}")
  # train_recall.reset()    
  # total_train_precision = train_precision.compute()
  # print(f"Precision: {total_train_precision}")
  # train_precision.reset()

  #Saving the model
  if not os.path.exists(save_path):
    os.makedirs(save_path)
  if(i==epochs-1):
    torch.save(net.state_dict(), save_path+'/Model_Checkpoint_'+'Last'+'.pt')
  else:
    torch.save(net.state_dict(), save_path+'/Model_Checkpoint_'+str(i)+'.pt')

  net.eval()

  test_accuracy = torchmetrics.Accuracy() 
  test_confusion = torchmetrics.ConfusionMatrix(num_classes=2)
  for sample, target in test:
      target = target.squeeze()
      target = target.double()
      sample = sample.to(device)
      target = target.to(device)

      out = net(sample).squeeze()

      out_rounded = torch.round(out)
      target_tensor = torch.tensor(target, dtype=torch.int8)

      acc = test_accuracy(out_rounded, target_tensor)
      acc_overall_test[i] = acc.item()

      confusion = test_confusion(out_rounded, target_tensor)

  total_test_accuracy = test_accuracy.compute()
  print(f"Accuracy: {total_test_accuracy}")
  test_accuracy.reset()
  total_test_confusion = test_confusion.compute()
  print(f"Confusion: {total_test_confusion}")
  test_confusion.reset()

print('Training Finished! Time Taken: ', time.time()-time_start)


Training Started




Accuracy: 0.6620667576789856




Accuracy: 0.75738126039505
Confusion: tensor([[387, 174],
        [204, 793]])
Accuracy: 0.6901476383209229
Accuracy: 0.7310654520988464
Confusion: tensor([[503,  58],
        [361, 636]])
Accuracy: 0.7174261808395386
Accuracy: 0.7079589366912842
Confusion: tensor([[524,  37],
        [418, 579]])
Accuracy: 0.7310654520988464
Accuracy: 0.7541720271110535
Confusion: tensor([[440, 121],
        [262, 735]])
Accuracy: 0.7297817468643188
Accuracy: 0.7920410633087158
Confusion: tensor([[499,  62],
        [262, 735]])
Accuracy: 0.7262516021728516
Accuracy: 0.7188703417778015
Confusion: tensor([[525,  36],
        [402, 595]])
Accuracy: 0.7408536672592163
Accuracy: 0.7637997269630432
Confusion: tensor([[371, 190],
        [178, 819]])
Accuracy: 0.7421373724937439
Accuracy: 0.7650834321975708
Confusion: tensor([[529,  32],
        [334, 663]])
Accuracy: 0.7496790885925293
Accuracy: 0.7567394375801086
Confusion: tensor([[524,  37],
        [342, 655]])
Accuracy: 0.7641206383705139
Accuracy: 0.

<Figure size 432x288 with 0 Axes>

In [None]:
# plt.figure()
# plt.plot(acc_overall, label='accuracy')
# plt.plot(recall_overall, label='recall')
# plt.plot(precision_overall, label='precision')
# plt.title('Training metrics')
# plt.xlabel('epochs')
# plt.legend()

# plt.figure()
# plt.plot(loss_overall)
# plt.show()

# # print(acc_overall)


In [None]:
# # load_path = "/content/drive/MyDrive/Colab Notebooks/ModelsNN2/Model_Checkpoint_Last.pt"
# load_path = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30/ModelsNN2/Model_Checkpoint_Last.pt"

# batch_size = 24

# test = DataLoader(testdatasetNN2, batch_size=batch_size, shuffle=False, num_workers =1, pin_memory = True)

# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# net = CollisionNetwork2().to(device)
# net.load_state_dict(torch.load(load_path))
# net.eval()

# test_accuracy = torchmetrics.Accuracy() 
# test_recall = torchmetrics.Recall()
# test_precision =  torchmetrics.Precision()
# test_confusion = torchmetrics.ConfusionMatrix(num_classes=2)
# Nsamples_ = 0
# Tsamples_ = 0
# for sample, target in test:
#     sample, target = next(iter(test))
#     target = target.squeeze()
#     target = target.double()
#     sample = sample.to(device)
#     target = target.to(device)

#     out = net(sample).squeeze()

#     out_rounded = torch.round(out)
#     target_tensor = torch.tensor(target, dtype=torch.int8)

#     Tsamples = torch.sum(target)
#     Nsamples = target.size(0) - Tsamples

#     Tsamples_ += Tsamples
#     Nsamples_ += Nsamples

#     acc = test_accuracy(out_rounded, target_tensor)
#     acc = acc.item()

#     recall = test_recall(out_rounded, target_tensor)
#     recall = recall.item()

#     precision = test_precision(out_rounded, target_tensor)
#     precision = precision.item()

#     confusion = test_confusion(out_rounded, target_tensor)

# total_test_accuracy = test_accuracy.compute()
# print(f"Accuracy: {total_test_accuracy}")
# test_accuracy.reset()
# total_test_recall= test_recall.compute()
# print(f"Recall: {total_test_recall}")
# test_recall.reset()    
# total_test_precision = test_precision.compute()
# print(f"Precision: {total_test_precision}")
# test_precision.reset()
# total_test_confusion = test_confusion.compute()
# print(f"Confusion: {total_test_confusion}")
# test_confusion.reset()
# print(Tsamples_)
# print(Nsamples_)


# Third network: intentional v.s. unintentional classification

In [None]:
class CollisionNetwork3(nn.Module):

	def __init__(self, no_layers=11, kernel = 3, device=None):
		super(CollisionNetwork3, self).__init__()
		self.no_layers = no_layers
		self.kernel = kernel
		self.layers = {}
		self.device = device

		self.Conv1d_1 = nn.Conv1d(1, 64, kernel, stride=1, padding='valid', bias=True, dtype=torch.double)
		self.BatchNorm1d_1 = nn.BatchNorm1d(64, dtype=torch.double)
		self.ReLU_1= nn.ReLU(True)

		self.Conv1d_2 = nn.Conv1d(64, 128, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		self.BatchNorm1d_2 = nn.BatchNorm1d(128, dtype=torch.double)
		self.ReLU_2= nn.ReLU(True)        

		# self.Conv1d_3 = nn.Conv1d(32, 32, kernel, stride=1, dilation=2, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_3 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_3= nn.ReLU(True)    

		# self.Conv1d_4 = nn.Conv1d(32, 32, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_4 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_4= nn.ReLU(True)    
	
		# self.Conv1d_5 = nn.Conv1d(32, 32, kernel, stride=1, dilation=4, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_5 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_5= nn.ReLU(True)        

		# self.Conv1d_6 = nn.Conv1d(32, 32, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_6 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_6= nn.ReLU(True)    

		# self.Conv1d_7 = nn.Conv1d(32, 32, kernel, stride=1, dilation=8, padding='valid', bias=True, dtype=torch.double)
		# self.BatchNorm1d_7 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_7= nn.ReLU(True)    

		# self.Conv1d_8 = nn.Conv1d(32, 32, kernel, stride=1, padding='same', bias=True, dtype=torch.double)
		# self.BatchNorm1d_8 = nn.BatchNorm1d(32, dtype=torch.double)
		# self.ReLU_8= nn.ReLU(True) 

		self.Flatten = nn.Flatten()
	
		self.out = nn.Linear(8704,1, dtype=torch.double) 

	def forward(self, x):
		x = self.Conv1d_1(x)
		x = self.BatchNorm1d_1(x)
		x = self.ReLU_1(x)

		x = self.Conv1d_2(x)
		x = self.BatchNorm1d_2(x)
		x = self.ReLU_2(x)

		# x = self.Conv1d_3(x)
		# x = self.BatchNorm1d_3(x)
		# x = self.ReLU_3(x)
		
		# x = self.Conv1d_4(x)
		# x = self.BatchNorm1d_4(x)
		# x = self.ReLU_4(x)
		
		# x = self.Conv1d_5(x)
		# x = self.BatchNorm1d_5(x)
		# x = self.ReLU_5(x)
		
		# x = self.Conv1d_6(x)
		# x = self.BatchNorm1d_6(x)
		# x = self.ReLU_6(x)
		
		# x = self.Conv1d_7(x)
		# x = self.BatchNorm1d_7(x)
		# x = self.ReLU_7(x)
		
		# x = self.Conv1d_8(x)
		# x = self.BatchNorm1d_8(x)
		# x = self.ReLU_8(x)
	
		x = self.Flatten(x)
  
		return self.out(x)



In [None]:
batch_size = 64
epochs = 30
# save_path = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30/ModelsNN3"
save_path = save_path3

train = DataLoader(traindataset3, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)
test = DataLoader(testdataset3, batch_size=batch_size, shuffle=True, num_workers =1, pin_memory = True)

#Defining the model and training it on loss function
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = CollisionNetwork3().to(device)

# optimizer = torch.optim.Adam(net.parameters())
optimizer = torch.optim.Adam(net.parameters())
# optimizer = torch.optim.Adam(net.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)
criterion = nn.BCEWithLogitsLoss() 			#remove weight as dataset is balanced for soft int / unint


loss_overall = []
acc_overall = [None]*epochs
recall_overall = [None]*epochs
precision_overall = [None]*epochs
acc_overall_test = [None]*epochs
acc_overall_train = [None]*epochs
time_start = time.time()
print('Training Started')

train_accuracy = torchmetrics.Accuracy() 			#in order to check if model learns properly
# train_recall = torchmetrics.Recall()
# train_precision =  torchmetrics.Precision()

plt.figure()
for i in range(epochs):

  net.train(True)
  step = 0
  loss_= 0
  for images, target in train:

    # images, target = next(iter(train))
    target = target.squeeze()
    target = target.double()
    images = images.to(device)
    target = target.to(device)

    optimizer.zero_grad()

    output = net(images)
    output = output.squeeze()

    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

    out_rounded = torch.round(output)
    target_tensor = torch.tensor(target, dtype=torch.int8)

    acc = train_accuracy(out_rounded, target_tensor)
    acc_overall_test[i] = acc.item()

    # recall = train_recall(out_rounded, target_tensor)
    # recall_overall[i] = recall.item()

    # precision = train_precision(out_rounded, target_tensor)
    # precision_overall[i] = precision.item()

    loss_+=loss
    step+=1

    # if(step%100 == 0):
    #   print('Epoch:'+str(i)+'\t'+ str(step) +'\t Iterations Complete \t'+'loss: ', loss.item()/1000.0)
    #   loss_overall.append(loss_/1000.0)
    #   loss_=0

  total_train_accuracy = train_accuracy.compute()
  print(f"Accuracy: {total_train_accuracy}")
  train_accuracy.reset()
  # total_train_recall= train_recall.compute()
  # print(f"Recall: {total_train_recall}")
  # train_recall.reset()    
  # total_train_precision = train_precision.compute()
  # print(f"Precision: {total_train_precision}")
  # train_precision.reset()

  #Saving the model
  if not os.path.exists(save_path):
    os.makedirs(save_path)
  if(i==epochs-1):
    torch.save(net.state_dict(), save_path+'/Model_Checkpoint_'+'Last'+'.pt')
  else:
    torch.save(net.state_dict(), save_path+'/Model_Checkpoint_'+str(i)+'.pt')

  # net.eval()

  # test_accuracy = torchmetrics.Accuracy() 
  # test_confusion = torchmetrics.ConfusionMatrix(num_classes=2)
  # for sample, target in test:
  #     target = target.squeeze()
  #     target = target.double()
  #     sample = sample.to(device)
  #     target = target.to(device)

  #     out = net(sample).squeeze()

  #     out_rounded = torch.round(out)
  #     target_tensor = torch.tensor(target, dtype=torch.int8)

  #     acc = test_accuracy(out_rounded, target_tensor)
  #     acc_overall_test[i] = acc.item()

  #     confusion = test_confusion(out_rounded, target_tensor)

  # total_test_accuracy = test_accuracy.compute()
  # print(f"Accuracy: {total_test_accuracy}")
  # test_accuracy.reset()
  # total_test_confusion = test_confusion.compute()
  # print(f"Confusion: {total_test_confusion}")
  # test_confusion.reset()

print('Training Finished! Time Taken: ', time.time()-time_start)

Training Started




Accuracy: 0.797550618648529
Accuracy: 0.8295426368713379
Accuracy: 0.8505373597145081
Accuracy: 0.857035756111145
Accuracy: 0.8702824115753174
Accuracy: 0.8700324892997742
Accuracy: 0.8452886939048767
Accuracy: 0.8625343441963196
Accuracy: 0.8775306344032288
Accuracy: 0.8607848286628723
Accuracy: 0.8860284686088562
Accuracy: 0.8932766914367676
Accuracy: 0.8885278701782227
Accuracy: 0.8912771940231323
Accuracy: 0.8860284686088562
Accuracy: 0.8865283727645874
Accuracy: 0.9047738313674927
Accuracy: 0.9047738313674927
Accuracy: 0.8992751836776733
Accuracy: 0.9022744297981262
Accuracy: 0.9155211448669434
Accuracy: 0.9152711629867554
Accuracy: 0.9132716655731201
Accuracy: 0.9022744297981262
Accuracy: 0.9175206422805786
Accuracy: 0.9240189790725708
Accuracy: 0.9232692122459412
Accuracy: 0.9152711629867554
Accuracy: 0.930767297744751
Accuracy: 0.9320169687271118
Training Finished! Time Taken:  279.75229477882385


<Figure size 432x288 with 0 Axes>

In [None]:
# load_path = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30/ModelsNN3/Model_Checkpoint_Last.pt"

# batch_size = 24

# test = DataLoader(testdatasetNN3, batch_size=batch_size, shuffle=False, num_workers =1, pin_memory = True)

# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# net = CollisionNetwork3().to(device)
# net.load_state_dict(torch.load(load_path))
# net.eval()

# test_accuracy = torchmetrics.Accuracy() 
# test_recall = torchmetrics.Recall()
# test_precision =  torchmetrics.Precision()
# test_confusion = torchmetrics.ConfusionMatrix(num_classes=2)
# Nsamples_ = 0
# Tsamples_ = 0
# for sample, target in test:
#     sample, target = next(iter(test))
#     target = target.squeeze()
#     target = target.double()
#     sample = sample.to(device)
#     target = target.to(device)

#     out = net(sample).squeeze()

#     out_rounded = torch.round(out)
#     target_tensor = torch.tensor(target, dtype=torch.int8)

#     Tsamples = torch.sum(target)
#     Nsamples = target.size(0) - Tsamples

#     Tsamples_ += Tsamples
#     Nsamples_ += Nsamples

#     # print(out_rounded)
#     # print(target_tensor)

#     acc = test_accuracy(out_rounded, target_tensor)
#     acc = acc.item()

#     recall = test_recall(out_rounded, target_tensor)
#     recall = recall.item()

#     precision = test_precision(out_rounded, target_tensor)
#     precision = precision.item()

#     confusion = test_confusion(out_rounded, target_tensor)

# total_test_accuracy = test_accuracy.compute()
# print(f"Accuracy: {total_test_accuracy}")
# test_accuracy.reset()
# total_test_recall= test_recall.compute()
# print(f"Recall: {total_test_recall}")
# test_recall.reset()    
# total_test_precision = test_precision.compute()
# print(f"Precision: {total_test_precision}")
# test_precision.reset()
# total_test_confusion = test_confusion.compute()
# print(f"Confusion: {total_test_confusion}")
# test_confusion.reset()
# print(Tsamples_)
# print(Nsamples_)

How will total pipeline look if only ramp samples are taken for classification of intensity and intensional? Wont work for period after collisionramp, as only trained on.

for window
do collision detection, if no collision return 0, if collision do classification, return 1, 2 or 3. 
If previous window was collision, return same number

In [None]:
# # #only for cross-accuracy, otherwise comment out:
# save_path1 = "/content/drive/MyDrive/Colab Notebooks/Models_ds7_2layer_ch128_ch256_W30_avg2/ModelsNN1"
# save_path2 = "/content/drive/MyDrive/Colab Notebooks/Models_ds7_2layer_ch128_ch256_W30_avg2/ModelsNN2"
# save_path3 = "/content/drive/MyDrive/Colab Notebooks/Models_ds7_2layer_ch128_ch256_W30_avg2/ModelsNN3"
# evalset = CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/diff_ctrl_eval_W30/',0)
evalset = CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/ds7_test_eval_avg2_W10/',0)

# load_path_NN1 = save_path1+"/Model_Checkpoint_Last.pt"
load_path_NN1 = save_path1+"/Model_Checkpoint_23.pt"
load_path_NN2 = save_path2+"/Model_Checkpoint_26.pt"
load_path_NN3 = save_path3+"/Model_Checkpoint_25.pt"

batch_size = 1        #make 1 for boolean check of NN1 and NN2 to continue through the pipeline

eval = DataLoader(evalset, batch_size=batch_size, shuffle=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

net_NN1 = CollisionNetwork().to(device)
net_NN1.load_state_dict(torch.load(load_path_NN1))
net_NN1.eval()
net_NN2 = CollisionNetwork2().to(device)
net_NN2.load_state_dict(torch.load(load_path_NN2))
net_NN2.eval()
net_NN3 = CollisionNetwork3().to(device)
net_NN3.load_state_dict(torch.load(load_path_NN3))
net_NN3.eval()

mistakes = np.zeros(1)
nmistakes = 0

accuracy = 0
count = 0
final_confusion = torchmetrics.ConfusionMatrix(num_classes=4)
for sample, target in eval:
    sample = sample.to(device)
    target = target.to(device)
    target = target.squeeze()
    target = target.double()

    out = net_NN1(sample).squeeze()
    out_rounded = torch.round(out)
    target_tensor = torch.tensor(target, dtype=torch.int8)
    # if target >= 1:           #so we get 100% accuracy NN1 and test 2 and 3 without the need of a new dataset
    if out_rounded >= 1:                      #-- if collision is detected
        out = net_NN2(sample).squeeze()
        out_rounded = torch.round(out)
        if out_rounded <= 0:                  # --if collision is hard
          out_final = 1
        else:
          out = net_NN3(sample).squeeze()
          out_rounded = torch.round(out)
          if out_rounded <= 0:                # --if collision is intentional
            out_final = 2
          else:                               # --if collision is unintentional
            out_final = 3
    else:                                     # --if no collision is detected
      out_final = 0

    out_final = torch.tensor([out_final], dtype=torch.int8)
    confusion = final_confusion(out_final, target_tensor)

    # print(out_final.item(), int(target.item()))

    count += 1
    if (out_final == target_tensor.item()):
      accuracy += 1
      mistakes = np.append(mistakes,nmistakes)
    else:
      nmistakes +=1
      mistakes = np.append(mistakes,nmistakes)
      
total_final_confusion= final_confusion.compute()
print(f"Confusion: {total_final_confusion}")
final_confusion.reset()

final_accuracy = accuracy/count
print(f"Accuracy: {final_accuracy}")




Confusion: tensor([[184,   2,   4,   4],
        [  9,  14,   0,  13],
        [  1,   0,  46,   0],
        [  1,   1,   0,  27]])
Accuracy: 0.8856209150326797


In [None]:
# # W = 30

# # save_path1 = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30_sameset_norm_avg/ModelsNN1"
# # save_path2 = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30_sameset_norm_avg/ModelsNN2"
# # save_path3 = "/content/drive/MyDrive/Colab Notebooks/Models_separate_W30_sameset_norm_avg/ModelsNN3"
# # evalset = CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/new_separate_eval_norm_avg_W30/',0)

# # load_path_NN1 = save_path1+"/Model_Checkpoint_4.pt"
# # load_path_NN2 = save_path2+"/Model_Checkpoint_Last.pt"
# # load_path_NN3 = save_path3+"/Model_Checkpoint_Last.pt"

# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# net_NN1 = CollisionNetwork().to(device)
# net_NN1.load_state_dict(torch.load(load_path_NN1))
# net_NN1.eval()
# net_NN2 = CollisionNetwork2().to(device)
# net_NN2.load_state_dict(torch.load(load_path_NN2))
# net_NN2.eval()
# net_NN3 = CollisionNetwork3().to(device)
# net_NN3.load_state_dict(torch.load(load_path_NN3))
# net_NN3.eval()

# eval = DataLoader(evalset, batch_size=batch_size, shuffle=False)

# time_total = 0
# for sample, target in eval:
#   get_time = sample.to(device)
#   time_start1 = time.time()
#   out1 = net_NN1(get_time).squeeze()
#   out2 = net_NN2(get_time).squeeze()
#   out3 = net_NN3(get_time).squeeze()
#   time_end1 = time.time()
  
#   time_total += time_end1-time_start1



# print(time_total/evalset.__len__())


In [None]:
# def count_parameters(model):
#     return sum(p.numel() for p in model.parameters() if p.requires_grad)

# # count_parameters(net_NN1)
# count_parameters(net_NN1)

In [None]:
# load_path_NN1 = save_path1+"/Model_Checkpoint_Last.pt"
# load_path_NN2 = save_path2+"/Model_Checkpoint_Last.pt"
# load_path_NN3 = save_path3+"/Model_Checkpoint_Last.pt"

# batch_size = 1        #make 1 for boolean check of NN1 and NN2 to continue through the pipeline

# eval = DataLoader(evalset, batch_size=batch_size, shuffle=False)
# # eval2 = DataLoader(evaldataset2, batch_size=batch_size, shuffle=False)
# # eval3 = DataLoader(evaldataset3, batch_size=batch_size, shuffle=False)
# # eval_list = [eval1, eval2, eval3]

# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# net_NN1 = CollisionNetwork().to(device)
# net_NN1.load_state_dict(torch.load(load_path_NN1))
# net_NN1.eval()
# net_NN2 = CollisionNetwork2().to(device)
# net_NN2.load_state_dict(torch.load(load_path_NN2))
# net_NN2.eval()
# net_NN3 = CollisionNetwork3().to(device)
# net_NN3.load_state_dict(torch.load(load_path_NN3))
# net_NN3.eval()

# # preds = np.zeros(1)
# # targets = np.zeros(1)
# mistakes = np.zeros(1)
# nmistakes = 0
# eval_count = 0
# for eval2 in eval:
#   accuracy = 0
#   count = 0
#   final_confusion = torchmetrics.ConfusionMatrix(num_classes=4)
#   for sample, target in eval:
#       # sample, target = next(iter(eval))
#       sample = sample.to(device)
#       target = target.to(device)
#       target = target.squeeze()
#       target = target.double()

#       if eval_count == 1:
#         target = target*2
#       if eval_count == 2:
#         target = target*3

#       out = net_NN1(sample).squeeze()
#       out_rounded = torch.round(out)
#       target_tensor = torch.tensor(target, dtype=torch.int8)
#       if out_rounded >= 1:                      #-- if collision is detected
#           out = net_NN2(sample).squeeze()
#           out_rounded = torch.round(out)
#           if out_rounded <= 0:                  # --if collision is hard
#             out_final = 1
#           else:
#             out = net_NN3(sample).squeeze()
#             out_rounded = torch.round(out)
#             if out_rounded <= 0:                # --if collision is intentional
#               out_final = 2
#             else:                               # --if collision is unintentional
#               out_final = 3
#       else:                                     # --if no collision is detected
#         out_final = 0

#       #maybe write this as a 3xn array so each evalset is separate
#       # preds = np.append(preds,out_final)
#       # targets = np.append(targets,target)

#       out_final = torch.tensor([out_final], dtype=torch.int8)
#       confusion = final_confusion(out_final, target_tensor)

#       count += 1
#       if (out_final == target_tensor.item()):
#         accuracy += 1
#         mistakes = np.append(mistakes,nmistakes)
#       else:
#         nmistakes +=1
#         mistakes = np.append(mistakes,nmistakes)
        
#   eval_count +=1

#   total_final_confusion= final_confusion.compute()
#   print(f"Confusion: {total_final_confusion}")
#   final_confusion.reset()

#   final_accuracy = accuracy/count
#   print(f"Accuracy: {final_accuracy}")

# # for item in preds:
# #   item = int(item)
# # for item in targets:
# #   item = int(item)

# # with open("/content/drive/MyDrive/Colab Notebooks/data/preds", "a") as f:
# #     np.savetxt(f, preds, fmt='%.1i')
# # with open("/content/drive/MyDrive/Colab Notebooks/data/labels", "a") as f:
# #     np.savetxt(f, preds, fmt='%.1i')



Confusion: tensor([[176,   9,   5,   4],
        [  9,  12,   0,  15],
        [  1,   0,  46,   0],
        [  1,   1,   4,  23]])
Accuracy: 0.8398692810457516


RuntimeError: ignored

In [None]:
# mistakes = mistakes[1:]
# plt.plot(mistakes)
# plt.show()

In [None]:
# eval_dataset =  CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/eval_set_W100_overlap/')
# # eval_dataset =  CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/eval_set_W25/')

# eval_dataset =  CollisionDataset('/content/drive/MyDrive/Colab Notebooks/data/eval_set_W30/')

# W = 30

# # list = [0,39,40,125,126, 395, 396]
# list = [12,13,35,36,192,193,359,360]
# for i in list:
# # for i in range(151,155):
#   source_sample, target_sample = eval_dataset.__getitem__(i)
#   plt.figure()
#   print(target_sample)
#   tau = [[],[],[],[],[],[],[]]
#   for i in range(7):
#     for j in range(W):
#       tau[i] = np.append(tau[i], source_sample[0][i+7*j])
#     plt.plot(tau[i])
#     plt.ylim([0,10])

# plt.show()