# Load Dependencies

In [None]:
import pandas as pd
import numpy as np
import math
import joblib
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

from torch.utils.tensorboard import SummaryWriter

# Mount Google Drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Set device as GPU if available

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#Project 2


##Model Training Function

In [None]:
def train_model(data_loader, model, loss_fn, optimizer, epochs):
    train_dataset_size = len(data_loader.dataset)
    num_batches = len(data_loader)
    model.train()
    for epoch in range(epochs):
        train_loss, correct = 0, 0
        print(f'Start of epoch [{epoch+1}/{epochs}]')
        for batch_number, data in enumerate(data_loader):
            X, y = data
            # Compute prediction and loss
            probabilities = model(X)
            loss = loss_fn(probabilities, y)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Compute and display training loss and number of correct predictions at each epoch
            prediction = torch.argmax(probabilities, axis=1)
            train_loss += loss.item() 
            correct += (prediction == y).sum().item()

            if batch_number % 100 == 0:
                print(f'Batch [{batch_number+1}/{num_batches}] --> Accuracy: {(correct/(64*(batch_number+1))):.4f}')

            # For every 3 batches of data trained on write data for tensorboard
            if batch_number % 3 == 0:
                train_writer.add_scalar('loss', (train_loss/(64*(batch_number+1))), (epoch * train_dataset_size) + (64*(batch_number + 1)))
                train_writer.add_scalar('accuracy', (correct/(64*(batch_number+1))), (epoch * train_dataset_size) + (64*(batch_number + 1)))
          
        print(f'End of epoch [{epoch+1}/{epochs}], Loss: {train_loss/train_dataset_size:.4f}, Accuracy: {(correct/train_dataset_size):.4f}')
    print("Done Training!")

##Model Testing Function

In [None]:
def test_model(data_loader, model, loss_fn):
    test_dataset_size = len(test_loader.dataset)
    num_batches = len(test_loader)
    with torch.no_grad():
        test_loss, correct = 0, 0
        for batch_number, data in enumerate(test_loader):
            X, y = data
            # Compute prediction and loss
            probabilities = model(X)
            loss = loss_fn(probabilities, y)

            # Compute and display testing loss and number of correct predictions 
            prediction = torch.argmax(probabilities, axis=1)
            test_loss += loss.item() 
            correct += (prediction == y).sum().item()

            if batch_number % 3 == 0:
                print(f'Batch [{batch_number+1}/{num_batches}] --> Accuracy: {(correct/(64*(batch_number+1))):.4f}')
                test_writer.add_scalar('accuracy', (correct/(64*(batch_number+1))), (64*(batch_number + 1)))
          
    print(f'Final Test Data Loss: {test_loss/test_dataset_size:.4f}, Accuracy: {(correct/test_dataset_size):.4f}')
    print("Done!")

##Load Train/Test Datasets and Convert to Tensors

In [None]:
class GridWorldDataset(Dataset):

    def __init__(self, file, transform=torch.tensor):
        data = joblib.load(file)
        df = pd.DataFrame(data, columns=['agent_view','blocked_state','unblocked_state','hidden_state','sensed_count','neigbor_count', 'move'])
        self.gridview_x = df[['agent_view','blocked_state','unblocked_state','hidden_state','sensed_count','neigbor_count']]
        self.move_y = df['move']
        self.transform = transform

    def __len__(self):
        return len(self.move_y)

    def __getitem__(self, index):
        x, y = self.gridview_x.iloc[index], self.move_y.iloc[index]
        x, y = self.transform(x), self.transform(y)
        return x,y

In [None]:
#Train data loader
train_data_path = '/content/drive/MyDrive/proj2_train_data_1'
train_data = GridWorldDataset(file=train_data_path, transform=torch.tensor)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

#Test data loader
test_data_path = '/content/drive/MyDrive/proj2_train_data_1'
test_data = GridWorldDataset(file=test_data_path, transform=torch.tensor)
test_loader = DataLoader(test_data, batch_size=64, shuffle=True)

##Simple Neural Net

### Simple Neural Net Model Architecture

In [None]:
# Neural net architecture
class NeuralNet(nn.Module):
    def __init__(self):
        super(NeuralNet, self).__init__()
        self.flatten = nn.Flatten()
        self.l1 = nn.Linear(64*6, 64*3) 
        self.l2 = nn.Linear(64*3, 64) 
        self.l3 = nn.Linear(64, 16) 
        self.l4 = nn.Linear(16, 4)  
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()

    def forward(self, x):
        x = self.flatten(x.float())
        x = self.l1(x)
        x = self.relu(x)
        x = self.l2(x)
        x = self.relu(x)
        x = self.l3(x)
        x = self.relu(x)
        x = self.l4(x)
        logits = self.relu(x)
        probabilities = self.softmax(logits)
        return probabilities

# Model of NN
model = NeuralNet().to(device)

###Training Simple Neural Net Model

In [None]:
train_writer = SummaryWriter('runs/proj2/nn/train')
train_model(train_loader, model, loss_fn = nn.CrossEntropyLoss(), optimizer = torch.optim.Adam(model.parameters(), lr=0.001), epochs = 50)

### Save Model

In [None]:
PATH='model/proj2/nn.pt'
torch.save(model.state_dict(), PATH)

### Load Model

In [None]:
model = NeuralNet()
PATH='model/proj2/nn.pt' 
model.load_state_dict(torch.load(PATH))

<All keys matched successfully>

###Testing Simple Neural Net Model

In [None]:
test_writer = SummaryWriter('runs/proj2/nn/test')
test_model(test_loader, model, loss_fn = nn.CrossEntropyLoss())



Batch [1/234] --> Accuracy: 0.9844
Batch [4/234] --> Accuracy: 0.9805
Batch [7/234] --> Accuracy: 0.9710
Batch [10/234] --> Accuracy: 0.9719
Batch [13/234] --> Accuracy: 0.9700
Batch [16/234] --> Accuracy: 0.9727
Batch [19/234] --> Accuracy: 0.9753
Batch [22/234] --> Accuracy: 0.9751
Batch [25/234] --> Accuracy: 0.9756
Batch [28/234] --> Accuracy: 0.9743
Batch [31/234] --> Accuracy: 0.9753
Batch [34/234] --> Accuracy: 0.9752
Batch [37/234] --> Accuracy: 0.9751
Batch [40/234] --> Accuracy: 0.9746
Batch [43/234] --> Accuracy: 0.9760
Batch [46/234] --> Accuracy: 0.9759
Batch [49/234] --> Accuracy: 0.9761
Batch [52/234] --> Accuracy: 0.9757
Batch [55/234] --> Accuracy: 0.9756
Batch [58/234] --> Accuracy: 0.9755
Batch [61/234] --> Accuracy: 0.9759
Batch [64/234] --> Accuracy: 0.9768
Batch [67/234] --> Accuracy: 0.9764
Batch [70/234] --> Accuracy: 0.9763
Batch [73/234] --> Accuracy: 0.9758
Batch [76/234] --> Accuracy: 0.9764
Batch [79/234] --> Accuracy: 0.9767
Batch [82/234] --> Accuracy: 0.

###Tensorboard data analysis

In [None]:
%load_ext tensorboard
%tensorboard --logdir runs

##Convulational Neural Net

### Convulational Neural Net Model Architecture

In [None]:
# Convulational Neural net architecture
class ConvulationalNeuralNet(nn.Module):
    def __init__(self):
        super(ConvulationalNeuralNet, self).__init__()
        self.flatten = nn.Flatten()
        self.l1 = nn.Conv2d(6, 32, kernel_size=2, stride=1, padding=0) 
        self.l2 = nn.Conv2d(32, 64, kernel_size=2, stride=1, padding=0) 
        self.l3 = nn.Conv2d(64, 64, kernel_size=2, stride=1, padding=0)
        self.l4 = nn.Conv2d(64, 32, kernel_size=2, stride=1, padding=0) 
        self.l5 = nn.Conv2d(32, 16, kernel_size=2, stride=1, padding=0) 
        self.l6 = nn.Linear(16*3*3, 4) 
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()

    def forward(self, x):
        x = x.float()
        x = self.l1(x)
        x = self.relu(x)
        x = self.l2(x)
        x = self.relu(x)
        x = self.l3(x)
        x = self.relu(x)
        x = self.l4(x)
        x = self.relu(x)
        x = self.l5(x)
        x = self.relu(x)
        x = self.flatten(x)
        x = self.l6(x)
        logits = self.relu(x)
        probabilities = self.softmax(logits)
        return probabilities

# Model of CNN
cnn_model = ConvulationalNeuralNet().to(device)

###Training Convulational Neural Net Model

In [None]:
train_writer = SummaryWriter('runs/proj1/cnn/train')
train_model(train_loader, cnn_model, loss_fn = nn.CrossEntropyLoss(), optimizer = torch.optim.Adam(cnn_model.parameters(), lr=0.0001), epochs = 50)

### Save Model

In [None]:
PATH='cnn.pt'
torch.save(cnn_model.state_dict(), PATH)

### Load Model

In [None]:
cnn_model = ConvulationalNeuralNet()
PATH='cnn.pt' 
cnn_model.load_state_dict(torch.load(PATH))

<All keys matched successfully>

###Testing Convulational Neural Net Model

In [None]:
test_writer = SummaryWriter('runs/proj2/cnn/test')
test_model(test_loader, cnn_model, loss_fn = nn.CrossEntropyLoss())

###Tensorboard data analysis

In [None]:
%load_ext tensorboard
%tensorboard --logdir runs

## 4-Neighbor Agent Analysis


### Utils

In [None]:
from collections import defaultdict
import numpy as np
import heapq as hq
import copy
import random
import math
import csv
import joblib
import time

class Cell:
    def __init__(self, x, y, state=0, fval=math.inf, gval=math.inf, hval=0, parent=(-10, -10)):
        self.x = x
        self.y = y
        self.hval = hval
        self.fval = fval
        self.gval = gval
        self.parent = parent
        self.state = state

def generate_gridworld(dim, probability=0.0):
    '''
    Generate the gridworld of size [dim, dim]
    0 is unblocked 1 is blocked
    Each cell is blocked with probability p and unblocked with probability 1-p
    Takes the dimension (n) and value of prbability p as input 
    and returns a nxn gridworld
    '''
    # random.seed(100) #for reproducible results
    gridworld =  np.random.choice([1, 0], size=(dim,dim), p=[probability, 1-probability]) 
    gridworld[0][0]=0
    gridworld[dim-1][dim-1]=0
    return gridworld

def sensing(gridworld, cell):
    action_offset = [
        (1, 0),  # E
        (0, -1),  # S
        (-1, 0),  # W
        (0, 1),  # N
        (1, 1),  # NE
        (1, -1),  # SE
        (-1, -1),  # SW
        (-1, 1),  # NW
    ]
    count_blocked = 0
    cell_x,cell_y = cell
    for n in action_offset:
        x = cell_x + n[0]
        y = cell_y + n[1]
        if 0 <= x < len(gridworld) and 0 <= y < len(gridworld):
            if(gridworld[x][y] == 1):
                count_blocked += 1
    return count_blocked


def get_neighbors_8(dim, cell):
    action_offset = [
        (1, 0),  # E
        (0, -1),  # S
        (-1, 0),  # W
        (0, 1),  # N
        (1, 1),  # NE
        (1, -1),  # SE
        (-1, -1),  # SW
        (-1, 1),  # NW
    ]
    neighbors = []
    cell_x,cell_y = cell
    for n in action_offset:
        x = cell_x + n[0]
        y = cell_y + n[1]
        if 0 <= x < dim and 0 <= y < dim:
            neighbors.append((x, y))
    return neighbors, len(neighbors)


def genrate_params_slice(dim,cell,agent_grid_view):

    padded_data = np.full((dim+10,dim+10),-1)
    for i in range(dim):
        for j in range(dim):
            padded_data[i+4][j+4] = agent_grid_view[i][j]
    x,y = cell
    slice = padded_data[x-3+4:x+5+4,y-3+4:y+5+4]
    if(slice.shape[0]<8 or slice.shape[1]<8):
        print('fucked',slice.shape)

    return slice     
    
def get_nearby_cells(dim,cell,agent_grid_view,l=2):
    agent_grid_view[cell[0]][cell[1]]=5
    padded_data = np.full((dim+10,dim+10),-1)

    for i in range(dim):
        for j in range(dim):
            padded_data[i+4][j+4] = agent_grid_view[i][j]
    x,y = cell
    slice = padded_data[x-3+4:x+5+4,y-3+4:y+5+4]
    # print(agent_grid_view)
    # print(slice)  
    # print(slice.shape,'current_cell',cell)
    if(slice.shape[0]<8 or slice.shape[1]<8):

        print('fucked',slice.shape)

    return slice     

### Original AI Agent Implementation

In [None]:
from collections import defaultdict
import numpy as np
import heapq as hq


class PathFinder:
    def __init__(self, gridworld, agent_grid_view, dim, start, target):
        """
        Builder function
        Takes Paramets: gridworld, agent_grid_view, belief_state, dim, start, target, agent
        """
        self.gridworld = gridworld
        self.dim = dim
        self.agent_grid_view = agent_grid_view

        # data set
        self.agent_block_cells = np.full((dim,dim),0)
        self.agent_unblocked_cells = np.full((dim,dim),0)
        self.agent_hidden_cells = np.full((dim,dim),8)
        self.agent_sensed_blocks = np.full((dim,dim),0)
        self.agent_neighbor_count = np.full((dim,dim),0)


        self.agent_infered_state = np.full((dim,dim),-2)

        self.start = start
        self.target = target
        self.goal = (dim-1, dim-1)  # Initialize fixed goal
        self.cells = {}  # Initialize Cells Hash
        self.children_hash = {}  # Initiliaze Children Hash

        self.blocks_encountered = 0  # keep track of blocks encountered
        self.replans_count = 0  # Counter to keep track of no of replans
        self.cells_processed=0
        self.examination_cost = 0  # No of nodes actually examined
        self.states=[]
        self.states_cell = {}
        # create cell objects
        for i in range(0, dim):
            for j in range(0, dim):
                self.cells[(i, j)] = Cell(i, j)

    def heuristic(self, x1, y1):
        """
        Function used to calculate Manhattan Distance. 
        Takes row,col as input and returns the Manhattan Distance
        """
        x2, y2 = self.goal
        return abs(x1-x2) + abs(y1-y2)

    def get_children(self, cell_x, cell_y):
        """
        function used to get the next valid reachable cells from given cell
        Takes row,col as input and returns the next possible children of given cell
        """
        if self.children_hash.get((cell_x, cell_y)) != None:
            return self.children_hash.get((cell_x, cell_y))
        action_offset = [
            (1, 0),  # Right
            (0, -1),  # Down
            (-1, 0),  # Left
            (0, 1),  # Up
        ]
        children = []
        for n in action_offset:
            x = cell_x + n[0]
            y = cell_y + n[1]
            if 0 <= x < self.dim and 0 <= y < self.dim:
                children.append((x, y))
        self.children_hash[(cell_x, cell_y)] = children
        return children

    def astar(self, start_cell, goal_cell):
        """
        funciton to implement A* algorithm and caluclate path.
        Input start node and given goal node, returns path from start node to goal node and a boolean solvable to indicate if goal is reachable from the start
        """
        solvable = True
        fringe = []
        path = []
        goal = goal_cell
        start_cell = self.cells.get(start_cell)
        start_cell.fval = self.heuristic(start_cell.x, start_cell.y)
        start_cell.gval = 0
        start_cell.state = 1
        start_cell.parent = None
        hq.heappush(fringe, (start_cell.fval, (start_cell.x, start_cell.y)))
        visited_list = {}
        self.cells_processed =0
        while len(fringe):
            current_cell = hq.heappop(fringe)
            self.cells_processed+=1
            current_cell = current_cell[1]
            if current_cell == goal:
                path = []
                current_cell = self.cells.get(
                    (current_cell[0], current_cell[1]))
                while current_cell != None:
                    path.append((current_cell.x, current_cell.y))
                    current_cell = self.cells.get(
                        (current_cell.x, current_cell.y)).parent
                return path[::-1], True
            elif visited_list.get(current_cell) == None:
                # add cell to closed list since we are exploring
                visited_list[current_cell] = current_cell
                children = self.children_hash.get(
                    (current_cell[0], current_cell[1]), self.get_children(current_cell[0], current_cell[1]))
                # iterate through them and add them to fringe if they are not in closed set
                parent = self.cells.get((current_cell[0], current_cell[1]))
                for child_x, child_y in children:
                    if self.agent_grid_view[child_x][child_y] == 1 or visited_list.get((child_x, child_y)) != None:
                        continue
                    else:
                        child = self.cells.get((child_x, child_y))
                        child.parent = parent
                        child.gval = self.cells.get(current_cell).gval+1
                        child.hval = self.heuristic(child_x, child_y)
                        child.fval = child.gval+child.hval
                        # add to fringe lowest one of multiple entries comes out
                        hq.heappush(fringe, (child.fval, (child_x, child_y)))
        solvable = False
        return path, solvable

    def replan_path(self, cell):
        """
        Function used to replan path, when encountered a block on traversing the path given by A*
        """
        self.replans_count += 1  # counter to count number of restarts
        self.blocks_encountered+=1
        restart_cell = self.cells.get(cell)
        restart_cell.parent = None
        restart_cell.gval = 0
        restart_cell.hval = self.heuristic(restart_cell.x, restart_cell.y)
        restart_cell.fval = restart_cell.hval
        return self.astar((restart_cell.x, restart_cell.y), self.goal)
    

    def run_inference(self,inference_path):
     cell_knowledge =[[]]
    # balancing equation nx = bx+ex+hx
     for cell in inference_path:

        # update cells bx and ex , hx
        blocked = 0
        empty = 0

        neighbors, cell_nx = get_neighbors_8(self.dim, cell)

        # for each neighbor cell it's updated state
        for neighbor in neighbors:
            neighbor_x,neighbor_y = neighbor
            if self.agent_infered_state[neighbor_x][neighbor_y] == 1:
                blocked += 1
            elif self.agent_infered_state[neighbor_x][neighbor_y] == 0:
                empty += 1
        
       
        # update the knowledge base grids 
        self.agent_neighbor_count[cell[0]][cell[1]] = cell_nx
        self.agent_block_cells[cell[0]][cell[1]] = blocked
        self.agent_unblocked_cells[cell[0]][cell[1]] = empty
        self.agent_hidden_cells[cell[0]][cell[1]] = cell_nx-blocked-empty


        cx =  self.agent_sensed_blocks[cell[0]][cell[1]]

        bx =  self.agent_block_cells[cell[0]][cell[1]]

        ex =  self.agent_unblocked_cells[cell[0]][cell[1]]

        hx =  self.agent_hidden_cells[cell[0]][cell[1]]

        nx =  self.agent_neighbor_count[cell[0]][cell[1]]

        # print('update knowledge ',cell.x,cell.y,cell.nx,cell.cx,cell.bx,cell.ex,cell.hx)

        # cell hx=0 nothing more to infer from it,skip it
        if(self.agent_hidden_cells[cell[0]][cell[1]] == 0):  # rule 3
            continue
        # hidden neighbors are blocked

        elif((nx-cx) == ex):  # rule 2
            for neighbor in neighbors:
                neighbor_x,neighbor_y = neighbor
                if (self.agent_infered_state[neighbor_x][neighbor_y] == -2):
                    self.agent_grid_view[neighbor_x][neighbor_y] = 1
                    self.agent_infered_state[neighbor_x][neighbor_y] = 1

            self.agent_block_cells[cell[0]][cell[1]] += self.agent_hidden_cells[cell[0]][cell[1]]
            self.agent_hidden_cells[cell[0]][cell[1]] = 0  # doing this as we make all the hidden cell inferences
        # making hidden neighbors unblocked
        elif(cx == bx):  # rule 1
            for neighbor in neighbors:
                if (self.agent_infered_state[neighbor_x][neighbor_y] == -2):
                    self.agent_grid_view[neighbor_x][neighbor_y] = 0
                    self.agent_infered_state[neighbor_x][neighbor_y] = 1

            self.agent_unblocked_cells[cell[0]][cell[1]] += hx
            self.agent_hidden_cells[cell[0]][cell[1]] = 0   # doing this as we make all the hidden cell inferences

# end of agent3 infrence


    def execute_agent(self, start):
        """
        Function that executes agent 11
        Takes input initial random start state
        Return the final path and cost metrics
        """
        # Initializing all values
        # print("Executing Agent", self.agent)
        start_cell = start
        self.cells.get(start_cell).parent = None
        goal_found = False
        complete_path = []

        path, solvable = self.astar(start_cell, self.goal)

        time_step =0

        while solvable and goal_found !=True:
            for i,cell in enumerate(path):
                time_step+=1

                # sense knowledge of blocked cells 
                self.agent_sensed_blocks[cell[0]][cell[1]]=sensing(copy.deepcopy(self.gridworld),cell)

                 # check for goal state
                if cell[0] == self.goal[0]  and cell[1] == self.goal[1]:
                    complete_path.append(cell)
                    goal_found = True
                    break
                else:
                    next_cell =  path[i+1] if i+1<len(path) else None
                    if next_cell!=None and  self.gridworld[next_cell[0]][next_cell[1]] == 1:
                            # revert to parent and replan
                            self.agent_grid_view[next_cell[0]][next_cell[1]] = 1
                            self.agent_infered_state[cell[0]][cell[1]]=1
                            inference_path = path[i:]
                        # run the inference engine to update agent knowledge
                            self.run_inference(inference_path)
                            self.blocks_encountered+=1
                            path,solvable = self.replan_path(cell)

                            break
                    else:
                        complete_path.append(cell)
                        self.agent_infered_state[cell[0]][cell[1]]=0
                        inference_path = path[i:]
                        # run the inference engine to update agent knowledge
                        self.run_inference(inference_path)

                        # run the check for blocks on the remaning path
                        future_block = False

                        for r_cell in inference_path:
                            c_x,c_y = r_cell
                            if(self.agent_grid_view[c_x][c_y]==1):
                                path,solvable = self.replan_path(cell)
                                break

        if goal_found:
            # self.write_to_file()
            print('Hidden goal found')
            # write states to dump file)
            return 1,len(complete_path),complete_path, self.replans_count, self.cells_processed
        else:
            
            print('target not reachable')
            return 0,0,[],self.replans_count, self.cells_processed


### ML Agent Implementation

#### Inference function for ML agent

In [None]:
def inference(grid, model):
  grid = torch.tensor(grid)
  grid = grid.float()
  grid = grid.unsqueeze(0)
  grid = nn.Flatten()(grid)
  with torch.no_grad():
    model.eval()       
    probabilities = model(grid)
    probabilities = probabilities[0].to(device).detach().numpy()
  return probabilities

#### ML agent class


In [None]:
class Mlfinder:
    def __init__(self, gridworld, agent_grid_view, dim, start, goal, ml_model):
        """
        Builder function
        Takes Paramets: gridworld, agent_grid_view, belief_state, dim, start, target, agent
        """
        self.model = ml_model
        self.gridworld = gridworld
        self.dim = dim
        self.agent_grid_view = agent_grid_view
        self.start = start
        self.goal = goal
        self.cells = {}  # Initialize Cells Hash
        self.children_hash = {}  # Initiliaze Children Hash
        self.blocks_encountered = 0  # keep track of blocks encountered
        self.replans_count = 0  # Counter to keep track of no of replans
        self.cells_processed=0
        self.examination_cost = 0  # No of nodes actually examined
        # create cell objects
        for i in range(0, dim):
            for j in range(0, dim):
                self.cells[(i, j)] = Cell(i, j)
        # data set
        self.agent_block_cells = np.full((dim,dim),0)
        self.agent_unblocked_cells = np.full((dim,dim),0)
        self.agent_hidden_cells = np.full((dim,dim),8)
        self.agent_sensed_blocks = np.full((dim,dim),0)
        self.agent_neighbor_count = np.full((dim,dim),0)


        self.agent_infered_state = np.full((dim,dim),-2)        
   
    def get_children(self, cell_x, cell_y):
        """
        function used to get the next valid reachable cells from given cell
        Takes row,col as input and returns the next possible children of given cell
        """
        if self.children_hash.get((cell_x, cell_y)) != None:
            return self.children_hash.get((cell_x, cell_y))
        action_offset = [
            (1, 0),  # Right
            (0, -1),  # Down
            (-1, 0),  # Left
            (0, 1),  # Up
        ]
        children = []
        for n in action_offset:
            x = cell_x + n[0]
            y = cell_y + n[1]
            if 0 <= x < self.dim and 0 <= y < self.dim:
                children.append((x, y))
        self.children_hash[(cell_x, cell_y)] = children
        return children            

    def get_cell(self, cell_x, cell_y,dir):
        action_offset = {
           0: (1, 0),  # Right
            1:(0, -1),  # Down
            2:(-1, 0),  # Left
            3: (0, 1),  # Up
        }
        n = action_offset.get(dir)
        x = cell_x + n[0]
        y = cell_y + n[1]
        if 0 <= x < self.dim and 0 <= y < self.dim:
          if(self.gridworld[x][y]==1):
            return None
          return (x,y)
        else:
          return None  
   
    
  
    def run_kb_update(self,cell):
    # balancing equation nx = bx+ex+hx
    
        # update cells bx and ex , hx
        blocked = 0
        empty = 0

        neighbors, cell_nx = get_neighbors_8(self.dim, cell)

        # for each neighbor cell it's updated state
        for neighbor in neighbors:
            neighbor_x,neighbor_y = neighbor
            if self.agent_infered_state[neighbor_x][neighbor_y] == 1:
                blocked += 1
            elif self.agent_infered_state[neighbor_x][neighbor_y] == 0:
                empty += 1
        
       
        # update the knowledge base grids 
        self.agent_neighbor_count[cell[0]][cell[1]] = cell_nx
        self.agent_block_cells[cell[0]][cell[1]] = blocked
        self.agent_unblocked_cells[cell[0]][cell[1]] = empty
        self.agent_hidden_cells[cell[0]][cell[1]] = cell_nx-blocked-empty




    def get_agent_state(self,current_cell):

        # agent view slice
        copy_view = copy.deepcopy(self.agent_grid_view)
        sliced_agent_view_state = get_nearby_cells(self.dim,current_cell,copy_view,2)

        # agent blocked slice =  bx
        blocked_copy = copy.deepcopy(self.agent_block_cells)
        sliced_agent_blocked_state = genrate_params_slice(self.dim,current_cell,blocked_copy)

        # agent empty slice = ex
        unblocked_copy = copy.deepcopy(self.agent_unblocked_cells)
        sliced_agent_unblocked_state = genrate_params_slice(self.dim,current_cell,unblocked_copy)


        # agent sensed slice = hx
        hidden_copy = copy.deepcopy(self.agent_hidden_cells)
        sliced_agent_hidden_state = genrate_params_slice(self.dim,current_cell,hidden_copy)

        # agent sensed slice = cx
        sensed_copy = copy.deepcopy(self.agent_sensed_blocks)
        sliced_agent_sensed_state = genrate_params_slice(self.dim,current_cell,sensed_copy)

        # agent sensed slice = nx
        neighbor_copy = copy.deepcopy(self.agent_neighbor_count)
        sliced_agent_neighbor_state = genrate_params_slice(self.dim,current_cell,neighbor_copy)

        final_arr  = np.array([sliced_agent_view_state,sliced_agent_blocked_state,sliced_agent_unblocked_state,sliced_agent_hidden_state,sliced_agent_sensed_state,sliced_agent_neighbor_state])

        return final_arr


    
    def get_next_cell(self,current_cell):
      copy_agent_view = copy.deepcopy(self.agent_grid_view)
      local_grid = self.get_agent_state(current_cell)
      directions = inference(local_grid, self.model)
      return directions
    
    def contious_cells(self,direction,cell):
          cell_x,cell_y =cell
          action_offset = {
            0: (1, 0),  # Right
            1: (0, -1),  # Down
            2: (-1, 0),  # Left
            3: (0, 1),  # Up
                           }
          n = action_offset.get(direction)
          prediction = []
          for i in range(5):
               x = cell_x + n[0]*i
               y = cell_y + n[1]*i
               if 0 <= x < self.dim and 0 <= y < self.dim:
                    prediction.append((x,y))
          return prediction 

    def loop_cell(self,current_cell):
      state = True
      directions = self.get_next_cell(current_cell)
      cell = None
      no_direction = 0
      while no_direction < 4 and cell == None:
            no_direction += 1
            direction = np.argmax(directions)
            directions[direction] = -1
            cell = self.get_cell(current_cell[0],current_cell[1],direction)
      return self.contious_cells(direction,cell)if cell!=None else []    

    def execute_agent(self, start):
        """
        Function that executes agent 11
        Takes input initial random start state
        Return the final path and cost metrics
        """
        # Initializing all values
        # print("Executing Agent", self.agent)
        start_cell = start
        self.cells.get(start_cell).parent = None
        goal_found = False


        time_step =0
        complete_path = [start]
        # Find path from random start state to random goal state 
        path = self.loop_cell(start)

        while goal_found !=True and len(path)>0 and time_step<1000:
            print('planned path',path)
            for i,cell in enumerate(path):
                time_step+=1
                
                if self.gridworld[cell[0]][cell[1]]==0:
                      # sense knowledge of blocked cells 
                     self.agent_sensed_blocks[cell[0]][cell[1]]=sensing(copy.deepcopy(self.gridworld),cell)
                     self.agent_infered_state[cell[0]][cell[1]]=0
                     self.run_kb_update(cell)

                 # check for goal state
                if cell[0] == self.goal[0]  and cell[1] == self.goal[1]:
                    complete_path.append(cell)
                    goal_found = True
                    break
                else:
                    
                     if self.gridworld[cell[0]][cell[1]] == 1:
                            # revert to parent and replan
                            self.agent_grid_view[cell[0]][cell[1]] = 1
                            self.agent_infered_state[cell[0]][cell[1]]=1 
                            self.blocks_encountered+=1
                            prev = complete_path.pop()
                            self.run_kb_update(prev)
                            path= self.loop_cell(prev)
                            break
                     else:                                       
                        complete_path.append(cell)
                        path=self.loop_cell(cell) 
                                      
        if goal_found:
            print('goal found by ML')
            return 1,len(complete_path),complete_path, self.replans_count, self.cells_processed
        else:
            print('ML target not reachable')
            return 0,0,[],self.replans_count, self.cells_processed

### Execute both agents

In [None]:
from time import process_time

def test_ml_agent(number_of_mazes):
  dim = 20
  start =(0,0)
  goal =(dim-1,dim-1)

  '''[[agent_solved, agent_unsolved],
    [ml_agent_solved, ml_agent_unsolved]]'''
  confusion_matrix = np.array([[0, 0], 
                        [0, 0]])
  total_path_length = 0
  total_run_time = 0
  total_ml_path_length = 0
  total_ml_run_time = 0
  for i in range(number_of_mazes):
    print(i,'sample------------------')
    gridworld= generate_gridworld(dim, 0.3)

    # start = process_time()
    goal_found_agent,path_length,_, _, _ = PathFinder(gridworld, np.zeros((dim, dim)), dim, start, goal).execute_agent(start)
    # runtime = process_time() - start

    ml_model = NeuralNet()

    PATH='/content/model/proj2/nn.pt' 
    ml_model.load_state_dict(torch.load(PATH))
    # start_ml = process_time()
    goal_found_ml, ml_path_length,_, _, _ = Mlfinder(gridworld, np.zeros((dim, dim)), dim, start, goal, ml_model).execute_agent(start)
    # goal_found_ml, ml_path_length,_, _, _ = PathFinder(gridworld, np.zeros((dim, dim)), dim, start, goal).execute_agent(start)
    # ml_runtime = process_time() - start_ml
    
    total_path_length += path_length
    total_ml_path_length += ml_path_length
    # total_run_time += runtime
    # total_ml_run_time += ml_runtime
    if goal_found_agent and goal_found_ml:
      confusion_matrix[0][0] += 1
    elif not goal_found_agent and not goal_found_ml:
      confusion_matrix[1][1] += 1
    elif not goal_found_agent and goal_found_ml:
      confusion_matrix[1][0] += 1
    elif goal_found_agent and not goal_found_ml:
      confusion_matrix[0][1] += 1
    else:
      print("NOOB")
    print(i,'sample------------------')
  accuracy = (confusion_matrix[0][0] + confusion_matrix[1][1]) / confusion_matrix.sum()
  recall = confusion_matrix[0][0]/(confusion_matrix[0][0] + confusion_matrix[0][1])
  precision = confusion_matrix[0][0]/(confusion_matrix[0][0] + confusion_matrix[1][0])

  return confusion_matrix, accuracy, precision, recall, total_path_length/number_of_mazes, total_ml_path_length/number_of_mazes
#traj len, planning time

In [None]:
def write_to_file(states, file_name):
       try:
          print('number of states apend',len(states))
          joblib.dump(states, file_name, compress=3)
       except OverflowError:  
            print ("OverFlow Exception Raised.")

In [None]:
number_of_tests = 1 #30
sum_confusion_matrix = np.array([[0, 0], 
                            [0, 0]])
sum_accuracy, sum_precision, sum_recall, sum_path_length, sum_ml_path_length = 0, 0, 0, 0, 0
graph_data = []
for i in range(number_of_tests):
  confusion_matrix, accuracy, precision, recall, path_length, ml_path_length = test_ml_agent(number_of_mazes=10) # 100
  graph_data.append([i, confusion_matrix, accuracy, precision, recall, path_length, ml_path_length])
  sum_confusion_matrix += confusion_matrix
  sum_accuracy += accuracy
  sum_precision += precision
  sum_recall += recall
  sum_path_length += path_length
  sum_ml_path_length += ml_path_length

file_name = 'proj2_nn_data'
write_to_file(graph_data, file_name)

# avg_confusion_matrix = sum_confusion_matrix / number_of_tests
# avg_accuracy = sum_accuracy / number_of_tests
# avg_precision = sum_precision / number_of_tests
# avg_recall = sum_recall / number_of_tests
# avg_path_length = sum_path_length / number_of_tests
# avg_ml_path_length = sum_ml_path_length / number_of_tests
# print('avg')
# print(avg_confusion_matrix, avg_accuracy, avg_precision, avg_recall)

In [None]:
data = joblib.load('proj1_cnn_data')
print(data)