<a href="https://colab.research.google.com/github/maor226/Search/blob/main/search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import time
from queue import PriorityQueue
from random import shuffle
import math

import pandas as pd
import matplotlib.pyplot as plt
from sortedcontainers import SortedList
import matplotlib.transforms as mtransforms



In [None]:

blank = "_"
N = 4
up, down, left, right = "D", "U", "R", "L"


In [None]:

def parity(permutation):
    # assert set(permutation) == set(range(N*N))
    # return sum(x<y and px>py for (x, px) in enumerate(permutation) for (y, py) in enumerate(permutation))%2
    seen, cycles = set(), 0
    for i in permutation:
        if i not in seen:
            cycles += 1
            while i not in seen:
                seen.add(i)
                i = permutation[i]
    return (cycles + len(permutation)) % 2


def init_open():
    return SortedList()


# The function inserts s into open
def insert_to_open(open_list, s):  # Should be implemented according to the open list data structure
    open_list.add(s)


# The function returns the best node in open (according to the search algorithm)
def get_best(open_list):
    # best_node_ever = open_list[0]
    # for node in open_list[1:]:
    #     if node[0] <= best_node_ever[0] and node[1] < best_node_ever[1]:
    #         best_node_ever = node
    # open_list.remove(best_node_ever)
    # return best_node_ever
    val = open_list[0]
    open_list.remove(val)
    return val


# The function returns neighboring locations of s_location
def next_moves(puzzle):
    moves = []
    b = puzzle.index(blank)
    if b % N != 0:
        moves.append((left, swap(b, b - 1, puzzle)))
    if b % N != N - 1:
        moves.append((right, swap(b, b + 1, puzzle)))
    if b >= N:
        moves.append((up, swap(b, b - N, puzzle)))
    if b < N * N - N:
        moves.append((down, swap(b, b + N, puzzle)))
    return moves


def swap(i, j, puzzle):
    return tuple([puzzle[k] if k != i and k != j else puzzle[j] if k == i else puzzle[i] for k in range(len(puzzle))])


# The function returns whether or not s_location is the goal location
def is_goal(puzzle):
    for i, t in enumerate(list(puzzle)[:N * N - 1]):
        if t == blank or i != t - 1:
            return False
    return True


# The function estimates the cost to get from s_location to goal_location
def calculate_heuristic(puzzle):
    temp = [abs((tile - 1) % N - i % N) + abs((tile - 1) // N - i // N) if tile != blank else 0 for (i, tile) in
            enumerate(puzzle)]
    return sum(temp)


# The function returns whether n_location should be generated (checks in open_list)
# removes a node from open_list if needed
def check_for_duplicates_open(puzzle, g, open_list):
    if puzzle in open_list:
        open_list.remove(puzzle)
        return True
    return False


# The function returns whether n_location should be generated (checks in closed_list)
# removes a node from closed_list if needed
def check_for_duplicates_close(puzzle, s, closed_list):
    if puzzle in closed_list:
        if s.g + 1 < closed_list[puzzle].g:
            closed_list.pop(puzzle)
            return True
    return False


class State:
    def __init__(self, f, h, g, puzzle, s_prev, iteration_number):
        self.f = f
        self.h = h
        self.g = g
        self.puzzle = puzzle
        self.prev = s_prev

    def __lt__(self, other):
        return isinstance(other, State) and (self.f < other.f or ( self.f == other.f and self.g < other.g ))

    def __str__(self):
        return f"f={self.f}, h={self.h},g= {self.g} ,moves = {self.prev}"

    def __eq__(self, other):
        return isinstance(other, State) and other.puzzle == self.puzzle


# Locations are tuples of (x, y)
def search(start_puzzle, f):
    # State = (f, h, g, puzzle, s_prev) # f = g + h (For Priority Queue)
    # Start_state = (0, 0, 0, puzzle, [])
    iteration_number = 0
    start = State(0, 0, 0, start_puzzle, 0, 0)
    open_list = init_open()
    closed_list = {}
    # Mark the source node as
    # visited and enqueue it
    insert_to_open(open_list, start)
    while len(open_list) > 0:
        # Dequeue a vertex from
        # queue and print it
        iteration_number += 1
        s = get_best(open_list)
        s_puzzle = s.puzzle
        if s_puzzle in closed_list:
            continue
        if is_goal(s_puzzle):
            print("close ", len(closed_list), " open ", len(open_list), " len ", s.prev)
            s= s , len(open_list) + len(closed_list)
            del open_list
            del closed_list
            return s

        for direction, next_move in next_moves(s.puzzle):
            g = s.g + 1
            if check_for_duplicates_open(next_move, g, open_list) or check_for_duplicates_close(next_move, s,
                                                                                                closed_list):
                continue
            h = calculate_heuristic(next_move)
            n = State(f(g, h, iteration_number), h, g, next_move, s.prev + 1, iteration_number)
            insert_to_open(open_list, n)
        closed_list[s_puzzle] = s


def print_route(s):
    for r in s:
        print(r)


def get_route(s):
    route = []
    while s:
        s_location = (s[3], s[4])
        route.append(s_location)
        s = s[5]
    route.reverse()
    return route


def print_grid_route(route, grid):
    for location in route:
        grid[location] = 'x'
    print(pd.DataFrame(grid).to_string())


class pzl(list):
    def __str__(self):
        return (("[" + ("\t{}," * (N - 1) + "\t{}\n") * N)[:-1] + "\t]").format(*self)


def new_puzzle():
    puzzle = list(range(N * N - 1))
    shuffle(puzzle)
    while parity(puzzle) != 0:
        shuffle(puzzle)
    return tuple([s + 1 for s in puzzle] + [blank])


In [None]:


def astar_f(g, h, *args):
    return g + h


In [None]:

def maor_f(g , h, *l):
    return g +  1.1 *h if g <10 else g+h

def maor2_f(g, h, *l):
    return (g +  1.1 *h if g <10 else g+h) + 1 / l[0]

def maor3_f(g , h, *l):
    return g +  0.9 *h if g <10 else g+h

def maor(magik):
  def maor2_f(g , h, *l):
      return g +  1.1 *h if g <magik/2 else g+h
  return maor2_f

In [None]:

def y_star_f(g,h,*k):
    return g + h + 1 / g


In [None]:
def check_diff(f, k, n):
  N=n
  sum_a_close, sum_m_close, more_close = 0,0,0
  for _ in range(k):
    puzzle = new_puzzle()
    puzzle = (5, 2, 9, 12, 13, 6, 3, 1, blank, 8, 14, 7, 10, 4, 11, 15)
    print(pzl(puzzle))

    print("\n\n ",f.__name__,":")
    t = time.time()
    m = search(puzzle, f)
    print(time.time()-t)

    print("\n\n a_star:")
    t = time.time()
    a = search(puzzle, astar_f)
    print(time.time()-t)


    if a[0].g != m[0].g:
        return "ERROR "
        break
    else:
        sum_a_close += len(a[2])
        sum_m_close += len(m[2])
        if len(a[2]) < len(m[2]):
          more_close += 1

  return (sum_a_close/k, sum_m_close/k) , (sum_a_close/k- sum_m_close/k) / (sum_a_close/k), more_close


In [None]:

def y_star2_f(g,h,*k):
    return max(g - 1 + h + 1 / g, 0)

def y_star3_f(g, h, *args):
    return g + h + 1 / args[0]

def y_star4_f(g, h, *args):
    return max(g + h + 1 / args[0] - 1, 0)

def y_star5_f(g, h, *args):
    return max(g + h + math.sin(args[0]) - 1, 0)

def y_star6_f(g, h, *args):
    return max(g + h + math.cos(args[0]) - 1, 0)

def y_star7_f(g, h, *args):
    return max(g + h + ((- 1) ** args[0]) / 2 + 1 / 2 * 1 / args[0] - 1 , 0)

def y_star8_f(g, h, *args):
    return max(g + h + (((- 1) ** args[0]) / 2 + 1 / 2 ) * 1 / args[0] - 1 , 0)

def y_star9_f(g, h, *args):
    return g + h + 1 - 1 / args[0]

In [None]:
def check_dif(f1,f2, k, n):
  N=n
  sum_a_close , sum_m_close = 0,0
  for _ in range(k):
    puzzle = new_puzzle()
    print(pzl(puzzle))
    

    print("\n\n ",f1.__name__,":")
    t = time.time()
    a = search(puzzle, f1)
    print(time.time()-t)


    t = time.time()
    print("\n\n ",f2.__name__,":")
    m = search(puzzle, f2)
    print(time.time()-t)



    if a[0].g != m[0].g:
        return "ERROR "
        break
    else:
        sum_a_close += len(a[2])
        sum_m_close += len(m[2])
  return (sum_a_close/k, sum_m_close/k) , (sum_a_close/k- sum_m_close/k) / (sum_a_close/k)


In [None]:
def record_diff(fs, k,df):
  data = ["puzzle"]+[ f.__name__ + "_close_list" for f in fs]+[ f.__name__ + "_len" for f in fs]
  data = {d:[] for d in data}
  dupp = 0
  for i in range(k):
    puzzle = new_puzzle()
    print(pzl(puzzle))
    # puzzle = (5, 2, 9, 12, 13, 6, 3, 1, blank, 8, 14, 7, 10, 4, 11, 15)
    if   len(df[df["puzzle"] ==str((puzzle))])>0 or str((puzzle)) in data["puzzle"]:
      dupp+=1
      print(f"################################################ dup {dupp}") 
      continue
    data["puzzle"].append(str((puzzle)))
    for f in fs:
        t = time.time()
        s=search(puzzle, f)
        print(time.time()-t)
        data[f.__name__ + "_len"].append(s[0].prev)
        data[f.__name__ + "_close_list"].append((s[1]))
    print("############################################### iteration number: ", i)
  data = pd.DataFrame(data)
  print(f"################################################ dup {dupp}")
 
    
  return  df.append(data,True)
  # return data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
record_df = pd.read_csv('/content/drive/Shareddrives/search/weighted_record_3x3.csv')

In [None]:
def wstar100_f(g,h,*args):
  return g+100*h

def wstar10_f(g,h,*args):
  return g+10*h  

def wstar5_f(g,h,*args):
  return g+5*h  


def wstar3_f(g,h,*args):
  return g+3*h   
def wstar2_5_f(g,h,*args):
  return g+2.5*h  

def wstar2_f(g,h,*args):
  return g+2*h  

def wstar1_5_f(g,h,*args):
  return g+1.5*h  

def wstar1_1_f(g,h,*args):
  return g+1.1*h

In [None]:
fun=[wstar100_f,wstar10_f,wstar5_f,wstar3_f,wstar2_5_f,wstar2_f,wstar1_5_f,wstar1_1_f,astar_f,y_star3_f,maor_f,maor2_f]
fun_name = {astar_f:"A*",wstar100_f:"100W*",wstar5_f:"5W*",wstar3_f:"3W*",wstar2_5_f:"2.5W*",wstar1_5_f:"1.5W*",wstar10_f:"10W*",wstar2_f:"2W*",wstar1_1_f:"1.1W*",y_star3_f:"Y*",maor_f:"M*",maor2_f:"MY*"}

In [None]:
record_df.to_csv('/content/drive/Shareddrives/search/weighted_record_3x3.csv')

In [None]:
prob =[]
my_fun = [wstar1_1_f,maor2_f,maor_f,y_star3_f,astar_f]
for f in my_fun:
  p=len(record_df[record_df[astar_f.__name__+"_len"] == record_df[f.__name__+"_len"]]) / len(record_df)
  prob.append(p)
  print(f.__name__ , " prob to get opt" , p)

df = pd.DataFrame({"function":[fun_name[f] for f in my_fun],"probability to return optimal solution":prob})

import seaborn as sns
g = sns.catplot(x="function", y='probability to return optimal solution', kind='bar', data=df)
g.ax.set_ylim(0.99,1)

import tikzplotlib
tikzplotlib.save(f"/content/drive/Shareddrives/search/prob.tex")


In [None]:
def plt_ptint(df,fun):
    plt.figure(figsize=(12,8), dpi= 100, edgecolor='k')

    label = [fun_name[f] for f in fun]
    x = [f.__name__+"_close_list" for f in fun]
    y = [f.__name__+"_len" for f in fun]

    # trans_offset = mtransforms.offset_copy(ax.transData, fig=fig,
    #                                    x=0.05, y=0.10, units='inches')
    dots=[]
    for x, y ,l in zip(x, y,label):
      c='ro'
      if l[-2]=="W" or l[-2] =="A":
        dots.append((x,y))
        c = 'bo'
      plt.plot(df[x], df[y], c)
      plt.text(df[x], df[y], l +"- %.4f"%(df[y],)+("\n\n "if l==fun_name[maor2_f] else "\n") )
      

    plt.plot([df[x] for x,_ in dots] , [df[y] for _,y in dots],"--")
    # naming the x axis
    plt.xlabel('AVG Node Expansion')
    # naming the y axis
    plt.ylabel('AVG Solution Length')

    # # show a legend on the plot
    # plt.legend()
      
    # function to show the plot
    plt.show()



In [None]:
def plt_pgf(df,fun,name):

    plt.figure(figsize=(12,8), dpi= 100, edgecolor='k')

    label = [fun_name[f] for f in fun]
    x = [f.__name__+"_close_list" for f in fun]
    y = [f.__name__+"_len" for f in fun]

    # trans_offset = mtransforms.offset_copy(ax.transData, fig=fig,
    #                                    x=0.05, y=0.10, units='inches')
    dots=[]
    for x, y ,l in zip(x, y,label):
      c='ro'
      if l[-2]=="W" or l[-2] =="A":
        dots.append((x,y))
        c = 'bo'
      plt.plot(df[x], df[y], c)
      plt.text(df[x], df[y], l +"- %.4f"%(df[y],)+("\n\n "if l==fun_name[maor2_f] else "\n") )
      

    plt.plot([df[x] for x,_ in dots] , [df[y] for _,y in dots],"--")
    # naming the x axis
    plt.xlabel('AVG Node Expansion')
    # naming the y axis
    plt.ylabel('AVG Solution Length')

    # # show a legend on the plot
    # plt.legend()
      
    # function to show the plot
    import tikzplotlib
    tikzplotlib.save(f"/content/drive/Shareddrives/search/{name}.tex")



In [None]:
plt_pgf(record_df.mean(),fun,"search_records")
plt_pgf(record_df.mean(),[wstar1_5_f,wstar1_1_f,astar_f,maor_f,y_star3_f,maor2_f],"search_records1")

In [None]:
plt_ptint(record_df.mean(),[astar_f,wstar2_f,wstar1_5_f,wstar1_1_f,y_star3_f,maor_f,maor2_f])