<a href="https://colab.research.google.com/github/rtaiello/crypten_lr_clear_model/blob/main/crypten_logistic_regression_clear_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Crypten + Python implementation of Logistic Regression from scratch

_Riccardo Taiello, riccardo.taiello@inria.fr_

### Abstract

In this notebook I developed a complete version of Logistic Regression from scratch. I created a training where the model and the labels are public, while the features are encrypted through Crypten. <br>
The selected dataset is MNIST

In [None]:
! pip install torch==1.9 crypten==0.4.0 -q

In [2]:
! nvidia-smi

Thu Oct 13 14:17:10 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   42C    P8     9W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [17]:
from typing import Dict, List, Tuple

import torch
import torchvision
import numpy as np
import crypten

%matplotlib inline
import matplotlib.pyplot as plt
from tqdm.notebook import trange

In [27]:
torch.manual_seed(42)
np.random.seed(42)
crypten.init()

DEVICE = "cpu" #torch.device("cuda:0" if torch.cuda.is_available() else "cpu")



In [28]:
def load_mnist() -> (torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor):
    """
    load MNIST data
    :return: x_train, y_train, x_test, y_test
    """
    # download the dataset from torchvision
    train_dataset = torchvision.datasets.MNIST(
        root="./data",
        train=True,
        transform=torchvision.transforms.ToTensor(),
        download=True,
    )
    test_dataset = torchvision.datasets.MNIST(
        root="./data",
        train=False,
        transform=torchvision.transforms.ToTensor(),
        download=True,
    )

    # create torch.Tensor objects for the training and test data
    x_train: torch.Tensor = train_dataset.data.reshape(-1, 28 * 28).float()
    x_train_mean: float = x_train.mean()
    x_train_std: float = x_train.std()
    # Standardize the training data
    x_train_norm = (x_train - x_train_mean) / x_train_std
    y_train = train_dataset.targets
    # Standardize the test data
    x_test = test_dataset.data.reshape(-1, 28 * 28).float()
    x_test_norm = (x_test - x_train_mean) / x_train_std
    y_test = test_dataset.targets

    return x_train_norm, y_train, x_test_norm, y_test
x_train, y_train, x_test, y_test = load_mnist()
x_train = x_train.to(DEVICE)
y_train = y_train.to(DEVICE)
x_test = x_test.to(DEVICE)
y_test = y_test.to(DEVICE)

In [29]:
def initialize(num_inputs, num_classes) -> Dict[str, torch.Tensor]:
    """
    initialize the parameters
    :param num_inputs:
    :param num_classes:
    :return:
    """
    w: torch.Tensor = torch.rand(num_inputs, num_classes, device=DEVICE)
    b: torch.Tensor = torch.rand(1, num_classes, device=DEVICE)

    param: Dict[str, torch.Tensor] = {"w": w, "b": b}  # (10*784)  # (10*1)
    return param
num_inputs = x_train.shape[1]
num_classes = len(torch.unique(y_train))
param = initialize(num_inputs, num_classes)

In [30]:
%%time
def train_crypten(
    param: Dict[str, torch.Tensor],
    x_train: torch.Tensor,
    y_train: torch.Tensor,
    num_epochs: int,
    batch_size: int,
    learning_rate: float,
    verbose: bool = False,
) -> Tuple[Dict[str, torch.Tensor], List[float]]:
    """
    Train the model with the given parameters
    :param param:
    :param x_train:
    :param y_train:
    :param num_epochs:
    :param batch_size:
    :param learning_rate:
    :return:
    """
    losses: List[float] = []
    current_loss = 0
    for _ in trange(num_epochs, desc="Epoch"):
        # select the random sequence of training set
        rand_indices: List[int] = np.random.choice(
            x_train.shape[0], x_train.shape[0], replace=False
        )
        num_batches: int = int(x_train.shape[0] / batch_size)
        for batch in trange(num_batches, desc="Batch"):
            index = rand_indices[batch * batch_size : (batch + 1) * batch_size]
            x_batch = x_train[index]  # (batch_size, num_inputs)
            y_batch = y_train[index]  # (batch_size, 1)
            x_batch = crypten.cryptensor(x_batch, device=DEVICE)  # encrypt the features
            logits = x_batch.matmul(param["w"]) + param["b"]  # (num_classes, 1)

            # https://codesti.com/issue/facebookresearch/CrypTen/278
            y_one_hot = torch.nn.functional.one_hot(y_batch, num_classes)
            if verbose:
                loss = critertion(logits, y_one_hot)
                current_loss = loss.get_plain_text()
                print(current_loss)
            # https://aaronkub.com/2020/02/12/logistic-regression-with-pytorch.html#why-logistic-regression
            activation = logits.softmax(1)
            w_gradients = (
                -x_batch.transpose(0, 1).matmul(-activation + y_one_hot) / batch_size
            )  # (num_inputs, num_classes)
            b_gradients = -(-activation + y_one_hot).mean(0)
            param["w"] -= learning_rate * w_gradients.get_plain_text()
            param["b"] -= learning_rate * b_gradients.get_plain_text()
        losses.append(current_loss)
    return param, losses
learning_rate = 0.1
num_epochs = 1
batch_size = 256
from crypten.nn.loss import CrossEntropyLoss
critertion = CrossEntropyLoss()
param, losses =  train_crypten(param, x_train, y_train,num_epochs, batch_size, learning_rate)

Epoch:   0%|          | 0/1 [00:00<?, ?it/s]

Batch:   0%|          | 0/234 [00:00<?, ?it/s]

To keep the current behavior, use torch.div(a, b, rounding_mode='trunc'), or for actual floor division, use torch.div(a, b, rounding_mode='floor'). (Triggered internally at  /pytorch/aten/src/ATen/native/BinaryOps.cpp:467.)
  return torch.floor_divide(self, other)


CPU times: user 44 s, sys: 358 ms, total: 44.3 s
Wall time: 44.1 s


In [31]:
def evaluation(
    param: Dict[str, torch.Tensor], x: torch.Tensor, y: torch.Tensor
) -> None:
    test_predictions = torch.argmax(
        torch.softmax(x.matmul(param["w"]) + param["b"], dim=1), dim=1
    )
    test_accuracy = torch.sum(test_predictions == y).float() / y_test.shape[0]
    print(f"Accuracy: {test_accuracy}")
print("Evaluation training over encrypted features")
evaluation(param, x_test, y_test)

Evaluation training over encrypted features
Accuracy: 0.864300012588501


In [34]:
%%time
def train_clear(
    param: Dict[str, torch.Tensor],
    x_train: torch.Tensor,
    y_train: torch.Tensor,
    num_epochs: int,
    batch_size: int,
    learning_rate: float,
    verbose: bool = False,
) -> Tuple[Dict[str, torch.Tensor], List[float]]:
    """
    Train the model with the given parameters
    :param param:
    :param x_train:
    :param y_train:
    :param num_epochs:
    :param batch_size:
    :param learning_rate:
    :return:
    """
    losses: List[float] = []
    current_loss = 0
    for _ in trange(num_epochs, desc="Epoch"):
        # select the random sequence of training set
        rand_indices: List[int] = np.random.choice(
            x_train.shape[0], x_train.shape[0], replace=False
        )
        num_batches: int = int(x_train.shape[0] / batch_size)
        for batch in trange(num_batches, desc="Batch"):
            index = rand_indices[batch * batch_size : (batch + 1) * batch_size]
            x_batch = x_train[index]  # (batch_size, num_inputs)
            y_batch = y_train[index]  # (batch_size, 1)
            logits = x_batch.matmul(param["w"]) + param["b"]  # (num_classes, 1)

            # https://codesti.com/issue/facebookresearch/CrypTen/278
            y_one_hot = torch.nn.functional.one_hot(y_batch, num_classes)
            if verbose:
                loss = critertion(logits, y)
                current_loss = loss
                print(current_loss)
            # https://aaronkub.com/2020/02/12/logistic-regression-with-pytorch.html#why-logistic-regression
            activation = logits.softmax(1)
            w_gradients = (
                -x_batch.transpose(0, 1).matmul(-activation + y_one_hot) / batch_size
            )  # (num_inputs, num_classes)
            b_gradients = -(-activation + y_one_hot).mean(0)
            param["w"] -= learning_rate * w_gradients
            param["b"] -= learning_rate * b_gradients
        losses.append(current_loss)
    return param, losses

learning_rate = 0.1
num_epochs = 1
batch_size = 256
from torch.nn import CrossEntropyLoss
critertion = CrossEntropyLoss()
param, losses =  train_clear(param, x_train, y_train,num_epochs, batch_size, learning_rate)

Epoch:   0%|          | 0/1 [00:00<?, ?it/s]

Batch:   0%|          | 0/234 [00:00<?, ?it/s]

CPU times: user 255 ms, sys: 10.3 ms, total: 265 ms
Wall time: 291 ms


In [35]:
print("Evaluation training over clear features")
evaluation(param, x_test, y_test)

Evaluation training over clear features
Accuracy: 0.8866999745368958
