In [1]:
from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 데이터 로드
data = load_breast_cancer()
X = data.data
y = data.target

# 데이터를 훈련 세트와 테스트 세트로 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 로지스틱 회귀 모델 생성 및 훈련
log_reg = LogisticRegression(max_iter=10000)
log_reg.fit(X_train, y_train)

# 로지스틱 회귀 모델 평가
y_pred_log_reg = log_reg.predict(X_test)
accuracy_log_reg = accuracy_score(y_test, y_pred_log_reg)

# 릿지 회귀 모델 생성 및 훈련
ridge_reg = Ridge()
ridge_reg.fit(X_train, y_train)

# 릿지 회귀 모델 평가
y_pred_ridge_reg = ridge_reg.predict(X_test)
# 릿지 회귀는 연속적인 출력을 제공하므로 이진 분류 문제에 대해서는 임계값 설정 필요
threshold = 0.5
y_pred_ridge_reg = (y_pred_ridge_reg > threshold).astype(int)
accuracy_ridge_reg = accuracy_score(y_test, y_pred_ridge_reg)

accuracy_log_reg, accuracy_ridge_reg



(0.9766081871345029, 0.9649122807017544)

In [26]:
import ast
import sys
import subprocess
import base64
import json

# astor 모듈 설치 확인 및 설치
try:
    import astor
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "astor"])
    import astor

class ConcreteMLTransformer(ast.NodeTransformer):
    def __init__(self):
        self.supported_classes = {
            "LinearRegression", "LogisticRegression", "LinearSVC", "LinearSVR",
            "PoissonRegressor", "TweedieRegressor", "GammaRegressor", "Lasso",
            "Ridge", "ElasticNet", "SGDRegressor"
        }
        self.model_variable_name = None
        self.fit_variable_name = None
        self.used_models = set()
        self.model_counts = {model: 0 for model in self.supported_classes}
        self.multi_fit = None

    def visit_Import(self, node):
        for alias in node.names:
            if 'sklearn.linear_model' in alias.name:
                alias.name = alias.name.replace('sklearn.linear_model', 'concrete.ml.sklearn')
        return node

    def visit_ImportFrom(self, node):
        if node.module == 'sklearn.linear_model':
            node.module = 'concrete.ml.sklearn'
        return node

    def visit_Assign(self, node):
        if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name):
            model_name = node.value.func.id
            if model_name in self.supported_classes:
                self.model_counts[model_name] += 1
                if self.model_counts[model_name] > 1:
                    self.multi_fit = f"Multiple instances of {model_name} detected."
        return node

    def visit_Call(self, node):
        if hasattr(node.func, 'attr') and node.func.attr == 'fit':
            if hasattr(node.func.value, 'id') and node.func.value.id in self.supported_classes:
                self.used_models.add(node.func.value.id)
                if len(self.used_models) > 1:
                    self.multi_fit = "Multiple models with 'fit' method detected: " + ", ".join(self.used_models)
        return self.generic_visit(node)


def convert_to_concrete_ml(user_code):

    try: 
        tree = ast.parse(user_code)
        transformer = ConcreteMLTransformer()
        transformed_tree = transformer.visit(tree)

        if transformer.multi_fit == None :
            return None

        save_code_str = """

    import os
    import shutil
    import numpy as np
    from concrete.ml.deployment import FHEModelClient, FHEModelDev, FHEModelServer
    current_path = os.getcwd()
    temp_path = os.path.join(current_path, 'blindml_output')
    if os.path.exists(temp_path) and os.path.isdir(temp_path):
        shutil.rmtree(temp_path)
    if not os.path.exists(temp_path):
        os.makedirs(temp_path)

    X_train_np_blindml = np.array({}""".format(transformer.fit_variable_name) + """)
    logistic_model.compile(X_train_np_blindml)  

    directory_name = temp_path
    fhemodel_dev = FHEModelDev(directory_name, {}""".format(transformer.model_variable_name) + """)
    fhemodel_dev.save()

    # Let's create the client and load the model
    key_path = os.path.join(directory_name, 'test')
    if os.path.exists(key_path) and os.path.isdir(key_path):
        shutil.rmtree(temp_path)
    if not os.path.exists(key_path):
        os.makedirs(key_path)
    fhemodel_client = FHEModelClient(directory_name, key_dir=key_path)

    # The client first need to create the private and evaluation keys.
    fhemodel_client.generate_private_and_evaluation_keys()

    # Get the serialized evaluation keys
    serialized_evaluation_keys = fhemodel_client.get_serialized_evaluation_keys()
    with open(key_path + "/serialized_evaluation_keys.ekl", "wb") as f:
                f.write(serialized_evaluation_keys)

    """
        save_code = ast.parse(save_code_str).body
        transformed_tree.body.extend(save_code)

        return astor.to_source(transformed_tree)

    except Exception as e:
        print(f"Error during transformation: {str(e)}")
        raise Exception(str(e))

def transform_code(encoded_user_code):
    try:
        converted_code = convert_to_concrete_ml(encoded_user_code)
        if converted_code is not None:
            return {"success": True, "code": converted_code}
        else:
            return {"success": False, "msg": "Multiple models with 'fit' method detected. Please choose one"}
    except Exception as e:
        return {"success": False, "error": str(e)}



In [27]:
transform_code("""from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# 데이터 로드
data = load_breast_cancer()
X = data.data
y = data.target

# 데이터를 훈련 세트와 테스트 세트로 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# 로지스틱 회귀 모델 생성 및 훈련
log_reg = LogisticRegression(max_iter=10000)
log_reg.fit(X_train, y_train)

# 로지스틱 회귀 모델 평가
y_pred_log_reg = log_reg.predict(X_test)
accuracy_log_reg = accuracy_score(y_test, y_pred_log_reg)

# 릿지 회귀 모델 생성 및 훈련
ridge_reg = Ridge()
ridge_reg.fit(X_train, y_train)

# 릿지 회귀 모델 평가
y_pred_ridge_reg = ridge_reg.predict(X_test)
# 릿지 회귀는 연속적인 출력을 제공하므로 이진 분류 문제에 대해서는 임계값 설정 필요
threshold = 0.5
y_pred_ridge_reg = (y_pred_ridge_reg > threshold).astype(int)
accuracy_ridge_reg = accuracy_score(y_test, y_pred_ridge_reg)

accuracy_log_reg, accuracy_ridge_reg
""")

{'success': False, 'msg': 'Rr'}