# QUBO Formulation for Quantum Credit Scoring

In [3]:
from src.select_data import *
import dimod
import hybrid
import math
import time
import itertools

In [4]:
data_path = 'data/dataset-ispq.csv'  # if empty, random data is generated
db_years = [2012, 2015] # list of (not consecutive) years from 2012 to 2020, [] = ignore attribute
db_sector = 'no'
db_revenue = 'no'
db_geo_area = 'no'

random_data = 'yes'      # select 'yes' to generate random dataset
default_prob = 0.4       # probability of default (range: 0 - 1)

n_counterpart = 8      # could be a number or 'all' for selecting all db entries
m_company = 3

alpha_concentration = 0.05

shots = 100000

constr_one_class = True
constr_logic = True
constr_conentration = True
constr_min_thr = True
constr_max_thr = True

mu_one_calss = 100
mu_logic = 100
mu_concentration = 20
mu_min_thr = 10
mu_max_thr = 10

In [5]:
def one_class_const(m, n, mu=1):
    Q = np.zeros([n*m, n*m])
    c = 0

    # penalty: "one counterpart per class"
    for ii in range(n):
        for jj in range(m):
            tt = ii*m+jj
            Q[tt][tt] += -1
        for jj in range(m-1):
            for kk in range(jj+1,m):
                tt = ii*m+jj
                rr = ii*m+kk
                Q[tt][rr] += 1
                Q[rr][tt] += 1
        c += 1
    return (mu*Q, mu*c)

In [6]:
def first_counterpart_const(m, n, mu=1):
    Q = np.zeros([n*m, n*m])
    
    # penalty: "first counterpart in first class"
    for jj in range(1, m):
        Q[jj][jj] += 1
        Q[0][jj] -= 0.5
        Q[jj][0] -= 0.5
    return mu*Q

def last_counterpart_const(m, n, mu=1):
    Q = np.zeros([n*m, n*m])

    # penalty: "last counterpart in the last class"
    for jj in range(m-1):
        tt = (n-1)*m+jj
        Q[tt][tt] += 1
        Q[(n*m)-1][tt] -= 0.5
        Q[tt][(n*m)-1] -= 0.5
    return mu*Q

def staircase_constr(m, n, mu=1):
    Q = first_counterpart_const(m,n) + last_counterpart_const(m,n)

    # penalty: "penalize not permitted submatrix"
    # a submatrix is
    # [[x1, x1], [x3, x4]]
    for ii in range(n-1):
        for jj in range(m-1):
            x1 = ii*m+jj
            x2 = x1+1
            x3 = (ii+1)*m+jj
            x4 = x3+1

            # add linear terms
            Q[x1][x1] += 1
            Q[x4][x4] += 1

            # add quadratic terms
            Q[x1][x2] += 0.5
            Q[x2][x1] += 0.5

            Q[x1][x3] -= 0.5
            Q[x3][x1] -= 0.5

            Q[x1][x4] -= 1
            Q[x4][x1] -= 1

            Q[x2][x3] += 0.5
            Q[x3][x2] += 0.5

            Q[x2][x4] -= 0.5
            Q[x4][x2] -= 0.5

            Q[x3][x4] += 0.5
            Q[x4][x3] += 0.5

    # penalty: "penalize restarting from class 0"
    for ii in range(n-1):
        x1 = ii*m
        x2 = x1+1
        x3 = (ii+1)*m

        Q[x3][x3] += 1

        Q[x1][x3] -= 0.5
        Q[x3][x1] -= 0.5

        Q[x2][x3] -= 0.5
        Q[x3][x2] -= 0.5

        Q[x1][x2] += 0.5
        Q[x2][x1] += 0.5

    return mu*Q

In [7]:
def concentration_constr(m, n, mu=1):
    Q = np.zeros([n*m, n*m])

    u = np.zeros([n * n * m, 2], dtype=int)
    index = 0
    for i1 in range(n):
        for i2 in range(n):
            for j in range(m):
                u[index] = [(i1)*m+j, (i2)*m+j]
                index += 1

    # penalty: "concentration"
    c = 1/(1-m)
    gamma = m/(m-1)
    for (u1, u2) in u:
        if u1==u2:
            Q[u1][u2] += gamma
        else:
            Q[u1][u2] += gamma/2

    return (mu*Q, mu*c)

In [8]:
def compute_lower_thrs(n):
    return math.floor(n*0.01) if math.floor(n*0.01) != 0 else 1

def compute_upper_thrs(n, grades):
    return math.floor(n*0.15) if grades > 7 and math.floor(n*0.15) != 0 else (n-grades+1)

In [9]:
def threshold_constr(m, n, offset, minmax, mu=1):

    if minmax == 'min':
        thr = compute_lower_thrs(n)
        slack_vars = math.floor(1+math.log2(n-thr)) # to check
    elif minmax == 'max':
        thr = compute_upper_thrs(n, m)
        slack_vars = math.floor(1+math.log2(thr)) # to check
    else:
        print("Error in threshold function call")
        sys.exit(1)

    # initialize Q and c
    dim = offset+slack_vars*m
    Q = np.zeros([dim, dim])
    c = m * thr * thr

    for i1 in range(n):
        for i2 in range(n):
            for j in range(m):
                u2 = [i1*m+j, i2*m+j]
                if u2[0]==u2[1]: # questo l'ho modificato, forse c'era un typo
                    Q[u2[0]][u2[1]] += 1
                else:
                    Q[u2[0]][u2[1]] += 0.5
                    Q[u2[1]][u2[0]] += 0.5

    for l1 in range(slack_vars):
        for l2 in range(slack_vars):
            for j in range(m):
                v2 = [l1*m+j, l2*m+j]
                tmp = math.pow(2,math.floor((v2[0]+1)/m)+math.floor((v2[1]+1)/m))
                if v2[0]==v2[1]:
                    Q[offset+v2[0]][offset+v2[1]] += tmp
                else:
                    Q[offset+v2[0]][offset+v2[1]] += 0.5*tmp
                    Q[offset+v2[1]][offset+v2[0]] += 0.5*tmp


    for i in range(n):
        for j in range(m):
            u = i*m+j
            Q[u,u] -= 2*thr

    index = 0
    for l in range(slack_vars):
        for j in range(m):
            Q[offset+index][offset+index] += thr*math.pow(2,1+math.floor((l*m+j+1)/m))
            index += 1

    for i in range(n):
        for l in range(slack_vars):
            for j in range(m):
                w2 = [i*m+j, l*m+j]
                tmp = math.pow(2,1+math.floor((w2[1]+1)/m))
                Q[w2[0]][offset+w2[1]] -= -0.5*tmp
                Q[offset+w2[1]][w2[0]] -= -0.5*tmp

    return (mu*Q, mu*c)

In [10]:
def check_staircase(matrix, verbose=False):

    # check if each counterpart is in one class
    ones_per_row = np.sum(matrix == 1, axis=1)
    if not np.all(ones_per_row == 1):
        if verbose:
            print("Error: logic constraint not respected")
            print("\tMmore or less than one class per counterpart")
        return False

    # retreive all the 1's indexes
    index_1 = np.argmax(matrix == 1, axis=1)
    # print(index_1)

    # check the first and the last counterpart
    if index_1[0] != 0:
        if verbose:
            print("Error: logic constraint not respected")
            print("\tError in the first counterpart")
        return False
    if index_1[-1] != matrix.shape[1]-1:
        if verbose:
            print("Error: logic constraint not respected")
            print("\tError in the last counterpart")
        return False

    # check if the matrix is a staircase matrix
    for i, el in enumerate(index_1[1:]):
        # i = inex of the vector index_1 (from 0 to m-1)
        # el = element index_1[i+1]
        # print(f"index {i+1} contains {el}")
        if el != index_1[i] and el != index_1[i]+1:
            if verbose:
                print("Error: logic constraint not respected")
                print(f"\tError in the counterpart {i+2}")
            return False

    if verbose:
        print("Staircase matrix constraint checked")
    return True

In [11]:
def check_concentration(matrix, m, n, alpha_conc = 0.05, verbose=False):
    J_floor = math.floor(n*n*(alpha_conc + (1-alpha_conc)/m))
    s = 0
    for i1 in range(n):
        for i2 in range(n):
            for j in range(m):
                s = s + matrix[i1,j] * matrix[i2,j]
    if s <= J_floor:
        if verbose:
            print("Concentration constraint checked")
        return True
    else:
        if verbose:
            print("Error: concentration constraint not respected")
        return False

In [12]:
def check_upper_thrs(matrix, max_thrs, verbose=False):

    for ii in np.sum(matrix, axis=0):
        if ii > max_thrs:
            if verbose:
                print("Error: upper threshold limit constraint not respected")
            return False
    
    if verbose:
        print("Upper threshold limit constraint checked")
    return True

def check_lower_thrs(matrix, min_thrs, verbose=False):
    
    for ii in np.sum(matrix, axis=0):
        if ii < min_thrs:
            if verbose:
                print("Error: lower threshold limit constraint not respected")
            return False
    
    if verbose:
        print("Lower threshold limit constraint checked")
    return True

In [13]:
def from_matrix_to_bqm(matrix, c):
    
    Q_dict = {(i, j): matrix[i, j] for i in range(matrix.shape[0]) for j in range(matrix.shape[1])}# if matrix[i, j] != 0}
    #print(Q_dict)
    bqm = dimod.BinaryQuadraticModel.from_qubo(Q_dict, c)

    return bqm

In [14]:
def annealer_solver(dim, bqm, shots):

    # Set up the sampler with an initial state
    sampler = hybrid.samplers.SimulatedAnnealingProblemSampler(num_sweeps=shots)
    state = hybrid.core.State.from_sample({i: 0 for i in range(dim)}, bqm)
 
    # Sample the problem
    new_state = sampler.run(state).result()
 
    return new_state

In [15]:
def exact_solver(bqm):
    sampler = dimod.ExactSolver()
    sampleset = sampler.sample(bqm)

    return sampleset

def brute_force_solver(Q, c, dim):

    # compute C(Y) = (Y^T)QY + (G^T)Y + c for every Y
    Ylist = list(itertools.product([0, 1], repeat=dim))
    Cmin = float('inf')

    for ii in range(len(Ylist)):
        Y = np.array(Ylist[ii])
        Cy=(Y.dot(Q).dot(Y.transpose()))+c
        if ( Cy < Cmin ):
            Cmin = Cy
            Ymin = Y.copy()
    
    return (np.array(Ymin), Cmin)

In [16]:
# Select parameters
data_path = 'data/dataset-ispq.csv'  # if empty, random data is generated
db_years = [2012, 2015] # list of (not consecutive) years from 2012 to 2020, [] = ignore attribute
db_sector = 'no'
db_revenue = 'no'
db_geo_area = 'no'

random_data = 'yes'      # select 'yes' to generate random dataset
default_prob = 0.4       # probability of default (range: 0 - 1)

n = 4 # COUNTERPARTS (could be a number or 'all' for selecting all db entries)
m = 2 # GRADES

alpha_concentration = 0.05

shots = 100000

constr_one_class = True
constr_logic = True
constr_conentration = True
constr_min_thr = True
constr_max_thr = True

mu_one_calss = 100
mu_logic = 100
mu_concentration = 20
mu_min_thr = 10
mu_max_thr = 10

# Gen Q matrix
start_time = time.perf_counter_ns()
Q = np.zeros([m*n, m*n])
c = 0
if constr_one_class == True:
    (Q_one_class,c_one_class) = one_class_const(m,n,mu_one_calss)
    Q = Q + Q_one_class
    c = c + c_one_class
if constr_logic == True:
    Q = Q + staircase_constr(m,n,mu_logic)
if constr_conentration == True:
    (Q_conc,c_conc) = concentration_constr(m, n, mu_concentration)
    Q = Q + Q_conc
    c = c + c_conc
if constr_min_thr == True:
    (Q_min_thr, c_min_thr) = threshold_constr(m, n, Q.shape[0], 'min', mu_min_thr)
    pad = Q_min_thr.shape[0] - Q.shape[0]
    Q = np.pad(Q, pad_width=((0,pad), (0, pad)), mode='constant', constant_values=0) + Q_min_thr
    c = c + c_min_thr
if constr_max_thr == True:
    (Q_max_thr, c_max_thr) = threshold_constr(m, n, Q.shape[0], 'max', mu_max_thr)
    pad = Q_max_thr.shape[0] - Q.shape[0]
    Q = np.pad(Q, pad_width=((0,pad), (0, pad)), mode='constant', constant_values=0) + Q_max_thr
    c = c + c_max_thr

# BQM generation
bqm = from_matrix_to_bqm(Q, c)
end_time = time.perf_counter_ns()
print(f"Matrix size:{Q.shape}")
print(f"Time of generation: {(end_time - start_time)/10e9} s")

Matrix size:(16, 16)
Time of generation: 0.0001354872 s


In [18]:
# Solving with brute force
start_time = time.perf_counter_ns()
(result_bf, cost) = brute_force_solver(Q,c,Q.shape[0])
end_time = time.perf_counter_ns()
if constr_min_thr == True:
    result_bf = result_bf[:m*n]
print(f"\nBrute Force result:\n{result_bf.reshape(n,m)}")
print(f"Time of brute force solution: {(end_time - start_time)/10e9} s\n")


Brute Force result:
[[1 0]
 [1 0]
 [0 1]
 [0 1]]
Time of brute force solution: 0.0140047461 s



In [19]:
# Solving exactly with dwave
start_time = time.perf_counter_ns()
e_result = exact_solver(bqm)
df_result = e_result.lowest().to_pandas_dataframe()
end_time = time.perf_counter_ns()
elapsed_time_ns = end_time - start_time
# Print all the solutions
result_exact_solver = df_result.iloc[:, :m*n].to_numpy()
# print(f"All exact solutions:\n{df_result}")
print(f"Exact solutions with dwave: {int(result_exact_solver.size/(m*n))}")
for sol in result_exact_solver[:]:
    print(f"solution:\n{sol.reshape(n, m)}")
print(f"Time of all exact solutions: {elapsed_time_ns/10e9} s")
# print(f"First solution:\n{result_exact_solver[0].reshape(n, m)}")

Exact solutions with dwave: 1
solution:
[[1 0]
 [1 0]
 [0 1]
 [0 1]]
Time of all exact solutions: 0.0053144229 s


In [20]:
# Solving with annealing 
start_time = time.perf_counter_ns()
result = annealer_solver(Q.shape[0], bqm, shots)
end_time = time.perf_counter_ns()
result_ann = np.array([int(x) for x in result.samples.first.sample.values()])[:m*n]
annealing_matrix = result_ann.reshape(n, m)
print(f"\nAnnealing result:\n{annealing_matrix}")    
print(f"Time of annealing solution: {(end_time - start_time)/10e9} s\n")


Annealing result:
[[1 0]
 [1 0]
 [0 1]
 [0 1]]
Time of annealing solution: 0.0028177002 s



In [21]:
print("Result validation:")
verbose = True
check_staircase(annealing_matrix, verbose)
check_concentration(annealing_matrix, m, n, alpha_concentration, verbose)
check_lower_thrs(annealing_matrix, compute_lower_thrs(n), verbose)
check_upper_thrs(annealing_matrix, compute_upper_thrs(n,m), verbose)

Result validation:
Staircase matrix constraint checked
Concentration constraint checked
Lower threshold limit constraint checked
Upper threshold limit constraint checked


True