<a href="https://colab.research.google.com/github/softcomput/cp-colab/blob/main/CPP_from_work_avoiding_accident_locations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
## **Getting started**

1. Install Google OR-Tools.
2. Create small data model or large data model.
3. Run carpooling problem to work.

In [None]:
!pip install ortools

In [None]:
import logging
import re

from ortools.linear_solver import pywraplp
import numpy as np


class CPPFromWork:
    """CPPFromWork

    Attributes:
        data (dict):
            Vs (list): servers
            Vc (list): clients
            Q (list): capacities
            T (list): maximum ride times
            distance (2-d list): distance for each server and client
            duration (2-d list): duration for each server and client
            risk (2-d list): risk for each server and client
    """

    def __init__(self, data):
        self.d = self.__format_data(data)

    def __format_data(self, data):
        """ __format_data

        Format data model.
        """
        distance = []
        for vs_d in data['distance'][0:len(data['Vs'])]:
            distance.append(vs_d)
            distance.extend(data['distance'][len(data['Vs']):])
        data['distance'] = distance

        duration = []
        for vs_d in data['duration'][0:len(data['Vs'])]:
            duration.append(vs_d)
            duration.extend(data['duration'][len(data['Vs']):])
        data['duration'] = duration

        distance_vc = []
        for vc in range(len(data['Vc'])):
            distance_vc.append(data['distance'][vc + 1][0])
        data['distance_vc'] = distance_vc

        return data

    def __create_variables(self, solver):
        """ __create_variables

        Create variables.

        Returns:
            x (list): x^k_ij, k in Vs
            y (list): y_i, i in Vc
            u (list): u^k_i, 0 <= u^k_i <= Qk, i in V
        """
        x = []
        u = []
        for idx, k in enumerate(self.d['Vs']):
            ai = [0]
            ai.extend(self.d['Vc'])
            aj = [k]
            aj.extend(self.d['Vc'])
            for j in aj:
                x.append([
                    solver.IntVar(0, 1, 'x{}({},{})'.format(k, i, j))
                    for i in ai])
            u.append([
                solver.IntVar(1, self.d['Q'][idx], 'u{}({})'.format(k, j))
                for j in aj])
        y = [solver.IntVar(0, 1, 'y({})'.format(i)) for i in self.d['Vc']]
        return x, y, u

    def __create_constraints(self, solver, x, y, u):
        """ __create_constraints

        Create constraints.
        """
        n = len(self.d['Vc']) + 1
        for vs in range(len(self.d['Vs'])):
            k = vs * n
            # Each employee leaves the workspace.
            solver.Add(solver.Sum([x[j][0] for j in range(k, k + n)]) == 1)
            # Each employee arrives at home.
            solver.Add(solver.Sum(x[k]) == 1)
            # Continuity.
            for i in range(1, len(self.d['Vc']) + 1):
                solver.Add(
                    solver.Sum(x[k + i]) -
                    solver.Sum([x[j][i] for j in range(k, k + n)]) == 0)
            # Capacity.
            solver.Add(
                solver.Sum([x_k for i in range(n) for x_k in x[k + i]]) <=
                self.d['Q'][vs])
            # Maximum ride time.
            solver.Add(
                solver.Sum([np.inner(d, _x) for d, _x in zip(
                    self.d['duration'][k:k + n], x[k:k + n])]) <=
                self.d['T'][vs])
            # Own arc is not passed.
            for i in range(1, len(self.d['Vc']) + 1):
                solver.Add(x[k + i][i] == 0)
            # Subtour elimination constraints by MTZ.
            for i in range(1, len(u[vs])):
                solver.Add(
                    u[vs][i] + 1 - self.d['Q'][vs] * (1 - x[k][i]) <= u[vs][0])
            for i in range(1, len(u[vs])):
                for j in range(1, len(u[vs])):
                    if str(u[vs][i]) != str(u[vs][j]):
                        solver.Add(
                            u[vs][j] + 1 -
                            self.d['Q'][vs] * (1 - x[k + i][j]) <=
                            u[vs][i])
            solver.Add(u[vs][0] == self.d['Q'][vs])

        # Each client is either picked up by a server or is left unserviced.
        for vc in range(len(self.d['Vc'])):
            solver.Add(
                solver.Sum([x_k for vs in range(len(self.d['Vs']))
                           for x_k in x[vs * n + vc + 1]]) + y[vc] == 1)

    def __create_objective(self, solver, x, y):
        """ __create_objective

        Create objective.
        """
        if self.d.get('risk'):
            solver.Minimize(
                solver.Sum([np.inner(d, _x) for d, _x in zip(
                    np.array(self.d['risk']) *
                    np.array(self.d['distance']), x)]) +
                solver.Sum([np.inner(np.array(self.d['distance_vc']) * 2, y)]))
        else:
            solver.Minimize(
                solver.Sum([np.inner(d, _x) for d, _x in zip(
                    self.d['distance'], x)]) +
                solver.Sum([np.inner(np.array(self.d['distance_vc']) * 2, y)]))

    def solve(self):
        """ solve

        Solve CPP.
        """
        solver = pywraplp.Solver(
            'cp', pywraplp.Solver.CBC_MIXED_INTEGER_PROGRAMMING)

        x, y, u = self.__create_variables(solver)
        self.__create_constraints(solver, x, y, u)
        self.__create_objective(solver, x, y)

        status = solver.Solve()

        paths = {}
        if status == pywraplp.Solver.OPTIMAL:
            n = len(self.d['Vc']) + 1
            root = []
            for i in range(len(self.d['Vs']) * n):
                for j in range(n):
                    if x[i][j].solution_value():
                        root.append(x[i][j].name())
                        r = re.findall(r'\w+', x[i][j].name())
                        r[0] = r[0].lstrip('x')
                        if not r[0] in paths:
                            paths[r[0]] = []
                        paths[r[0]].append((r[1], r[2]))
            paths = self.__format_paths(paths)
            clients_not_pickedup = []
            for i in range(len(self.d['Vc'])):
                if y[i].solution_value():
                    r = re.findall(r'\w+', y[i].name())
                    clients_not_pickedup.append(r[1])
            arrival_times = []
            for i in range(len(self.d['Vs'])):
                arrival_times.append([
                    (u[i][j].name(), u[i][j].solution_value())
                    for j in range(len(u[i]))])
            logging.info(
                'Root %s, clients not pickedup %s,\
                and arrival_times %s solved in %d variables, \
                %d constraints, %f objective value, %f milliseconds, \
                %d iterations, and %d branch-and-bound nodes.' % (
                    root, clients_not_pickedup, arrival_times,
                    solver.NumVariables(),
                    solver.NumConstraints(),
                    solver.Objective().Value(),
                    solver.wall_time(), solver.iterations(), solver.nodes()))
        else:
            logging.info('The problem does not have an optimal solution.')
        m, tdr, dti = self.__evaluate(x, y)
        er = self.__evaluate_accident_evasion_rate(x, y)
        logging.info('evasion_rate: %s', er)
        print('evasion_rate: %s', er)
        return {
            'feasible_paths': paths,
            '%m': m,
            '%tdr': tdr,
            '%dti': dti,
            'clients_not_pickedup': clients_not_pickedup
        }

    def __format_paths(self, paths):
        for i in paths:
            clients = []
            prev = paths[i][0][0]
            for _ in paths[i]:
                if not prev == '0':
                    for j in paths[i]:
                        if prev == j[1]:
                            clients.insert(0, j[1])
                            prev = j[0]
                            break
            paths[i] = clients
        return paths

    def __evaluate(self, x, y):
        total_distance = 0
        total_duration = 0
        original_distance = 0
        original_duration_vs = 0
        n = len(self.d['Vc']) + 1
        for i in range(len(self.d['Vs']) * n):
            for j in range(n):
                if x[i][j].solution_value():
                    total_distance += self.d['distance'][i][j]
                    total_duration += self.d['duration'][i][j]
        for i in range(len(self.d['Vs'])):
            original_distance += self.d['distance'][i * n][0]
            original_duration_vs += self.d['duration'][i * n][0]
        not_pickup_clients = 0
        for i in range(len(self.d['Vc'])):
            if y[i].solution_value() == 1:
                not_pickup_clients += 1
                total_distance += self.d['distance'][i + 1][0]
            original_distance += self.d['distance'][i + 1][0]

        m = (len(self.d['Vc']) - not_pickup_clients) / len(self.d['Vc'])
        tdr = 1 - total_distance / original_distance
        dti = total_duration / original_duration_vs
        return m, tdr, dti

    def __evaluate_accident_evasion_rate(self, x, y):
        if not self.d.get('accidentCount'):
            return 0
        total_accident_count = 0
        original_accident_count = 0
        n = len(self.d['Vc']) + 1
        for i in range(len(self.d['Vs']) * n):
            for j in range(n):
                if x[i][j].solution_value():
                    total_accident_count += self.d['accidentCount'][i][j]
        for i in range(len(self.d['Vs'])):
            original_accident_count += self.d['accidentCount'][i * n][0]
        for i in range(len(self.d['Vc'])):
            if y[i].solution_value() == 1:
                total_accident_count += self.d['accidentCount'][i + 1][0]
            original_accident_count += self.d['accidentCount'][i + 1][0]

        evasion_rate = 1 - total_accident_count / original_accident_count
        return evasion_rate


data = create_data_model()
output = CPPFromWork(data).solve()
print(output)

In [None]:
def create_data_model():
    """Stores the data for the problem."""
    data = {}
    data['Vs'] = ['1', '2']
    data['Vc'] = ['3', '4', '5', '6', '7', '8', '9']
    data['Q'] = [7, 5]
    data['T'] = [12, 12]
    data['Ls'] = [12, 12]
    data['Lc'] = [12, 12, 12, 12, 12, 12, 100]
    data['Ss'] = [0, 0]
    data['Sc'] = [0, 0, 0, 0, 0, 0, 0]
    data['distance'] = [
                        [10, 5, 7, 10, 1, 3, 10, 100],
                        [10, 7, 9, 4, 11, 11, 12, 100],
                        [5, 0, 2, 5, 4, 2, 5, 100],
                        [3, 2, 0, 5, 6, 4, 3, 100],
                        [6, 5, 5, 0, 9, 7, 8, 100],
                        [9, 4, 6, 9, 0, 2, 9, 100],
                        [7, 2, 4, 7, 2, 0, 7, 100],
                        [1, 5, 3, 8, 9, 7, 0, 100],
                        [100, 100, 100, 100, 100, 100, 100, 0],
                        ]
    data['duration'] = [
                        [10, 5, 7, 10, 1, 3, 10, 100],
                        [10, 7, 9, 4, 11, 11, 12, 100],
                        [5, 0, 2, 5, 4, 2, 5, 100],
                        [3, 2, 0, 5, 6, 4, 3, 100],
                        [6, 5, 5, 0, 9, 7, 8, 100],
                        [9, 4, 6, 9, 0, 2, 9, 100],
                        [7, 2, 4, 7, 2, 0, 7, 100],
                        [1, 5, 3, 8, 9, 7, 0, 100],
                        [100, 100, 100, 100, 100, 100, 100, 0],
                        ]
    return data