In [None]:
### This is a GPU implementation for the HQC classifier using Scikit-learn's methods, but with PyTorch as the backend. ###

In [None]:
# I have implemented the code below in such a way that you would only need to input X and y as numpy arrays and the
# output y_hat would also be a numpy array (rather than PyTorch tensors). This would make it easier to use the package
# below with minimal knowledge of PyTorch tensors.

# Take note of the parameter n_splits, where the implementation of n_splits now is different to the one in the CPU case.
# Please read the description of n_splits below.

In [8]:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import check_classification_targets
import torch
from torch.nn.functional import normalize

class HQC(BaseEstimator, ClassifierMixin):
    """The Helstrom Quantum Centroid (HQC) classifier is a quantum-inspired supervised 
    classification approach for data with binary classes (ie. data with 2 classes only).
                         
    Parameters
    ----------
    rescale : int or float, default = 1
        The dataset rescaling factor. A parameter used for rescaling the dataset. 
    encoding : str, default = 'amplit'
        The encoding method used to encode vectors into quantum densities. Possible values:
        'amplit', 'stereo'. 'amplit' means using the amplitude encoding method. 'stereo' means 
        using the inverse of the standard stereographic projection encoding method. Default set 
        to 'amplit'.
    n_copies : int, default = 1
        The number of copies to take for each quantum density. This is equivalent to taking 
        the n-fold Kronecker tensor product for each quantum density.
    class_wgt : str, default = 'equi'
        The class weights assigned to the Quantum Helstrom observable terms. Possible values: 
        'equi', 'weighted'. 'equi' means assigning equal weights of 1/2 (equiprobable) to the
        two classes in the Quantum Helstrom observable. 'weighted' means assigning weights equal 
        to the proportion of the number of rows in each class to the two classes in the Quantum 
        Helstrom observable. Default set to 'equi'.
    n_splits : int, default = 1
        The number of subset splits performed on the input dataset row-wise and on the number 
        of eigenvalues/eigenvectors of the Quantum Helstrom observable for optimal speed 
        performance. If 1 is given, no subset splits are used. For optimal speed, recommend 
        using small values as close to 1 as possible. If memory blow-out occurs, increase 
        n_splits.
    
    Attributes
    ----------
    classes_ : ndarray, shape (2,)
        Sorted binary classes.
    centroids_ : tensor, size (2, (n_features + 1)**n_copies, (n_features + 1)**n_copies)
        Quantum Centroids for class with index 0 and 1 respectively. Stored in GPU.
    hels_obs_ : tensor, size ((n_features + 1)**n_copies, (n_features + 1)**n_copies)
        Quantum Helstrom observable. Stored in GPU.
    proj_sums_ : tensor, size (2, (n_features + 1)**n_copies, (n_features + 1)**n_copies)
        Sum of the projectors of the Quantum Helstrom observable's eigenvectors, which has
        corresponding positive and negative eigenvalues respectively. Stored in GPU.
    hels_bound_ : float
        Helstrom bound is the upper bound of the probability that one can correctly 
        discriminate whether a quantum density is of which of the two binary quantum density 
        pattern. Stored in CPU.         
    """
    # Added binary_only tag as required by sklearn check_estimator
    def _more_tags(self):
        return {'binary_only': True}
    
    
    # Initialize model hyperparameters
    def __init__(self, 
                 rescale = 1,
                 encoding = 'amplit',
                 n_copies = 1,                   
                 class_wgt = 'equi', 
                 n_splits = 1):
        self.rescale = rescale
        self.encoding = encoding
        self.n_copies = n_copies
        self.class_wgt = class_wgt
        self.n_splits = n_splits
        
    
    # Function for kronecker tensor product of PyTorch tensors, set as global function
    global kronecker
    def kronecker(A, B):
        return torch.einsum('nab,ncd->nacbd', A, B).view(A.size(0), 
                                                         A.size(1)*B.size(1), 
                                                         A.size(2)*B.size(2))
    
    
    # Function for fit
    def fit(self, X, y):
        """Perform HQC classification with the inverse of the standard stereographic 
        projection encoding, with the option to rescale the dataset prior to encoding.
                
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            The training input samples. An array of int or float.
        y : array-like, shape (n_samples,)
            The training input binary target values. An array of str, int or float.
            
        Returns
        -------
        self : object
            Returns self.
        """
        # Check that arrays X and y have correct shape
        X, y = check_X_y(X, y)
        
        # Ensure target array y is of non-regression type  
        # Added as required by sklearn check_estimator
        check_classification_targets(y)
            
        # Store binary classes and encode y into binary class indexes 0 and 1
        self.classes_, y_class_index = np.unique(y, return_inverse = True)
        
        # Cast array X into a floating point tensor to ensure all following calculations below  
        # are done in float rather than integer, and send tensor X from CPU to GPU
        X = torch.DoubleTensor(X).cuda()
        
        # Rescale X
        X = self.rescale*X
        
        # Calculate sum of squares of each row (sample) in X
        X_sq_sum = (X**2).sum(dim = 1)
        
        # Number of rows in X
        m = X.shape[0]
        
        # Number of columns in X
        n = X.shape[1]
        
        # Calculate X' using amplitude or inverse of the standard stereographic projection 
        # encoding method
        if self.encoding == 'amplit':
            X_prime = normalize(torch.cat([X, torch.ones(m, dtype=torch.float64) \
                                           .reshape(-1, 1).cuda()], dim = 1), p = 2, dim = 1)
        elif self.encoding == 'stereo':
            X_prime = (1 / (X_sq_sum + 1)).reshape(-1, 1)*(torch.cat((2*X, (X_sq_sum - 1) \
                                                                      .reshape(-1, 1)), dim = 1))
        else:
            raise ValueError('encoding should be "amplit" or "stereo"')
        
        # Number of columns in X', set as global variable
        global n_prime
        n_prime = n + 1
        
        # Function to calculate terms in the Quantum Centroids and quantum Helstrom 
        # observable for each class, per subset split
        def centroids_terms_func(i):
            # Cast array y_class_index into a tensor and send from CPU to GPU
            # Determine rows (samples) in X' belonging to either class
            X_prime_class = X_prime[torch.LongTensor(y_class_index).cuda() == i]
                                    
            # Split X' belonging to either class into n_splits subsets, row-wise
            # Send tensors from GPU to CPU and cast tensors into arrays, use np.array_split()
            # because the equivalent torch.chunk() doesn't behave similarly to np.array_split()
            X_prime_class_split_arr = np.array_split(X_prime_class.cpu().numpy(),
                                                     indices_or_sections = self.n_splits,
                                                     axis = 0)
            
            # Cast arrays back to tensors and send back from CPU to GPU
            X_prime_class_split = [torch.DoubleTensor(a).cuda() for a in X_prime_class_split_arr]
            
            # Function to calculate sum of quantum densities belonging to each class, 
            # per subset split
            def X_prime_class_split_func(j):
                # Counter for j-th split of X'
                X_prime_class_split_jth = X_prime_class_split[j]
                
                # Number of rows (samples) in j-th split of X'
                m_class_split = X_prime_class_split_jth.shape[0]
                
                # Encode vectors into quantum densities
                density_chunk = torch.matmul(X_prime_class_split_jth.view(m_class_split, 
                                                                          n_prime, 1),
                                             X_prime_class_split_jth.view(m_class_split, 
                                                                          1, n_prime))
                
                # Calculate n-fold Kronecker tensor product
                if self.n_copies == 1:
                    density_chunk = density_chunk
                else:
                    density_chunk_copy = density_chunk
                    for b in range(self.n_copies - 1):
                        density_chunk = kronecker(density_chunk, density_chunk_copy)
                    
                # Calculate sum of quantum densities
                density_chunk_sum = density_chunk.sum(dim = 0)
                return density_chunk_sum

            # Number of rows/columns in density matrix, set as global variable
            global density_nrow_ncol
            density_nrow_ncol = n_prime**self.n_copies
            
            # Initialize array density_class_sum
            density_class_sum = torch.zeros([density_nrow_ncol, density_nrow_ncol], 
                                            dtype = torch.float64).cuda()
            for c in range(self.n_splits):
                # Calculate sum of quantum densities belonging to either class
                density_class_sum = density_class_sum + X_prime_class_split_func(c)
            
            # Number of rows (samples) in X' belonging to either class
            m_class = X_prime_class.shape[0]
            
            # Function to calculate centroid belonging to either class
            def centroid():
                # Calculate Quantum Centroid belonging to either class
                # Added ZeroDivisionError as required by sklearn check_estimator
                try:
                    centroid = (1 / m_class)*density_class_sum
                except ZeroDivisionError:
                    centroid = 0 
                return centroid
            
            # Calculate centroid belonging to either class
            centroid_class = centroid()
            
            # Calculate terms in the quantum Helstrom observable belonging to either class
            if self.class_wgt == 'equi':
                hels_obs_terms = 0.5*centroid_class
            elif self.class_wgt == 'weighted':
                hels_obs_terms = (m_class / m)*centroid_class
            else:
                raise ValueError('class_wgt should be "equi" or "weighted"')
            return m_class, centroid_class, hels_obs_terms
        
        # Calculate Quantum Centroids and terms in the quantum Helstrom observable belonging 
        # to either class
        centroids_terms = [centroids_terms_func(0), centroids_terms_func(1)] 
                    
        # Determine Quantum Centroids
        self.centroids_ = torch.stack([centroids_terms[0][1], centroids_terms[1][1]], dim = 0)
                
        # Calculate quantum Helstrom observable
        self.hels_obs_ = centroids_terms[0][2] - centroids_terms[1][2] 
                
        # Calculate eigenvalues w and eigenvectors v of the quantum Helstrom observable
        w, v = torch.symeig(self.hels_obs_, eigenvectors = True)
          
        # Length of w
        len_w = len(w)
        
        # Initialize array eigval_class
        eigval_class = torch.empty_like(w, dtype = torch.float64).cuda()
        for d in range(len_w):
            # Create an array of 0s and 1s to indicate positive and negative eigenvalues
            # respectively
            if w[d] > 0:
                eigval_class[d] = 0
            else:
                eigval_class[d] = 1
        
        # Transpose matrix v containing eigenvectors to row-wise
        eigvec = v.T
        
        # Function to calculate sum of the projectors corresponding to positive and negative
        # eigenvalues respectively
        def sum_proj_func(e):
            # Split eigenvectors belonging to positive or negative eigenvalues into n_splits subsets
            # Send tensors from GPU to CPU and cast tensors into arrays, use np.array_split()
            # because the equivalent torch.chunk() doesn't behave similarly to np.array_split()
            eigvec_class_split_arr_full = np.array_split(eigvec.cpu().numpy()[eigval_class.cpu() == e],
                                                         indices_or_sections = self.n_splits,
                                                         axis = 0)
            
            # Remove empty rows in eigvec_class_split_arr_full
            eigvec_class_split_arr = [f for f in eigvec_class_split_arr_full if f.shape[0] > 0]

            # Cast arrays back to tensors and send back from CPU to GPU
            eigvec_class_split = [torch.DoubleTensor(g).cuda() for g in eigvec_class_split_arr]             
            
            # Function to calculate sum of the projectors corresponding to positive and negative
            # eigenvalues respectively, per subset split
            def eigvec_class_split_func(h):
                # Counter for h-th split of eigvec
                eigvec_class_split_hth = eigvec_class_split[h]
                
                # Number of rows (samples) in h-th split of eigvec
                m_eigvec_class_split = eigvec_class_split_hth.shape[0]
                
                # Calculate projectors corresponding to positive and negative eigenvalues  
                # respectively, per subset split
                proj_split = torch.matmul(eigvec_class_split_hth.view(m_eigvec_class_split, 
                                                                      density_nrow_ncol, 1),
                                          eigvec_class_split_hth.view(m_eigvec_class_split, 
                                                                      1, density_nrow_ncol))
                
                # Calculate sum of projectors
                proj_split_sum = proj_split.sum(dim = 0)
                return proj_split_sum
            
            # Determine length of eigvec_class_split_arr
            eigvec_class_split_arr_len = len(eigvec_class_split_arr)

            # Initialize array proj_class_sum
            proj_class_sum = torch.zeros([density_nrow_ncol, density_nrow_ncol], 
                                         dtype = torch.float64).cuda()  
            for k in range(eigvec_class_split_arr_len):
                # Calculate sum of the projectors corresponding to positive and negative eigenvalues
                # respectively
                proj_class_sum = proj_class_sum + eigvec_class_split_func(k)
            return proj_class_sum
        
        # Calculate sum of the projectors corresponding to positive and negative eigenvalues 
        # respectively
        self.proj_sums_ = torch.stack([sum_proj_func(0), sum_proj_func(1)], dim = 0)        
                       
        # Calculate Helstrom bound
        self.hels_bound_ = (centroids_terms[0][0] / m)*torch.einsum('ij,ji->', self.centroids_[0], 
                                                                   self.proj_sums_[0]).item() \
                           + (centroids_terms[1][0] / m)*torch.einsum('ij,ji->', self.centroids_[1], 
                                                                     self.proj_sums_[1]).item()
        return self
        
    
    # Function for predict_proba
    def predict_proba(self, X):
        """Performs HQC classification on X and returns the trace of the dot product of the densities 
        and the sum of the projectors with corresponding positive and negative eigenvalues respectively.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            The input samples. An array of int or float.       
            
        Returns
        -------
        trace_matrix : tensor, size (n_samples, 2)
            Column index 0 corresponds to the trace of the dot product of the densities and the sum  
            of projectors with positive eigenvalues. Column index 1 corresponds to the trace of the  
            dot product of the densities and the sum of projectors with negative eigenvalues. A tensor 
            of float. Stored in GPU.
        """
        # Send tensor self.proj_sums_ from GPU to CPU and cast into an array
        self.proj_sums_arr_ = self.proj_sums_.cpu().numpy()
                
        # Check if fit had been called
        check_is_fitted(self, ['proj_sums_arr_'])
               
        # Input validation of array X
        X = check_array(X)
                 
        # Cast array X into a floating point tensor to ensure all following calculations below  
        # are done in float rather than integer, and send tensor X from CPU to GPU
        X = torch.DoubleTensor(X).cuda()
        
        # Rescale X
        X = self.rescale*X        
        
        # Calculate sum of squares of each row (sample) in X
        X_sq_sum = (X**2).sum(dim = 1)
        
        # Number of rows in X
        m = X.shape[0]
        
        # Number of columns in X
        n = X.shape[1]

        # Calculate X' using amplitude or inverse of the standard stereographic projection 
        # encoding method
        if self.encoding == 'amplit':
            X_prime = normalize(torch.cat([X, torch.ones(m, dtype=torch.float64) \
                                           .reshape(-1, 1).cuda()], dim = 1), p = 2, dim = 1)
        elif self.encoding == 'stereo':
            X_prime = (1 / (X_sq_sum + 1)).reshape(-1, 1)*(torch.cat((2*X, (X_sq_sum - 1) \
                                                                      .reshape(-1, 1)), dim = 1))
        else:
            raise ValueError('encoding should be "amplit" or "stereo"')
                       
        # Function to calculate trace values for each class
        def trace_func(i):
            # Split X' into n_splits subsets, row-wise
            # Send tensors from GPU to CPU and cast tensors into arrays, use np.array_split()
            # because the equivalent torch.chunk() doesn't behave similarly to np.array_split()
            X_prime_split_arr_full = np.array_split(X_prime.cpu().numpy(),
                                                    indices_or_sections = self.n_splits,
                                                    axis = 0)
            
            # Remove empty rows in X_prime_split_arr_full
            X_prime_split_arr = [a for a in X_prime_split_arr_full if a.shape[0] > 0]

            # Cast arrays back to tensors and send back from CPU to GPU
            X_prime_split = [torch.DoubleTensor(q).cuda() for q in X_prime_split_arr]
            
            # Function to calculate trace values for each class, per subset split
            def trace_split_func(j):
                # Counter for j-th split X'
                X_prime_split_jth = X_prime_split[j]
                
                # Number of rows (samples) in j-th split X'
                X_prime_split_m = X_prime_split_jth.shape[0]
                
                # Encode vectors into quantum densities
                density_chunk = torch.matmul(X_prime_split_jth.view(X_prime_split_m, n_prime, 1),
                                             X_prime_split_jth.view(X_prime_split_m, 1, n_prime))
                
                # Calculate n-fold Kronecker tensor product
                if self.n_copies == 1:
                    density_chunk = density_chunk
                else:
                    density_chunk_copy = density_chunk
                    for b in range(self.n_copies - 1):
                        density_chunk = kronecker(density_chunk, density_chunk_copy)
                        
                # Calculate trace of the dot product of density of each row and sum of projectors
                # with corresponding positive and negative eigenvalues respectively
                return torch.einsum('bij,ji->b', density_chunk, self.proj_sums_[i])
            
            # Determine length of X_prime_split_arr
            X_prime_split_arr_len = len(X_prime_split_arr)

            # Initialize array trace_class
            trace_class = torch.empty([0], dtype = torch.float64).cuda()
            for c in range(X_prime_split_arr_len):
                # Calculate trace values for each class, per subset split
                trace_class = torch.cat([trace_class, trace_split_func(c)], dim = 0)
            return trace_class
        
        # Calculate trace values for each class
        trace_matrix = torch.stack([trace_func(0), trace_func(1)], dim = 1)
        return trace_matrix
                
    
    # Function for predict
    def predict(self, X):
        """Performs HQC classification on X and returns the binary classes.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            The input samples. An array of int or float.
            
        Returns
        -------
        self.classes_[predict_trace_index] : array-like, shape (n_samples,)
            The predicted binary classes. An array of str, int or float.
        """
        # Determine column index with the higher trace value in trace_matrix
        # If both columns have the same trace value, returns column index 1, which is different 
        # to np.argmax() which returns column index 0
        predict_trace_index = torch.argmax(self.predict_proba(X), axis = 1)
        # Returns the predicted binary classes
        return self.classes_[predict_trace_index.cpu().numpy()]

In [9]:
# appendicitis dataset (7 features, 106 rows)
import pandas as pd

df = pd.read_csv('appendicitis.tsv',delimiter='\t')
X = df.drop('target', axis=1).values
y = df['target'].values

from sklearn import model_selection
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=0.2, random_state=4)

In [6]:
# No. of rows in training and test sets
X_train.shape[0], X_test.shape[0]

(84, 22)

In [10]:
# Check F1 score and Helstrom bound values for various rescale and n_copies values
model = HQC(rescale=0.5, n_copies=3, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)
y_hat = model.predict(X_test)

from sklearn import metrics
metrics.f1_score(y_test, y_hat, average='weighted'), model.hels_bound_

(0.7520661157024794, 0.8772542482734038)

In [22]:
# Time required for n_copies=1
%timeit HQC(rescale=0.5, n_copies=1, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

100 loops, best of 3: 5.52 ms per loop


In [23]:
# Time required for n_copies=2
%timeit HQC(rescale=0.5, n_copies=2, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

100 loops, best of 3: 12.9 ms per loop


In [24]:
# Time required for n_copies=3
%timeit HQC(rescale=0.5, n_copies=3, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

10 loops, best of 3: 118 ms per loop


In [3]:
# Time required for n_copies=4
%timeit HQC(rescale=0.5, n_copies=4, encoding='stereo', class_wgt='weighted', n_splits=60).fit(X_train, y_train)

1 loop, best of 3: 13.6 s per loop


In [4]:
# Time required for n_copies=5
# No. of rows in training set = 84
%timeit HQC(rescale=0.5, n_copies=5, encoding='stereo', class_wgt='weighted', n_splits=84).fit(X_train, y_train)

RuntimeError: ignored

In [11]:
# banana dataset (2 features, 5300 rows)
import pandas as pd

df = pd.read_csv('banana.tsv', sep='\t')
X = df.drop('target', axis=1).values
y = df['target'].values

from sklearn import model_selection
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=0.2, random_state=4)

In [9]:
# No. of rows in training and test sets
X_train.shape[0], X_test.shape[0]

(4240, 1060)

In [12]:
# Check F1 score and Helstrom bound values for various rescale and n_copies values
model = HQC(rescale=0.5, n_copies=4, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)
y_hat = model.predict(X_test)

from sklearn import metrics
metrics.f1_score(y_test, y_hat, average='weighted'), model.hels_bound_

(0.858978398722441, 0.7732939055876822)

In [7]:
# Time required for n_copies=1
%timeit HQC(rescale=0.5, n_copies=1, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

100 loops, best of 3: 5.41 ms per loop


In [8]:
# Time required for n_copies=2
%timeit HQC(rescale=0.5, n_copies=2, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

100 loops, best of 3: 6.25 ms per loop


In [9]:
# Time required for n_copies=3
%timeit HQC(rescale=0.5, n_copies=3, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

100 loops, best of 3: 9.03 ms per loop


In [10]:
# Time required for n_copies=4
%timeit HQC(rescale=0.5, n_copies=4, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

10 loops, best of 3: 23.6 ms per loop


In [11]:
# Time required for n_copies=5
%timeit HQC(rescale=0.5, n_copies=5, encoding='stereo', class_wgt='weighted', n_splits=1).fit(X_train, y_train)

10 loops, best of 3: 94.2 ms per loop


In [4]:
# Time required for n_copies=6
%timeit HQC(rescale=0.5, n_copies=6, encoding='stereo', class_wgt='weighted', n_splits=2).fit(X_train, y_train)

1 loop, best of 3: 662 ms per loop


In [5]:
# Time required for n_copies=7
%timeit HQC(rescale=0.5, n_copies=7, encoding='stereo', class_wgt='weighted', n_splits=18).fit(X_train, y_train)

1 loop, best of 3: 7.19 s per loop


In [6]:
# Time required for n_copies=8
%timeit HQC(rescale=0.5, n_copies=8, encoding='stereo', class_wgt='weighted', n_splits=3000).fit(X_train, y_train)

1 loop, best of 3: 3min 15s per loop


In [4]:
# Time required for n_copies=9
# No. of rows in training set = 4240
%timeit HQC(rescale=0.5, n_copies=9, encoding='stereo', class_wgt='weighted', n_splits=4240).fit(X_train, y_train)

RuntimeError: ignored

In [5]:
# Using scikit-learn's GridSearchCV
from sklearn.model_selection import GridSearchCV

param_grid = {'rescale':[0.5, 1, 1.5], 'encoding':['amplit', 'stereo'], 'n_copies':[1, 2], 'class_wgt':['equi', 'weighted']}
models = GridSearchCV(HQC(n_splits=1), param_grid).fit(X_train, y_train)

In [6]:
# Best F1 score
best_model = models.best_estimator_
y_hat = best_model.predict(X_test)

from sklearn import metrics
metrics.f1_score(y_test, y_hat, average='weighted'), best_model.hels_bound_

(0.7867484392554756, 0.6852296915320486)

In [7]:
# Best hyperparameter combination
models.best_params_

{'class_wgt': 'equi', 'encoding': 'amplit', 'n_copies': 2, 'rescale': 1}