# QBoost: A QUBO Based Binary Classification Method


## Introduction


## Methodology



## Data



## Implementation QBoost Algorithm



In [None]:
# Import libs
import os
import sys
import time
import datetime
import json
from functools import wraps
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from sklearn.metrics import (
    confusion_matrix,
    precision_score,
    recall_score,
    accuracy_score,
    f1_score,
)

from qci_client import QciClient
from qci_client import load_json_file

PLOT_FLAG = False


def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        beg_time = time.time()
        val = func(*args, **kwargs)
        end_time = time.time()
        tot_time = end_time - beg_time

        print("Runtime of %s: %0.2f seconds!" % (func.__name__, tot_time,))

        return val

    return wrapper

class WeakClassifierDct:
    def __init__(self, fea_ind_list, X_train, y_train):

        assert X_train.shape[0] == len(y_train)

        self.fea_ind_list = fea_ind_list
        self.X_train = X_train
        self.y_train = y_train
        self.clf = DecisionTreeClassifier(random_state=0)

    def train(self):

        X_tmp = self.X_train.transpose()[self.fea_ind_list].transpose()

        self.clf.fit(X_tmp, self.y_train)

    def predict(self, X):

        X_tmp = X.transpose()[self.fea_ind_list].transpose()

        return self.clf.predict(X_tmp)


class QBoost:
    def __init__(
        self,
        lambda_coef,
        num_eqc_samples=10,
        alpha=1.0,
        theta=0.0,
        mode="dct",
    ):

        self.lambda_coef = lambda_coef
        self.num_eqc_samples = num_eqc_samples
        self.alpha = alpha
        self.theta = theta
        self.mode = mode
        self.weights = None
        self.h_list = None


    @timer
    def _build_weak_classifiers_dct(self, X, y):

        S = X.shape[0]
        M = X.shape[1]

        assert len(y) == S

        h_list = []

        for l in range(M):
            weak_classifier = WeakClassifierDct([l], X, y)
            weak_classifier.train()

            h_list.append(weak_classifier)

        for i in range(M):
            for j in range(i + 1, M):
                weak_classifier = WeakClassifierDct([i, j], X, y)
                weak_classifier.train()
                h_list.append(weak_classifier)

        for i in range(M):
            for j in range(i + 1, M):
                for k in range(j + 1, M):                
                    weak_classifier = WeakClassifierDct([i, j, k], X, y)
                    weak_classifier.train()
                    h_list.append(weak_classifier)
                
        return h_list
    
    
    @timer
    def _get_hamiltonian(self, X, y):

        S = X.shape[0]
        M = X.shape[1]

        if if self.mode == "dct":
            h_list = self._build_weak_classifiers_dct(X, y)          
        else:
            assert False, "Incorrect mode <%s>!" % self.mode

        self.h_list = h_list

        N = len(h_list)

        Q = np.zeros(shape=(N, N), dtype="d")
        P = np.zeros(shape=(N, N), dtype="d")

        h_vals = np.array([h_list[i].predict(X) for i in range(N)])

        assert h_vals.shape[0] == N
        assert h_vals.shape[1] == S

        for i in range(N):
            P[i][i] = self.lambda_coef - (2.0 / N) * np.sum(h_vals[i] * y)
            for j in range(N):
                Q[i][j] = (1.0 / N ** 2) * np.sum(h_vals[i] * h_vals[j])

        # Calculate the Hamiltonian
        H = Q + P

        # make sure H is symmetric up to machine precision
        H = 0.5 * (H + H.transpose())

        print("The size of the hamiltonian is %d by %d" % (N, N))
        
        return H

    def set_weights(self, weights):
        self.weights = weights

    @timer
    def train(self, X, y):

        H = self._get_hamiltonian(X, y)

        N = H.shape[0]

        objective_json = {}
        objective_json["data"] = []
        for i in range(N):
            for j in range(N):
                if H[i][j] == 0:
                    continue
                objective_json["data"].append(
                    {"i": i, "j": j, "val": H[i][j]}
                )
        objective_json["file_name"] = "qboost.json"
        objective_json["num_variables"] = N
        objective_json["file_type"] = "qubo"

        json.dump(objective_json, open("objective.json", "w"))
        
        job_json = {
            "job_name": "qboost_classifier",
            "job_tags": ["qboost"],
            "params": {
                "sampler_type": "csample", # "eqc1"
                "n_samples": self.num_eqc_samples,
                "alpha": self.alpha,
            },
        }

        # Solve the optimization problem
        qci = QciClient()

        response_json = qci.upload_file(objective_json)
        objective_file_id = response_json["file_id"]
        job_json["qubo_file_id"] = objective_file_id

        job_response_json = qci.process_job(
            job_body=job_json, job_type="sample-qubo",
        )

        #print(job_response_json)

        if (
            job_response_json["job_info"]["details"]["status"]
            == "COMPLETED"
        ):
            results = job_response_json["results"]
            energies = results["energies"]
            samples = results["samples"]
        else:
            assert False, job_response_json["job_info"]["results"]["error"]

        if True:
            print("Energies:", energies)

        # Pick a feasible solution with lowest energy
        # The sample solutions are sorted by energy
        sol = samples[0]

        assert len(sol) == N, "Inconsistent solution size!"

        self.weights = np.array(sol)

        return

    def predict(self, X):

        assert self.weights is not None, "Model is not trained!"
        assert self.h_list is not None, "Model is not trained!"

        assert len(self.weights) == len(self.h_list), "Inconsisent sizes!"

        N = len(self.weights)
        tmp_vals = np.zeros(shape=(X.shape[0]), dtype="d")

        fct = sum(self.weights)
        if fct > 0:
            fct = 1.0 / fct

        for i in range(N):
            # if self.weights[i] > 0:
            #     print(self.weights[i], set(self.h_list[i].predict(X)))
            tmp_vals += self.weights[i] * self.h_list[i].predict(X)

        tmp_vals = fct * tmp_vals

        pred_vals = np.sign(tmp_vals - self.theta)

        for i in range(len(pred_vals)):
            if pred_vals[i] == 0:
                pred_vals[i] = -1.0

        return pred_vals

    def save_weights(self, file_name):
        np.save(file_name, self.weights)

We now print the feature names and get the total count of features in the dataset,

In [None]:
import sys
from collections import Counter
import numpy as np
import pandas as pd
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, accuracy_score

# Some parameters
TEST_SIZE = 0.2
LAMBDA_COEF = 0.5

# Read dataset
iris = datasets.load_iris()
X = iris.data
y = iris.target

for i in range(len(y)):
    if y[i] == 0:
        y[i] = -1
    elif y[i] == 2:
        y[i] = 1

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=42,
)
print(Counter(y_train))
print(X_train.shape)
print(y_train.shape)

print(Counter(y_test))
print(X_test.shape)
print(y_test.shape)

obj = QBoost(lambda_coef=LAMBDA_COEF, num_eqc_samples=10, alpha=1.0, mode="dct")

obj.train(X_train, y_train)

y_train_prd = obj.predict(X_train)
y_test_prd = obj.predict(X_test)

print(Counter(y_train_prd))
print(Counter(y_test_prd))

print("Weights:", obj.weights)

print(
    "Train precision:",
    precision_score(y_train, y_train_prd, labels=[-1, 1], pos_label=1),
)
print(
    "Train recall:",
    recall_score(y_train, y_train_prd, labels=[-1, 1], pos_label=1),
)
print(
    "Train accuracy:",
    accuracy_score(y_train, y_train_prd),
)
print(
    "Train confusion matrix:",
    confusion_matrix(y_train, y_train_prd, labels=[-1, 1]),
)

print(
    "Test precision:",
    precision_score(y_test, y_test_prd, labels=[-1, 1], pos_label=1),
)
print(
    "Test recall:",
    recall_score(y_test, y_test_prd, labels=[-1, 1], pos_label=1),
)
print(
    "Test accuracy:",
    accuracy_score(y_test, y_test_prd),
)
print(
    "Test confusion matrix:",
    confusion_matrix(y_test, y_test_prd, labels=[-1, 1]),
)
