<a href="https://colab.research.google.com/github/smartcontracts0/genomics/blob/main/FHE/DNAFHE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install concrete-ml
!pip install faker


                              sample_id  \
0  8b7a6e21-2d1d-43f3-8313-19c8f29a42d6   
1  18172ece-7f7c-4a6a-9435-e5fff9cf1c13   
2  6d0fcdc1-4f61-4076-a89d-afa5eee4b002   
3  e45f2aab-1aec-492d-a301-6e523c0a03da   
4  42edfef1-12e4-4a27-807a-d032551c00e0   

                                            sequence chromosome region_type  \
0  CTACCCTGCTGATCTAGGACGCATTCAACGGCTCTTTCTAAAGCGG...         10      coding   
1  GCGGGCGACCATACCAACCATCCTGGGAAACCGCATGCCTACTGTA...          Y      coding   
2  AACGACGGGACGTCATTGGGCACCTGGGACACGGCCGCCTTGTGCC...         15  non-coding   
3  TCGTGAACACGCAGGCCAACTTATAGGCGTTGGTGAACTCCTATTC...         12      coding   
4  GGGGGCCAATCCGTCATCGCAGGTACCCGCTTCGTGTGCAGACCTG...          X  non-coding   

   sequence_length  ancestry phenotype  
0               50  European  diseased  
1              119   African  diseased  
2               96  European   healthy  
3               53     Asian   healthy  
4              130   African  diseased  


In [None]:
import random
import pandas as pd
import numpy as np
from faker import Faker

# Initialize Faker
fake = Faker()

# Define DNA nucleotides
nucleotides = ['A', 'T', 'C', 'G']

# Function to generate a random DNA sequence of a given length
def generate_dna_sequence(length):
    return ''.join(random.choices(nucleotides, k=length))

# Create a synthetic dataset
def create_synthetic_dataset(n_samples=1000):
    data = []
    for _ in range(n_samples):
        sample_id = fake.uuid4()
        sequence_length = random.randint(50, 150)
        sequence = generate_dna_sequence(sequence_length)
        chromosome = random.choice(list(range(1, 23)) + ['X', 'Y'])
        region_type = random.choice(['coding', 'non-coding'])
        ancestry = random.choice(['African', 'Asian', 'European', 'American'])
        phenotype = random.choice(['healthy', 'diseased'])
        data.append([sample_id, sequence, chromosome, region_type, sequence_length, ancestry, phenotype])
    return pd.DataFrame(data, columns=['sample_id', 'sequence', 'chromosome', 'region_type', 'sequence_length', 'ancestry', 'phenotype'])

# Create the dataset
df = create_synthetic_dataset()
print(df.head())


In [2]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Encode categorical columns
label_encoders = {}
for column in ['chromosome', 'region_type', 'ancestry', 'phenotype']:
    le = LabelEncoder()
    # Convert the column to string type before encoding to ensure uniformity
    df[column] = df[column].astype(str)  # This line is added to fix the error
    df[column] = le.fit_transform(df[column])
    label_encoders[column] = le

# Prepare features and labels
X = df[['sequence_length', 'chromosome', 'region_type', 'ancestry']].values
y = df['phenotype'].values

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [3]:
from concrete.ml.sklearn import LogisticRegression
from sklearn.preprocessing import StandardScaler

# Scale the features to have zero mean and unit variance
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Now we train in the clear and quantize the weights
model = LogisticRegression(n_bits=8)
model.fit(X_train, y_train)

# We can simulate the predictions in the clear
y_pred_clear = model.predict(X_test)

# We then compile on a representative set
model.compile(X_train)

# Finally we run the inference on encrypted inputs !
y_pred_fhe = model.predict(X_test, fhe="execute")

print("In clear  :", y_pred_clear)
print("In FHE    :", y_pred_fhe)
print(f"Similarity: {int((y_pred_fhe == y_pred_clear).mean()*100)}%")

In clear  : [1 1 0 1 0 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 1 0 0
 1 0 0 1 0 1 0 0 1 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 1 0 0 1 0 1 1 0 0 0 0 0 0 0 0 0 1 0 1 0 0 1 0 1 0 1 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 1 1 0 0 0 1 0 0 1 0 0
 1 0 1 1 0 0 0 0 0 0 1 0 0 1 0]
In FHE    : [1 1 0 1 0 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 1 0 0
 1 0 0 1 0 1 0 0 1 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 1 0 0 1 0 1 1 0 0 0 0 0 0 0 0 0 1 0 1 0 0 1 0 1 0 1 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 1 0 0 0 0 1 1 0 1 1 0 0 0 1 0 0 1 0 0
 1 0 1 1 0 0 0 0 0 0 1 0 0 1 0]
Similarity: 100%


**Overhead Analysis**

In [18]:
import random
import pandas as pd
import numpy as np
from faker import Faker
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from concrete.ml.sklearn import LogisticRegression
import time
import tracemalloc

# Initialize Faker
fake = Faker()

# Define DNA nucleotides
nucleotides = ['A', 'T', 'C', 'G']

# Function to generate a random DNA sequence of a given length
def generate_dna_sequence(length):
    return ''.join(random.choices(nucleotides, k=length))

# Create a synthetic dataset
def create_synthetic_dataset(n_samples=100000):  # Adjust sample size as needed
    data = []
    for _ in range(n_samples):
        sample_id = fake.uuid4()
        sequence_length = random.randint(50, 150)
        sequence = generate_dna_sequence(sequence_length)
        chromosome = random.choice(list(range(1, 23)) + ['X', 'Y'])
        region_type = random.choice(['coding', 'non-coding'])
        ancestry = random.choice(['African', 'Asian', 'European', 'American'])
        phenotype = random.choice(['healthy', 'diseased'])
        data.append([sample_id, sequence, chromosome, region_type, sequence_length, ancestry, phenotype])
    return pd.DataFrame(data, columns=['sample_id', 'sequence', 'chromosome', 'region_type', 'sequence_length', 'ancestry', 'phenotype'])

# Create the dataset
df = create_synthetic_dataset()

# Encode categorical columns
label_encoders = {}
for column in ['chromosome', 'region_type', 'ancestry', 'phenotype']:
    le = LabelEncoder()
    df[column] = df[column].astype(str)
    df[column] = le.fit_transform(df[column])
    label_encoders[column] = le

# Prepare features and labels
X = df[['sequence_length', 'chromosome', 'region_type', 'ancestry']].values
y = df['phenotype'].values

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale the features to have zero mean and unit variance
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Train in the clear and quantize the weights
model = LogisticRegression(n_bits=8)
model.fit(X_train, y_train)

# Compile the model for FHE execution
model.compile(X_train)

# Function to test predictions for a specific batch size
def test_predictions(batch_size):
    # Prepare batched data
    X_test_batch = np.repeat(X_test, batch_size // len(X_test) + 1, axis=0)[:batch_size]
    y_test_batch = np.repeat(y_test, batch_size // len(y_test) + 1, axis=0)[:batch_size]

    # Measure in-clear predictions
    tracemalloc.start()
    start_time = time.time()
    y_pred_clear = model.predict(X_test_batch)
    end_time = time.time()
    peak_memory_clear = tracemalloc.get_traced_memory()[1] / 1e3  # Convert bytes to KB
    tracemalloc.stop()
    clear_time = end_time - start_time

    # Measure FHE predictions
    tracemalloc.start()
    start_time_fhe = time.time()
    y_pred_fhe = model.predict(X_test_batch, fhe="execute")
    end_time_fhe = time.time()
    peak_memory_fhe = tracemalloc.get_traced_memory()[1] / 1e3  # Convert bytes to KB
    tracemalloc.stop()
    fhe_time = end_time_fhe - start_time_fhe

    # Calculate similarity
    similarity = (y_pred_fhe == y_pred_clear).mean() * 100  # Percentage similarity

    # Return results
    return {
        "Batch Size": batch_size,
        "In-Clear Time (s)": clear_time,
        "FHE Time (s)": fhe_time,
        "In-Clear Peak Memory (KB)": peak_memory_clear,
        "FHE Peak Memory (KB)": peak_memory_fhe,
        "Similarity (%)": similarity,
    }

# Test for large batch sizes
batch_sizes = [10, 100, 1000, 10000, 100000]
results = [test_predictions(batch_size) for batch_size in batch_sizes]

# Convert results to a DataFrame for better readability
results_df = pd.DataFrame(results)
print(results_df)


   Batch Size  In-Clear Time (s)  FHE Time (s)  In-Clear Peak Memory (KB)  \
0          10           0.000739      0.059212                      2.296   
1         100           0.000509      0.571410                      9.496   
2        1000           0.000937      6.073873                     81.496   
3       10000           0.001729     62.592004                    640.744   
4      100000           0.005954    634.109776                   6400.744   

   FHE Peak Memory (KB)  Similarity (%)  
0                13.226           100.0  
1                48.727           100.0  
2               444.605           100.0  
3              3386.499           100.0  
4             33245.416           100.0  


In [22]:
import matplotlib.pyplot as plt
import pandas as pd


def generate_selected_figures(results_df):
    # Extract batch sizes and metrics from the DataFrame
    batch_sizes = results_df["Batch Size"]
    in_clear_time = results_df["In-Clear Time (s)"]
    fhe_time = results_df["FHE Time (s)"]
    in_clear_memory = results_df["In-Clear Peak Memory (KB)"]
    fhe_memory = results_df["FHE Peak Memory (KB)"]

    # Calculate overhead ratios
    time_overhead_ratio = fhe_time / in_clear_time
    memory_overhead_ratio = fhe_memory / in_clear_memory

    # Batch Size vs FHE Prediction Time
    plt.figure(figsize=(10, 6))
    plt.plot(batch_sizes, fhe_time, marker='o', label='FHE Time', color='red')
    plt.plot(batch_sizes, in_clear_time, marker='o', label='In-Clear Time', color='blue')
    plt.xscale('log')
    plt.xlabel('Batch Size (log scale)')
    plt.ylabel('Prediction Time (s)')
    plt.title('Batch Size vs Prediction Time')
    plt.legend()
    plt.grid(True, which="both", linestyle='--', linewidth=0.5)
    plt.savefig('batch_size_vs_prediction_time.png', bbox_inches='tight')  # Save with no extra white space
    plt.close()

    # Batch Size vs FHE Memory Usage
    plt.figure(figsize=(10, 6))
    plt.plot(batch_sizes, fhe_memory, marker='o', label='FHE Memory Usage', color='red')
    plt.plot(batch_sizes, in_clear_memory, marker='o', label='In-Clear Memory Usage', color='blue')
    plt.xscale('log')
    plt.xlabel('Batch Size (log scale)')
    plt.ylabel('Memory Usage (KB)')
    plt.title('Batch Size vs Memory Usage')
    plt.legend()
    plt.grid(True, which="both", linestyle='--', linewidth=0.5)
    plt.savefig('batch_size_vs_memory_usage.png', bbox_inches='tight')  # Save with no extra white space
    plt.close()

    # Time Overhead Ratio vs Batch Size
    plt.figure(figsize=(10, 6))
    plt.plot(batch_sizes, time_overhead_ratio, marker='o', label='Time Overhead Ratio (FHE/In-Clear)', color='green')
    plt.xscale('log')
    plt.xlabel('Batch Size (log scale)')
    plt.ylabel('Overhead Ratio')
    plt.title('Time Overhead Ratio vs Batch Size')
    plt.grid(True, which="both", linestyle='--', linewidth=0.5)
    plt.legend()
    plt.savefig('time_overhead_ratio_vs_batch_size.png', bbox_inches='tight')  # Save with no extra white space
    plt.close()

    # Memory Overhead Ratio vs Batch Size
    plt.figure(figsize=(10, 6))
    plt.plot(batch_sizes, memory_overhead_ratio, marker='o', label='Memory Overhead Ratio (FHE/In-Clear)', color='purple')
    plt.xscale('log')
    plt.xlabel('Batch Size (log scale)')
    plt.ylabel('Overhead Ratio')
    plt.title('Memory Overhead Ratio vs Batch Size')
    plt.grid(True, which="both", linestyle='--', linewidth=0.5)
    plt.legend()
    plt.savefig('memory_overhead_ratio_vs_batch_size.png', bbox_inches='tight')  # Save with no extra white space
    plt.close()

# Generate the figures
generate_selected_figures(results_df)

