# FHE-based Aging Pace Estimation by Horaizon27 team

### Imports

In [None]:
import json
import time
from pathlib import Path
import pandas as pd
import numpy as np

from concrete import fhe

### Constants

In [None]:
DATA_FOLDER = Path("data")
DUNEDIN_PACE_DATA_PATH = DATA_FOLDER / "dunedin_pace_data.json"
BETA_VALUES_PATH = DATA_FOLDER / "GSE40279_average_beta.txt"

### Data loading

Dunedin model params/data

In [None]:
dunedin_pace_data = json.loads(open(DUNEDIN_PACE_DATA_PATH).read())

Preparing beta values
1) Load beta values (you need to download GSE40279 dataset to "data" folder)
2) Add missing probes data (using mean data from dunedin model)
2) Filter only needed probes (recommended 20k)

In [None]:
beta_values = pd.read_csv(BETA_VALUES_PATH, delimiter="\t", index_col=0)

probes_mean = {
    dunedin_pace_data['normalization_probes'][i]: dunedin_pace_data['normalization_means'][i]
    for i in range(len(dunedin_pace_data['normalization_probes']))
}

missing_probes = [_ for _ in dunedin_pace_data['normalization_probes'] if _ not in beta_values.index]
missing_probes_data = [[probes_mean[_]] * len(beta_values.columns) for _ in missing_probes]
missing_betas = pd.DataFrame(missing_probes_data, index=missing_probes, columns=beta_values.columns)
filtered_beta_values = pd.concat([beta_values, missing_betas]).loc[dunedin_pace_data['normalization_probes']]

### Models

DunedinPACE python version

In [None]:
class DunedinPACE():
    def _get_pace(self, x):
        return np.dot(x, self.weights) + self.intercept

    def __init__(self, name, features, base_features, reference_values, weights, intercept):
        self.name = name
        self.features = features
        self.reference_values = reference_values
        self.weights = weights
        self.intercept = intercept
        base_features_indices = [features.index(item) for item in base_features]
        self.base_features_indices = base_features_indices
    
    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        # check input data
        if x.ndim > 1 or len(x) != len(self.features):
            raise ValueError(f"Expected 1D array with {len(self.features)} probes, got shape {x.shape}")
        
        # apply normalization
        x = self.preprocess(x)
        
        # return calculated pace
        return self._get_pace(x)
    
    def preprocess(self, x):
        sorted_gold_standard = np.sort(self.reference_values)
        indexes = np.argsort(x)
        x_normalized = np.zeros(x.size, dtype=np.float64)
        x_normalized[indexes] = sorted_gold_standard
        return x_normalized[self.base_features_indices]

In [None]:
dunedin_model = DunedinPACE(
    name="dunedin_pace",
    features=dunedin_pace_data['normalization_probes'],
    base_features=dunedin_pace_data['probes'],
    reference_values = dunedin_pace_data['normalization_means'],
    weights=dunedin_pace_data['weights'],
    intercept=dunedin_pace_data['intercept']
)

Dunnedin FHE version

In [None]:
class FHEDunedinPACE():
    def __init__(self, name, features, base_features, reference_values, weights, intercept):
        self.name = name
        self.scale = 100000
        self.features = features
        self.reference_values = reference_values
        self.sorted_gold_standard_int = (np.sort(self.reference_values) * self.scale).astype(np.int64)
        self.weights = np.array(weights)
        self.weights_int = (self.weights * self.scale).astype(np.int64)
        self.intercept = intercept
        self.intercept_int = int(self.intercept * (self.scale ** 2))
        base_features_indices = [features.index(item) for item in base_features]
        self.base_features_indices = base_features_indices
        self.circuit = self._create_circuit()

    def __call__(self, x):
        return self.forward(x)

    def _get_pace(self, x, weights, intercept):
        return np.dot(x, weights) + intercept # intercept = -19498585554

    def _create_circuit(self, inputset_size=200):
        fhe_compiler = fhe.Compiler(
            function=self._get_pace,
            parameter_encryption_statuses={
                "x": "encrypted",
                "weights": "clear",
                "intercept": "clear"
            }
        )

        inputset_for_compiler = [
            (
                np.random.permutation(self.sorted_gold_standard_int[self.base_features_indices]),
                self.weights_int,
                self.intercept_int
            )
            for _ in range(inputset_size)
        ]
        circuit = fhe_compiler.compile(inputset_for_compiler)
        return circuit
    
    def forward(self, x):
        # check input data
        if x.ndim > 1 or len(x) != len(self.features):
            raise ValueError(f"Expected 1D array with {len(self.features)} probes, got shape {x.shape}")
        
        # apply normalization
        x = self.preprocess(x)

        # calculate pace with fhe circuit using encrypted data
        pace = self.circuit.encrypt_run_decrypt(x, self.weights_int, self.intercept_int)

        # convert pace to float
        return self.postprocess(pace)
    
    def preprocess(self, x):
        indexes = np.argsort(x)
        x_normalized = np.zeros(x.size, dtype=np.int64)
        x_normalized[indexes] = self.sorted_gold_standard_int
        return x_normalized[self.base_features_indices]
    
    def postprocess(self, pace):
        return pace / (self.scale ** 2)

In [None]:
fhe_dunedin_model = FHEDunedinPACE(
    name = "fhe_dunedin_pace",
    features=dunedin_pace_data['normalization_probes'],
    base_features=dunedin_pace_data['probes'],
    reference_values = dunedin_pace_data['normalization_means'],
    weights=dunedin_pace_data['weights'],
    intercept=dunedin_pace_data['intercept']
)

Dunnedin FHE version with high precision

In [None]:
class FHEDunedinPACEHighPrecision():
    def __init__(self, name, features, base_features, reference_values, weights, intercept):
        self.name = name
        self.scale = 10000
        self.features = features
        self.reference_values = reference_values
        self.sorted_gold_standard = np.sort(self.reference_values)
        self.sorted_gold_standard_int = (self.sorted_gold_standard * self.scale).astype(np.int64)
        self.weights = np.array(weights)
        weights_high, weights_low = self._split_float_array(self.weights)
        self.weights_high = weights_high
        self.weights_low = weights_low
        self.intercept = intercept
        base_features_indices = [features.index(item) for item in base_features]
        self.base_features_indices = base_features_indices
        self.circuit = self._create_circuit()

    def __call__(self, x):
        return self.forward(x)

    def _get_pace(self, x, weights):
        return np.dot(x, weights)

    def _create_circuit(self, inputset_size=200):
        fhe_compiler = fhe.Compiler(
            function=self._get_pace,
            parameter_encryption_statuses={
                "x": "encrypted",
                "weights": "clear"
            }
        )

        inputset_for_compiler = [
            (
                np.random.randint(0, self.scale * 10, size=len(self.weights), dtype=np.int64),
                self.weights_high + self.weights_low,
            )
            for _ in range(inputset_size)
        ]

        circuit = fhe_compiler.compile(inputset_for_compiler)
        return circuit

    def _split_float_array(self, x):
        x = (np.array(x) * (self.scale ** 2)).astype(np.int64)
        x_high = x // self.scale
        x_low = x % self.scale
        return (x_high, x_low)

    def forward(self, x):
        # check input data
        if x.ndim > 1 or len(x) != len(self.features):
            raise ValueError(f"Expected 1D array with {len(self.features)} probes, got shape {x.shape}")
        
        # apply normalization
        x = self.preprocess(x)

        # split betas data on two parts to implement high precision float multiplication
        x_high, x_low = self._split_float_array(x)
        
        # calculate pace by parts
        pace_1 = self.circuit.encrypt_run_decrypt(x_high, self.weights_high)
        pace_2 = self.circuit.encrypt_run_decrypt(x_high, self.weights_low)
        pace_3 = self.circuit.encrypt_run_decrypt(x_low, self.weights_high)
        pace_4 = self.circuit.encrypt_run_decrypt(x_low, self.weights_low)

        pace = pace_1  + (pace_2 + pace_3) / self.scale + pace_4 / (self.scale ** 2)

        # convert pace to float and add intercept
        return self.postprocess(pace)
    
    def preprocess(self, x):
        indexes = np.argsort(x)
        x_normalized = np.zeros(x.size, dtype=np.float64)
        x_normalized[indexes] = self.sorted_gold_standard
        return x_normalized[self.base_features_indices]
    
    def postprocess(self, pace):
        return pace / (self.scale ** 2) + self.intercept

In [None]:
fhe_precision_dunedin_model = FHEDunedinPACEHighPrecision(
    name="fhe_high_precision_dunedin_pace",
    features=dunedin_pace_data['normalization_probes'],
    base_features=dunedin_pace_data['probes'],
    reference_values = dunedin_pace_data['normalization_means'],
    weights=dunedin_pace_data['weights'],
    intercept=dunedin_pace_data['intercept']
)

In [None]:
models_list = [
    dunedin_model, fhe_dunedin_model, fhe_precision_dunedin_model
]

### PACE calculation

In [None]:
def get_pace(model, dataset):
    start_time = time.time()
    pace = [
        model(dataset[column_name].values)
        for column_name in dataset.columns
    ]
    end_time = time.time()
    avg_inference_time = (end_time - start_time) / dataset.columns.size
    return (np.array(pace), avg_inference_time)

def get_mae(x, y):
    return np.mean(np.abs(x - y))

In [None]:
pace_data = {}
inference_time = []
for model in models_list:
    pace, avg_inference_time = get_pace(model, filtered_beta_values)
    pace_data[model.name] = pace
    inference_time.append(avg_inference_time)

In [None]:
true_pace = pace_data["dunedin_pace"]
mae_list = []

for model in models_list:
    mae_list.append(get_mae(true_pace, pace_data[model.name]))

### Results

In [22]:
pd.options.display.float_format = '{:.8f}'.format
pd.DataFrame(
    [inference_time, mae_list],
    index=["Avg_inference_time", "mae"],
    columns=[model.name for model in models_list]
)

Unnamed: 0,dunedin_pace,fhe_dunedin_pace,fhe_high_precision_dunedin_pace
Avg_inference_time,0.00318715,0.04702486,0.17222939
mae,0.0,0.00019811,2e-07
