To install Z3, run the following cell:

In [1]:
!pip install z3-solver

Collecting z3-solver
  Downloading z3_solver-4.13.3.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (602 bytes)
Downloading z3_solver-4.13.3.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (28.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m28.1/28.1 MB[0m [31m31.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: z3-solver
Successfully installed z3-solver-4.13.3.0


You are given several classes for reading the instance data from a file, storing the instance data and returning a solution, as well as some testing facilities:

In [2]:
class AuthorisationConstraint:
    def __init__(self, instance, user, tasks):
        assert user >= 0 and user < instance.m
        assert all(t >= 0 and t < instance.n for t in tasks)

        self.user = user
        self.tasks = tasks

    def is_satisfied(self, solution):
        for task in set(range(solution.instance.n)) - set(self.tasks):
            if solution.assignment[task] == self.user:
                return False
        return True

    def write(self, f):
        f.write('Authorisations u' + str(self.user + 1))
        for t in self.tasks:
            f.write(' t' + str(t + 1))
        f.write('\n')

class BindingOfDutyConstraint:
    def __init__(self, instance, t1, t2):
        assert 0 <= t1 < instance.n
        assert t2 >= 0 and t2 < instance.n

        self.t1 = t1
        self.t2 = t2

    def is_satisfied(self, solution):
        return solution.assignment[self.t1] == solution.assignment[self.t2]

    def write(self, f):
        f.write('Binding-of-duty t%i t%i\n' % (self.t1 + 1, self.t2 + 1))


class SeparationOfDutyConstraint:
    def __init__(self, instance, t1, t2):
        assert 0 <= t1 < instance.n
        assert t2 >= 0 and t2 < instance.n

        self.t1 = t1
        self.t2 = t2

    def is_satisfied(self, solution):
        return solution.assignment[self.t1] != solution.assignment[self.t2]

    def write(self, f):
        f.write('Separation-of-duty t%i t%i\n' % (self.t1 + 1, self.t2 + 1))


class AtMostKConstraint:
    def __init__(self, instance, k, scope):
        assert 0 < k <= instance.n
        assert all(0 <= task <= instance.n for task in scope)

        self.scope = scope
        self.k = k

    def is_satisfied(self, solution):
        return len(set(solution.assignment[task] for task in self.scope)) <= self.k

    def write(self, f):
        f.write('At-most-k %i' % self.k)
        for t in self.scope:
            f.write(' t%i' % (t + 1))
        f.write('\n')


# Reads and stores instance data
class Instance:
    def __init__(self, filename):
        def parse_task(string):
            return int(re.match(r't(\d+)', string).group(1)) - 1

        def parse_user(string):
            return int(re.match(r'u(\d+)', string).group(1)) - 1

        if filename is None:
            return

        with open(filename, 'r') as f:
            import re
            self.n = int(re.match(r'^\s*#Tasks:\s+(\d+)\s*$', f.readline(), re.IGNORECASE).group(1))
            self.m = int(re.match(r'^\s*#Users:\s+(\d+)\s*$', f.readline(), re.IGNORECASE).group(1))

            c = int(re.match(r'^\s*#Constraints:\s+(\d+)\s*$', f.readline(), re.IGNORECASE).group(1))

            self.constraints = []
            for line_index in range(c):
                line = f.readline().strip().lower()
                values = line.split()

                if values[0] == 'authorisations':
                    self.constraints.append(AuthorisationConstraint(
                        self, parse_user(values[1]), [parse_task(t) for t in values[2:]]))

                elif values[0] == 'binding-of-duty':
                    self.constraints.append(BindingOfDutyConstraint(self, parse_task(values[1]), parse_task(values[2])))

                elif values[0] == 'separation-of-duty':
                    self.constraints.append(SeparationOfDutyConstraint(self, parse_task(values[1]), parse_task(values[2])))

                elif values[0] == 'at-most-k':
                    self.constraints.append(AtMostKConstraint(self, int(values[1]), [parse_task(s) for s in values[2:]]))

                else:
                    raise Exception(f'Unknown constraint {values[0]}.')


# Stores a solution to a WSP instance
class Solution:
    def __init__(self, instance, sat):
        self.instance = instance
        self.sat = sat
        self.assignment = [-1]*self.instance.n

    # Use this function to specify that user 'user' is assigned to task 'task'
    def assign_user(self, task, user):
        if task < 0 or task >= self.instance.n:
            raise Exception(f'Task {task} is outside the range.')

        if user < 0 or user >= self.instance.m:
            raise Exception(f'User {user} is outside the range.')

        self.assignment[task] = user


def ensure_instances_downloaded():
    from os.path import exists

    if not exists('instances.zip'):
        print(f'Downloading \'instances.zip\'...')

        url = 'https://people.cs.nott.ac.uk/pszdk/COMP3008/WspInstances.zip'

        from urllib.request import urlopen
        with urlopen(url) as file:
            with open('instances.zip', 'wb') as f:
                f.write(file.read())

        print(f'Unpacking...')
        import zipfile
        with zipfile.ZipFile('instances.zip', 'r') as zip_ref:
            zip_ref.extractall('.')

        print(f'Instances are ready')


def run_test(filename, known_to_be_sat):
    def coloured_print(text, colour):
        from IPython.core.display import display, HTML
        display(HTML(f'<span style=color:{colour}><pre>{text}</pre></span>'))

    ensure_instances_downloaded()

    instance = Instance(filename)

    from time import time as currenttime

    starttime = int(currenttime() * 1000)
    solution = solve(instance)
    endtime = int(currenttime() * 1000)

    def print_test_header(passed):
        coloured_print(f'{filename:<30} {"  (sat)" if known_to_be_sat else "(unsat)"} {endtime - starttime:5.0f} ms {"PASS" if passed else "FAIL"}', 'green' if passed else 'red')

    if solution.sat:
        if len(solution.assignment) != instance.n or not all(0 <= user <= instance.m for user in solution.assignment):
            print_test_header(False)
            print('  assignment of users to tasks is infeasible.')
            return

        broken = [c for c in instance.constraints if not c.is_satisfied(solution)]
        if len(broken) > 0:
            print_test_header(False)
            print('  the solution breaks some constraints:')
            for t in [AuthorisationConstraint, BindingOfDutyConstraint, SeparationOfDutyConstraint, AtMostKConstraint]:
                print(f'    {len([0 for c in broken if isinstance(c, t)])} broken {t.__name__} constraints')
            return

    correct = solution.sat == known_to_be_sat
    if correct:
        print_test_header(True)
    else:
        print_test_header(False)
        print(f'  Expected outcome:  {"sat" if known_to_be_sat else "unsat"}')
        print(f'    Actual outcome:  {"sat" if solution.sat else "unsat"}')


ensure_instances_downloaded()


def test_1_constraint_small():
    run_test('1-constraint-small/0.txt', True)
    run_test('1-constraint-small/1.txt', False)
    run_test('1-constraint-small/2.txt', True)
    run_test('1-constraint-small/3.txt', True)
    run_test('1-constraint-small/4.txt', True)
    run_test('1-constraint-small/5.txt', True)
    run_test('1-constraint-small/6.txt', False)
    run_test('1-constraint-small/7.txt', True)
    run_test('1-constraint-small/8.txt', True)
    run_test('1-constraint-small/9.txt', True)

def test_3_constraint_small():
    run_test('3-constraint-small/0.txt', True)
    run_test('3-constraint-small/1.txt', False)
    run_test('3-constraint-small/2.txt', True)
    run_test('3-constraint-small/3.txt', True)
    run_test('3-constraint-small/4.txt', True)
    run_test('3-constraint-small/5.txt', True)
    run_test('3-constraint-small/6.txt', False)
    run_test('3-constraint-small/7.txt', False)
    run_test('3-constraint-small/8.txt', True)
    run_test('3-constraint-small/9.txt', True)

def test_4_constraint_small():
    run_test('4-constraint-small/0.txt', True)
    run_test('4-constraint-small/1.txt', False)
    run_test('4-constraint-small/2.txt', True)
    run_test('4-constraint-small/3.txt', False)
    run_test('4-constraint-small/4.txt', True)
    run_test('4-constraint-small/5.txt', True)
    run_test('4-constraint-small/6.txt', True)
    run_test('4-constraint-small/7.txt', False)
    run_test('4-constraint-small/8.txt', True)
    run_test('4-constraint-small/9.txt', False)

def test_3_constraint_medium():
    run_test('3-constraint-medium/0.txt', True)
    run_test('3-constraint-medium/1.txt', True)
    run_test('3-constraint-medium/2.txt', True)
    run_test('3-constraint-medium/3.txt', True)
    run_test('3-constraint-medium/4.txt', False)
    run_test('3-constraint-medium/5.txt', False)
    run_test('3-constraint-medium/6.txt', True)
    run_test('3-constraint-medium/7.txt', False)
    run_test('3-constraint-medium/8.txt', True)
    run_test('3-constraint-medium/9.txt', False)

def test_4_constraint_medium():
    run_test('4-constraint-medium/0.txt', True)
    run_test('4-constraint-medium/1.txt', False)
    run_test('4-constraint-medium/2.txt', False)
    run_test('4-constraint-medium/3.txt', False)
    run_test('4-constraint-medium/4.txt', False)
    run_test('4-constraint-medium/5.txt', True)
    run_test('4-constraint-medium/6.txt', True)
    run_test('4-constraint-medium/7.txt', True)
    run_test('4-constraint-medium/8.txt', True)
    run_test('4-constraint-medium/9.txt', False)

def test_3_constraint_large():
    run_test('3-constraint-large/0.txt', True)
    run_test('3-constraint-large/1.txt', True)
    run_test('3-constraint-large/2.txt', True)
    run_test('3-constraint-large/3.txt', True)
    run_test('3-constraint-large/4.txt', False)
    run_test('3-constraint-large/5.txt', False)
    run_test('3-constraint-large/6.txt', True)
    run_test('3-constraint-large/7.txt', False)
    run_test('3-constraint-large/8.txt', True)
    run_test('3-constraint-large/9.txt', False)

def test_4_constraint_large():
    run_test('4-constraint-large/0.txt', False)
    run_test('4-constraint-large/1.txt', False)
    run_test('4-constraint-large/2.txt', True)
    run_test('4-constraint-large/3.txt', False)
    run_test('4-constraint-large/4.txt', True)
    run_test('4-constraint-large/5.txt', True)
    run_test('4-constraint-large/6.txt', True)
    run_test('4-constraint-large/7.txt', False)
    run_test('4-constraint-large/8.txt', True)
    run_test('4-constraint-large/9.txt', False)

Downloading 'instances.zip'...
Unpacking...
Instances are ready


You are expected to implement function `solve(instance)` that takes an object of class `Instance` as a parameter and returns an object of class `Solution`.

You can extend the functionality of the provided classes as you wish.  If you change those classes, please include them into your submission.

Implement the `solve(instance)` function below.

In [3]:
from z3 import *
from itertools import combinations

# Solver with original formulation
def solve(instance):

    solver = Solver()

    # Define Users and Tasks as data types
    U = Datatype('u')
    for i in range(instance.m):
        U.declare(f'u{i}')
    U = U.create()

    T = Datatype('t')
    for i in range(instance.n):
        T.declare(f't{i}')
    T = T.create()

    # Create lists of user and task instances
    users = [U.constructor(i)() for i in range(instance.m)]
    tasks = [T.constructor(i)() for i in range(instance.n)]

    # Defind Constant
    t = Const("t", T)
    u = Const("u", U)
    t1 = Const("t1", T)
    t2 = Const("t2", T)
    t3 = Const("t3", T)

    # Define M and A
    M = Function('m', T, T, IntSort())
    A = Function('a', T, U, IntSort())

    # Restriction of return value 0 and 1
    solver.add(ForAll([t1, t2], Or(M(t1, t2) == 0, M(t1, t2) == 1)))
    solver.add(ForAll([t, u], Or(A(t, u) == 0, A(t, u) == 1)))

    # Task assigned to only one user
    solver.add(ForAll([t], Sum([A(t, user) for user in users]) == 1))

    # Symmetry
    solver.add(ForAll([t1, t2], Implies(t1 != t2, M(t1, t2) == M(t2, t1))))

    # Self-assignment
    solver.add(ForAll([t], M(t, t) == 1))

    # Transitivity for same user assignments
    solver.add(ForAll([t1, t2, t3], Implies(And(t1 != t2, t2 != t3, t3 != t1), Implies(And(M(t1, t2) == 1, M(t2, t3) == 1), M(t1, t3) == 1))))

    # Transitivity for different user assignments
    solver.add(ForAll([t1, t2, t3], Implies(And(t1 != t2, t2 != t3, t3 != t1), Implies(And(M(t1, t2)== 0, M(t2, t3) == 1), M(t1, t3)== 0))))

    # Linking M-variables with task-to-user assignment
    solver.add(ForAll([t1, t2, u], Implies(t1 != t2, Implies(M(t1, t2) == 1, A(t1, u) == A(t2, u)))))
    solver.add(ForAll([t1, t2, u], Implies(t1 != t2, Implies(M(t1, t2) == 0, Or(A(t1, u) == 0, A(t2, u) == 0)))))

    # Apply given constraints
    for constraint in instance.constraints:
        if isinstance(constraint, AuthorisationConstraint):
            for t in (set(range(instance.n)) - set(constraint.tasks)):
                solver.add(A(tasks[t], users[constraint.user]) == 0)

        elif isinstance(constraint, BindingOfDutyConstraint):
            solver.add(M(tasks[constraint.t1], tasks[constraint.t2]) == 1)

        elif isinstance(constraint, SeparationOfDutyConstraint):
            solver.add(M(tasks[constraint.t1], tasks[constraint.t2]) == 0)

        elif isinstance(constraint, AtMostKConstraint):
            scope_tasks = [tasks[t] for t in constraint.scope]
            subset_size = constraint.k + 1

            for subset in combinations(scope_tasks, subset_size):
                solver.add(Or([M(t1, t2) == 1 for t1, t2 in combinations(subset, 2)]))

    # Solve and process the solution
    solution = Solution(instance, False)

    if solver.check() == sat:
        model = solver.model()
        solution.assignment = []

        for t in range(instance.n):
            for u in range(instance.m):
                if model.eval(A(tasks[t], users[u]) == 1):
                    solution.assignment.append(u)
                    break

        solution.sat = True

    return solution

# Solver with alternative formulation
def solve1(instance):

    solver = Solver()

    # Define Users and Tasks as data types
    U = Datatype('u')
    for i in range(instance.m):
        U.declare(f'u{i}')
    U = U.create()

    T = Datatype('t')
    for i in range(instance.n):
        T.declare(f't{i}')
    T = T.create()

    # Create lists of user and task instances
    users = [U.constructor(i)() for i in range(instance.m)]
    tasks = [T.constructor(i)() for i in range(instance.n)]

    # Define Constant
    t = Const("t", T)
    u = Const("u", U)
    t1 = Const("t1", T)
    t2 = Const("t2", T)
    t3 = Const("t3", T)

    # Define M and A
    M = Function('m', T, T, BoolSort())
    A = Function('a', T, U)

    # Symmetry
    for t1 in range(instance.n - 1):
        for t2 in range(t1 + 1, instance.n):
            solver.add(M(tasks[t1], tasks[t2]) == M(tasks[t2], tasks[t1]))

    # Self-assignment
    solver.add(ForAll([t], M(t, t)))

    # Transitivity for same user assignments
    for t1 in range(instance.n - 2):
        for t2 in range(t1 + 1, instance.n - 1):
            for t3 in range(t2 + 1, instance.n):
                solver.add(Implies(And(M(tasks[t1], tasks[t2]), M(tasks[t2], tasks[t3])), M(tasks[t1], tasks[t3])))

    # Transitivity for different user assignments
    for t1 in range(instance.n - 2):
        for t2 in range(t1 + 1, instance.n - 1):
            for t3 in range(t2 + 1, instance.n):
                solver.add(Implies(And(Not(M(tasks[t1], tasks[t2])), M(tasks[t2], tasks[t3])), Not(M(tasks[t1], tasks[t3]))))

    # Linking M-variables with task-to-user assignment
    for t1 in range(instance.n - 1):
        for t2 in range(t1 + 1, instance.n):
            solver.add(Implies(M(tasks[t1], tasks[t2]), A(tasks[t1]) == A(tasks[t2])))
            solver.add(Implies(Not(M(tasks[t1], tasks[t2])), Or(A(tasks[t1]) != A(tasks[t2]))))

    # Apply given constraints
    for constraint in instance.constraints:
        if isinstance(constraint, AuthorisationConstraint):
            for t in (set(range(instance.n)) - set(constraint.tasks)):
                solver.add(A(tasks[t]) != users[constraint.user])

        elif isinstance(constraint, BindingOfDutyConstraint):
            solver.add(M(tasks[constraint.t1], tasks[constraint.t2]))

        elif isinstance(constraint, SeparationOfDutyConstraint):
            solver.add(Not(M(tasks[constraint.t1], tasks[constraint.t2])))

        elif isinstance(constraint, AtMostKConstraint):
            scope_tasks = [tasks[t] for t in constraint.scope]
            subset_size = constraint.k + 1

            for subset in combinations(scope_tasks, subset_size):
                solver.add(Or([M(t1, t2) for t1, t2 in combinations(subset, 2)]))

    # Solve and process the solution
    solution = Solution(instance, False)

    if solver.check() == sat:
        model = solver.model()
        solution.assignment = []

        for t in range(instance.n):
            for u in range(instance.m):
                if model.eval(A(tasks[t]) == users[u]):
                    solution.assignment.append(u)
                    break

        solution.sat = True

    return solution


# Advanced Solver using array
def solve2(instance):

    solver = Solver()

    M = [[Bool(f"t{i}_t{j}") for i in range(instance.n)] for j in range(instance.n)]
    A = [Int(f"t{i}") for i in range(instance.n)]

    for t in range(instance.n):
        solver.add(And(0 <= A[t], A[t] < instance.m))

    # Symmetry of M-variables
    for t1 in range(instance.n - 1):
        for t2 in range(t1 + 1, instance.n):
            solver.add(M[t1][t2] == M[t2][t1])

    # Self-assignment of M-variables
    for t in range(instance.n):
        solver.add(M[t][t])

    # Transitivity for same user assignments
    for t1 in range(instance.n - 2):
        for t2 in range(t1 + 1, instance.n - 1):
            for t3 in range(t2 + 1, instance.n):
                solver.add(Implies(And(M[t1][t2], M[t2][t3]), M[t1][t3]))

    # Transitivity for different user assignments
    for t1 in range(instance.n - 2):
        for t2 in range(t1 + 1, instance.n - 1):
            for t3 in range(t2 + 1, instance.n):
                solver.add(Implies(And(Not(M[t1][t2]), M[t2][t3]), Not(M[t1][t3])))

    # Linking M-variables with task-to-user assignment
    for t1 in range(instance.n - 1):
        for t2 in range(t1 + 1, instance.n):
            solver.add(Implies(M[t1][t2], A[t1] == A[t2]))
            solver.add(Implies(Not(M[t1][t2]), A[t1] != A[t2]))

    # Apply instance-specific constraints
    for constraint in instance.constraints:
        if isinstance(constraint, AuthorisationConstraint):
            for t in (set(range(instance.n)) - set(constraint.tasks)):
                solver.add(A[t] != constraint.user)

        elif isinstance(constraint, BindingOfDutyConstraint):
            solver.add(M[constraint.t1][constraint.t2])

        elif isinstance(constraint, SeparationOfDutyConstraint):
            solver.add(Not(M[constraint.t1][constraint.t2]))

        elif isinstance(constraint, AtMostKConstraint):
            scope_tasks = [t for t in constraint.scope]
            subset_size = constraint.k + 1

            for subset in combinations(scope_tasks, subset_size):
                solver.add(Or([M[t1][t2] for t1, t2 in combinations(subset, 2)]))

    # Solve and process the solution
    solution = Solution(instance, False)

    if solver.check() == sat:
        model = solver.model()
        solution.assignment = [model.eval(A[i]).as_long() for i in range(instance.n)]

        solution.sat = True

    return solution


Run this cell to test your `solve(instance)` function.  Comment/uncomment the lines below as appropriate.

In [None]:
# test_1_constraint_small()
# test_3_constraint_small()
# test_3_constraint_medium()
# test_3_constraint_large()
# test_4_constraint_small()
# test_4_constraint_medium()
# test_4_constraint_large()

Exception ignored in: <function AstRef.__del__ at 0x79002ef27f40>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/z3/z3.py", line 352, in __del__
    Z3_dec_ref(self.ctx.ref(), self.as_ast())
  File "/usr/local/lib/python3.10/dist-packages/z3/z3core.py", line 1638, in Z3_dec_ref
    _elems.f(a0, a1)
ctypes.ArgumentError: argument 1: KeyboardInterrupt: 


KeyboardInterrupt: 