In [None]:
!python './drive/MyDrive/ibm/ILOG/CPLEX_Studio221/python/setup.py' install

In [5]:
import cplex
from cplex.exceptions import CplexError
import sys, json
import itertools

def findsubsets(s, n):
    return list(itertools.combinations(s, n))

def checkd(d):
    subs = findsubsets(range(len(d)),3)
    for a,b,c in subs:
        if d[a][b]>d[b][c]+d[c][a]:
            return False
    return True
    
def dump_json(test_data,output_json_name=None):
    if output_json_name is not None:
            with open(output_json_name, "w",encoding='utf8') as output_file:
                json.dump(test_data,output_file,ensure_ascii=False)

def get_json(path):
    with open(path,"r", encoding='utf8') as f:
        data=json.load(f)
    return data

def read_file(path):
    with open(path,'r') as f:
        d = f.read().strip()
    return d

def write_file(content,path):
    with open(path,'w') as f:
        f.write(content)

In [100]:
def read_raw_data_and_solve(str_path: str, output_txt_path: str = None):
    """
    str_path: Path to txt file containing raw data
    output_txt_path: Path to save txt file containing data with solution

    Return: Dict{
        ... // same as init_data_with_solution
    }
    """
    ddd = read_file(str_path).split('\n')
    for i, row in enumerate(ddd):
        if i==0:
            n,m = int(row.split()[0]),int(row.split()[1])
            itloc = [[0 for i in range(m+1)] for j in range(n+1)]
            d = [[0 for i in range(m+1)] for j in range(m+1)]
        elif i>=1 and i<=n:
            itloc[i] = [0]+[int(x) for x in row.split()]
        elif i>=n+1 and i<=n+1+m:
            d[i-n-1] = [int(x) for x in row.split()]
        elif i==n+m+2:
            item2take = [0]+[int(x) for x in row.split()]
        else:
            print("Warning! There are more than m+n+3 rows in input data")
            print("Defined Optimal Value is "+ddd[-2])
            break

    itemavail = [sum(itloc[i][1:]) for i in range(n+1)]
    data = {'n':n,'m':m,'item2take':item2take,'d':d,'itloc':itloc,'itemavail':itemavail}
    solver = Solver(data)
    data.update({"optimal_path":solver.get_optimal_path(),
                "optimal_value":solver.get_optimal_value(),
                "explanation":solver.get_explanation(),
                    "CPLEX_solving_time":solver.get_solving_time()})
    
    if output_txt_path is not None:
        tmp = []
        tmp.append("{} {}".format(data['n'],data['m']))
        for i in range(1,data['n']+1):
            tmp.append(' '.join([str(x) for x in data['itloc'][i][1:]]))
        for i in range(data['m']+1):
            tmp.append(' '.join([str(x) for x in data['d'][i]]))
        assert len(data['item2take'])==data['n']+1
        tmp.append(' '.join([str(x) for x in data['item2take'][1:]]))
        tmp.append(' '.join([str(x) for x in data['optimal_path']]))
        tmp.append(str(data['optimal_value']))
        tmp.append(str(data['CPLEX_solving_time']))
        write_file('\n'.join(tmp),output_txt_path)
    
    return data

In [7]:
from typing import Dict, List, Union
import random
from tqdm.auto import tqdm
import os
from os.path import join, exists
import math
import numpy as np

def json2txt(json_path: Union[str,List], txt_folder: str):
    if isinstance(json_path,str):
        json_data = get_json(json_path)
    else:
        assert isinstance(json_path,list), "List of data points or path please!"
        json_data = json_path
    
    if not exists(txt_folder):
        os.makedirs(txt_folder)
    for idx,data in enumerate(json_data):
        tmp = []
        tmp.append("{} {}".format(data['n'],data['m']))
        for i in range(1,data['n']+1):
            tmp.append(' '.join([str(x) for x in data['itloc'][i][1:]]))
        for i in range(data['m']+1):
            tmp.append(' '.join([str(x) for x in data['d'][i]]))
        assert len(data['item2take'])==data['n']+1
        tmp.append(' '.join([str(x) for x in data['item2take'][1:]]))
        # The last 3 lines are optimal path, optimal value and solving time (in seconds)
        if 'optimal_path' in data:
            tmp.append(' '.join([str(x) for x in data['optimal_path']]))
            tmp.append(str(data['optimal_value']))
            tmp.append(str(data['CPLEX_solving_time']))
        write_file('\n'.join(tmp),join(txt_folder,f'case_{idx}.txt'))

def get_rand_point(bound):
    tmp = [random.randint(0,bound),random.randint(0,bound)]
    while tmp==[0,0]:
        tmp = [random.randint(0,bound),random.randint(0,bound)]
    return tmp

def calc_eucli(a,b):
    # return math.sqrt((a[0]-b[0])**2+(a[1]-b[1])**2)
    return np.linalg.norm(np.array(a)-np.array(b))

#### Solver

In [91]:
from typing import Dict, List
import cplex
import time, math

class Solver:
    """
    Solver that handles data with UNLIMITED decision variables (well, solving time gets escalated quickly)
    """
    def __init__(self, data: Dict):
        for k,v in data.items():
            setattr(self, k, v)
        self.data = data
        self.C = 1e4
        self.init_problem()
        self.init_constraint()
        self.solve()
        self.recheck()
        # print(f"Case with {self.n} kinds of item and {self.m} container locations")
        # print("Number of decision variables",len(self.prob.variables.get_lower_bounds()))

    def init_problem(self):
        self.prob = cplex.Cplex()
        m=self.m
        # x[j]
        self.prob.variables.add(names=[f"x_{j}" for j in range(m+2)],
                            types=[self.prob.variables.type.binary for j in range(m+2)])
        # y[i,j]
        self.prob.variables.add(names=[f"y_{i}_{j}" \
                                    for i in range(m+2) for j in range(m+2) if i!=j],
                            types=[self.prob.variables.type.binary \
                                for i in range(m+2) for j in range(m+2) if i!=j])
        # s[i]
        self.prob.variables.add(names=[f"s_{i}" for i in range(m+2)],
                                types=[self.prob.variables.type.continuous for i in range(m+2)])
        
        new_d = [[0]*(m+2) for i in range(m+2)]
        for i in range(m+1):
            new_d[i][m+1]=self.d[i][0]
            new_d[m+1][i]=self.d[0][i]
            for j in range(m+1):
                new_d[i][j]=self.d[i][j]
        self.new_d = new_d
        self.prob.objective.set_linear([(f"y_{i}_{j}",self.new_d[i][j])\
                             for i in range(m+2)\
                             for j in range(m+2) if i!=j])
        self.prob.objective.set_sense(self.prob.objective.sense.minimize)

    def init_constraint(self):
        m=self.m
        n=self.n
        # x[0]=1
        self.prob.linear_constraints.add(
            lin_expr= [cplex.SparsePair(ind= [f"x_0"], \
                                        val= [1])],
            rhs= [1],
            names = ["Initial constraint (1)"],
            senses = ['E']
        )
        # x[m+1]=1
        self.prob.linear_constraints.add(
            lin_expr= [cplex.SparsePair(ind= [f"x_{m+1}"], \
                                        val= [1])],
            rhs= [1],
            names = ["Initial constraint (2)"],
            senses = ['E']
        )
        # s[0]=0
        self.prob.linear_constraints.add(
            lin_expr= [cplex.SparsePair(ind= [f"s_0"], \
                                        val= [1])],
            rhs= [0],
            names = ["Initial constraint (3)"],
            senses = ['E']
        )
        # sum(y[0][i])=1
        self.prob.linear_constraints.add(
            lin_expr= [cplex.SparsePair(ind= [f"y_0_{i}" for i in range(1,m+1)],\
                                        val= [1 for i in range(1,m+1)])],
            rhs= [1],
            names = ["Initial constraint (4)"],
            senses = ['E']
        )
        # sum(y[i][m+1])=1
        self.prob.linear_constraints.add(
            lin_expr= [cplex.SparsePair(ind= [f"y_{i}_{m+1}" for i in range(1,m+1)],\
                                        val= [1 for i in range(1,m+1)])],
            rhs= [1],
            names = ["Initial constraint (5)"],
            senses = ['E']
        )

        # x[i] = 1 -> sum(y[i][j])=1, j =1..m+1
        # sum(y[i][j]) + C*(1-x[i])>=1 => sum(y[i][j]) - C*x[i]>= 1 - C
        # sum(y[i][j]) - C*(1-x[i])<=1 => sum(y[i][j]) + C*x[i]<= 1 + C
        for i in range(1,m+1):
            self.prob.linear_constraints.add(
                lin_expr=[cplex.SparsePair(ind=[f"y_{i}_{j}" for j in range(1,m+2) if j!=i] + [f"x_{i}"],
                                            val=[1 for j in range(1,m+2) if j!=i]+[-self.C])],
                rhs=[1-self.C],
                names = ["Initial constraint (6) - {}".format(i)],
                senses = ['G']
            )

            self.prob.linear_constraints.add(
                lin_expr=[cplex.SparsePair(ind=[f"y_{i}_{j}" for j in range(1,m+2) if j!=i] + [f"x_{i}"],
                                            val=[1 for j in range(1,m+2) if j!=i]+[self.C])],
                rhs=[1+self.C],
                names = ["Initial constraint (7) - {}".format(i)],
                senses = ['L']
            )
        # x[i] = 1 -> sum(y[j][i])=1, j =1..m
        # sum(y[j][i]) + C*(1-x[i])>=1 => sum(y[j][i]) - C*x[i]>= 1 - C
        # sum(y[j][i]) - C*(1-x[i])<=1 => sum(y[j][i]) + C*x[i]<= 1 + C
        for i in range(1,m+1):
            self.prob.linear_constraints.add(
                lin_expr=[cplex.SparsePair(ind=[f"y_{j}_{i}" for j in range(0,m+1) if j!=i] + [f"x_{i}"],
                                            val=[1 for j in range(0,m+1) if j!=i]+[-self.C])],
                rhs=[1-self.C],
                names = ["Initial constraint (8) - {}".format(i)],
                senses = ['G']
            )

            self.prob.linear_constraints.add(
                lin_expr=[cplex.SparsePair(ind=[f"y_{j}_{i}" for j in range(0,m+1) if j!=i] + [f"x_{i}"],
                                            val=[1 for j in range(0,m+1) if j!=i]+[self.C])],
                rhs=[1+self.C],
                names = ["Initial constraint (9) - {}".format(i)],
                senses = ['L']
            )


        # s[j]-s[i]-C*(1-y(i,j))<=d[i][j]
        # s[j]-s[i]+C*(1-y(i,j))>=d[i][j]
        for i in range(m+2):
            for j in range(m+2):
                if i==j:
                    continue
                self.prob.linear_constraints.add(
                    lin_expr=[cplex.SparsePair(ind=[f"s_{i}",f"y_{i}_{j}",f"s_{j}"],
                                               val=[-1,self.C,1])],
                    rhs=[self.C + self.new_d[i][j]],
                    names = ["Subtour elimination (1) -{}{}".format(i,j)],
                    senses = ['L']
                )

                self.prob.linear_constraints.add(
                    lin_expr=[cplex.SparsePair(ind=[f"s_{i}",f"y_{i}_{j}",f"s_{j}"],
                                               val=[-1,-self.C,1])],
                    rhs=[self.new_d[i][j]-self.C],
                    names = ["Subtour elimination (2) -{}{}".format(i,j)],
                    senses = ['G']
                )

        # if y[i,j]=1 => x[i]+x[j]=2
        # => x[i]+x[j]+C(1-y[i,j])>=2
        # => x[i]+x[j]-C(1-y[i,j])<=2
        for i in range(m+2):
            for j in range(m+2):
                if i==j:
                    continue
                elif i==m+1:
                    tmp_diss = self.d[0][j]
                elif j==m+1:
                    tmp_diss = self.d[i][0]
                else:
                    tmp_diss = self.d[i][j]
                self.prob.linear_constraints.add(
                    lin_expr=[cplex.SparsePair(ind=[f"x_{i}",f"y_{i}_{j}",f"x_{j}"],
                                               val=[1,-self.C,1])],
                    rhs=[2-self.C],
                    names = ["Add elimination (1) -{}{}".format(i,j)],
                    senses = ['G']
                )

                self.prob.linear_constraints.add(
                    lin_expr=[cplex.SparsePair(ind=[f"x_{i}",f"y_{i}_{j}",f"x_{j}"],
                                               val=[1,self.C,1])],
                    rhs=[2+self.C],
                    names = ["Add elimination (2) -{}{}".format(i,j)],
                    senses = ['L']
                )
        # sum(Q[i,j]*x[j])>=q[i]
        for i in range(1,n+1):
            self.prob.linear_constraints.add(
                lin_expr=[cplex.SparsePair(ind=[f"x_{j}" for j in range(1,m+1)],
                                        val=[self.itloc[i][j] for j in range(1,m+1)])],
                rhs = [self.item2take[i]],
                names = ["Item taken satisfied-{}".format(i)],
                senses = ['G']
            )

    def solve(self):
        self.prob.set_log_stream(None)
        self.prob.set_results_stream(None)
        st = time.time()
        self.prob.solve()
        self.solve_time = time.time()-st
        self.x_opt = [self.roundt(self.prob.solution.get_values(f"x_{i}")) if i!=0 else 1 for i in range(self.m+1)]
        self.ori_y = [[self.prob.solution.get_values(f"y_{i}_{j}") if i!=j else 0 for j in range(self.m+2)] for i in range(self.m+2)]
        self.y_opt = [[self.roundt(self.prob.solution.get_values(f"y_{i}_{j}")) if i!=j else 0 for j in range(self.m+2)] for i in range(self.m+2)]
        self.optimal_path = self.get_path(self.y_opt,self.m)
        self.optimal_value = sum([self.d[self.optimal_path[i]][self.optimal_path[i+1]] for i in range(len(self.optimal_path)-1)])
        self.cplex_optimal_value = self.prob.solution.get_objective_value()
        # print(self.optimal_path)

    def get_optimal_value(self) -> int:
        return self.optimal_value

    def get_optimal_path(self) -> List[int]:
        return self.optimal_path    

    def get_solving_time(self) -> float:
        """
        Return solving time in seconds
        """
        return self.solve_time

    def get_path(self, y_opt: List[List[int]]=None, m: int=None):
        m, y_opt = self.m, self.y_opt
        path = [[i,j] for i in range(m+2) for j in range(m+2) if y_opt[i][j]!=0]
        # print(path)
        start = 0
        dest = {}
        dest.update(path)
        prev = start
        pathtoget = [prev]
        while dest[prev]!=m+1:
            pathtoget.append(dest[prev])
            prev=dest[prev]
        return pathtoget +[start]

    def recheck(self):
        assert abs(self.cplex_optimal_value-self.optimal_value)<1e-6,\
               [self.cplex_optimal_value,self.optimal_value]
        for i in range(self.m+1):
            for j in range(self.m+1):
                if self.y_opt[i][j]!=0:
                    # print(self.ori_y[i][j])
                    # print(i,j,self.prob.solution.get_values(f"s_{i}"),self.prob.solution.get_values(f"s_{j}"),self.new_d[i][j])
                    assert i in self.optimal_path, "Subtour detected!" + f"{i} {j}"
                    assert j in self.optimal_path, "Subtour detected!"+ f"{i} {j}"

    def roundt(self, x):
        lower_bound = 1e-4
        assert (x<lower_bound) or (x>0.9999), x
        return 0 if x<lower_bound else 1

    def explain(self):
        print(self.get_explanation())
    
    def get_explanation(self,data: Dict = None, x_opt: List[int] = None, y_opt: List[List[int]] = None):
        if data is None:
            data, x_opt, y_opt = self.data, self.x_opt, self.y_opt
        n, m, itloc = data['n'], data['m'], data['itloc']
        itemavail, d, item2take = data['itemavail'], data['d'], data['item2take']
        path = self.get_path(y_opt,m)
        distance_cost =[d[path[i]][path[i+1]] for i in range(len(path)-1)]
        INFO = []
        INFO.append("EXPLANATION")
        INFO.append("="*30)
        INFO.append("Path of the packager: {}".format(" -> ".join(map(str,path))))
        INFO.append("Distance of path: {}".format(" + ".join(map(str,distance_cost)))\
                    + " = {}".format(sum(distance_cost)))
        INFO.append("-"*30)
        INFO.append("The packager need to take: ")
        for i in range(1,n+1):
            INFO.append("\tItem {} - Quantity: {}".format(i,item2take[i]))
            INFO.append("\tHe visits places containing the item: {}".format\
                        (" - ".join(["{} ({})".format(j,itloc[i][j]) for j in path[1:-1] if itloc[i][j]!=0])))
            INFO.append("\tHe could take a quantity of up to {}, which satisfies request".\
                        format(sum([itloc[i][j] for j in path[1:-1]])))
        INFO.append("-"*30)
        INFO.append("Thus, the optimal value is {}".format(sum(distance_cost)))
        return "\n".join(INFO)

In [10]:
!ls './drive/MyDrive/Academic/IT4663/Data/6_26_new_data/'

case_0.txt   case_12.txt  case_15.txt  case_2.txt  case_5.txt  case_8.txt
case_10.txt  case_13.txt  case_16.txt  case_3.txt  case_6.txt  case_9.txt
case_11.txt  case_14.txt  case_1.txt   case_4.txt  case_7.txt


In [107]:
!mkdir cplex2

In [None]:
!cp './cplex2' './drive/MyDrive/Academic/IT4663/Data/6_26_cplex2'

In [None]:
# Example
for i in range(17):
    path_to_read = './drive/MyDrive/Academic/IT4663/Data/6_26_new_data/case_{}.txt'.format(i)
    path_to_save = './cplex2/case_{}.txt'.format(i)
    data = read_raw_data_and_solve(path_to_read,path_to_save)
    # print(data['m'])
    print(data['optimal_path'])
    print(data['optimal_value'])
    print(data['CPLEX_solving_time'])
# print(data['explanation'])

Defined Optimal Value is 91
[0, 4, 5, 3, 0]
91
0.02130436897277832
Defined Optimal Value is 89
[0, 13, 11, 2, 12, 3, 14, 9, 15, 8, 5, 0]
89
0.1346583366394043
Defined Optimal Value is 99
[0, 4, 14, 7, 10, 6, 1, 2, 8, 16, 11, 12, 5, 9, 0]
99
0.43399858474731445
Defined Optimal Value is 80
[0, 4, 8, 9, 20, 5, 6, 1, 16, 18, 22, 19, 7, 10, 12, 3, 21, 0]
80
1.7468421459197998
Defined Optimal Value is 59
[0, 7, 18, 26, 30, 13, 6, 27, 19, 28, 5, 25, 24, 8, 20, 14, 0]
59
0.6766812801361084
Defined Optimal Value is 58
[0, 38, 29, 13, 36, 7, 4, 20, 12, 22, 21, 30, 32, 1, 34, 3, 2, 15, 11, 5, 0]
58
11.904063701629639
Defined Optimal Value is 60
[0, 12, 22, 13, 4, 14, 37, 21, 25, 41, 18, 24, 2, 1, 59, 29, 10, 42, 57, 8, 17, 26, 39, 9, 28, 55, 7, 27, 54, 40, 0]
60
57.48714756965637
Defined Optimal Value is 75
[0, 4, 2, 50, 40, 10, 36, 35, 32, 55, 43, 14, 15, 27, 46, 44, 51, 45, 23, 26, 47, 49, 52, 28, 54, 18, 59, 5, 33, 57, 0]
75
80.60819220542908
Defined Optimal Value is 40
