#Explanation - Constraint Satisfaction Problems



In this portion of this laboratory, you are to complete the implementation of a general constraint satisfaction problem solver.

We have provided you a basic CSP implementation in the CSP code. The implementation has the Depth-first-search already completed. It even has a basic built in constraint checker. So it will produce the search trees of the kind for DFS with back tracking with basic constraint checking.

However, it doesn't do forward checking or forward checking + singleton propagation!

So your job is to complete:

    forward_checking(state):
and
     
    forward_checking_prop_singleton(state):
Here `state` is an instance of `CSPState`, an object that keeps track of the current variable assignments and domains. These functions are called by the Search algorithm at every node in the search tree. These functions should return `False` at points at which the Domain Reduction Algorithm would backtrack, and `True` otherwise (i.e. continue extending).

As a hint, here is the (unrefined) pseudocode for the two algorithms.

## Forward Checking

1.   Let X be the variable currently being assigned.
2.   Let x be the value being assigned to X.
3.   Find all the binary constraints that are associated with X.
4.   For each constraint:
      1.   Let Y be the variable connected to X by that binary constraint.
      2.   For each variable value y in Y's domain
           1. If constraint checking fails for X=x and Y=y
                 1. Remove y from Y's domain
           2. If the domain of Y is reduced down to the empty set, then the entire check fails: return False.
5.   If all constraints passed declare success, return True

If you get a state with no current variable assignment (at the Root of the search tree) then you should just return True, since forward checking could only be applied when there is some variable assignment.

## Forward Checking with Propagation through Singletons

1.   Run forward checking, fail if forward checking fails.
2.   Find variables with domains of size 1.
3.   Create a queue of singleton variables.
4.   While singleton queue is not empty
      1. Pop off the first singleton variable X (add X to list of visited singletons)
      2. Find all the binary constraints that singleton X is associated with.
      3. For each constraint therein:
           1. Let Y be the variable connected to X by that binary constraint
                1. For each value of y in Y's domain:
                    1. If constraint check fails for X = (X's singleton value) and Y = y:
                        1. Remove y from Y's domain
                    2. If the domain of Y is reduced down to the empty set, then the entire check fails, return False.
      4. Check to see if domain reduction produced any new and unvisited singletons; if so, add them to the queue.
5. return True.

## API

These are some useful functions defined in the CSP code that you should use in your code to implement the above algorithms:

`CSPState`: representation of one of the many possible search states in the CSP problem.
* `get_current_variable()` - gets the Variable instance being currently assigned. Returns `None` if we are in the root state, when there are no variable assignments yet.
* `get_constraints_by_name(variable_name)` - retrieves all the `BinaryConstraint` objects associated with `variable_name`.
* `get_variable_by_name(variable_name)` - retrieves the `Variable` object associated with `variable_name`.
* `get_all_variables()` - gets the list of all `Variable` objects in this CSP problem.

`Variable`: representation of a variable in these problems.
* `get_name()` - returns the name of this variable.
* `get_assigned_value()` - returns the **assigned** value of this variable. **Returns `None` if `is_assigned()` returns `False`, that is if the variable hasn't been assigned yet.**
* `is_assigned()` - returns `True` if we've made an assignment for this variable.
* `get_domain()` - returns a copy of the list of the current domain of this variable. Use this to iterate over values of Y.

**You might want to consider using this method to get the singular value of a variable with domain size reduced to 1.**
* `reduce_domain(value)` - remove `value` from this variable's domain.
* `domain_size()` - returns the size of this variable's domain

`BinaryConstraint`: a binary constraint on variable i, j: i -> j.
* `get_variable_i_name()` - name of the i variable
* `get_variable_j_name()` - name of the j variable
* `check(state, value_i=value, value_j=value)` - checks the binary constraint for a given CSP state, with variable i set by value i, and variable j set by value j. Returns `False` if the constraint fails. Raises an exception if `value_i` or `value_j` are not set or cannot be inferred from state.

NOTE: in our implementation of CSPs, constraints are symmetrical; a constraint object exists for each "direction" of a constraint, so you can check for the presence of a constraint by substituting for i and/or j in the most convenient fashion for you.

Here is how you might use the API to get the value of a variable currently being assigned.

    var = state.get_current_variable()
    value = None
    if var is not None: # we are not in the root state
       value = var.get_assigned_value()
       # Here value is the value of the variable current being assigned.
Here is how you might use the API to get the singular value from a singleton variable:
         
    if singleton_var.domain_size() == 1
    value = singleton_var.get_domain()[0]
    
## Testing

For unit testing, we have provided the  Moose CSP code, an implementation of the seating problem involving a Moose, Palin, McCain, Obama, Biden, and You -- in terms of the framework as defined in the CSP code.

Running:

    solve_csp_problem(moose_csp_problem, checker, verbose=True)
will return the search tree for DFS with constraint checking. When you have finished your implementation, running `solve_csp_problem(moose_csp_problem, checker=forward_checking, verbose=True)` or `(moose_csp_problem, checker=forward_checking_prop_singleton, verbose=True)` should return the correct search trees under forward checking and forward checking with singleton propagation.

Similarly

Running:
      
    solve_csp_problem(map_coloring_csp_problem, checker, verbose=True)
Should return the expected search trees for the B,Y,R, state coloring problem.








#CSP

In [None]:
"""
A General Constraint Satisfaction Problem Solver
"""
class Variable:
    """
    Representation of a discrete variable with a finite domain.
    As used in our VD table.
    A variable can be in the assigned state, in which v.is_assigned()
    will return true.
    """
    def __init__(self, name, domain, value=None):
        self._name = name
        self._domain = domain[:]
        self._value = value

    def copy(self):
        return Variable(self._name, self._domain, self._value)

    def get_name(self):
        return self._name
    
    def reduce_domain(self, value):
        self._domain.remove(value)

    def domain_size(self):
        return len(self._domain)

    def get_domain(self):
        return self._domain[:]
    
    def is_assigned(self):
        return self._value is not None

    def get_assigned_value(self):
        return self._value
    
    def set_value(self, value):
        self._value = value
        
    def __str__(self):
        buf = "%s(%s)" %(self._name, self._domain)
        if self._value is not None:
            buf += ": %s" %(self._value)
        return buf
        
class BinaryConstraint:
    """
    Representation of a binary-constraint on two variables variable i and
    variable j.
    """
    def __init__(self, var_i_name, var_j_name, check_func, description=None):
        """
        * var_i_name, var_j_name are the names of the variables.
        * check_func is a function that takes four arguments value_i and
        value_j. var_name_i, var_name_j
        Example. lambda i,j,name_i,name_j: i < j and returns true if
        the values passes the constraint, false otherwise.
        * description is a string descriptor of the constraint (helpful
        to determine what constraints triggered a search failure.
        """
        self.var_i_name = var_i_name
        self.var_j_name = var_j_name
        self.check_func = check_func
        self.description = description

    def get_variable_i_name(self):
        return self.var_i_name

    def get_variable_j_name(self):
        return self.var_j_name
    
    def check(self, state, value_i=None, value_j=None):
        """
        state is the csp state and should be an instance of
        CSPState.
        value_i and value_j are the values assigned to variable
        i and j respectively.   If they are not provided, the they are
        fetched from the state by looking up variable_i and variable_j's
        names.
        """
        variable_i = state.get_variable_by_name(self.var_i_name)
        if value_i is None and variable_i is not None:
            value_i = variable_i.get_assigned_value()

        variable_j = state.get_variable_by_name(self.var_j_name)            
        if value_j is None and variable_j is not None:
            value_j = variable_j.get_assigned_value()
            
        if value_i is not None and value_j is not None:
            return self.check_func(value_i, value_j,
                                   self.var_i_name, self.var_j_name)
        else:
            raise Exception("neither value_i nor value_j are set")

        # if values of i or j are not set, we really can't check
        # this constraint.  So the check passes.
        return True

    def __repr__(self):
        return self.__str__()
    
    def __str__(self):
        name = "BinaryConstraint(%s, %s)" %(self.get_variable_i_name(),
                                            self.get_variable_j_name())
        if self.description is not None:
            name += " : %s" %(self.description)
        return name
        
class CSPState:
    """
    Representation of a single state in the CSP search tree.  One can
    think of this as the encapsulation of the Variable-domain (VD) table.
    """
    def __init__(self,
                 constraint_map,
                 variable_map,
                 variable_order,
                 variable_index):
        """
        constraint_map - a dictionary of variable names to
                         lists of associated constraints
        variable_map - a dictionary of variable names to
                       variable objects
        variable_order - the ordering in which variables are assigned
                       values are the names of variables
        variable_index - the position into the variable_order in which
                       we are currently making an assignment.
        """
        self.constraint_map = constraint_map
        self.variable_map = variable_map
        self.variable_order = variable_order
        self.variable_index = variable_index

    def copy(self):
        """
        Make a complete deep copy of this state; this should be
        done so that modifications to the VD table is only transmitted
        to children but not siblings (in the search tree).
        """
        # make a deep copy of the variable map.
        new_variable_map = {}
        for var_name, variable in self.variable_map.items():
            new_variable_map[var_name] = variable.copy()
        new_state = CSPState(self.constraint_map,
                             new_variable_map,
                             self.variable_order,
                             self.variable_index)
        return new_state

    def get_constraints_by_name(self, variable_name):
        """
        List only constraints associated with variable_name
        (where variable_name is variable_i in the constraint)
        """
        constraints = []
        for key, val in self.constraint_map.items():
            v_i, v_j = key
            if v_i == variable_name:
                constraints += val
        return constraints
    
    def get_all_constraints(self):
        """
        List all the constraints in this problem
        """
        constraints = []
        for key, val in self.constraint_map.items():
            constraints += val
        return constraints

    def get_all_variables(self):
        """
        List all the variable objects in this problem
        """
        variables = []
        for name in self.variable_order:
            variables.append(self.variable_map[name])
        return variables
        
    def get_current_variable_name(self):
        """
        Get the name of the variable currently being assigned.
        This function will returns None when in the root/initial state.
        """
        if self.variable_index >= 0:
            return self.get_variable_by_index(self.variable_index).get_name()
        else:
            return None

    def get_current_variable(self):
        """
        Get variable (object) currently being assigned.
        """
        if self.variable_index >= 0:
            return self.get_variable_by_index(self.variable_index)
        else:
            return None
        
    def set_variable_by_index(self, variable_index, variable_value):
        """
        assign variable (given index) the variable_value
        """
        variable = self.get_variable_by_index(variable_index)
        if variable is not None:
            variable.set_value(variable_value)
            self.variable_index = variable_index
        
    def get_variable_by_index(self, index):
        """
        fetch the index(th) variable object
        """
        if index >= 0 and index < len(self.variable_order):
            return self.variable_map[self.variable_order[index]]
        return None

    def get_variable_by_name(self, name):
        """
        fetch a variable object by the variable name
        """
        if name in self.variable_map:
            return self.variable_map[name]
        return None

    def is_solution(self):
        """
        Check if this csp state is a solution.
        Note we assume that constraint checking has been done
        on this state.  This merely checks if all the variables
        have an assignment
        """
        for var in self.variable_map.values():
            if not var.is_assigned():
                return False
        return True

    def solution(self):
        """
        return the set of tuples (var-name, var-value) for
        all the assigned variables
        """
        assignment = []
        for varname in self.variable_order:
            vnode = self.get_variable_by_name(varname)
            if vnode.is_assigned():
                assignment.append((vnode.get_name(),
                    vnode.get_assigned_value()))
        return assignment
    
    def __str__(self):
        return self.vd_table()
    
    def vd_table(self):
        """
        Output the vd table as a string for debugging.
        """
        buf = ""
        for var_name in self.variable_order:
            var = self.variable_map[var_name]
            if var.is_assigned():
                buf += "%s | %s*\n" %(var.get_name(),
                      var.get_assigned_value())
            else:
                buf += "%s | %s\n" %(var.get_name(),
                                     var.get_domain())
        return buf


def basic_constraint_checker(state, verbose=False):
    """
    Basic constraint checker used to check at every assignment
    whether the assignment passes all the constraints
    """
    constraints = state.get_all_constraints()
    for constraint in constraints:
        var_i = state.get_variable_by_name(constraint.get_variable_i_name())
        var_j = state.get_variable_by_name(constraint.get_variable_j_name())

        if not var_i.is_assigned() or not var_j.is_assigned():
            continue

        if not constraint.check(state):
            if verbose:
                print("CONSTRAINT-FAILS: %s" %(constraint))
            return False
    return True

class CSP:
    """
    Top-level wrapper object that encapsulates all the
    variables and constraints of a CSP problem
    """
    def __init__(self, constraints, variables):
        # Step 1: generate a constraint map, a mapping of pairs of
        # variable names to defined constraints on that pair.
        self.constraint_map = {}
        for constraint in constraints:
            i = constraint.get_variable_i_name()
            j = constraint.get_variable_j_name()
            tup = (i, j)
            if tup not in self.constraint_map:
                lst = []
                self.constraint_map[tup] = lst
            else:
                lst = self.constraint_map[tup]
            lst.append(constraint)

        # Step 2: generate the variable map, 
        self.variable_map = {}
        self.variable_order = []
        for var in variables:
            self.variable_map[var.get_name()] = var
            self.variable_order.append(var.get_name())

    def initial_state(self):
        """
        Returns the starting state of the CSP with no variables assigned.
        """
        return CSPState(self.constraint_map, self.variable_map,
                        self.variable_order, -1)

    def solve(self,
              constraint_checker=basic_constraint_checker,
              verbose=False):
        """
        Perform a depth-first search with backtracking to solve
        This CSP problem.

        The constraint_checker is a function that performs constraint-checking
        propagation on a CSPState.  By default the checker does
        basic constraint checking (without propagation).
        
        returns the solution state, and the search tree.
        """
        initial_state = self.initial_state()
        search_root = Node("ROOT", initial_state)
        agenda = [search_root]

        step = 0
        while len(agenda) > 0:
            cur_node = agenda.pop(0)
            state = cur_node.value        
            cur_node.step = step
            
            if verbose:
                print("-"*20)
                print("%d. EXAMINING:\n%s" %(step, state.vd_table()))
        
            if not constraint_checker(state, verbose):
                if verbose:
                    print("%d. FAIL:\n%s" %(step, state.vd_table()))
                cur_node.status = Node.FAILED
                step += 1
                continue

            if state.is_solution():
                cur_node.status = Node.SOLUTION
                if verbose:
                    print("%d. SOLUTION:\n%s" %(step, state.vd_table()))
                return state, search_root

            cur_node.status = Node.CONTINUE
            if verbose:
                print("%d. CONTINUE:\n%s" %(step, state.vd_table()))
        

            next_variable_index = state.variable_index + 1
            next_variable = state.get_variable_by_index(next_variable_index)
            values = next_variable.get_domain()

            children = []
            for value in values:
                new_state = state.copy()
                new_state.set_variable_by_index(next_variable_index, value)
                children.append(Node(str(value), new_state))

            cur_node.add_children(children)
            agenda = children + agenda
            step += 1
            
        # fail! no solution
        return None, search_root
    
class Node:
    """
    A tree node that csp.solve() uses/returns that keeps track of the CSP
    search tree.
    """
    UNEXTENDED = "u"
    FAILED = "f"
    CONTINUE = "c"
    SOLUTION = "*"
    
    def __init__(self, label, value):
        self.label = label
        self.status = Node.UNEXTENDED
        self.value = value
        self.step = '-'
        self.children = []

    def add_children(self, children):
        self.children += children
        
    def __str__(self):
        return self.label
            
    def tree_to_string(self, node, depth=0):
        pad = depth*4*" "
        current_var = node.value.get_current_variable_name()    
        if current_var is not None:
            buf = "%s%s=%s(%s,%s)\n" %(pad,
                                       current_var,
                                       node.label,
                                       node.status,
                                       node.step)
        else:
            buf = "%s%s\n" %(pad, node.label)
            
        for child in node.children:
            buf += self.tree_to_string(child, depth+1)
        return buf

    
def simple_csp_problem():
    """
    Formulation of a simple CSP problem that attempts to find
    an assignment to 4 variables: A,B,C,D.  With the constraint that
    A < B < C < D.
    """
    variables = []
    domain = [1, 2, 3, 4]
    
    variables.append(Variable("A", domain))
    variables.append(Variable("B", domain))
    variables.append(Variable("C", domain))
    variables.append(Variable("D", domain))

    constraints = []

    def less_than(val_a, val_b, name_a=None, name_b=None):
        return val_a < val_b
    
    constraints.append(BinaryConstraint("A", "B", less_than, "A < B"))
    constraints.append(BinaryConstraint("B", "C", less_than, "B < C"))
    constraints.append(BinaryConstraint("C", "D", less_than, "C < D"))

    def not_equal(val_a, val_b, name_a=None, name_b=None):
        return val_a != val_b
    
    constraints.append(BinaryConstraint("A", "B", not_equal, "A != B"))
    constraints.append(BinaryConstraint("B", "C", not_equal, "B != C"))
    constraints.append(BinaryConstraint("C", "D", not_equal, "C != D"))
    constraints.append(BinaryConstraint("A", "D", not_equal, "A != D"))
    return CSP(constraints, variables)

def solve_csp_problem(problem, checker, verbose=False):
    """
    problem is a function that returns a CSP object that we can solve.
    checker is a function that implements the contraint checking.
    variable_order_cmp is a comparator for ordering the variables.
    """
    csp = problem()
    answer, search_tree = csp.solve(checker, verbose=verbose)

    if verbose:
        if answer is not None:
            print("ANSWER: %s" %(answer.solution()))
        else:
            print("NO SOLUTION FOUND")
        if search_tree is not None:
            print("TREE:\n")
            print(search_tree.tree_to_string(search_tree))
        
    return answer, search_tree
        
if __name__ == "__main__":
    checker = basic_constraint_checker
    #fc_checker = forward_checking    
    #fcps_checker = forward_checking_prop_singleton
    solve_csp_problem(simple_csp_problem, checker, verbose=True)


--------------------
0. EXAMINING:
A | [1, 2, 3, 4]
B | [1, 2, 3, 4]
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

0. CONTINUE:
A | [1, 2, 3, 4]
B | [1, 2, 3, 4]
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

--------------------
1. EXAMINING:
A | 1*
B | [1, 2, 3, 4]
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

1. CONTINUE:
A | 1*
B | [1, 2, 3, 4]
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

--------------------
2. EXAMINING:
A | 1*
B | 1*
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

CONSTRAINT-FAILS: BinaryConstraint(A, B) : A < B
2. FAIL:
A | 1*
B | 1*
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

--------------------
3. EXAMINING:
A | 1*
B | 2*
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

3. CONTINUE:
A | 1*
B | 2*
C | [1, 2, 3, 4]
D | [1, 2, 3, 4]

--------------------
4. EXAMINING:
A | 1*
B | 2*
C | 1*
D | [1, 2, 3, 4]

CONSTRAINT-FAILS: BinaryConstraint(B, C) : B < C
4. FAIL:
A | 1*
B | 2*
C | 1*
D | [1, 2, 3, 4]

--------------------
5. EXAMINING:
A | 1*
B | 2*
C | 2*
D | [1, 2, 3, 4]

CONSTRAINT-FAILS: BinaryConstraint(B, C) : B < C
5. FAIL:
A | 1*
B

# **To be implemented code**

In [None]:

import math



def forward_checking(state, verbose = False):
    # Before running Forward checking we must ensure
    # that constraints are okay for this state.
    basic = basic_constraint_checker(state, verbose)
    if not basic:
        return False

    # Add your forward checking logic here.
    
    # Let X be the variable currently being assigned
    X = state.get_current_variable()
         
    # Let x be the value being assigned to X
    x = None
    if X is not None: # we are not in the root state
        x = X.get_assigned_value()
    else:
        return True
    
    #Find all the binary constraints that are associated with X.
    listConstraints = state.get_constraints_by_name(X.get_name())
    
    for constraint in listConstraints:
        
        #Let Y be the variable connected to X by that binary constraint
        if (constraint.get_variable_i_name() != X.get_name() ):
            Y = state.get_variable_by_name(constraint.get_variable_i_name())
        else:
            Y = state.get_variable_by_name(constraint.get_variable_j_name())
            
        # For each variable value y in Y's domain
        for y in Y.get_domain():
            # If constraint checking fails for X=x and Y=y remove y from domain
            if not constraint.check(state, x, y):
                Y.reduce_domain(y)
                # If the domain of Y is reduced down to the empty set, then the
                # entire check fails: return False.
                if Y.domain_size() == 0:
                    return False
    # If all constraints passed declare success, return True
    return True
                

# Now Implement forward checking + (constraint) propagation through
# singleton domains.
def forward_checking_prop_singleton(state, verbose = False):
    # Run forward checking first.
    fc_checker = forward_checking(state, verbose)
    if not fc_checker:
        return False

    # Add your propagate singleton logic here.
           
    # Create a queue of singleton variables (domains of size 1)
    listSingletons = [s for s in state.get_all_variables() if s.domain_size() == 1]
    
    visitedSingletons = []
    while len(listSingletons) != 0:
        X = listSingletons.pop(0)
        x = X.get_domain()[0]
            
        # Find all the binary constraints that singleton X is associated with
        listConstraints = state.get_constraints_by_name(X.get_name())
        
        for constraint in listConstraints:
            
            #Let Y be the variable connected to X by that binary constraint
            if (constraint.get_variable_i_name() != X.get_name()):
                Y = state.get_variable_by_name(constraint.get_variable_i_name())
            else:
                Y = state.get_variable_by_name(constraint.get_variable_j_name()) 
            
            # For each variable value y in Y's domain
            for y in Y.get_domain():
                # If constraint checking fails for X=x and Y=y 
                # remove y from domain
                if not constraint.check(state, x, y):
                    Y.reduce_domain(y)
                    # If the domain of Y is reduced down to the empty set,
                    # then the entire check fails: return False.
                    if Y.domain_size() == 0:
                        return False
            
            visitedSingletons.append(X)  
            # Check to see if domain reduction produced any new and
            # unvisited singletons; if so, add them to the queue
            for var in state.get_all_variables():
                if var.domain_size() == 1 and var not in listSingletons and \
                var not in visitedSingletons:
                    listSingletons.append(var)
        
    return True

# Moose CSP

In [None]:
"""
Implementation of the Moose/Obama/Biden/McCain/Palin problem.
"""
import sys

def moose_csp_problem():
    constraints = []

    # We start with the reduced domain.
    # So the constraint that McCain must sit in seat 1 is already
    # covered.
    variables = []
    variables.append(Variable("1", ["Mc"]))
    variables.append(Variable("2", ["Y", "M", "P"]))
    variables.append(Variable("3", ["Y", "M", "O", "B"]))
    variables.append(Variable("4", ["Y", "M", "O", "B"]))
    variables.append(Variable("5", ["Y", "M", "O", "B"]))
    variables.append(Variable("6", ["Y", "M", "P"]))

    # these are all variable pairing of adjacent seats
    adjacent_pairs = [("1", "2"), ("2", "1"),
                      ("2", "3"), ("3", "2"),
                      ("3", "4"), ("4", "3"),
                      ("4", "5"), ("5", "4"),
                      ("5", "6"), ("6", "5"),
                      ("6", "1"), ("1", "6")]
    # now we construct the set of non-adjacent seat pairs.
    nonadjacent_pairs = []
    variable_names = ["1", "2", "3", "4", "5", "6"]
    for x in range(len(variable_names)):
        for y in range(x,len(variable_names)):
            if x == y:
                continue
            tup = (variable_names[x], variable_names[y])
            rev = (variable_names[y], variable_names[x])
            if tup not in adjacent_pairs:
                nonadjacent_pairs.append(tup)
            if rev not in adjacent_pairs:
                nonadjacent_pairs.append(rev)

    # all pairs is the set of all distinct seating pairs
    # this list is useful for checking where
    # the two seat are assigned to the same person.
    all_pairs = adjacent_pairs + nonadjacent_pairs

    # 1. The Moose is afraid of Palin
    def M_not_next_to_P(val_a, val_b, name_a, name_b):
        if (val_a == "M" and val_b == "P") or (val_a == "P" and val_b == "M"):
            return False
        return True
    for pair in adjacent_pairs:
        constraints.append(BinaryConstraint(pair[0], pair[1], M_not_next_to_P,
                                            "Moose can't be next to Palin"))

    # 2. Obama and Biden must sit next to each other.
    # This constraint can be directly phrased as:
    #
    #   for all sets of adjacents seats
    #      there must exist one pair where O & B are assigned
    #
    #   C(1,2) or C(2,3) or C(3,4) or ... or C(6,1)
    #
    # where C is a binary constraint that checks
    # whether the value of the two variables have values O and B
    #
    # However the way our checker works, the constraint needs to be
    # expressed as a big AND.
    # So that when any one of the binary constraints
    # fails the entire assignment fails.
    #
    # To turn our original OR formulation to an AND:
    # We invert the constraint condition as:
    #
    #  for all sets of nonadjacent seats
    #     there must *not* exist a pair where O & B are assigned.
    #
    #  not C(1,3) and not C(1,4) and not C(1,5) ... not C(6,4)
    #
    # Here C checks whether the values assigned are O and B.
    #
    # Finally, this is an AND of all the binary constraints as required.

    def OB_not_next_to_each_other(val_a, val_b, name_a, name_b):
        if (val_a == "O" and val_b == "B") or \
                (val_a == "B" and val_b == "O"):
            return False
        return True
    
    for pair in nonadjacent_pairs:
        constraints.append(
            BinaryConstraint(pair[0], pair[1],
                             OB_not_next_to_each_other,
                             "Obama, Biden must be next to each-other"))

    # 3. McCain and Palin must sit next to each other
    def McP_not_next_to_each_other(val_a, val_b, name_a, name_b):
        if (val_a == "P" and val_b == "Mc") or (val_a == "Mc" and val_b == "P"):
            return False
        return True

    for pair in nonadjacent_pairs:
        constraints.append(
            BinaryConstraint(pair[0], pair[1],
                             McP_not_next_to_each_other,
                             "McCain and Palin must be next to each other"))

    # 4. Obama + Biden can't sit next to Palin or McCain
    def OB_not_next_to_McP(val_a, val_b, name_a, name_b):
        if ((val_a == "O" or val_a == "B") \
                and (val_b == "Mc" or val_b == "P")) or \
                ((val_b == "O" or val_b == "B") \
                     and (val_a == "Mc" or val_a == "P")):
            return False
        return True
    for pair in adjacent_pairs:
        constraints.append(
            BinaryConstraint(pair[0], pair[1],
                             OB_not_next_to_McP,
                             "McCain, Palin can't be next to Obama, Biden"))

    # No two seats can be occupied by the same person
    def not_same_person(val_a, val_b, name_a, name_b):
        return val_a != val_b
    for pair in all_pairs:
        constraints.append(
            BinaryConstraint(pair[0], pair[1],
                             not_same_person,
                             "No two seats can be occupied by the same person"))
    return CSP(constraints, variables)

checker_type = "fcps" #change here
        
if checker_type == "dfs":
    checker = basic_constraint_checker
elif checker_type == "fc":
    checker = forward_checking
elif checker_type == "fcps":
    checker = forward_checking_prop_singleton
else:
    checker = basic_constraint_checker

solve_csp_problem(moose_csp_problem, checker, verbose=True)


--------------------
0. EXAMINING:
1 | ['Mc']
2 | ['Y', 'M', 'P']
3 | ['Y', 'M', 'O', 'B']
4 | ['Y', 'M', 'O', 'B']
5 | ['Y', 'M', 'O', 'B']
6 | ['Y', 'M', 'P']

0. CONTINUE:
1 | ['Mc']
2 | ['Y', 'M', 'P']
3 | ['Y', 'M', 'O', 'B']
4 | ['Y', 'M', 'O', 'B']
5 | ['Y', 'M', 'O', 'B']
6 | ['Y', 'M', 'P']

--------------------
1. EXAMINING:
1 | Mc*
2 | ['Y', 'M', 'P']
3 | ['Y', 'M', 'O', 'B']
4 | ['Y', 'M', 'O', 'B']
5 | ['Y', 'M', 'O', 'B']
6 | ['Y', 'M', 'P']

1. CONTINUE:
1 | Mc*
2 | ['Y', 'M', 'P']
3 | ['Y', 'M', 'O', 'B']
4 | ['Y', 'M', 'O', 'B']
5 | ['Y', 'M', 'O', 'B']
6 | ['Y', 'M', 'P']

--------------------
2. EXAMINING:
1 | Mc*
2 | Y*
3 | ['Y', 'M', 'O', 'B']
4 | ['Y', 'M', 'O', 'B']
5 | ['Y', 'M', 'O', 'B']
6 | ['Y', 'M', 'P']

2. CONTINUE:
1 | Mc*
2 | Y*
3 | ['M', 'O', 'B']
4 | ['M', 'O', 'B']
5 | ['M', 'O', 'B']
6 | ['M', 'P']

--------------------
3. EXAMINING:
1 | Mc*
2 | Y*
3 | M*
4 | ['M', 'O', 'B']
5 | ['M', 'O', 'B']
6 | ['M', 'P']

3. FAIL:
1 | Mc*
2 | Y*
3 | M*
4 | ['O'

(<__main__.CSPState at 0x7f0aa1d08c50>, <__main__.Node at 0x7f0aa1d16b90>)

# Map coloring

In [None]:
"""
Implementation of the Map coloring problem.
"""
import sys

def map_coloring_csp_problem():
    constraints = []

    variables = []
    # order of the variables here is the order given in the problem
    variables.append(Variable("MA", ["B"]))
    variables.append(Variable("TX", ["R"]))
    variables.append(Variable("NE", ["R", "B", "Y"]))
    variables.append(Variable("OV", ["R", "B", "Y"]))
    variables.append(Variable("SE", ["R", "B", "Y"]))
    variables.append(Variable("GL", ["R", "B", "Y"]))
    variables.append(Variable("MID",["R", "B", "Y"]))
    variables.append(Variable("MW", ["R", "B", "Y"]))
    variables.append(Variable("SO", ["R", "B"]))
    variables.append(Variable("NY", ["R", "B"]))
    variables.append(Variable("FL", ["R", "B"]))

    # these are all variable pairing of adjacent seats
    edges = [("NE", "NY"),
             ("NE", "MA"),
             ("MA", "NY"),
             ("GL", "NY"),
             ("GL", "OV"),
             ("MID", "NY"),
             ("OV", "NY"),
             ("OV", "MID"),
             ("MW", "OV"),
             ("MW", "TX"),
             ("TX", "SO"),
             ("SO", "OV"),
             ("SO", "FL"),
             ("FL", "SE"),
             ("SE", "MID"),
             ("SE", "SO")]
    # duplicate the edges the other way.
    all_edges = []
    for edge in edges:
        all_edges.append((edge[0], edge[1]))
        all_edges.append((edge[1], edge[0]))
        
    forbidden = [("R", "B"), ("B", "R"), ("Y", "Y")]
    
    # not allowed constraints:
    def forbidden_edge(val_a, val_b, name_a, name_b):
        if (val_a, val_b) in forbidden or (val_b, val_a) in forbidden:
            return False
        return True

    for pair in all_edges:
        constraints.append(
            BinaryConstraint(pair[0], pair[1],
                             forbidden_edge,
                             "R-B, B-R, Y-Y edges are not allowed"))

    return CSP(constraints, variables)

checker_type = "fcps" #change here
        
if checker_type == "dfs":
    checker = basic_constraint_checker
elif checker_type == "fc":
    checker = forward_checking
elif checker_type == "fcps":
    checker = forward_checking_prop_singleton
else:
    checker = forward_checking_prop_singleton

solve_csp_problem(map_coloring_csp_problem, checker, verbose=True)


--------------------
0. EXAMINING:
MA | ['B']
TX | ['R']
NE | ['R', 'B', 'Y']
OV | ['R', 'B', 'Y']
SE | ['R', 'B', 'Y']
GL | ['R', 'B', 'Y']
MID | ['R', 'B', 'Y']
MW | ['R', 'B', 'Y']
SO | ['R', 'B']
NY | ['R', 'B']
FL | ['R', 'B']

0. CONTINUE:
MA | ['B']
TX | ['R']
NE | ['R', 'B', 'Y']
OV | ['R', 'B', 'Y']
SE | ['R', 'B', 'Y']
GL | ['R', 'B', 'Y']
MID | ['R', 'B', 'Y']
MW | ['R', 'B', 'Y']
SO | ['R', 'B']
NY | ['R', 'B']
FL | ['R', 'B']

--------------------
1. EXAMINING:
MA | B*
TX | ['R']
NE | ['R', 'B', 'Y']
OV | ['R', 'B', 'Y']
SE | ['R', 'B', 'Y']
GL | ['R', 'B', 'Y']
MID | ['R', 'B', 'Y']
MW | ['R', 'B', 'Y']
SO | ['R', 'B']
NY | ['R', 'B']
FL | ['R', 'B']

1. CONTINUE:
MA | B*
TX | ['R']
NE | ['B', 'Y']
OV | ['R', 'B', 'Y']
SE | ['R', 'B', 'Y']
GL | ['R', 'B', 'Y']
MID | ['R', 'B', 'Y']
MW | ['R', 'B', 'Y']
SO | ['R', 'B']
NY | ['B']
FL | ['R', 'B']

--------------------
2. EXAMINING:
MA | B*
TX | R*
NE | ['B', 'Y']
OV | ['R', 'B', 'Y']
SE | ['R', 'B', 'Y']
GL | ['R', 'B', 'Y'

(<__main__.CSPState at 0x7f0aa1d0f990>, <__main__.Node at 0x7f0aa1dad510>)

# Tester

In [None]:
from xmlrpc import client
import traceback
import sys
import os
import tarfile

try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO
    
def csp_solver_tree(problem, checker):
    problem_func = globals()[problem]
    checker_func = globals()[checker]
    answer, search_tree = problem_func().solve(checker_func)
    return search_tree.tree_to_string(search_tree)

# This is a skeleton for what the tester should do. Ideally, this module
# would be imported in the pset and run as its main function. 

# We need the following rpc functions. (They generally take username and
# password, but you could adjust this for whatever security system.)
#
# tester.submit_code(username, password, pset, studentcode)
#   'pset' is a string such as 'ps0'. studentcode is a string containing
#   the contents of the corresponding file, ps0.py. This stores the code on
#   the server so we can check it later for cheating, and is a prerequisite
#   to the tester returning a grade.
#
# tester.get_tests(pset)
#   returns a list of tuples of the form (INDEX, TYPE, NAME, ARGS):
#     INDEX is a unique integer that identifies the test.
#     TYPE should be one of either 'VALUE' or 'FUNCTION'.
#     If TYPE is 'VALUE', ARGS is ignored, and NAME is the name of a
#     variable to return for this test.  The variable must be an attribute
#     of the lab module.
#     If TYPE is 'FUNCTION', NAME is the name of a function in the lab module
#     whose return value should be the answer to this test, and ARGS is a
#     tuple containing arguments for the function.
#
# tester.send_answer(username, password, pset, index, answer)
#   Sends <answer> as the answer to test case <index> (0-numbered) in the pset
#   named <pset>. Returns whether the answer was correct, and an expected
#   value.
#
# tester.status(username, password, pset)
#   A string that includes the official score for this user on this pset.
#   If a part is missing (like the code), it should say so.

# Because I haven't written anything on the server side, test_online has never
# been tested.

def test_summary(dispindex, ntests, testname):
    return "Test %d/%d (%s)" % (dispindex, ntests, testname)
  
tests = []

def show_result(testsummary, testcode, correct, got, expected, verbosity):
    """ Pretty-print test results """
    if correct:
        if verbosity > 0:
            print("%s: Correct." % testsummary)
        if verbosity > 1:
            print('\t', testcode)
            print()
    else:
        print("%s: Incorrect." % testsummary)
        print('\t', testcode)
        print("Got:     ", got)
        print("Expected:", expected)

def show_exception(testsummary, testcode):
    """ Pretty-print exceptions (including tracebacks) """
    print("%s: Error." % testsummary)
    print("While running the following test case:")
    print('\t', testcode)
    print("Your code encountered the following error:")
    traceback.print_exc()
    print()


def get_lab_module():
    # Try the easy way first
    try:
        from tests import lab_number
    except ImportError:
        lab_number = None
        
    if lab_number != None:
        lab = __import__('lab%s' % lab_number)
        return lab
        
    lab = None

    for labnum in range(10):
        try:
            lab = __import__('lab%s' % labnum)
        except ImportError:
            pass

    if lab == None:
        raise ImportError("Cannot find your lab; or, error importing it.  Try loading it by running 'python labN.py' (for the appropriate value of 'N').")

    if not hasattr(lab, "LAB_NUMBER"):
        lab.LAB_NUMBER = labnum
    
    return lab

def type_decode(arg, lab):
    """
    XMLRPC can only pass a very limited collection of types.
    Frequently, we want to pass a subclass of 'list' in as a test argument.
    We do that by converting the sub-type into a regular list of the form:
    [ 'TYPE', (data) ] (ie., AND(['x','y','z']) becomes ['AND','x','y','z']).
    This function assumes that TYPE is a valid attr of 'lab' and that TYPE's
    constructor takes a list as an argument; it uses that to reconstruct the
    original data type.
    """
    if isinstance(arg, list) and len(arg) >= 1: # We'll leave tuples reserved for some other future magic
        try:
            mytype = arg[0]
            data = arg[1:]
            return getattr(lab, mytype)([ type_decode(x, lab) for x in data ])
        except AttributeError:
            return [ type_decode(x, lab) for x in arg ]
        except TypeError:
            return [ type_decode(x, lab) for x in arg ]
    else:
        return arg

    
def type_encode(arg):
    """
    Encode trees as lists in a way that can be decoded by 'type_decode'
    """
    if isinstance(arg, list) and not type(arg) in (list,tuple):
        return [ arg.__class__.__name__ ] + [ type_encode(x) for x in arg ]
    elif hasattr(arg, '__class__') and arg.__class__.__name__ == 'IF':
        return [ 'IF', type_encode(arg._conditional), type_encode(arg._action), type_encode(arg._delete_clause) ]
    else:
        return arg

    
def run_test(test, lab):
    """
    Takes a 'test' tuple as provided by the online tester
    (or generated by the offline tester) and executes that test,
    returning whatever output is expected (the variable that's being
    queried, the output of the function being called, etc)

    'lab' (the argument) is the module containing the lab code.
    
    'test' tuples are in the following format:
      'id': A unique integer identifying the test
      'type': One of 'VALUE', 'FUNCTION', 'MULTIFUNCTION', or 'FUNCTION_ENCODED_ARGS'
      'attr_name': The name of the attribute in the 'lab' module
      'args': a list of the arguments to be passed to the function; [] if no args.
      For 'MULTIFUNCTION's, a list of lists of arguments to be passed in
    """
    id, mytype, attr_name, args = test

    attr = getattr(lab, attr_name)

    if mytype == 'VALUE':
        return attr
    elif mytype == 'FUNCTION':
        return apply(attr, args)
    elif mytype == 'MULTIFUNCTION':
        return [ run_test( (id, 'FUNCTION', attr_name, FN), lab) for FN in args ]
    elif mytype == 'FUNCTION_ENCODED_ARGS':
        return run_test( (id, 'FUNCTION', attr_name, type_decode(args, lab)), lab )
    else:
        raise Exception("Test Error: Unknown TYPE '%s'.  Please make sure you have downloaded the latest version of the tester script.  If you continue to see this error, contact a TA.")


def test_offline(verbosity=1):
    """ Run the unit tests in 'tests.py' """
#    import tests as tests_module
    
#    tests = [ (x[:-8],
#               getattr(tests_module, x),
#               getattr(tests_module, "%s_testanswer" % x[:-8]),
#               getattr(tests_module, "%s_expected" % x[:-8]),
#               "_".join(x[:-8].split('_')[:-1]))
#              for x in tests_module.__dict__.keys() if x[-8:] == "_getargs" ]

#    tests = tests_module.get_tests()
    global tests

    ntests = len(tests)
    ncorrect = 0
    
    for index, (testname, getargs, testanswer, expected, fn_name, type) in enumerate(tests):
        dispindex = index+1
        summary = test_summary(dispindex, ntests, fn_name)
        
        try:
            if callable(getargs):
                getargs = getargs()
            
            if type == 'FUNCTION':
                answer = fn_name(*getargs)
            elif type == 'VALUE':
                answer = fn_name
            else:
                answer = [ FN(*getargs) for FN in getargs ]#run_test((index, type, fn_name, getargs), get_lab_module())
        except NotImplementedError:
            print("%d: (%s: Function not yet implemented, NotImplementedError raised)" % (index, testname))
            continue
        except Exception:
            show_exception(summary, testname)
            continue
        
        correct = testanswer(answer, original_val = getargs)
        show_result(summary, testname, correct, answer, expected, verbosity)
        if correct: ncorrect += 1
    
    print("Passed %d of %d tests." % (ncorrect, ntests))
    tests = []
    return ncorrect == ntests

def get_target_upload_filedir():
    """ Get, via user prompting, the directory containing the current lab """
    cwd = os.getcwd() # Get current directory.  Play nice with Unicode pathnames, just in case.
        
    print("Please specify the directory containing your lab.")
    print("Note that all files from this directory will be uploaded!")
    print("Labs should not contain large amounts of data; very-large")
    print("files will fail to upload.")
    print()
    print("The default path is '%s'" % cwd)
    target_dir = raw_input("[%s] >>> " % cwd)

    target_dir = target_dir.strip()
    if target_dir == '':
        target_dir = cwd

    print("Ok, using '%s'." % target_dir)

    return target_dir

def get_tarball_data(target_dir, filename):
    """ Return a binary String containing the binary data for a tarball of the specified directory """
    data = StringIO()
    file = tarfile.open(filename, "w|bz2", data)

    print("Preparing the lab directory for transmission...")
            
    file.add(target_dir+"/lab4.py")
    
    print("Done.")
    print()
    print("The following files have been added:")
    
    for f in file.getmembers():
        print(f.name)
            
    file.close()

    return data.getvalue()
    

def test_online(verbosity=1):
    """ Run online unit tests.  Run them against the 6.034 server via XMLRPC. """
    lab = get_lab_module()

    try:
        server = xmlrpclib.Server(server_url, allow_none=True)
        #print("Getting tests:", (username, password, lab.__name__))
        tests = server.get_tests(username, password, lab.__name__)
        #print("*** TESTS:")
        #print(tests)

    except NotImplementedError: # Solaris Athena doesn't seem to support HTTPS
        print("Your version of Python doesn't seem to support HTTPS, for")
        print("secure test submission.  Would you like to downgrade to HTTP?")
        print("(note that this could theoretically allow a hacker with access")
        print("to your local network to find your 6.034 password)")
        answer = raw_input("(Y/n) >>> ")
        if len(answer) == 0 or answer[0] in "Yy":
            server = xmlrpclib.Server(server_url.replace("https", "http"))
            tests = server.get_tests(username, password, lab.__name__)
        else:
            print("Ok, not running your tests.")
            print("Please try again on another computer.")
            print("Linux Athena computers are known to support HTTPS,")
            print("if you use the version of Python in the 'python' locker.")
            sys.exit(0)
            
    ntests = len(tests)
    ncorrect = 0

    lab = get_lab_module()
    
    target_dir = get_target_upload_filedir()

    tarball_data = get_tarball_data(target_dir, "lab%s.tar.bz2" % lab.LAB_NUMBER)
            
    print("Submitting to the 6.034 Webserver...")

    server.submit_code(username, password, lab.__name__, xmlrpclib.Binary(tarball_data))

    print("Done submitting code.")
    print("Running test cases...")
    
    for index, testcode in enumerate(tests):
        dispindex = index+1
        summary = test_summary(dispindex, ntests, testcode)

        try:
            answer = run_test(testcode, get_lab_module())
        except Exception:
            show_exception(summary, testcode)
            continue

        correct, expected = server.send_answer(username, password, lab.__name__, testcode[0], type_encode(answer))
        show_result(summary, testcode, correct, answer, expected, verbosity)
        if correct: ncorrect += 1
    
    response = server.status(username, password, lab.__name__)
    print(response)



#if __name__ == '__main__':
#    test_offline()
    
def make_test_counter_decorator():
    #tests = []
    def make_test(getargs, testanswer, expected_val, name = None, type = 'FUNCTION'):
        if name != None:
            getargs_name = name
        elif not callable(getargs):
            getargs_name = "_".join(getargs[:-8].split('_')[:-1])
            getargs = lambda: getargs
        else:
            getargs_name = "_".join(getargs.__name__[:-8].split('_')[:-1])
            
        tests.append( ( getargs_name,
                        getargs,
                        testanswer,
                        expected_val,
                        getargs_name,
                        type ) )

    def get_tests():
        return tests

    return make_test, get_tests


make_test, get_tests = make_test_counter_decorator()


# Tests

In [None]:
# This code implements some very rudimentary matrix-like and vector-like operations
# It is used by the tester.
# You are welcome to use these functions for the laboratory implementation as well,
# though this isn't expected to be necessary.

import random
import operator
import math

def test_code():
    try:
        all([])
    except NameError:
        # The all() function was introduced in Python 2.5.
        # Provide our own implementation if it's not available here.
        def all(iterable):
            for element in iterable:
                if not element:
                    return False
            return True


    def unit_vector(vec1, vec2):
        """ Return a unit vector pointing from vec1 to vec2 """
        diff_vector = map(operator.sub, vec2, vec1)
        
        scale_factor = math.sqrt( sum( map( lambda x: x**2, diff_vector ) ) )
        if scale_factor == 0:
            scale_factor = 1 # We don't have an actual vector, it has zero length
        return map(lambda x: x/scale_factor, diff_vector)

    def vector_compare(vec1, vec2, delta):
        """ Compare two vectors
        Confirm that no two corresponding fields differ by more than delta """
        return all( map(lambda x,y: (abs(x-y) < delta), vec1, vec2) )
        
    def validate_euclidean_distance(list1, list2, dist):
        """
        Confirm that the given distance is the Euclidean distance
        between list1 and list2 by establishing a unit vector between
        the two lists and seeing if vec * dist + list1 == list2
        """

        vec = unit_vector(list1, list2)
        target = map(lambda jmp, base: jmp * dist + base, vec, list1)
        return vector_compare(target, list2, 0.01)

    def random_list(length):
        return [ random.randint(1,100) for x in range(length) ]


    import operator
    import math

    # CSP tests


    EXPECTED_FC_MOOSE_TREE = """ROOT
    1=Mc(c,1)
        2=Y(c,2)
            3=M(c,3)
                4=O(c,4)
                    5=B(f,5)
                4=B(c,6)
                    5=O(f,7)
            3=O(c,8)
                4=M(f,9)
                4=B(c,10)
                    5=M(f,11)
            3=B(c,12)
                4=M(f,13)
                4=O(c,14)
                    5=M(f,15)
        2=M(c,16)
            3=Y(c,17)
                4=O(c,18)
                    5=B(f,19)
                4=B(c,20)
                    5=O(f,21)
            3=O(c,22)
                4=Y(f,23)
                4=B(c,24)
                    5=Y(c,25)
                        6=P(*,26)
            3=B(u,-)
        2=P(u,-)
"""

    EXPECTED_FCPS_MOOSE_TREE = """ROOT
    1=Mc(c,1)
        2=Y(c,2)
            3=M(f,3)
            3=O(f,4)
            3=B(f,5)
        2=M(c,6)
            3=Y(f,7)
            3=O(c,8)
                4=B(c,9)
                    5=Y(c,10)
                        6=P(*,11)
            3=B(u,-)
        2=P(u,-)
"""

    def csp_test_1_getargs():
        return [ "moose_csp_problem", "forward_checking" ]

    def csp_test_1_testanswer(val, original_val = None):
        return ( val == EXPECTED_FC_MOOSE_TREE )

    make_test(type = 'FUNCTION',
            getargs = csp_test_1_getargs,
            testanswer = csp_test_1_testanswer,
            expected_val = EXPECTED_FC_MOOSE_TREE,
            name = csp_solver_tree
            )

    def csp_test_2_getargs():
        return [ "moose_csp_problem", "forward_checking_prop_singleton" ]

    def csp_test_2_testanswer(val, original_val = None):
        return ( val == EXPECTED_FCPS_MOOSE_TREE )

    make_test(type = 'FUNCTION',
            getargs = csp_test_2_getargs,
            testanswer = csp_test_2_testanswer,
            expected_val = EXPECTED_FCPS_MOOSE_TREE,
            name = csp_solver_tree
            )
    test_offline()

#**Test your code**

In [None]:
test_code()

Test 1/2 (<function csp_solver_tree at 0x7f0aa1ca8c20>): Correct.
Test 2/2 (<function csp_solver_tree at 0x7f0aa1ca8c20>): Correct.
Passed 2 of 2 tests.
