In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

import numpy as np
import torch.nn.functional as F

In [None]:
class FeedForwardNet(nn.Module):
    def __init__(self):
        super(FeedForwardNet, self).__init__()
        self.fc1 = nn.Linear(160 * 120, 2056)
        self.fc2 = nn.Linear(2056, 1028)
        self.fc3 = nn.Linear(1028, 128)
        self.fc4 = nn.Linear(128, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = x.view(-1, 160 * 120)
        x = self.fc1(x)
        x = self.sigmoid(x)
        x = self.fc2(x)
        x = self.sigmoid(x)
        x = self.fc3(x)
        x = self.sigmoid(x)
        x = self.fc4(x)
        x = self.sigmoid(x)
        return x

class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(76800, 128)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        # Reshape for fully connected layer
        x = torch.flatten(x, 1)

        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x
    
class LightweightCNN(nn.Module):
    def __init__(self):
        super(LightweightCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=5, stride=2, padding=1)
        self.fc1 = nn.Linear(17024, 32)
        self.fc2 = nn.Linear(32, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # Reshape for fully connected layer
        x = torch.flatten(x, 1)
        
        x = F.relu(self.fc1(x))
        x = F.sigmoid(self.fc2(x))
        return x
    

# Creating a CNN class
class ConvNeuralNet(nn.Module):
	#  Determine what layers and their order in CNN object 
    def __init__(self, num_classes):
        super(ConvNeuralNet, self).__init__()
        self.conv_layer1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3)
        self.conv_layer2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3)
        self.max_pool1 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        
        self.conv_layer3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
        self.conv_layer4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3)
        self.max_pool2 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        
        self.fc1 = nn.Linear(1600, 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)
    
    # Progresses data across layers    
    def forward(self, x):
        out = self.conv_layer1(x)
        out = self.conv_layer2(out)
        out = self.max_pool1(out)
        
        out = self.conv_layer3(out)
        out = self.conv_layer4(out)
        out = self.max_pool2(out)
                
        out = out.reshape(out.size(0), -1)
        
        out = self.fc1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x



In [None]:
model = LightweightCNN()
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss()

In [None]:
# Import dataset from npz file
with np.load('/Users/jvelasquez/Virginia_Tech/Spring_2023/ECE_4806/pedestal_svm/upright_fallen_dataset_v6.npz') as data:
    print(data.files)
    images = data['images']
    labels = data['labels']

# Preprocess the images
images = images.astype('float32') / 255
print(images[0].shape)

# Flatten the images
# images = images.reshape(images.shape[0], -1)

# Make labels shape (n, 1)
labels = labels.reshape(labels.shape[0], 1).astype('float32')

print(np.count_nonzero(labels == 1))
print(np.count_nonzero(labels == 0))

# Convert the numpy arrays to PyTorch tensors
images_tensor = torch.from_numpy(images).to("mps")
labels_tensor = torch.from_numpy(labels).to("mps")

#images_tensor = torch.from_numpy(images)
#labels_tensor = torch.from_numpy(labels)


# Define the dataset using the tensors
dataset = torch.utils.data.TensorDataset(images_tensor, labels_tensor)

# Split the dataset into training, validation, and testing sets
train_set, val_set = torch.utils.data.random_split(dataset, [int(len(dataset)*0.8), int(len(dataset)*0.2)])

# Load the testing set
with np.load('/Users/jvelasquez/Virginia_Tech/Spring_2023/ECE_4806/pedestal_svm/upright_fallen_dataset.npz') as data:
    print(data.files)
    test_images = data['images']
    test_labels = data['labels']

test_images = test_images.astype('float32') / 255
test_labels = test_labels.reshape(test_labels.shape[0], 1).astype('float32')

# Reshape all images in test_images to have channels first
test_images = np.transpose(test_images, (0, 3, 1, 2))

print(test_images[0].shape)

print(np.count_nonzero(test_labels == 1))
print(np.count_nonzero(test_labels == 0))

# Convert the numpy arrays to PyTorch tensors
test_images_tensor = torch.from_numpy(images).to("mps")
test_labels_tensor = torch.from_numpy(labels).to("mps")

# Define the test dataset using the tensors
test_set = torch.utils.data.TensorDataset(images_tensor, labels_tensor)

# Create the dataloaders
batch_size = 16

train_loader = torch.utils.data.DataLoader(train_set, batch_size=16, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=16, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=16, shuffle=True)



In [None]:
# Check that MPS is available
if not torch.backends.mps.is_available():
    if not torch.backends.mps.is_built():
        print("MPS not available because the current PyTorch install was not "
              "built with MPS enabled.")
    else:
        print("MPS not available because the current MacOS version is not 12.3+ "
              "and/or you do not have an MPS-enabled device on this machine.")
else:
    mps_device = torch.device("mps")
    
    model.to(mps_device)

print(model)

# # Input a dummy tensor to the model
# dummy_input = torch.randn(1, 3, 120, 160)
# out = model(dummy_input)
# print(out.shape)
# print(out)

In [None]:
num_epochs = 10

for epoch in range(num_epochs):
    # Train
    model.train()
    train_loss = 0
    correct_train = 0
    total_train = 0
    for i, (images, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(images)

        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        predicted = (outputs > 0.5).float()
        #print("Outputs: ", outputs)
        #print("Predicted: ", predicted)
        #print("Labels: ", labels)
        
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
        
    train_loss /= len(train_loader)

    #print(correct_train, total_train)
    train_acc = 100 * correct_train // total_train
    
    # Validate
    model.eval()
    val_loss = 0
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for i, (images, labels) in enumerate(val_loader):
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            predicted = (outputs > 0.5).float()
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    val_acc = 100 * correct_val / total_val
    
    # Test
    model.eval()
    test_loss = 0
    correct_test = 0
    total_test = 0
    with torch.no_grad():
        for i, (images, labels) in enumerate(test_loader):
            outputs = model(images)
            loss = criterion(outputs, labels)


            test_loss += loss.item()
            predicted = (outputs > 0.5).float()
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()
    test_loss /= len(test_loader)
    test_acc = 100 * correct_test / total_test
    
    # Print statistics
    print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.2f}%, Val Loss: {:.4f}, Val Acc: {:.2f}%, Test Loss: {:.4f}, Test Acc: {:.2f}%'.format(epoch+1, num_epochs, train_loss, train_acc, val_loss, val_acc, test_loss, test_acc))



In [None]:
# Save the model
torch.save(model.state_dict(), 'lightweight_net_orientation.pth')
