In [5]:
import os

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import confusion_matrix
from sklearn.utils import resample
from sklearn.svm import SVC

import umap

from qiskit_algorithms.utils import algorithm_globals
from qiskit.circuit.library import (
    ZZFeatureMap,
    ZFeatureMap,
    PauliFeatureMap,
    RealAmplitudes,
)
from qiskit_algorithms.optimizers import COBYLA, SPSA
from qiskit_machine_learning.algorithms.classifiers import VQC

In [6]:
data_paths = [
    "data/bank-additional-full_normalised.csv",
    "data/census-income-full-mixed-binarized.tar.xz",
    "data/UNSW_NB15_traintest_backdoor.csv",
    "data/KDD2014_donors_10feat_nomissing_normalised.csv",
    "data/celeba_baldvsnonbald_normalised.csv",
    "data/creditcardfraud_normalised.tar.xz",
    "data/annthyroid_21feat_normalised.csv",
]

data_dict = {}
names = []

for _path in data_paths:
        ext = os.path.splitext(_path)[1][1:]

        name = os.path.basename(_path)
        names.append(name)
        
        if ext == "csv":
            print("==="*3, name, "==="*3)

            print(name.split(".")[0
                              ])

            df = pd.read_csv(_path)
            print(df.shape)
            print(df["class"].value_counts())

            data_dict[name] = df

bank-additional-full_normalised
(41188, 63)
class
0    36548
1     4640
Name: count, dtype: int64
UNSW_NB15_traintest_backdoor
(95329, 197)
class
0    93000
1     2329
Name: count, dtype: int64
KDD2014_donors_10feat_nomissing_normalised
(619326, 11)
class
0    582616
1     36710
Name: count, dtype: int64
celeba_baldvsnonbald_normalised
(202599, 40)
class
0    198052
1      4547
Name: count, dtype: int64
annthyroid_21feat_normalised
(7200, 22)
class
0    6666
1     534
Name: count, dtype: int64


In [7]:
CLASS_RAITO = 3
N_SAMPLES = 2_000
N_COMPONENTS = 3

In [8]:
def balance_classes(df):
    # Assuming df is your dataframe and 'class' is your column of interest
    # Get counts of each class
    class_counts = df['class'].value_counts()

    # Determine the class labels
    minority_class = class_counts.idxmin()
    majority_class = class_counts.idxmax()

    # Calculate the number of instances to keep for the minority class
    minority_class_count = class_counts[minority_class]
    majority_class_count = minority_class_count * CLASS_RAITO  # To achieve 1:3 ratio

    # Downsample the majority class or upsample the minority class
    df_majority_downsampled = resample(
        df[df['class'] == majority_class], 
        replace=False,               # Sample without replacement
        n_samples=majority_class_count,  # To match the desired ratio
        random_state=42               # For reproducibility
    )

    # Combine the minority class with the downsampled majority class
    df_balanced = pd.concat([df[df['class'] == minority_class], df_majority_downsampled])

    # Shuffle the resulting dataframe
    df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

    # Now, df_balanced has the desired 1:3 class ratio

    return df_balanced

def sample_df(df):
    n_ratio = N_SAMPLES / df.shape[0]

    sampled_df, _ = train_test_split(df, train_size=n_ratio+0.000002, stratify=df['class'], random_state=42)
    return sampled_df


def reduce_dimension(df):
    features = df.drop(columns=['class'])  

    # Initialize UMAP with desired parameters
    reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, n_components=N_COMPONENTS, random_state=42)

    # Fit and transform the data
    embedding = reducer.fit_transform(features)

    X = df.drop(['class'], axis=1)
    X_pca = MinMaxScaler().fit_transform(embedding)
    y = df['class']

    return X, X_pca, y

In [12]:
def classical_ml(X_train, y_train, X_test, y_test, y, filename):
    # rbf, linear, poly, sigmoidの4つの古典的手法による結果を確認。
    classical_kernels = ["rbf", "linear", "poly", "sigmoid"]

    # Assume your minority class is the class with fewer samples in the dataset
    minority_class = y.value_counts().idxmin()

    # txt_file = open(filename, "w")

    # Start the evaluation for each kernel
    for kernel in classical_kernels:
        classical_svc = SVC(kernel=kernel)
        classical_svc.fit(X_train, y_train)

        y_pred = classical_svc.predict(X_test)
        classical_score = classical_svc.score(X_test, y_test)

        # Calculate the confusion matrix
        cm = confusion_matrix(y_test, y_pred, labels=[minority_class])

        # The number of correctly predicted samples for the minority class
        correct_minority_predictions = cm[0, 0]  # True Positives for the minority class

        # Calculate the total number of minority class samples in the test set
        total_minority_class_samples = (y_test == minority_class).sum()

        # Calculate the percentage of correctly predicted minority class samples
        minority_class_percentage = (
            correct_minority_predictions / total_minority_class_samples
        ) * 100

        print(f"{kernel} kernel classification test score:  {classical_score:.3f}")
        print(
            f"Correctly predicted samples for the minority class ({minority_class}): {correct_minority_predictions}"
        )
        print(
            f"Percentage of correctly predicted minority class samples: {minority_class_percentage:.3f}%"
        )


def quantum_ml(
    feature_map, entanglement, X_train, y_train, X_test, y_test, y, file, eval_file
):
    # 使用する特徴量の数
    objective_func_vals = []
    num_features = X_train.shape[1]

    # ZZfeatureMapを作成
    # feature_dimention: 特徴量数、reps: 繰り返し数
    feature_map.decompose().draw(output="mpl", fold=20)

    ansatz = RealAmplitudes(num_qubits=num_features, reps=3, entanglement=entanglement)

    # 全ての量子ビットに対して測定を追加
    ansatz.measure_all()

    # 作成された回路を描写
    ansatz.decompose().draw(output="mpl", fold=20)

    # 量子回路を作成
    circuit = feature_map.compose(ansatz)

    # 最終的な回路の描写
    circuit.decompose().draw(output="mpl")

    # 学習経過を描写するコールバックを定義
    def _callback_graph(weights, obj_func_eval):
        objective_func_vals.append(obj_func_eval)

        with open(file, "a") as f:
            f.write(f"{obj_func_eval}\n")

    # construct variational quantum classifier
    vqc = VQC(
        feature_map=feature_map,  # 特徴量マップの指定
        ansatz=ansatz,  # Ansatzの指定
        loss="cross_entropy",  # 訓練時に使用する損失関数
        # optimizer=COBYLA(maxiter=30), # 訓練時に使用する最適化アルゴリズム
        optimizer=SPSA(maxiter=200),
        callback=_callback_graph,  # 訓練中の中間データへのアクセス方式。ここでは先ほど設定したコールバック関数を適用。
        # sampler=sampler
    )

    vqc.fit(X_train, np.array(y_train).reshape(-1, 1))

    score = vqc.score(X_test, np.array(y_test).reshape(-1, 1))

    y_pred = vqc.predict(X_test)

    minority_class = y.value_counts().idxmin()
    cm = confusion_matrix(y_test, y_pred, labels=[minority_class])

    # The number of correctly predicted samples for the minority class
    correct_minority_predictions = cm[0, 0]  # True Positives for the minority class

    # Calculate the total number of minority class samples in the test set
    total_minority_class_samples = (y_test == minority_class).sum()

    # Calculate the percentage of correctly predicted minority class samples
    minority_class_percentage = (
        correct_minority_predictions / total_minority_class_samples
    ) * 100

    with open(eval_file, "w") as f:
        f.write(f"{score=}\n")
        f.write(f"{minority_class_percentage}")

    return score, minority_class_percentage


def get_feature_map(name, num_features, reps):
    if name == "Z":
        return ZFeatureMap(feature_dimension=num_features, reps=reps)
    elif name == "ZZ":
        return ZZFeatureMap(feature_dimension=num_features, reps=reps)
    elif name == "Pauli":
        return PauliFeatureMap(feature_dimension=num_features, reps=reps)

In [13]:
BASE_DIR = "./results/"

if not os.path.exists(BASE_DIR):
    os.mkdir(BASE_DIR)

In [None]:
df = data_dict["KDD2014_donors_10feat_nomissing_normalised.csv"]

df = balance_classes(df)
df = sample_df(df)

X, X_pca, y = reduce_dimension(df) 

X_train, X_test, y_train, y_test = train_test_split(X_pca, y, train_size=0.8, random_state=42)

print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

classical_ml(
    X_train,
    y_train,
    X_test,
    y_test,
    y,
    f"{BASE_DIR}/classical_eval.txt"
)

In [None]:
num_features = X_train.shape[1]

for _feat in ["Z", "ZZ", "Pauli"]:

    if not os.path.exists(f"{BASE_DIR}/{_feat}"):
        os.mkdir(f"{BASE_DIR}/{_feat}")

    for _reps in [1, 2, 3, 4, 5]:
        feature_map = get_feature_map(_feat, num_features, _reps)

        for entanglement in ["full", "linear", "circular", "reverse_linear"]: 
            quantum_ml(
                feature_map,
                entanglement,
                X_train,
                y_train,
                X_test,
                y_test,
                y,
                f"{BASE_DIR}/{_feat}/obj_func_eval-{_reps}-{entanglement}.txt",
                f"{BASE_DIR}/{_feat}/quantum_eval-{_reps}-{entanglement}.txt"
            )