<img src="./sharif.png" alt="SUT logo" width=300 height=300 align=left class="saturate">

<br>
<font>
<div dir=ltr align=center>
<font color=0F5298 size=7>
    Artificial Intelligence <br> <br>
<font color=2565AE size=5>
    CE Department <br>
    Fall 2024 - Prof. Rohban<br>
<font color=3C99D size=5>
    HW2 Practical <br>
    Constraint Satisfaction Problem<br>
<font color=696880 size=4>
    Shayan Shabani

____

## Personal Data

In [None]:
full_name = None
student_number = None

## Sudoku Solver : Backtracking (25 Points)
Use your knowledge of the backtracking algorithm to solve the sudoku problem

### Importing Libraries

In [None]:
import pandas as pd
import numpy as np

### Preprocessing
Import the data and explore it!
The columns you need are named quizzes and solutions
Unsolved sudoku are in quizzes and solved ones are in solutions, use the solutions column to check whether you solve the sudoku correctly

In [None]:
"""
TODO: import the data from the provided sudoku.csv file and explore it
"""
csv_path = "C:\\Users\\masoud\\Desktop\\sudoku.csv"
data = pd.read_csv(csv_path)
# data = None
# TODO: print the head of the data
data.head()

In [None]:
# TODO: print info of the data
data.info()

In [None]:
# TODO: describe the data
data.describe()

In [None]:
def gridding_array(array):
    """
    TODO: turn the 1D array to a sudoku map
    """
    return np.array(array).reshape(9, 9)

In [None]:
#################################################
############### DO NOT CHANGE ###################
#################################################
def display_grid(grid):
    """
    This function prints a formatted representation of the 9x9 Sudoku grid with dividing lines.
    """
    separator_row = "+-----+-------+-----+"

    for row in range(9):
        if row % 3 == 0:
            print(separator_row)
        
        row_content = []
        for col in range(9):
            row_content.append(str(grid[row, col]))
            if (col + 1) % 3 == 0 and col != 8:
                row_content.append("|")
        
        print(" ".join(row_content))
        
    print(separator_row)

#################################################
#################################################
#################################################

### Backtracking Algorithm
Testing your algorithm on a sample sudoku is enough, but your code will be manually tested on multiple ones later

In [None]:
def is_empty(sudoku):
    """
    TODO: This function should check whether the sudoku is still empty (contains zero values)
    """
    return np.any(sudoku == 0)

In [None]:
def is_placement_valid(grid, num, position):
    """
    TODO: This function determines whether it's valid to place the number 'num' at the given 'position' on the Sudoku grid.
    """
    row, col = position
    
    if num in grid[row]:
        return False

    if num in [grid[i][col] for i in range(9)]:
        return False

    start_row = (row // 3) * 3
    start_col = (col // 3) * 3
    for i in range(3):
        for j in range(3):
            if grid[start_row + i][start_col + j] == num:
                return False

    return True

In [None]:
def find_empty(sudoku):
    """
    Finds an empty cell in the Sudoku grid.
    Returns the position (row, col) of the empty cell, or None if the grid is full.
    """
    for i in range(9):
        for j in range(9):
            if sudoku[i][j] == 0:
                return (i, j)
    return None


def backtrack(sudoku):
    """
    Solve the Sudoku puzzle using the backtracking algorithm.
    """
    empty_pos = find_empty(sudoku)
    if not empty_pos:
        return True

    row, col = empty_pos

    for num in range(1, 10):
        if is_placement_valid(sudoku, num, (row, col)):
            sudoku[row][col] = num
            if backtrack(sudoku):
                return True
            sudoku[row][col] = 0

    return False

In [None]:
def compare(sudoku, solution):
    """
    TODO: compare the sudoku you solved with its actual solution inside the dataset
        You have to subtract these two from each other and print the pretty version of the result
        which should contains only zeros
    """
    sudoku = np.array(sudoku)
    solution = np.array(solution)
    
    difference = solution - sudoku
    display_grid(difference)
    
    return np.all(difference == 0)


"""
TODO: Test and print the pretty result of your sample sudoku
        Compare it with the actual solution of the sudoku to make sure it is correct
"""

first_quiz = data['quizzes'][60].strip()
first_solution = data['solutions'][60].strip()

quiz_array = [int(digit) for digit in first_quiz]
solution_array = [int(digit) for digit in first_solution]

quiz_grid = gridding_array(quiz_array)
solution_grid = gridding_array(solution_array)

print("\nOriginal Sudoku Puzzle:")
display_grid(quiz_grid)

backtrack(quiz_grid)

print("\nSolved Sudoku Puzzle:")
display_grid(quiz_grid)

print("\nCompared Sudoku Puzzle:")
is_correct = compare(quiz_grid, solution_grid)

print("\nIs the solution of sudoku correct?", is_correct)

## Sudoku Solver : CP-SAT Solver (15 Points)
Now we are going to explore ortools library and use it to solve the sudoku problem
Hint: use built-in functions of this library like AddAllDifferent and CpSolver; search for more ...
here is a useful link : https://developers.google.com/optimization/cp

In [None]:
! python -m pip install -U --user ortools
from ortools.sat.python import cp_model
import pandas as pd
import numpy as np

In [None]:
def solve_sudoku(sudoku):
    """
    Solves the given Sudoku puzzle using OR-Tools and returns the solved grid.
    
    :param sudoku: 2D numpy array representing the Sudoku puzzle
    :return: Solved 2D numpy array or a message indicating failure
    """
    model = cp_model.CpModel()
    n = 9 
    cells = {}
    for i in range(n):
        for j in range(n):
            cells[(i, j)] = model.NewIntVar(1, 9, f'cell_{i}_{j}')
    for i in range(n):
        for j in range(n):
            if sudoku[i][j] != 0:
                model.Add(cells[(i, j)] == sudoku[i][j])
    for i in range(n):
        model.AddAllDifferent([cells[(i, j)] for j in range(n)])
    for j in range(n):
        model.AddAllDifferent([cells[(i, j)] for i in range(n)])
    for box_row in range(0, n, 3):
        for box_col in range(0, n, 3):
            model.AddAllDifferent([
                cells[(i, j)]
                for i in range(box_row, box_row + 3)
                for j in range(box_col, box_col + 3)
            ])
    solver = cp_model.CpSolver()
    status = solver.Solve(model)
    if status != cp_model.OPTIMAL and status != cp_model.FEASIBLE:
        return "No solution was found"
    solved_sudoku = np.zeros((n, n), dtype=int)
    for i in range(n):
        for j in range(n):
            solved_sudoku[i, j] = solver.Value(cells[(i, j)])
            if solved_sudoku[i, j] == 0:
                return "Solution is incomplete or invalid"
    return solved_sudoku

In [None]:
"""
TODO: Test and print the pretty result of the above function, one sample sudoku from the dataset is enough
    use the compare function to make sure your answer is correct
"""

first_quiz = data['quizzes'][1000].strip()
first_solution = data['solutions'][1000].strip()

quiz_array = [int(digit) for digit in first_quiz]
solution_array = [int(digit) for digit in first_solution]

quiz_grid = gridding_array(quiz_array)
solution_grid = gridding_array(solution_array)

print("\nOriginal Sudoku Puzzle:")
display_grid(quiz_grid)

quiz_grid = solve_sudoku(quiz_grid)

print("\nSolved Sudoku Puzzle:")
display_grid(quiz_grid)

print("\nCompared Sudoku Puzzle:")
is_correct = compare(quiz_grid, solution_grid)
print("\nIs the solution of sudoku correct?", is_correct)



## Graph Coloring
In this part, we are going to explore the graph coloring topic. Now we can use advanced options to speed up the process of constraint satisfaction

In [None]:
from itertools import combinations
from time import time
import numpy as np
import copy

from collections import deque
from collections import defaultdict

### AC-3 Algorithm (20 Points)
First and foremost, implement the AC-3 algorithm to be later used in the problem

In [None]:
def AC3(csp, neighbors):
    
    nodes = csp['nodes']
    domains = csp['domains']
    
    queue = deque([(Xi, Xj) for Xi in nodes for Xj in neighbors[Xi]])

    def revise(Xi, Xj):
        revised = False
        constraints = csp['constraints'][(Xi, Xj)]

        for x in domains[Xi][:]:
            if not any((x, y) in constraints for y in domains[Xj]):
                domains[Xi].remove(x)
                revised = True
        return revised

    while queue:
        Xi, Xj = queue.popleft()
        if revise(Xi, Xj):
            if not domains[Xi]:
                return False  
            for Xk in neighbors[Xi]:
                if Xk != Xj:
                    queue.append((Xk, Xi))
    return True

### AC-4 Algorithm (20 Points)
Secondly, implement the AC-4 (Arc Consistency 4) algorithm. This algorithm enforces arc consistency by maintaining **support tables** and efficiently processing **a queue of arcs** for consistency checks. To help you implement it, here are the key steps and concepts to focus on:
1. **Support Tables**: For each pair of variable-value assignments, you must maintain a table that tracks the number of supporting values in the neighboring variables' domains. This table will allow you to quickly determine if a value is consistent with its neighbors
2. Start by initializing your support tables for all variable-value pairs. For each arc (X, Y), compute and store the number of values in Y's domain that support each value in X's domain based on the constraint between X and Y.
3. Initialize **a queue of arcs** with all pairs of variables that share a constraint
4. For each arc (X, Y) in the queue, check the support for each value in X's domain. If any value in X loses all supporting values in Y, remove that value from X’s domain.
5. If a value is removed from X’s domain, for every neighboring variable Z (other than Y), you must add the arc (Z, X) back into the queue for further consistency checks. This ensures that removing a value from X doesn’t create inconsistencies for other variables.
6. Whenever a value is removed from a domain, update the support tables for all neighboring variable-value pairs that depend on the removed value. This way, if a value of Z is only supported by the value that was removed from X, the support count for that value of Z should be decremented.
7. The algorithm terminates when the queue is empty, meaning all arcs have been processed and the domains of all variables are arc-consistent. If at any point a domain becomes empty, the problem is unsolvable.

In [None]:
def AC4(csp):
    """
    csp: 
        dict{
            nodes: list of nodes,
            domains: dictionary {variable: [domain]},
            constraints: dictionary {(node_i, node_j): list of constraints
        },
        
    TODO: Implement the AC4 algorithm
    
    return: a bool to show whether the problem is consistent and if so, the reduced domain of the problem
    """
    
    nodes = csp['nodes']
    domains = csp['domains']
    constraints = csp['constraints']
    
    support_count = defaultdict(int)
    supported_by = defaultdict(list)
    queue = deque()

    for (X, Y), constraint_list in constraints.items():
        for value_X in domains[X]:
            count = sum(1 for value_Y in domains[Y] if (value_X, value_Y) in constraint_list)
            support_count[(X, value_X, Y)] = count
            
            if count == 0:
                queue.append((X, value_X))
            else:
                for value_Y in domains[Y]:
                    if (value_X, value_Y) in constraint_list:
                        supported_by[(Y, value_Y)].append((X, value_X))

    while queue:
        X, value_X = queue.popleft()
        
        if value_X in domains[X]:
            domains[X].remove(value_X)
        
        if not domains[X]:
            return False, None

        for (Z, _) in constraints.keys():
            if Z != X:
                for value_Z in domains[Z]:
                    if (X, value_X) in supported_by[(Z, value_Z)]:
                        support_count[(Z, value_Z, X)] -= 1
                        
                        if support_count[(Z, value_Z, X)] == 0:
                            queue.append((Z, value_Z))

    return True, domains

In [None]:
#################################################
############### DO NOT CHANGE ###################
#################################################

graph = {
    "WA": ["NT", "SA"], 
    "NT": ["WA", "SA", "Q"], 
    "SA": ["NT", "WA", "Q", "NSW", "V"], 
    "Q": ["NT", "SA", "NSW"], 
    "NSW": ["Q", "SA", "V"], 
    "V": ["NSW", "SA"]
}

nodes = ["WA", "NT", "SA", "Q", "NSW", "V"]
domains = {}

for node in nodes:
    if node == "WA":
        domains[node] = ["R"]
    else:
        domains[node] = ["R", "G", "B"]

constraint = [(x, y) for x in ["R", "G", "B"] for y in ["R", "G", "B"] if x != y]
constraints = {}

for node in graph.keys():
    for neighbor in graph[node]:
        constraints[(node, neighbor)] = constraint

#################################################
#################################################
#################################################

def print_reduced_domains(domains):
    print("Reduced Domains:")
    for node in domains:
        print(f"{node}: {domains[node]}")


csp = {
    "nodes": nodes,
    "domains": domains,
    "constraints": constraints
}

def run_ac3(csp):
    domains = csp["domains"].copy()
    start_time = time()
    AC3(csp, graph)
    end_time = time()
    print("\nAC3 Results:")
    print_reduced_domains(ac3_domains)
    print(f"Time taken by AC3: {end_time - start_time:.6f} seconds\n")
    pass

def run_ac4(csp):
    domains = csp["domains"].copy()
    start_time = time()
    AC4(csp)
    end_time = time()
    print("\nAC4 Results:")
    print_reduced_domains(domains)
    print(f"Time taken by AC4: {end_time - start_time:.6f} seconds\n")
    pass

# TODO: print the reduced domain by applying the AC3 algorithm (make sure to show the time spent as well)
run_ac3(csp)

# TODO: print the reduced domain by applying the AC4 algorithm (make sure to show the time spent as well)
run_ac4(csp)

### Conclusion
Explain the differences between these two algorithms in terms of time and space complexity. Which one do you prefer to use?
The space complexity of AC3 is relatively low since it only needs to store the list of arcs that needs to be processed. AC4 uses additional data structures like support tables. This allows AC4 to quickly know when a value has lost all domain members, but it comes at the cost of increased space complexity.
Since the problem size is relatively small, I would recommend using AC3 because of its simplicity and lower space complexity. but if the graph had more nodes and edges, or the domains were much larger, AC4 would be a better choice for efficiency.


## Backtracking + AC-3 and AC-4 algorithm (20 Points)
Now we are going to utilize the implemented algorithms to speed up the backtracking process, use both of the above algorithms, you must run ac3 and ac4 algorithms seperately.

In [None]:
def is_consistent(var, value, assignment, csp, neighbors):
    """
    Check if assigning 'value' to 'var' is consistent with the current assignment.
    """
    for neighbor in neighbors[var]:
        if neighbor in assignment:
            constraints = csp["constraints"]
            if (value, assignment[neighbor]) not in constraints[(var, neighbor)]:
                return False
    return True

def select_unassigned_variable(assignment, domains):
    """
    Select an unassigned variable using the Minimum Remaining Values (MRV) heuristic.
    """
    unassigned_variables = [var for var in domains if var not in assignment]
    mrv_variable = min(unassigned_variables, key=lambda var: len(domains[var]))
    return mrv_variable




def backtracking_search(csp, neighbors, ac3):
    """
    Backtracking search with AC3 or AC4 for constraint propagation.
    """
    assignment = {}

    def backtrack(assignment):

        nodes = csp["nodes"]
        domains = csp["domains"]
        
        if len(assignment) == len(nodes):
            return assignment
        
        var = select_unassigned_variable(assignment, domains)
        
        for value in domains[var]:
            if is_consistent(var, value, assignment, csp, neighbors):
                assignment[var] = value
                
                if ac3:
                    if AC3(csp, neighbors):
                        result = backtrack(assignment)
                        if result:
                            return result
                else:
                    if AC4(csp):
                        result = backtrack(assignment)
                        if result:
                            return result
                
                del assignment[var]
        
        return None

    return backtrack(assignment)



csp = {
    "nodes": nodes,
    "domains": domains,
    "constraints": constraints
}

# TODO: Solve using AC3 algorithm

print("\nBacktracking + AC3 algorithm: \n")
start = time()
solution = backtracking_search(csp, graph, True)
s = time() - start

if solution:
    print("Solution found:", solution)
else:
    print("No solution found!")
    
print("Solution retrieved in: %.4g seconds" % s)


# TODO: Solve using AC4 algorithm

print("\nBacktracking + AC4 algorithm:\n")
start = time()
solution = backtracking_search(csp, graph, False)
s = time() - start

if solution:
    print("Solution found:", solution)
else:
    print("No solution found!")
    
print("Solution retrieved in: %.4g seconds" % s)