# Tema 1  - Beam-search


Boriceanu Ioana-Roxana 



Grupa 341A2

In [1]:
from __future__ import annotations
import random, math
from _functools import reduce
from copy import copy,deepcopy
from builtins import isinstance
from heapq import heappop, heappush
import time
#from resource import setrlimit, RLIMIT_AS, RLIMIT_DATA


class NPuzzle:
    """
    Reprezentarea unei stări a problemei și a istoriei mutărilor care au adus starea aici.
    
    Conține funcționalitate pentru
    - afișare
    - citirea unei stări dintr-o intrare pe o linie de text
    - obținerea sau ștergerea istoriei de mutări
    - obținerea variantei rezolvate a acestei probleme
    - verificarea dacă o listă de mutări fac ca această stare să devină rezolvată.
    """
    
    NMOVES = 4
    UP, DOWN, LEFT, RIGHT = range(NMOVES)
    ACTIONS = [UP, DOWN, LEFT, RIGHT]
    names = "UP, DOWN, LEFT, RIGHT".split(", ")
    BLANK = ' '
    delta = dict(zip(ACTIONS, [(-1, 0), (1, 0), (0, -1), (0, 1)]))
    
    PAD = 2
    
    def __init__(self, puzzle : list[int|str], movesList : list[int] = []):
        """
        Creează o stare nouă pe baza unei liste liniare de piese, care se copiază.
        
        Opțional, se poate copia și lista de mutări dată.
        """
        self.N = len(puzzle)
        self.side = int(math.sqrt(self.N))
        self.r = copy(puzzle)
        self.moves = copy(movesList)

    def display(self, show = True) -> str:
        l = "-" * ((NPuzzle.PAD + 1) * self.side + 1)
        aslist = self.r
        
        slices = [aslist[ slice * self.side : (slice+1) * self.side ]  for slice in range(self.side)]
        s = ' |\n| '.join([' '.join([str(e).rjust(NPuzzle.PAD, ' ') for e in line]) for line in slices]) 
        
        s = ' ' + l + '\n| ' + s + ' |\n ' + l
        if show: print(s)
        return s
    def display_moves(self):
        print([NPuzzle.names[a] if a is not None else None for a in self.moves])
        
    def print_line(self):
        return str(self.r)
    
    @staticmethod
    def read_from_line(line : str):
        list = line.strip('\n][').split(', ')
        numeric = [NPuzzle.BLANK if e == "' '" else int(e) for e in list]
        return NPuzzle(numeric)

    def clear_moves(self):
        """Șterge istoria mutărilor pentru această stare."""
        self.moves.clear()
    
    def apply_move_inplace(self, move : int):
        """Aplică o mutare, modificând această stare."""
        blankpos = self.r.index(NPuzzle.BLANK)
        y, x = blankpos // self.side, blankpos % self.side
        ny, nx = y + NPuzzle.delta[move][0], x + NPuzzle.delta[move][1]
        if ny < 0 or ny >= self.side or nx < 0 or nx >= self.side: return None
        newpos = ny * self.side + nx
        piece = self.r[newpos]
        self.r[blankpos] = piece
        self.r[newpos] = NPuzzle.BLANK
        self.moves.append(move)
        return self
    
    def apply_move(self, move : int):
        """Construiește o nouă stare, rezultată în urma aplicării mutării date."""
        return self.clone().apply_move_inplace(move)

    def solved(self):
        """Întoarce varianta rezolvată a unei probleme de aceeași dimensiune."""
        return NPuzzle(list(range(self.N))[1:] + [NPuzzle.BLANK])

    def verify_solved(self, moves : list[int]) -> bool:
        """"Verifică dacă aplicarea mutărilor date pe starea curentă duce la soluție"""
        return reduce(lambda s, m: s.apply_move_inplace(m), moves, self.clone()) == self.solved()
    
    def clone(self):
        return NPuzzle(self.r, self.moves)
    def __str__(self) -> str:
        return str(self.N-1) + "-puzzle:" + str(self.r)
    def __repr__(self) -> str: return str(self)
    def __eq__(self, other):
        return self.r == other.r
    def __lt__(self, other):
        return True
    def __hash__(self):
        return hash(tuple(self.r))


MLIMIT = 3 * 10 ** 9 # 2 GB RAM limit
#setrlimit(RLIMIT_DATA, (MLIMIT, MLIMIT))

f = open("files/problems4.txt", "r")
input = f.readlines()
f.close()
problems = [NPuzzle.read_from_line(line) for line in input]
problems[0].display()

# generare
def genOne(side, difficulty):
	state = NPuzzle(list(range(side * side))[1:] + [NPuzzle.BLANK])
	for i in range(side ** difficulty + random.choice(range(side ** (difficulty//2)))):
		s = state.apply_move(random.choice(NPuzzle.ACTIONS))
		if s is not None: state = s
	state.clear_moves()
	return state

print("Generare:")
random.seed(4242)
p = genOne(3, 4)
p.display()
# problemele easy au fost generate cu dificultatile 4, 3, respectiv 2 (pentru marimile 4, 5, 6)
# celelalte probleme au toate dificultate 6



 -------------
| 11 12  8  3 |
|  6  2  9 14 |
| 15  5  7 13 |
|    10  4  1 |
 -------------
Generare:
 ----------
|  2  5  7 |
|  1  3    |
|  8  4  6 |
 ----------


' ----------\n|  2  5  7 |\n|  1  3    |\n|  8  4  6 |\n ----------'

In [2]:
#Euristica1 NPuzzle

def manhattan_distance(self):
    good_one=self.solved()
    vals_self=self.r
    vals_good=good_one.r
    dist=0
    for j in range(self.N):
        for i in range(self.N):
            if vals_good[j]==vals_self[i]:
                dist=dist+abs(j//3-i//3)+abs(j%3-i%3)
    return dist

In [3]:
#Euristica2 NPuzzle
#cate piese sunt pozitionate gresit

def wrong_pos(self):
    good_one=self.solved()
    vals_self=self.r
    vals_good=good_one.r
    wrong_tiles=0
    for i in range(self.N):
        if vals_self[i]!=vals_good[i]:
            wrong_tiles+=1
    return wrong_tiles

In [4]:
def next_moves(self):
    next_states=set(self.apply_move(i) for i in range(4) if self.apply_move(i))
    return next_states

In [5]:
from heapq import heappop, heappush

def astar(start, h,limit):
    
    g=0
    frontier=[]
    heappush(frontier,(g+h(start),start))
    discovered={start:(None,0)}
    
    while frontier and len(discovered)<=limit:
        f,s=heappop(frontier)
        g=discovered[s][1]
        if not h(s):
            break
        for s_next in next_moves(s):
            if s_next not in discovered or g+1 < discovered[s_next][1]:
                discovered[s_next]=(s,g+1)
                heappush(frontier,(g+1+h(s_next),s_next))
                
    path=[]
    final_s=s
    final_s.display()
    
    while final_s:   
        path.append(final_s)
        final_s=discovered[final_s][0]
        
    print(len(path))
    print(len(discovered))
    return path

In [6]:
st = time.time()

limit=1000000
f = open("files/problems4-easy.txt", "r")
input = f.readlines()
f.close()

problems = [NPuzzle.read_from_line(line) for line in input]
problems[4].display()   
astar(problems[0],manhattan_distance,limit)
#astar(problems[4],wrong_pos,limit)
et = time.time()

print('Execution time:', et - st, 'seconds')

 -------------
|  8  5  3  7 |
|  2 13 14    |
|  9  1 10  4 |
|  6 11 15 12 |
 -------------
 -------------
|  1     3  4 |
|  8 11  5  2 |
|  6 12 10  9 |
| 13 15 14  7 |
 -------------
26
1000001
Execution time: 61.691282510757446 seconds


In [6]:
def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y) 
    return z

In [7]:
def BeamSearch(start,B,h,limit):
    moves=[]
    selected={}
    beam={start:h(start)}
    discovered={start:h(start)}
    found=False
    while beam and len(discovered)<limit and not found:
        succ={}
        new_succ={}
        for s in beam:
            for s_next in next_moves(s):
                moves.append(s_next)
            for m in moves:
                new_succ[m]=h(m)
                if not h(m):
                    m.display()
                    found=True
            if found:
                break
            moves.clear()
            succ=merge_two_dicts(succ,new_succ)
            new_succ.clear()
            
                
        selected.update((sorted(succ.items(), key=lambda x:x[1]))[:B])
        discovered=merge_two_dicts(discovered,selected)
        beam.clear()
        beam.update(selected)
        selected.clear()
        
    print("Numar stari stocate: ",len(discovered)) 
    if found:
        return 1
    else:
        return 0

In [8]:
st = time.time()

B=1000
lim=100000
f = open("files/problems4-easy.txt", "r")
input = f.readlines()
f.close()

problems = [NPuzzle.read_from_line(line) for line in input]
problems[0].display()

BeamSearch(problems[3],B,wrong_pos,lim)
#BeamSearch(problems[0],B,manhattan_distance,lim)
et = time.time()

print('Execution time:', et - st, 'seconds')

 -------------
| 11  3  2  4 |
|  1  9 14 10 |
| 13  8    12 |
| 15  5  6  7 |
 -------------
 -------------
|  1  2  3  4 |
|  5  6  7  8 |
|  9 10 11 12 |
| 13 14 15    |
 -------------
Numar stari stocate:  12766
Execution time: 1.8740761280059814 seconds


In [9]:
def Iteratie(stare, discrepante, h, vizitat, limita):
    succ={}
    new_succ={}
    best={}
    s={}
    moves=[]
    new_vizitat={}
    new_vizitat=merge_two_dicts(new_vizitat,vizitat)
    
    for s_next in next_moves(stare):
        moves.append(s_next)
    for m in moves:
        if not h(m):
            m.display()
            return 1
        else:
            if m not in vizitat:
                new_succ[m]=h(m)
    succ=merge_two_dicts(succ,new_succ)
    if not succ:
        return 0
    if len(new_vizitat)>limita:
        return 0
    best.update((sorted(succ.items(), key=lambda x:x[1]))[:1])
    for f in best.items():
        b=f[0]
    if not discrepante:
        vizitat=merge_two_dicts(vizitat,best)
        new_vizitat=merge_two_dicts(new_vizitat,best)
        return Iteratie(b, 0, h, new_vizitat, limita)
    else:
        succ.pop(b)
        while succ:
            s.update((sorted(succ.items(), key=lambda x:x[1]))[:1])
            for f in s.items():
                b=f[0]
            succ.pop(b)
            vizitat=merge_two_dicts(vizitat,s)
            new_vizitat=merge_two_dicts(new_vizitat,s)
            if Iteratie(b, discrepante-1, h, new_vizitat, limita):
                return 1
        return Iteratie(b, discrepante, h, new_vizitat, limita)

In [10]:
def GLDS(start,h,limita):
    vizitat={start:h(start)}
    discrepante=0
    found=False
    lim_discr=3
    while not found and discrepante<lim_discr:
        if Iteratie(start, discrepante, h, vizitat, limita):
            found=True
        discrepante=discrepante+1 
    if found:
        return 1
    else:
        return 0

In [None]:
st = time.time()

lim=100000
f = open("files/problems4-easy.txt", "r")
input = f.readlines()
f.close()

problems = [NPuzzle.read_from_line(line) for line in input]
problems[0].display()

GLDS(problems[0],manhattan_distance,lim)

et = time.time()

print('Execution time:', et - st, 'seconds')

 -------------
| 11  3  2  4 |
|  1  9 14 10 |
| 13  8    12 |
| 15  5  6  7 |
 -------------


In [None]:
lim=100
p = genOne(3,2)
p.display()

GLDS(p,manhattan_distance,lim)

In [12]:
import itertools
def Iter(nivel, discrepante, B, h, vizitat, limita):
    succ={}
    new_succ={}
    best={}
    new_best={}
    moves=[]
    nivel_urm={}
    for s in nivel.keys():
        for s_next in next_moves(s):
            moves.append(s_next)
        for m in moves:
            if not h(m):
                m.display()
                return 1
            else:
                if m not in vizitat:
                    new_succ[m]=h(m)
        succ=merge_two_dicts(succ,new_succ)
    if not succ:
        return 0
    if len(vizitat) + min(B,len(succ))>limita:
        return 0
    
    best.update((sorted(succ.items(), key=lambda x:x[1]))[:B])
    
    if not discrepante:
        nivel_urm=merge_two_dicts(nivel_urm,best)
        vizitat=merge_two_dicts(vizitat,nivel_urm)
        #print(len(vizitat))
        return Iter(nivel_urm, 0, B, h, vizitat, limita)
    else:
        deja_explorate=B
        while deja_explorate<len(succ):
            n=min(len(succ)-deja_explorate, B)
            nivel_urm.update(dict(itertools.islice(succ.items(),n)))
            vizitat=merge_two_dicts(vizitat,nivel_urm)
            val=Iter(nivel_urm, discrepante-1, B, h, vizitat, limita)
            if val:
                deja_explorate=deja_explorate+len(nivel_urm)
        new_best.update((sorted(succ.items(), key=lambda x:x[1]))[:B])
        nivel_urm=merge_two_dicts(nivel_urm,new_best)
        vizitat=merge_two_dicts(vizitat,nivel_urm)
        #print(len(vizitat))
        return Iter(nivel_urm, discrepante, B, h, vizitat, limita)

In [13]:
def BLDS(start,h,B,limita):
    vizitat={start:h(start)}
    discrepante=0
    found=False
    lim_discr=3
    while not found and discrepante<lim_discr:
        if Iter({start:h(start)}, discrepante, B,h, vizitat, limita):
            found=True
        discrepante=discrepante+1 
    
    if found:
        return 1
    else:
        return 0

In [15]:
st = time.time()

lim=1000000
B=500
f = open("files/problems4-easy.txt", "r")
input = f.readlines()
f.close()

problems = [NPuzzle.read_from_line(line) for line in input]
problems[0].display()
BLDS(problems[0],wrong_pos,B,lim)
#BLDS(problems[0],manhattan_distance,B,lim)


et = time.time()

print('Execution time:', et - st, 'seconds')

 -------------
| 11  3  2  4 |
|  1  9 14 10 |
| 13  8    12 |
| 15  5  6  7 |
 -------------
 -------------
|  1  2  3  4 |
|  5  6  7  8 |
|  9 10 11 12 |
| 13 14 15    |
 -------------
Execution time: 82.631831407547 seconds
