# AI - CA1 - Mohamad Taha Fakharian

## Goal
In this assignment, we're going to formulate a searching problem and try to solve that using uninformed(BFS and IDS) and informed(A* and Weighted A*) algorithms.

The problem is described as follows: Gandalf wants to deliever fellows from some places to their destiation and after that, he goes to Gondor. The problem isn't that easy: There are some sleeping orks in the map and Gondolf shouldn't awake them!

## Overall approach
So let's solve the problem! First let's import some libraries and define some primary functions to formulate the problem and then solve it using methods mentioned above:

In [1]:
# import some libraries
import numpy as np
import pandas as pd

In [2]:
# To check if this x and y are in the map or not
def is_valid(x, y, n, m):
    return x >= 0 and y >= 0 and x < n and y < m

In [3]:
# To specify cells in the map that are inspected by an ork. This area is specified using ork's military rank(X for 
# inspected cell and O for ork's cell)
def specify_inspected_cells(ork_cell, ork_id, degree, grid, n, m):
    ork_x, ork_y = ork_cell
    grid[ork_x][ork_y] = 'O'
    for i in range(degree + 1):
        for j in range(degree - i + 1):
            if i == 0 and j == 0:
                continue
            for delta_i in [-1, 1]:
                for delta_j in [-1, 1]:
                    if is_valid(ork_x + delta_i * i, ork_y + delta_j * j, n , m):
                        grid[ork_x + delta_i * i][ork_y + delta_j * j] = 'X' + str(ork_id)

# To create the map from a text file. each line in the text determines a property in map(Gandalf's position, 
# fellows positions and ork's positions and degrees                        
def prepare_grid(file_name):
    lines = []
    with open(file_name, "r") as test_file:
        for line in test_file:
            lines.append(line.rstrip("\n"))
    
    n, m = map(int, lines[0].split())
    grid = [['EMPTY' for j in range(m)] for i in range(n)]
    x_start, y_start = map(int, lines[1].split())
    grid[x_start][y_start] = 'START'
    x_end, y_end = map(int, lines[2].split())
    grid[x_end][y_end] = 'END'
    
    k, l = map(int, lines[3].split())
    degrees = []
    for i in range(k):
        x, y, c = map(int, lines[4 + i].split())
        degrees.append(c)
        specify_inspected_cells((x, y), i, c, grid, n, m)
    
    friends_start = []
    for i in range(l):
        x, y = map(int, lines[4 + k + i].split())
        friends_start.append((x, y))
        grid[x][y] = 'F' + str(i)
        
    friends_end = []
    for i in range(l):
        x, y = map(int, lines[4 + k + l + i].split())
        friends_end.append((x, y))
    return grid, (x_start, y_start), (x_end, y_end), friends_start, friends_end, degrees

Now let's define state in this problem. A state in the problem is determined by:

(Gandalf's position, the amount that Gandalf has been damaged from an ork(if damaged), id of the ork who has damaged the gandalf, id of the fellow who's been carried by Gandalf(if any), status of all fellows(whether they have been delivered or not)

Initial state: Gandalf is at his starting cell, carrying no fellows and isn't damaged by orks. None of the fellows are delivered.

Action: Moving left, right, up or down. 

Transition model: The properties will be updated if next cell is an inspected cell, starting/ending cell of a not delivered fellow and ending cell of Gandalf.

Goal state: Gandalf is at his ending cell, carrying no fellows. All of the fellows are delivered.

Path cost: Total movements from initial state to current state.

Now let's define some other interesting functions!

In [4]:
# To check if goal is reached
def reached_goal(grid, state):
    (x, y), current_damage, current_ork, current_friend, friends_status = state
    if grid[x][y] == 'END' and friends_status.find('0') == -1:
        return True
    return False

# To calculate manhattan distance between two cell. Used in informed search.
def calc_manhattan_dist(cur_cell, next_cell):
    x1, y1 = cur_cell
    x2, y2 = next_cell
    return abs(x2 - x1) + abs(y2 - y1)

# To check if the state is ok or not(if Gandalf is damaged too much or is in an ork position)
def is_ok(grid, state, degrees):
    (x, y), current_damage, current_ork, current_friend, friends_status = state
    if grid[x][y] == 'O':
        return False
    if grid[x][y][0] == 'X':
        if current_damage > degrees[current_ork]:
            return False
    return True

# Transition Model: Update state from previous state and action taken 
def update_state(grid, current_state, friends_end):
    (x, y), current_damage, current_ork, current_friend, friends_status = current_state
    if current_friend != -1 and friends_end[current_friend] == (x, y):
        friends_status = friends_status[:current_friend] + '1' + friends_status[current_friend+1:]
        current_friend = -1
    
    if grid[x][y][0] == 'F' and current_friend == -1 and friends_status[int(grid[x][y][1:])] == '0':
        current_friend = int(grid[x][y][1:])
        
    if grid[x][y][0] == 'X':
        if int(grid[x][y][1:]) == current_ork:
            current_damage += 1
        else:
            current_damage = 1
            current_ork = int(grid[x][y][1:])
    else:
        current_damage = 0
        current_ork = -1
    return ((x, y), current_damage, current_ork, current_friend, friends_status)

Now we define a function to get the string format of path taken to display it:

In [5]:
import time

# To get path taken in 'RLUD' format given traversal tree 
def get_path(end_state, start_state, tree):
    path = ''
    current_state = end_state
    while current_state != start_state:
        parent_state = tree[current_state]
        (x0, y0) = current_state[0]
        (x1, y1) = parent_state[0]
        if (x0 - x1) == 1:
            path += 'D'
        elif (x0 - x1) == -1:
            path += 'U'
        elif (y0 - y1) == 1:
            path += 'R'
        elif (y0 - y1) == -1:
            path += 'L'
        current_state = parent_state
    return path[::-1]

To run each method for all tests, repeated for some amount, let's define a function to do this for us! This function will output a possible path for each test(output a warning if no path is found) and a data frame which contains length of path, total states checked and average of time taken for method to find the path over total runs for all tests:

In [6]:
# To run tests given a method and get the results. Hyperparameters are used for informed searchs
def solve_problem(method, hyperparams = None):
    tests = 4
    runs = 3
    records = pd.DataFrame(columns = ['Result length', 'Total states checked', 'Mean time(s)'])
    for test in range(tests):
        test_name = "test_0" + str(test)
        grid, (x_start, y_start), (x_end, y_end), friends_start, friends_end, degrees = prepare_grid(test_name + ".txt")
        total_time = 0
        start_state = ((x_start, y_start), 0, -1, -1, '0'*len(friends_end))
        for run in range(runs):
            t0 = time.time()
            found, path, total_states_checked = method(grid, start_state, (x_end, y_end), friends_start, friends_end, degrees, hyperparams)
            if not found:
                print("Couldn't find path!")
                break
            t1 = time.time()
            total_time += (t1 - t0)
        if found:
            print("Path for {}: {}".format(test_name, path))
            records.loc[test_name] = [len(path), total_states_checked, total_time/runs]
    return records

# Uninformed Algorithm
## BFS
In BFS, frontier set acts like a FIFO queue and in each step, we'll expand the shallowest state in the frontier set and add the state to the explored set, which is used to avoid duplicate checking for states. Pay attention! We use explored set for states that are in frontier set and not explored yet too. This is because searching in set is much faster than searching in list(because it's implemented by hash table) and this will make the algorithm faster. We'll keep track of total states checked and traversal tree during execution.

In [7]:
## BFS algorithm
def bfs(grid, start_state, last_cell, friends_start, friends_end, degrees, hyperparams = None):
    q = []
    start_state = update_state(grid, start_state, friends_end)
    if reached_goal(grid, start_state):
        return True, '', 0
    total_states_checked = 0
    q.append(start_state)
    explored = {start_state}
    path = dict()
    while len(q) > 0:
        current_state = q.pop(0)
        total_states_checked += 1
        (x, y), current_damage, current_ork, current_friend, friends_status = current_state
        for delta_x in range(-1, 2):
            for delta_y in range(-1, 2):
                if abs(delta_x) != abs(delta_y) and is_valid(x + delta_x, y + delta_y, len(grid), len(grid[0])):
                    next_cell = (x + delta_x, y + delta_y)
                    next_state = (next_cell, current_damage, current_ork, current_friend, friends_status)
                    next_state = update_state(grid, next_state, friends_end)
                    if (not next_state in explored) and (is_ok(grid, next_state, degrees)):
                        path[next_state] = current_state
                        if reached_goal(grid, next_state):
                            return True, get_path(next_state, start_state, path), total_states_checked
                        explored.add(next_state)
                        q.append(next_state)
    return False, None, None

In [8]:
# To store records for each method. Used later for final conclusion
records = {}
records['BFS'] = solve_problem(bfs)
records['BFS']

Path for test_00: RRRRRRRRRRRRRRRRRRRRRRRRRRRRRDURDURDURDURDRRRRRR
Path for test_01: RRRRDDLLDDDDDLDURUUUURRRRLLDDDLDDUURUURRRRDDDLDLLRRR
Path for test_02: RRRRRRRLLLLDDDDLDLLRDDUURURRRRDRDD
Path for test_03: RRDDDDDLDLDDRDURUUUUUUUURRRRRRRLLLLLLLDDDDDDDDLDURUUUURRRRRRRDDDDD


Unnamed: 0,Result length,Total states checked,Mean time(s)
test_00,48.0,6438.0,0.04064
test_01,52.0,1383.0,0.006605
test_02,34.0,380.0,0.001841
test_03,66.0,3946.0,0.028076


## Pros
- It's complete(assuming branching factor is finite).
- It's optimal(since cost is 1 per step).
- Time spent isn't bad for an uninformed search algorithm.

## Cons
- Memory usage is high(since we keep track of many states per step).
- For far goal states, consumed time is high.

## IDS
In IDS, we use DFS algorithm with limited depth and use an iterative approach to check several depth to find the goal state. Frontier set acts like a LIFO queue and in each step, we'll expand the deepest state in the frontier set and add the state and its current depth to the explored set, which is used to avoid duplicate checking for states. Pay attention! We'll add a duplicate state if current depth is less than previously seen depth(since a new path is found with less shorter length). We'll reset these sets if we couldn't find a path in a specific depth. We'll keep track of total states checked and traversal tree during execution of algorithm.

In [9]:
# DFS with limited depth. A helper function for IDS
def rdls(grid, path, explored, degrees, friends_end, start_state, state, limit, total_states_checked):
    total_states_checked += 1
    if reached_goal(grid, state):
        return True, get_path(state, start_state, path), total_states_checked
    if limit == 0:
        return False, None, total_states_checked
    (x, y), current_damage, current_ork, current_friend, friends_status = state
    for delta_x in range(-1, 2):
        for delta_y in range(-1, 2):
            if abs(delta_x) != abs(delta_y) and is_valid(x + delta_x, y + delta_y, len(grid), len(grid[0])):
                next_cell = (x + delta_x, y + delta_y)
                next_state = (next_cell, current_damage, current_ork, current_friend, friends_status)
                next_state = update_state(grid, next_state, friends_end)
                if (not next_state in explored or explored[next_state] > explored[state] + 1) and (is_ok(grid, next_state, degrees)):
                    explored[next_state] = explored[state] + 1
                    path[next_state] = state
                    reached, possible_path, total_states_checked = rdls(grid, path, explored, degrees, friends_end, start_state, next_state, limit-1, total_states_checked)
                    if reached:
                        return reached, possible_path, total_states_checked
    return False, None, total_states_checked

# Calling DFS with a specific depth. Setting initial data structures needed for DFS
def dls(grid, degrees, friends_end, start_state, limit, total_states_checked):
    explored = {
        start_state : 0
    }
    path = dict()
    return rdls(grid, path, explored, degrees, friends_end, start_state, start_state, limit, total_states_checked)

# IDS. Calling dls with several depths. We consider the maximum depth as 2 * total cells * total fellows + 1
# since in worse case, Gandalf should traverse total cells for picking each fellow and delivering him
def ids(grid, start_state, last_cell, friends_start, friends_end, degrees, hyperparams = None):
    total_states_checked = 0
    for i in range(2 * len(grid) * len(grid[0]) * len(friends_end) + 1):
        reached, possible_path, total_states_checked = dls(grid, degrees, friends_end, start_state, i, total_states_checked)
        if reached:
            return reached, possible_path, total_states_checked
    return False, None, None

In [10]:
records['IDS'] = solve_problem(ids)
records['IDS']

Path for test_00: RRRRRRRRRRRRRRRRRRRRRRRRRRRRRDURDURDURDURDRRRRRR
Path for test_01: RRRRDDLLDDDDDLDURUUUURRRRLLDDDLDDUURUURRRRDDDLDLLRRR
Path for test_02: RRRRRRRLLLLDDDDLDLLRDDUURURRRRDRDD
Path for test_03: RRDDDDDLDLDDRDURUUUUUUUURRRRRRRLLLLLLLDDDDDDDDLDURUUUURRRRRRRDDDDD


Unnamed: 0,Result length,Total states checked,Mean time(s)
test_00,48.0,130983.0,0.553782
test_01,52.0,207801.0,1.208251
test_02,34.0,19900.0,0.107479
test_03,66.0,864222.0,4.916927


## Pros
- It's complete(assuming branching factor is finite).
- It's optimal(since cost is 1 per step).
- If goal state is found at the lower depths, then the algorithm proves to be efficient.
- Memory usage is good(if we free memory after each iteration) since it's linear.

## Cons
- Consumed time is really high(since we search at many depths)
- It calculates initial level too many times. 

# Informed Algorithm
## A*


In [11]:
def h(state, friends_start, friends_end, last_cell):
    (x, y), current_damage, current_ork, current_friend, friends_status = state
    remaining_friends = [i for i, val in enumerate(friends_status) if val == '0']
    if len(remaining_friends) == 0:
        return calc_manhattan_dist((x, y), last_cell)
    elif current_friend != -1:
        return calc_manhattan_dist((x, y), friends_end[current_friend]) + calc_manhattan_dist(friends_end[current_friend], last_cell)
    else:
        return np.max(np.array([calc_manhattan_dist((x, y), friends_start[i]) + calc_manhattan_dist(friends_start[i], friends_end[i]) + 
                      calc_manhattan_dist(friends_end[i], last_cell)
                      for i in remaining_friends]))
    
def a_star(grid, start_state, last_cell, friends_start, friends_end, degrees, hyperparams):
    alpha = hyperparams['alpha']
    total_states_checked = 0
    start_state = update_state(grid, start_state, friends_end)
    if reached_goal(grid, start_state):
        return True, '', 0
    total_states_checked = 0
    close_set = set()
    g = {
        start_state : 0
    }
    open_list = {
        start_state : g[start_state] + alpha * h(start_state, friends_start, friends_end, last_cell)
    }
    path = dict()
    while len(open_list):
        current_state = min(open_list, key = open_list.get)
        total_states_checked += 1
        open_list.pop(current_state)
        close_set.add(current_state)
        (x, y), current_damage, current_ork, current_friend, friends_status = current_state
        cost = g[current_state] + 1
        for delta_x in range(-1, 2):
            for delta_y in range(-1, 2):
                if abs(delta_x) != abs(delta_y) and is_valid(x + delta_x, y + delta_y, len(grid), len(grid[0])):
                    next_cell = (x + delta_x, y + delta_y)
                    next_state = (next_cell, current_damage, current_ork, current_friend, friends_status)
                    next_state = update_state(grid, next_state, friends_end)
                    if is_ok(grid, next_state, degrees) and not next_state in close_set:
                        if reached_goal(grid, next_state):
                            path[next_state] = current_state
                            return True, get_path(next_state, start_state, path), total_states_checked
                        if (next_state in open_list and g[next_state] > cost) or (not next_state in open_list):
                            path[next_state] = current_state
                            g[next_state] = cost
                            open_list[next_state] = g[next_state] + alpha * h(next_state, friends_start, friends_end, last_cell)
    return False, None, None

In [12]:
hyperparams = {
    'alpha' : 1
}
records['A*'] = solve_problem(a_star, hyperparams)
records['A*']

Path for test_00: RRRRRRRRRRRRRRRRRRRRRRRRRRRRRDURDURDURDURDRRRRRR
Path for test_01: RRRRDDLLDDDDDLDURUUUURRRRLLDDDLDDUURUURRRRDDDLDLLRRR
Path for test_02: RRRRRRRLLLLDDDDLDLLRDDUURRURRRDRDD
Path for test_03: RDDDRDDLDLDDRDURUUUUUUUURRRRRRRLLLLLLLDDDDDDDDLDURUUUURRRRRRRDDDDD


Unnamed: 0,Result length,Total states checked,Mean time(s)
test_00,48.0,565.0,0.013733
test_01,52.0,990.0,0.015337
test_02,34.0,123.0,0.002461
test_03,66.0,2078.0,0.029569


In [13]:
hyperparams = {
    'alpha' : 3
}
records['Weighted A*(alpha = {})'.format(hyperparams['alpha'])] = solve_problem(a_star, hyperparams)
records['Weighted A*(alpha = {})'.format(hyperparams['alpha'])]

Path for test_00: RRRRRRRRRRRRRRRRRRRRRRRRRRRRRDURDURDURDURDRRRRRR
Path for test_01: RRRRDDLLDDDDDLDURUUUURRRRLLDDDLDDUURUURRRRLLLDLDDRDRRR
Path for test_02: RRRRRRRLLLLDDDDLDLLRDDUURRURRRDRDD
Path for test_03: RDDDRDDLDLDDRDURUUUUUUUURRRRRRRLLLLLLLLDDDRDDDDDLDURUUUURRRRRRRDDDDD


Unnamed: 0,Result length,Total states checked,Mean time(s)
test_00,48.0,71.0,0.003491
test_01,54.0,891.0,0.017984
test_02,34.0,58.0,0.001996
test_03,68.0,631.0,0.008816


In [14]:
hyperparams = {
    'alpha' : 5
}
records['Weighted A*(alpha = {})'.format(hyperparams['alpha'])] = solve_problem(a_star, hyperparams)
records['Weighted A*(alpha = {})'.format(hyperparams['alpha'])]

Path for test_00: RRRRRRRRRRRRRRRRRRRRRRRRRRRRRDURDURDURDURDRRRRRR
Path for test_01: RRRRDDLLDLDDLDDRDURUUUURRRRLLDDDLDDUURUURRRRLLLDLDDRDRRR
Path for test_02: RRRRRRRLLLLLDDRDDLDLLRDDUURRURRRDRDD
Path for test_03: RDDDRDDLDLDDRDURUUUUUUUURRRRRRRLLLLLLLLDDDRDDDDDLDURUUUURRRRRRRDDDDD


Unnamed: 0,Result length,Total states checked,Mean time(s)
test_00,48.0,71.0,0.00368
test_01,56.0,516.0,0.010942
test_02,36.0,54.0,0.000742
test_03,68.0,481.0,0.007174


In [15]:
# Records per test
from IPython.display import display
tests = range(4)
name_prefix = "test_0"
for test in tests:
    test_name = name_prefix + str(test)
    test_df = pd.DataFrame(columns = ['Result length', 'Total states checked', 'Mean time(s)'])
    for method in records:
        test_df.loc[method] = records[method].loc[test_name]
    display(test_df)

Unnamed: 0,Result length,Total states checked,Mean time(s)
BFS,48.0,6438.0,0.04064
IDS,48.0,130983.0,0.553782
A*,48.0,565.0,0.013733
Weighted A*(alpha = 3),48.0,71.0,0.003491
Weighted A*(alpha = 5),48.0,71.0,0.00368


Unnamed: 0,Result length,Total states checked,Mean time(s)
BFS,52.0,1383.0,0.006605
IDS,52.0,207801.0,1.208251
A*,52.0,990.0,0.015337
Weighted A*(alpha = 3),54.0,891.0,0.017984
Weighted A*(alpha = 5),56.0,516.0,0.010942


Unnamed: 0,Result length,Total states checked,Mean time(s)
BFS,34.0,380.0,0.001841
IDS,34.0,19900.0,0.107479
A*,34.0,123.0,0.002461
Weighted A*(alpha = 3),34.0,58.0,0.001996
Weighted A*(alpha = 5),36.0,54.0,0.000742


Unnamed: 0,Result length,Total states checked,Mean time(s)
BFS,66.0,3946.0,0.028076
IDS,66.0,864222.0,4.916927
A*,66.0,2078.0,0.029569
Weighted A*(alpha = 3),68.0,631.0,0.008816
Weighted A*(alpha = 5),68.0,481.0,0.007174
