In [1]:
#random forest 암호화 구현
import piheaan as heaan
import math
import numpy as np
import pandas as pd
import os

params = heaan.ParameterPreset.FGb
context = heaan.make_context(params)
heaan.make_bootstrappable(context)

key_file_path = "./keys"
sk = heaan.SecretKey(context)
os.makedirs(key_file_path, mode=0o775, exist_ok=True)
sk.save(key_file_path+"/secretkey.bin")
key_generator = heaan.KeyGenerator(context, sk)
key_generator.gen_common_keys()
key_generator.save(key_file_path+"/")

sk = heaan.SecretKey(context,key_file_path+"/secretkey.bin")
pk = heaan.KeyPack(context, key_file_path+"/")
pk.load_enc_key()
pk.load_mult_key()

eval = heaan.HomEvaluator(context,pk)
dec = heaan.Decryptor(context)
enc = heaan.Encryptor(context)

# pi-heaan의 parameter 설정 및 키 생성, evaluator 지정.

log_slots = 15
num_slots = 2**log_slots # 32,768

## Encrypted_DecisionTree

In [2]:
class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2, thresholds=None):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.thresholds = thresholds
        self.tree = None
    
    def fit(self, X, y):
        self.n_classes_ = 2  # 발급 가능 여부에 대한 2가지 결과에 따라서, 2진 분류를 가정하였음.
        self.n_features_ = len(X)

        # 암호화된 데이터를 그대로 사용 - tree method 사용 전에 input 및 암호화를 진행하게 됨.
        X_enc = X
        y_enc = y

        self.tree = self._grow_tree(X_enc, y_enc)
    
    def predict(self, X):
        # 데이터 암호화된 상태로 전달받음 - fit의 경우와 마찬가지.
        X_enc = X

        num_samples = len(X_enc[0])  # 샘플 개수 계산 - 이는 암호문의 배열 길이를 말하는 것으로서, 암호화 전에 진행하는 것으로 변환 필요함.

        predictions = []
        for i in range(num_samples):
            sample = X_enc  # 전체 X_enc를 sample로 할당 - copy에 해당.
            prediction = self._predict(sample, self.tree)
            predictions.append(prediction)

        # 예측 결과 복호화
        decrypted_predictions = []
        for pred in predictions:
            if isinstance(pred, heaan.Ciphertext):  # pred가 Ciphertext인 경우에만 복호화 수행
                message = heaan.Message(log_slots)
                dec.decrypt(pred, sk, message) # decryption이 이루어지고 있으나, 실제로는 이 과정 없이 데이터 prediction이 이루어져야 함.
                decrypted_predictions.append(message[0].real)
            else:
                decrypted_predictions.append(pred)  # pred가 이미 실수인 경우 그대로 추가 - 분기 생성.

        return decrypted_predictions
    
    def _best_split(self, X, y): # tree의 node 분할을 위해 필요한 함수.
        best_gain = -1 # 데이터셋을 통한 tree 분할을 통해 best_gain에 최댓값을 저장하게 된다. 
        best_feature = None
        best_threshold = None

        for feature in range(self.n_features_):
            thresholds = self.thresholds[feature]

            for threshold in thresholds:
                # threshold를 Message 객체로 변환하여 암호화
                threshold_msg = heaan.Message(log_slots)
                threshold_msg[0] = complex(threshold)
                threshold_ct = heaan.Ciphertext(context)
                enc.encrypt(threshold_msg, pk, threshold_ct)

                comparison_ct = heaan.Ciphertext(context)
                eval.sub(X[feature], threshold_ct, comparison_ct)

                # Ciphertext를 Message로 변환하여 복호화
                comparison_msg = heaan.Message(log_slots)
                dec.decrypt(comparison_ct, sk, comparison_msg)

                # Message의 값을 사용하여 비교
                comparison_values = [int(x.real) < 0 for x in comparison_msg]

                gain = self._information_gain(y, comparison_values)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold
    
    def _information_gain(self, y, split): # split 별 information gain을 측정하고자 함.
        # y를 복호화하여 평문으로 변환 
        y_msg = heaan.Message(log_slots) 
        dec.decrypt(y, sk, y_msg) 
        y_values = [int(x.real) for x in y_msg] 

        parent = self._entropy(y_values) 
        left = self._entropy([y_values[i] for i in range(len(y_values)) if split[i]])
        right = self._entropy([y_values[i] for i in range(len(y_values)) if not split[i]])

        n = len(y_values)
        n_left = sum(split)
        n_right = n - n_left

        p_left = n_left / n
        p_right = n_right / n

        child = p_left * left + p_right * right # 각 노드가 저장하고 있는 값을 child tree로 설정하여
        return parent - child # 값 비교 진행
    
    def _entropy(self, y): # 정보의 값을 계산하기 위함. sklearn의 평문코드는 gini를 기준으로 사용하나, 우리는 entropy를 사용하였음.
        _, counts = np.unique(y, return_counts=True)
        p = counts / len(y)
        return -np.sum(p * np.log2(p))
    
    def _grow_tree(self, X, y, depth=0):
        n_samples = len(X[0])
        n_features = len(X)

        if depth >= self.max_depth or n_samples < self.min_samples_split:
            y_counts = heaan.Ciphertext(context)
            y_counts_pt = heaan.Plaintext(context)
            y_counts_msg = heaan.Message(log_slots)
            y_counts_msg[0] = n_samples
            y_counts_ct = heaan.Ciphertext(context)  # 새로운 Ciphertext 객체 생성
            enc.encrypt(y_counts_msg, pk, y_counts_ct)
            eval.add(y, y_counts_ct, y_counts)
            return y_counts

        best_feature, best_threshold = self._best_split(X, y)

        # left_idx를 평문으로 복호화
        left_idx_pt = heaan.Plaintext(context)
        dec.decrypt(X[best_feature], sk, left_idx_pt)

        # Plaintext를 Ciphertext로 암호화한 후 다시 복호화하여 Message 객체 얻기
        left_idx_ct = heaan.Ciphertext(context)
        enc.encrypt(left_idx_pt, pk, left_idx_ct)
        left_idx_msg = heaan.Message(log_slots)
        dec.decrypt(left_idx_ct, sk, left_idx_msg)

        left_idx = [int(left_idx_msg[i].real < best_threshold) for i in range(n_samples)]

        # 데이터 분할을 위한 회전 연산
        X_left = [heaan.Ciphertext(context) for _ in range(n_features)]
        y_left = heaan.Ciphertext(context)

        for i in range(n_features):
            eval.left_rotate(X[i], sum(left_idx), X_left[i])
        eval.left_rotate(y, sum(left_idx), y_left)

        X_right = [heaan.Ciphertext(context) for _ in range(n_features)]
        y_right = heaan.Ciphertext(context)

        for i in range(n_features):
            eval.right_rotate(X[i], sum(left_idx), X_right[i])
        eval.right_rotate(y, sum(left_idx), y_right)
        # 좌측 데이터와 우측 데이터를 비교하여 기준에 맞는 노드로 값을 보내고자 함.

        left_child = self._grow_tree(X_left, y_left, depth + 1)
        right_child = self._grow_tree(X_right, y_right, depth + 1)

        return {
            'feature': best_feature,
            'threshold': best_threshold,
            'left': left_child,
            'right': right_child
        }
    
    # 훈련된 tree에 대한 집단 비교 진행
    def _predict(self, sample, tree):
        if isinstance(tree, heaan.Ciphertext):
            # Ciphertext를 Message로 복호화하여 실수부 추출
            message = heaan.Message(log_slots)
            dec.decrypt(tree, sk, message)
            return message[0].real

        feature_ct = sample[tree['feature']]
        threshold_ct = heaan.Ciphertext(context)
        threshold_msg = heaan.Message(log_slots)
        threshold_msg[0] = tree['threshold']
        enc.encrypt(threshold_msg, pk, threshold_ct)

        comparison_ct = heaan.Ciphertext(context)
        eval.sub(feature_ct, threshold_ct, comparison_ct)

        # Ciphertext를 Message로 변환하여 복호화
        comparison_msg = heaan.Message(log_slots)
        dec.decrypt(comparison_ct, sk, comparison_msg)

        # Message의 값을 사용하여 비교
        # 이 부분도 복호화 이후 실수 연산이 진행되고 있기 때문에, 복호화가 이뤄지지 않은 계산 방식(비교)으로의 구현이 필요함.
        if comparison_msg[0].real < 0:
            return self._predict(sample, tree['left'])
        else:
            return self._predict(sample, tree['right'])

## Encrypted_RandomForest

In [3]:
class RandomForest: # 위의 DecisionTree class를 기반으로 하여 randomforest class를 구축.
    def __init__(self, n_trees=10, max_depth=5, min_samples_split=2):
        self.n_trees = n_trees # 1회 당의 random forest training에 할당될 수 있는 tree의 수를 나타냄.
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.trees = []
        self.thresholds = []  # 각 특성의 고유한 값들을 저장할 리스트
    
    def fit(self, X, y):
        self.trees = []
        n_samples = len(X)
        n_features = X.shape[1]
        
        # 데이터를 암호화하기 전에 각 특성의 고유한 값들을 찾아서 저장
        self.thresholds = []
        for i in range(n_features):
            self.thresholds.append(np.unique(X[:, i]))
        
        # 데이터 암호화
        X_enc = [heaan.Ciphertext(context) for _ in range(n_features)]
        y_enc = heaan.Ciphertext(context) 
        
        for i in range(n_features):
            message = heaan.Message(log_slots) 
            for j in range(num_slots):
                if j < len(X[:, i]):
                    message[j] = X[j, i]
            enc.encrypt(message, pk, X_enc[i]) # feature 개의 암호문에 대해 각각 암호화한 feature값을 삽입함.
        
        message = heaan.Message(log_slots)
        for j in range(num_slots):
            if j < len(y):
                message[j] = y[j]
        enc.encrypt(message, pk, y_enc)
        
        for _ in range(self.n_trees):
            tree = DecisionTree(max_depth=self.max_depth, min_samples_split=self.min_samples_split, thresholds=self.thresholds)
            # 앞서 설정한 decision tree class에 기반하여 tree 객체 생성.
            
            
            # 부트스트랩 샘플링을 위한 인덱스 생성
            # random forest 형성을 위한 무작위 tree 비교 기준을 설정하는 과정에 해당함.
            indexes = np.random.choice(n_samples, size=n_samples, replace=True)
            
            
            # 인덱스를 이용하여 데이터 재배치
            X_sample = [heaan.Ciphertext(context) for _ in range(n_features)]
            y_sample = heaan.Ciphertext(context)
            
            for i in range(n_features):
                rotated_x = heaan.Ciphertext(context)
                eval.left_rotate(X_enc[i], indexes[0], rotated_x)
                
                for j in range(1, n_samples):
                    temp = heaan.Ciphertext(context)
                    eval.left_rotate(X_enc[i], indexes[j], temp)
                    eval.add(rotated_x, temp, rotated_x)
                
                X_sample[i] = rotated_x
            
            eval.left_rotate(y_enc, indexes[0], y_sample)
            
            for j in range(1, n_samples):
                temp = heaan.Ciphertext(context)
                eval.left_rotate(y_enc, indexes[j], temp)
                eval.add(y_sample, temp, y_sample)
            
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)
    
    def predict(self, X):
        X = np.array(X)  # X를 NumPy 배열로 변환

        # 데이터 암호화
        X_enc = [heaan.Ciphertext(context) for _ in range(X.shape[1])]

        for i in range(X.shape[1]):
            message = heaan.Message(log_slots)
            for j in range(num_slots):
                if j < X.shape[0]:
                    message[j] = X[j, i]
            enc.encrypt(message, pk, X_enc[i])

        predictions = []
        for tree in self.trees: 
            tree_predictions = tree.predict(X_enc) # 암호문을 저장하고 있는 X_enc를 바탕으로 random forest에 대한 prediction가 이루어짐.
            predictions.append(tree_predictions)

        # 다수결 투표로 최종 예측 결정
        # 원래의 source code에서는 majority vote에 해당한다. 
        # 이를 통해 각 tree의 class 결정력을 비교하여 데이터를 설정하였음. 
        predictions = np.array(predictions)
        predictions = predictions.T  # 전치하여 (num_samples, num_trees) 형태로 변환
        majority_predictions = []
        for i in range(predictions.shape[0]):
            counts = np.bincount(predictions[i].real.astype(int))
            majority_predictions.append(np.argmax(counts))

        # 예측 결과 슬라이싱하여 입력 샘플 개수에 맞게 조정
        majority_predictions = majority_predictions[:len(X)]

        return majority_predictions

## Model Fit & Predict

In [4]:
X_data = pd.read_csv('X_data_100.csv').values
y_data = pd.read_csv('y_data_100.csv').values.flatten()

# 데이터셋을 학습 데이터와 테스트 데이터로 분할
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.2, random_state=42)

# Random Forest 학습
rf = RandomForest(n_trees=2, max_depth=5)
rf.fit(X_train, y_train)

In [5]:
# 예측
y_pred = rf.predict(X_test)

# 정확도 평가
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy) # test와 train set을 분리하여, 위의 코드로 훈련된 모델이 얼마 정도의 accuracy를 가지는 지를 평가하게 됨.

Accuracy: 0.0


## User_input

In [None]:
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QScrollArea

class InputWindow(QWidget):
    def __init__(self, questions):
        super().__init__()
        self.questions = questions
        self.user_input = []
        self.user_input_value = []
        self.initUI()

    def initUI(self):
        scroll_area = QScrollArea()
        scroll_area.setWidgetResizable(True)
        content_widget = QWidget()
        layout = QVBoxLayout(content_widget)

        for title, prompt in self.questions:
            label = QLabel(f"{title}\n{prompt}")
            layout.addWidget(label)
            line_edit = QLineEdit()
            layout.addWidget(line_edit)
            self.user_input.append(line_edit)

        submit_button = QPushButton("Submit")
        submit_button.clicked.connect(self.onSubmit)
        layout.addWidget(submit_button)

        scroll_area.setWidget(content_widget)
        main_layout = QVBoxLayout(self)
        main_layout.addWidget(scroll_area)
        self.setLayout(main_layout)
        self.setWindowTitle("User Input")
        self.resize(400, 600)  # 창의 크기 조정
        self.show()

    def onSubmit(self):
        user_input = []
        for line_edit in self.user_input:
            value = line_edit.text()
            user_input.append(float(value))
        self.user_input_value = user_input
        self.close()

# 카드 발급 여부를 위한 평가를 위한 18개의 질문이 이루어지고, 결과를 output으로 저장하여 전송함.
if __name__ == "__main__":
    questions = [
        ("What is your Gender?", "[Male : 0, Female : 1]"),
        ("Do you have your own car?", "[No : 0, Yes : 1]"),
        ("Do you have your own property?", "[No : 0, Yes : 1]"),
        ("Do you have your own phone for workplace?", "[No : 0, Yes : 1]"),
        ("Do you have your own phone for dailylife?", "[No : 0, Yes : 1]"),
        ("Do you have your own E-mail?", "[No : 0, Yes : 1]"),
        ("Do you have your own job?", "[No : 0, Yes : 1]"),
        ("How many children do you have?", "[Number of Children : 0, 1, 2...]"),
        ("How many family members do you have?", "[Number of Family member : 0, 1, 2...]"),
        ("When did you start to make transaction in this account?", "[Length of transaction period : 0,1,2...]"),
        ("What is amount of your total income?", "[Amount of total income : 1000,2000,3000...]"),
        ("What is your age?", "[Your own age : 20,25,30...]"),
        ("How long have you been employted?", "[Year that you are employed : 10, 15, 20...]"),
        ("What is your income type?", "['Commercial associate': 0, 'Pensioner': 1, 'State servant': 2, 'Student': 3, 'Working': 4]"),
        ("What is your educational type?", "['Academic degree': 0, 'Higher education': 1, 'Incomplete higher': 2, 'Lower secondary': 3, 'Secondary / secondary special': 4]"),
        ("What is your family status?", "['Civil marriage': 0, 'Married': 1, 'Separated': 2, 'Single / not married': 3, 'Widow': 4]"),
        ("What is your housing type?", "['Co-op apartment': 0, 'House / apartment': 1, 'Municipal apartment': 2, 'Office apartment': 3, 'Rented apartment': 4, 'With parents': 5]"),
        ("What is your occupation type?", "['Accountants': 0, 'Cleaning staff': 1, 'Cooking staff': 2, 'Core staff': 3, 'Drivers': 4, 'HR staff': 5, 'High skill tech staff': 6, 'IT staff': 7, 'Laborers': 8, 'Low-skill Laborers': 9, 'Managers': 10, 'Medicine staff': 11, 'Other': 12, 'Private service staff': 13, 'Realty agents': 14, 'Sales staff': 15, 'Secretaries': 16, 'Security staff': 17, 'Waiters/barmen staff': 18]")
    ]

    app = QApplication(sys.argv)
    window = InputWindow(questions)
    sys.exit(app.exec_())

In [None]:
# Encrypts user-entered data and stores it in a vector in user_data_ctx
msg = heaan.Message(LOG_SLOTS)

for i in range(len(window.user_input_value)):
    msg[i] = window.user_input_value[i]

user_data_ctx = heaan.Ciphertext(context)
enc.encrypt(msg, pk, user_data_ctx)

In [None]:
# 사용자 입력 데이터를 RandomForest의 predict 메서드에 맞는 형태로 조정
user_data_enc = [user_data_ctx] * X_data.shape[1]

# RandomForest 모델을 사용하여 예측 수행
y_pred_enc = rf.predict(user_data_enc)

# 예측 결과 복호화
y_pred = []
for pred in y_pred_enc:
    message = heaan.Message(log_slots)
    dec.decrypt(pred, sk, message)
    y_pred.append(int(message[0].real))

# 사용자에게 예측 결과 제공
print("Predicted class:", y_pred[0]) # input된 값을 기반으로, 사용자의 발급 여부 출력.