In [None]:
!pip install numpy pandas scikit-learn matplotlib pyfhel


In [None]:
import time
import random
import json
from dataclasses import dataclass
from typing import List, Tuple, Dict

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score
import matplotlib.pyplot as plt
from Pyfhel import Pyfhel, PyCtxt, PyPtxt

In [None]:
#@title syntetic generation
#generate synthetic CERT-like user-behavior data

def generate_synthetic_cert_data(num_users=1000, days=90, insider_frac=0.10, seed=42):
    random.seed(seed)
    np.random.seed(seed)
    users = [f"user_{i:03d}" for i in range(num_users)]
    start_date = pd.to_datetime("2023-01-01")
    rows = []
    insiders = set(random.sample(users, max(1, int(num_users * insider_frac))))

    for u in users:
        for d in range(days):
            date = (start_date + pd.Timedelta(days=d)).strftime("%Y-%m-%d")
            #daily logons
            logon = np.random.poisson(3)
            file_access = np.random.poisson(20)
            email_sent = np.random.poisson(5)
            web_visits = np.random.poisson(30)
            external_transfer = np.random.binomial(1, 0.01)

            label = 0
            #positive label - elevated behavior
            if u in insiders and random.random() < 0.2:
                label = 1

            if u in insiders and random.random() < 0.05:
                file_access += np.random.poisson(50)
                external_transfer = 1 if random.random() < 0.3 else external_transfer
                email_sent += np.random.poisson(8)

            rows.append({
                "user": u,
                "date": date,
                "logon": int(logon),
                "file_access": int(file_access),
                "email_sent": int(email_sent),
                "web_visits": int(web_visits),
                "external_transfer": int(external_transfer),
                "label": int(label)
            })
    df = pd.DataFrame(rows)
    return df

#larger sample size
df = generate_synthetic_cert_data(num_users=1000, days=90, insider_frac=0.10)
print("Rows:", len(df))
df.head()

In [None]:
#agregate features

FEATURES = ['logon', 'file_access', 'email_sent', 'web_visits', 'external_transfer']
TARGET = 'label'

X = df[FEATURES].astype(float)
y = df[TARGET].astype(int)

#standard scaling for ckks
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

#train/test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, stratify=y, random_state=42)

print("train shape:", X_train.shape, "test shape:", X_test.shape)


In [None]:
#train baseline logreg and random forest
lr = LogisticRegression(max_iter=1000, random_state=42).fit(X_train, y_train)
rf = RandomForestClassifier(n_estimators=100, random_state=42).fit(X_train, y_train)

#eval test set
def eval_model(model, X_t, y_t, name="model"):
    preds = model.predict(X_t)
    probs = model.predict_proba(X_t)[:,1] if hasattr(model, "predict_proba") else None
    print(name)
    print(classification_report(y_t, preds, digits=4))
    if probs is not None:
        print("ROC AUC:", roc_auc_score(y_t, probs))
    print("accuracy:", accuracy_score(y_t, preds))

eval_model(lr, X_test, y_test, "logistic regression")
eval_model(rf, X_test, y_test, "random forest")


In [None]:
#HEWrapper

class HEContextConfig:
    scheme: str = 'CKKS'
    n: int = 2**13
    scale: float = 2**40
    qi_sizes: Tuple[int, ...] = (60, 40, 40, 60)

class HEWrapper:
    def __init__(self, config: HEContextConfig = HEContextConfig()):
        self.config = config
        self.HE = Pyfhel()
        self.HE.contextGen(
            scheme='CKKS',
            n=self.config.n,
            scale=self.config.scale,
            qi_sizes=list(self.config.qi_sizes)
        )
        self.HE.keyGen()
        self.HE.relinKeyGen()
        print(f"Pyfhel CKKS context (n={self.config.n}, scale={self.config.scale}, qi_sizes={self.config.qi_sizes})")


    def encrypt(self, val: float):
        return self.HE.encryptFrac(np.array([val], dtype=np.float64))
        #encrypt scalar vector

    def decrypt(self, ctxt):
        plain = self.HE.decryptFrac(ctxt)
        return float(plain[0])

    def add(self, ct_a, ct_b):
        return ct_a + ct_b

    def mul_plain(self, ct, scalar: float):
        ptxt = self.HE.encodeFrac(np.array([scalar], dtype=np.float64))
        res = ct * ptxt
        self.HE.rescale_to_next(res)
        return res
        #multiply ciphertext by plaintext

    def sum_ciphertexts(self, list_ct):
        if not list_ct:
            return self.encrypt(0.0)
        acc = list_ct[0]
        for ct in list_ct[1:]:
            acc = self.add(acc, ct)
        return acc

he = HEWrapper()

In [None]:
weights = lr.coef_.flatten().tolist()
intercept = float(lr.intercept_[0])
feature_names = FEATURES

print("model weights")
for f, w in zip(feature_names, weights):
    print(f, ":", w)
print("intercept:", intercept)


In [None]:
#encrypted scoring small sample set

def encrypted_score_sample(he_ctx: HEWrapper, x_raw: np.ndarray, weights: List[float], intercept: float):
    ctxts = [he_ctx.encrypt(float(val)) for val in x_raw.tolist()]

    #multiply encrypted features by plaintext weight
    weighted_cts = [he_ctx.mul_plain(ct, w) for ct, w in zip(ctxts, weights)]

    #sum weighted ciphertexts
    sum_ct = he_ctx.sum_ciphertexts(weighted_cts)

    #add intercept
    intercept_ct = he_ctx.encrypt(float(intercept))
    he_ctx.HE.mod_switch_to_next(intercept_ct)

    total_ct = he_ctx.add(sum_ct, intercept_ct)

    #decrypt total
    score = he_ctx.decrypt(total_ct)
    return score

#test random test samples and compare with plaintext lr decision function
n_check = 10
indices = np.random.choice(len(X_test), size=n_check, replace=False)
plaintext_scores = []
encrypted_scores = []
times = []
for idx in indices:
    x_sample = X_test[idx]
    start = time.perf_counter()
    score_enc = encrypted_score_sample(he, x_sample, weights, intercept)
    t = time.perf_counter() - start
    score_plain = lr.decision_function([x_sample])[0]
    plaintext_scores.append(float(score_plain))
    encrypted_scores.append(float(score_enc))
    times.append(t)

df_scores = pd.DataFrame({
    "idx": indices,
    "plaintext_score": plaintext_scores,
    "encrypted_decrypted_score": encrypted_scores,
    "time_sec": times
})
df_scores

In [None]:
#predict from decrypted encrypted scores, apply sigmoid in plaintext
def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

df_scores['plaintext_prob'] = sigmoid(df_scores['plaintext_score'])
df_scores['enc_prob'] = sigmoid(df_scores['encrypted_decrypted_score'])

#show comparison
display(df_scores[['idx','plaintext_prob','enc_prob','time_sec']].round(6))

print("mean absolute difference in probabilities:", np.mean(np.abs(df_scores['plaintext_prob'] - df_scores['enc_prob'])))
print("mean HE inference time:", np.mean(df_scores['time_sec']))


In [None]:
#inference on small batch
batch_n = min(200, len(X_test))
start_total = time.perf_counter()
enc_preds = []
for i in range(batch_n):
    score = encrypted_score_sample(he, X_test[i], weights, intercept)
    prob = sigmoid(score)
    enc_preds.append(1 if prob >= 0.5 else 0)
t_total = time.perf_counter() - start_total

print(f"encrypted batch inference on {batch_n} samples took {t_total:.2f} sec (avg {t_total/batch_n:.4f} sec/sample)")
#compare to plaintext logreg predictions on same samples
plain_preds = lr.predict(X_test[:batch_n])
print("sample agreement rate:", np.mean(np.array(enc_preds) == plain_preds))
print("encrypted pipeline produced {}/{} predictions matching plaintext model".format(int(np.sum(np.array(enc_preds) == plain_preds)), batch_n))
