In [1]:
# various utility functions

import torch
import torch.nn.functional as F
import torch.nn.init as init
import torchvision
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def _weights_init(m):
    if isinstance(m, torch.nn.Linear) or isinstance(m, torch.nn.Conv2d):
        init.kaiming_normal_(m.weight)


class LambdaLayer(torch.nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd

    def forward(self, x):
        return self.lambd(x)


class BasicBlock(torch.nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, option="A"):
        super(BasicBlock, self).__init__()
        self.conv1 = torch.nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn1 = torch.nn.BatchNorm2d(planes)
        self.conv2 = torch.nn.Conv2d(
            planes, planes, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = torch.nn.BatchNorm2d(planes)

        self.shortcut = torch.nn.Sequential()
        if stride != 1 or in_planes != planes:
            if option == "A":
                """
                For CIFAR10 ResNet paper uses option A.
                """
                self.shortcut = LambdaLayer(
                    lambda x: F.pad(
                        x[:, :, ::2, ::2],
                        (0, 0, 0, 0, planes // 4, planes // 4),
                        "constant",
                        0,
                    )
                )
            elif option == "B":
                self.shortcut = torch.nn.Sequential(
                    torch.nn.Conv2d(
                        in_planes,
                        self.expansion * planes,
                        kernel_size=1,
                        stride=stride,
                        bias=False,
                    ),
                    torch.nn.BatchNorm2d(self.expansion * planes),
                )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(torch.nn.Module):
    def __init__(self, block, num_blocks, n_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = torch.nn.Conv2d(
            3, 16, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn1 = torch.nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.linear = torch.nn.Linear(64, n_classes)

        self.apply(_weights_init)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion

        return torch.nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, out.size()[3])
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def resnet32(n_classes=10):
    return ResNet(BasicBlock, [5, 5, 5], n_classes=n_classes)


class TorchCutout(object):
    def __init__(self, length, fill=(0.0, 0.0, 0.0)):
        self.length = length
        self.fill = torch.tensor(fill).reshape(shape=(3, 1, 1))

    def __call__(self, img):
        h = img.size(1)
        w = img.size(2)
        y = np.random.randint(h)
        x = np.random.randint(w)
        y1 = np.clip(y - self.length // 2, 0, h)
        y2 = np.clip(y + self.length // 2, 0, h)
        x1 = np.clip(x - self.length // 2, 0, w)
        x2 = np.clip(x + self.length // 2, 0, w)
        img[:, y1:y2, x1:x2] = self.fill
        return img


# Load dataset
def load_cifar10(batch_size, path):
    transform_train = torchvision.transforms.Compose(
        [
            torchvision.transforms.RandomCrop(32, padding=4),
            torchvision.transforms.RandomHorizontalFlip(),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(
                (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)
            ),
            TorchCutout(length=8),
        ]
    )

    transform_test = torchvision.transforms.Compose(
        [
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(
                (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)
            ),
        ]
    )

    trainset = torchvision.datasets.CIFAR10(
        root=path, train=True, download=True, transform=transform_train
    )
    testset = torchvision.datasets.CIFAR10(
        root=path, train=False, download=True, transform=transform_test
    )
    trainloader = torch.utils.data.DataLoader(
        trainset, batch_size=batch_size, shuffle=True, num_workers=1
    )
    testloader = torch.utils.data.DataLoader(
        testset, batch_size=batch_size, shuffle=False, num_workers=1
    )

    return trainloader, testloader


In [2]:


# - Generic imports
import torch
import numpy as np
from tqdm import tqdm

# - AIHWKIT related imports
from aihwkit.nn.conversion import convert_to_analog
from aihwkit.optim import AnalogSGD
from aihwkit.simulator.presets.utils import IOParameters

from aihwkit.simulator.parameters.io import IOParametersIRDropT
from aihwkit.inference.noise.pcm import PCMLikeNoiseModel
from aihwkit.inference import ReRamCMONoiseModel
from aihwkit.inference.compensation.drift import GlobalDriftCompensation
from aihwkit.simulator.configs import InferenceRPUConfig
from aihwkit.simulator.configs.utils import (
    WeightModifierType,
    BoundManagementType,
    WeightClipType,
    NoiseManagementType,
    WeightRemapType,
)

In [3]:
input_prec = 6
output_prec = 8
wire = 0.35
PCM = False

def gen_rpu_config(noise_model, pcm= False):
    input_prec = 6
    output_prec = 8
    my_rpu_config = InferenceRPUConfig()
    
    my_rpu_config.mapping.digital_bias = True # do the bias of the MVM digitally
    my_rpu_config.mapping.max_input_size = 256
    my_rpu_config.mapping.max_output_size = 256
    my_rpu_config.forward = IOParametersIRDropT()
    if pcm:
        my_rpu_config.noise_model = PCMLikeNoiseModel(g_max=25.0)
        my_rpu_config.drift_compensation = GlobalDriftCompensation()
        my_rpu_config.forward.ir_drop_g_ratio = 1.0 / 0.35 / 25e-6 # change to 25w-6 when using PCM
    else:
        my_rpu_config.noise_model = noise_model 
        my_rpu_config.drift_compensation = None # by default is GlobalCompensation from PCM
        my_rpu_config.forward.ir_drop_g_ratio = 1.0 / 0.35 / (noise_model.g_max*1e-6) # change to 25w-6 when using PCM

    #my_rpu_config.drift_compensation = None
    my_rpu_config.modifier.std_dev = 0.06
    my_rpu_config.modifier.type = WeightModifierType.ADD_NORMAL
    my_rpu_config.mapping.weight_scaling_omega = 1.0
    my_rpu_config.mapping.weight_scaling_columnwise = False
    my_rpu_config.mapping.out_scaling_columnwise = False
    my_rpu_config.remap.type = WeightRemapType.LAYERWISE_SYMMETRIC 
    my_rpu_config.forward.inp_res = 1 / (2**input_prec - 2)
    my_rpu_config.forward.out_res = 1 / (2**output_prec - 2)
    my_rpu_config.forward.is_perfect = False
    #my_rpu_config.forward.out_noise = 0.0 # Output on the current addition (?)
    my_rpu_config.forward.ir_drop = 1.0 # TODO set to 1.0 when activating IR drop effects
    my_rpu_config.forward.ir_drop_rs = 0.35 # Default: 0.15
    my_rpu_config.pre_post.input_range.enable = False
    
    #my_rpu_config.pre_post.input_range.manage_output_clipping = True
    #UNCOMMENT FOR MY MODEL
    my_rpu_config.pre_post.input_range.decay = 0.001
    my_rpu_config.pre_post.input_range.input_min_percentage = 0.95
    my_rpu_config.pre_post.input_range.output_min_percentage = 0.95
    my_rpu_config.forward.noise_management = NoiseManagementType.ABS_MAX # Rescale back the output with the scaling for normalizing the input
    #my_rpu_config.forward.bound_management = BoundManagementType.ITERATIVE
    my_rpu_config.forward.out_bound = 20.0  # quite restric
    return my_rpu_config

In [4]:
# - Standard train and test routines
def train_step(model, optimizer, criterion, trainloader):
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for inputs, targets in trainloader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)

        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    return train_loss / total, 100.0 * correct / total


def test_step(model, criterion, testloader):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in testloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    print(f"Test loss {test_loss/total:.4f} test acc. {100.*correct/total:.2f}%")
    return 100.0 * correct / total

In [6]:
# - Set seeds
torch.manual_seed(0)
np.random.seed(0)
import os
# - Get the dataloader
batch_size = 128
trainloader, testloader = load_cifar10(
    batch_size=batch_size, path=os.path.expanduser("~/Data/")
)

# - Change to True if one of the models should be re-trained
retrain_baseline = False
retrain_finetuned_model = False

# - Some hyperparameters
lr = 0.05
epochs = 200
epochs_finetuning = 70

Files already downloaded and verified
Files already downloaded and verified


In [7]:
# - Define model, criterion, optimizer and scheduler.
model = resnet32()
model = model.to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

We typically first pre-train a baseline model that we later fine-tune using noise injection.

In [8]:
# - Pre-training of the network
import os
if not os.path.exists("Models"):
    os.makedirs("Models")
if retrain_baseline:
    pbar = tqdm(range(epochs))
    for epoch in pbar:
        train_loss, train_acc = train_step(model, optimizer, criterion, trainloader)
        pbar.set_description(f"Epoch {epoch} Train loss: {train_loss:.4f} train acc. {train_acc:.2f}%")
        if epoch % 5 == 0:
            test_step(model, criterion, testloader)
        scheduler.step()
    torch.save(model.state_dict(), "Models/pre_trained_model.th")
else:
    import requests
    url = 'https://aihwkit-tutorial.s3.us-east.cloud-object-storage.appdomain.cloud/pre_trained_model.th'
    response = requests.get(url)
    with open('Models/pre_trained_resnet.th', 'wb') as f:
        f.write(response.content)
    model.load_state_dict(torch.load("Models/pre_trained_resnet.th", map_location=device))
    print(f"Pretrained test acc. {test_step(model, criterion, testloader)}%")

  model.load_state_dict(torch.load("Models/pre_trained_resnet.th", map_location=device))


Test loss 0.0016 test acc. 94.12%
Pretrained test acc. 94.12%


In [None]:
model.load_state_dict(torch.load("/u/mvc/aihwkit/notebooks/tutorial/Models/pre_trained_resnet.th", map_location=device))
print(f"Pretrained test acc. {test_step(model, criterion, testloader)}%")

In [None]:

analog_hwa = torch.load("/u/mvc/aihwkit/notebooks/tutorial/Models/hwa_reram_resnet_6_8bits_input_range.th", map_location=device)

In [None]:
#hwa_model = torch.load("Models/hwa_reram_resnet_6_8bits_input_range.th", map_location=device)

In [11]:
#"mean": [-0.08900206],
from aihwkit.inference.noise.config import SimulationContextWrapper
import matplotlib.pyplot as plt
g_max = 90
g_min = 10
acceptance_range=0.2
single_device=True
prog_overshoot=0.0
reram_noise =ReRamCMONoiseModel(g_max=g_max, g_min=g_min,
                                                        acceptance_range=acceptance_range,
                                                        resistor_compensator=prog_overshoot,
                                                        single_device=single_device)
rpu_config = gen_rpu_config(noise_model=reram_noise)
#rpu_config = gen_rpu_config()
converted_model = convert_to_analog(model, rpu_config)
#converted_model.load_state_dict(analog_hwa)
#converted_model = analog_hwa

#t_inferences = [0, 1, 3600, 3600 * 24, 3600 * 24 * 365 * 10]
#t_inferences = [10, 10e3, 10e4, 10e5]
t_inferences = [1, 60*10, 3600, 3600 * 24, 3600 * 24*7, 3600 * 24 *30, 3600 * 24 *365, 3600 * 24 *365*2, 3600 * 24 *365*5, 3600 * 24 * 365 * 10]    
#t_inferences = [0, 3600 * 24 * 365 * 10]
#labels = ["0s", "1s", "1h", "1d", "10y"]
print("Programming: ",test_step(converted_model, criterion, testloader))
converted_model.eval()
converted_model.program_analog_weights()
labels = ["0s",  "10y"]
color = [ 'lightskyblue', 'lightcoral']
n_rep = 5
drifted_test_accs = torch.zeros(size=(len(t_inferences), n_rep))
plt.figure()
for i,t in enumerate(t_inferences):
    for j in range(n_rep):
        #SimulationContextWrapper.t_inference = t
        converted_model.drift_analog_weights(t)
        print("Drifted at t: ", t)
        accuracy = test_step(converted_model, criterion, testloader)
        drifted_test_accs[i,j] = accuracy 
        #print(f"Accuracy of the analog model: {accuracy:.2f}%")
        
"""
    w, _ = (
        converted_model
        .layer3[0]
        .conv1.analog_module.get_weights(apply_weight_scaling=True)
        )
        real_weights = model.layer3[0].conv1.weight
        real_weights = real_weights.flatten(start_dim=1, end_dim=-1)
        print(real_weights.shape)
        print(w.shape)
        print("Weight error:", torch.mean(torch.abs(real_weights.cpu() - w.cpu())).item())
    plt.hist(w.flatten().detach().numpy(), color=color[i], bins=300, alpha =0.5, label=f"Time of inference = {labels[i]}")
    mean_w = w.flatten().detach().numpy()
    std_w = mean_w.std()
    mean_w = mean_w.mean()
    #plt.axvline(mean_w - std_w, color='b', linestyle='dotted', linewidth=2, label =f"STD:{std_w:.4f}")
    #plt.axvline(mean_w + std_w, color='b', linestyle='dotted', linewidth=2)
    #plt.axvspan(mean_w - std_w, mean_w + std_w, color='blue', alpha=0.2)
    #plt.axvline(mean_w)
    drifted_test_accs[i]/= n_rep
plt.legend()
plt.xlabel("Unnormalized weight")
plt.ylabel("Count")
#plt.title(f"Time of inference = {labels[i]}")
plt.show()
"""

Test loss 0.0026 test acc. 90.22%
Programming:  90.22
Drifted at t:  1
Test loss 0.0026 test acc. 90.41%
Drifted at t:  1
Test loss 0.0028 test acc. 89.85%
Drifted at t:  1
Test loss 0.0029 test acc. 89.62%
Drifted at t:  1
Test loss 0.0027 test acc. 90.06%
Drifted at t:  1
Test loss 0.0026 test acc. 90.72%
Drifted at t:  600
Test loss 0.0321 test acc. 13.18%
Drifted at t:  600
Test loss 0.0350 test acc. 12.47%
Drifted at t:  600
Test loss 0.0354 test acc. 12.20%
Drifted at t:  600
Test loss 0.0328 test acc. 13.10%
Drifted at t:  600
Test loss 0.0343 test acc. 12.68%
Drifted at t:  3600
Test loss 0.0312 test acc. 11.22%
Drifted at t:  3600
Test loss 0.0315 test acc. 10.37%
Drifted at t:  3600
Test loss 0.0343 test acc. 10.30%
Drifted at t:  3600
Test loss 0.0316 test acc. 10.63%
Drifted at t:  3600
Test loss 0.0332 test acc. 10.70%
Drifted at t:  86400
Test loss 0.0275 test acc. 10.31%
Drifted at t:  86400
Test loss 0.0269 test acc. 10.23%
Drifted at t:  86400
Test loss 0.0267 test acc

'\n    w, _ = (\n        converted_model\n        .layer3[0]\n        .conv1.analog_module.get_weights(apply_weight_scaling=True)\n        )\n        real_weights = model.layer3[0].conv1.weight\n        real_weights = real_weights.flatten(start_dim=1, end_dim=-1)\n        print(real_weights.shape)\n        print(w.shape)\n        print("Weight error:", torch.mean(torch.abs(real_weights.cpu() - w.cpu())).item())\n    plt.hist(w.flatten().detach().numpy(), color=color[i], bins=300, alpha =0.5, label=f"Time of inference = {labels[i]}")\n    mean_w = w.flatten().detach().numpy()\n    std_w = mean_w.std()\n    mean_w = mean_w.mean()\n    #plt.axvline(mean_w - std_w, color=\'b\', linestyle=\'dotted\', linewidth=2, label =f"STD:{std_w:.4f}")\n    #plt.axvline(mean_w + std_w, color=\'b\', linestyle=\'dotted\', linewidth=2)\n    #plt.axvspan(mean_w - std_w, mean_w + std_w, color=\'blue\', alpha=0.2)\n    #plt.axvline(mean_w)\n    drifted_test_accs[i]/= n_rep\nplt.legend()\nplt.xlabel("Unnormali

<Figure size 640x480 with 0 Axes>

In [12]:
torch.save(drifted_test_accs, "resnet_baseline_1m_10y.th")

In [6]:
def gen_rpu_config(noise_model, pcm= False):
    input_prec = 6
    output_prec = 8
    my_rpu_config = InferenceRPUConfig()
    my_rpu_config.mapping.digital_bias = True # do the bias of the MVM digitally
    my_rpu_config.mapping.max_input_size = 256
    my_rpu_config.mapping.max_output_size = 256
    my_rpu_config.forward = IOParametersIRDropT()
    if pcm:
        my_rpu_config.noise_model = PCMLikeNoiseModel(g_max=25.0)
        my_rpu_config.drift_compensation = GlobalDriftCompensation()
        my_rpu_config.forward.ir_drop_g_ratio = 1.0 / 0.35 / 25e-6 # change to 25w-6 when using PCM
    else:
        my_rpu_config.noise_model = noise_model 
        my_rpu_config.drift_compensation = None # by default is GlobalCompensation from PCM
        my_rpu_config.forward.ir_drop_g_ratio = 1.0 / 0.35 / (noise_model.g_max*1e-6) # change to 25w-6 when using PCM

    #my_rpu_config.drift_compensation = None
    my_rpu_config.modifier.std_dev = 0.06
    my_rpu_config.modifier.type = WeightModifierType.ADD_NORMAL
    
    my_rpu_config.forward.inp_res = 1 / (2**input_prec - 2)
    my_rpu_config.forward.out_res = 1 / (2**output_prec - 2)
    my_rpu_config.forward.is_perfect = False
    #my_rpu_config.forward.out_noise = 0.0 # Output on the current addition (?)
    my_rpu_config.forward.ir_drop = 1.0 # TODO set to 1.0 when activating IR drop effects
    my_rpu_config.forward.ir_drop_rs = 0.35 # Default: 0.15
    my_rpu_config.pre_post.input_range.enable = True
    
    #my_rpu_config.pre_post.input_range.manage_output_clipping = True
    my_rpu_config.pre_post.input_range.decay = 0.001
    my_rpu_config.pre_post.input_range.input_min_percentage = 0.95
    my_rpu_config.pre_post.input_range.output_min_percentage = 0.95
    my_rpu_config.mapping.weight_scaling_omega = 1.0
    my_rpu_config.mapping.weight_scaling_columnwise = True
    my_rpu_config.mapping.out_scaling_columnwise = False
    #my_rpu_config.forward.noise_management = NoiseManagementType.ABS_MAX # Rescale back the output with the scaling for normalizing the input
    my_rpu_config.forward.bound_management = BoundManagementType.ITERATIVE
    my_rpu_config.clip.type = WeightClipType.LAYER_GAUSSIAN
    my_rpu_config.clip.sigma = 2.5
    my_rpu_config.forward.out_bound = 100.0  # quite restric
    return my_rpu_config

In [10]:
g_max = 50
g_min = 10
acceptance_range=0.2
single_device=False
prog_overshoot=0.0
reram_noise =ReRamCMONoiseModel(g_max=g_max, g_min=g_min,
                                                        acceptance_range=acceptance_range,
                                                        resistor_compensator=prog_overshoot,
                                                        single_device=single_device)
rpu_config = gen_rpu_config(noise_model=reram_noise)
#rpu_config = gen_rpu_config()
analog_model = convert_to_analog(model, rpu_config)
dict = torch.load("/u/mvc/aihwkit/notebooks/tutorial/Models/hwa_reram_resnet_6_8bits_input_range_Claudia.th", map_location="cuda")
dict['linear.bias'] = torch.Tensor([ 0.0046, -0.0444,  0.0443,  0.0702,  0.0274,  0.0345,  0.0014, -0.0442,
        -0.0251, -0.0688])
analog_model.load_state_dict(dict
)
analog_model.eval()
analog_model.program_analog_weights(noise_model=reram_noise)
t_inferences =  [1, 60*10, 3600, 3600 * 24, 3600 * 24*7, 3600 * 24 *30, 3600 * 24 *365, 3600 * 24 *365*2, 3600 * 24 *365*5, 3600 * 24 * 365 * 10]    
n_rep = 2
drifted_test_accs = torch.zeros(size=(len(t_inferences),n_rep))
for i,t in enumerate(t_inferences):
    for j in range(n_rep):
        analog_model.drift_analog_weights(t)
        print("Drifted at t: ", t)
        accuracy = test_step(analog_model, criterion, testloader)
        drifted_test_accs[i, j] = accuracy
        print(f"Accuracy of the analog model: {accuracy:.2f}%")



In [11]:
torch.save(drifted_test_accs, "/u/mvc/aihwkit/drift_compensation_NNs/hwa_resnet_v2_2T2R_baseline.th")

In [None]:
dict = torch.load("/u/mvc/aihwkit/notebooks/tutorial/Models/hwa_reram_June_resnet.th", map_location="cuda")

In [None]:
dict['linear.bias']

In [None]:
torch.load("/u/mvc/aihwkit/notebooks/reram_accuracy_hwa.th")

We first convert our model to an analog model using `convert_to_analog` where we pass the model and the RPU config. The optimizer, in this case, `AnalogSGD`.
The rest is standard training. Analog models can be easily saved like regular models.

In [None]:
#Original config
def gen_rpu_config(noise_model):
    rpu_config = InferenceRPUConfig()
    rpu_config.modifier.std_dev = 0.06
    rpu_config.modifier.type = WeightModifierType.ADD_NORMAL

    rpu_config.mapping.digital_bias = True
    rpu_config.mapping.weight_scaling_omega = 1.0
    rpu_config.mapping.weight_scaling_columnwise = False
    rpu_config.mapping.out_scaling_columnwise = False
    rpu_config.remap.type = WeightRemapType.LAYERWISE_SYMMETRIC
    rpu_config.mapping.max_input_size = 256
    rpu_config.mapping.max_output_size = 256
    rpu_config.clip.type = WeightClipType.LAYER_GAUSSIAN
    rpu_config.clip.sigma = 2.0

    rpu_config.forward = IOParameters()
    rpu_config.forward.is_perfect = False
    rpu_config.forward.out_noise = 0.04
    rpu_config.forward.inp_bound = 1.0
    rpu_config.forward.inp_res = 1 / (2**8 - 2)
    rpu_config.forward.out_bound = 10
    rpu_config.forward.out_res = 1 / (2**8 - 2)
    rpu_config.forward.bound_management = BoundManagementType.NONE
    rpu_config.forward.noise_management = NoiseManagementType.NONE

    rpu_config.pre_post.input_range.enable = True
    rpu_config.pre_post.input_range.decay = 0.01
    rpu_config.pre_post.input_range.init_from_data = 50
    rpu_config.pre_post.input_range.init_std_alpha = 3.0
    rpu_config.pre_post.input_range.input_min_percentage = 0.995
    rpu_config.pre_post.input_range.manage_output_clipping = False

    rpu_config.noise_model = noise_model
    rpu_config.drift_compensation = None
    return rpu_config

In [None]:
# - Fine-tuning
retrain_finetuned_model = False
g_max = 90
g_min = 10
prog_overshoot =0.0 #1.235
single_device = True
acceptance_range = 0.2
reram_noise =ReRamCMONoiseModel(g_max=g_max, g_min=g_min,
                                                        acceptance_range=acceptance_range,
                                                        resistor_compensator=prog_overshoot,
                                                        single_device=single_device)
analog_model = convert_to_analog(model, gen_rpu_config(noise_model=reram_noise))
if retrain_finetuned_model:
    optimizer = AnalogSGD(
        analog_model.parameters(), lr=lr / 10.0, momentum=0.9, weight_decay=5e-4
    )
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

    test_accs = torch.empty(epochs_finetuning)
    pbar = tqdm(range(epochs_finetuning))
    for epoch in pbar:
        train_loss, train_acc = train_step(analog_model, optimizer, criterion, trainloader)
        pbar.set_description(f"Epoch {epoch} Train loss: {train_loss:.4f} train acc. {train_acc:.2f}%")
        test_accs[epoch] = test_step(analog_model, criterion, testloader)
        scheduler.step()

    torch.save(analog_model.state_dict(), "/u/mvc/aihwkit/notebooks/tutorial/Models/hwa_reram_June_resnet.th")
    #torch.save(test_accs, "Models/test_accs.th")

else:
    import requests
    url_test_accs = 'https://aihwkit-tutorial.s3.us-east.cloud-object-storage.appdomain.cloud/test_accs.th'
    url_finetuned_model = 'https://aihwkit-tutorial.s3.us-east.cloud-object-storage.appdomain.cloud/finetuned_model_0.9.1.th'
    
    response_test_accs = requests.get(url_test_accs)
    with open('Models/test_accs.th', 'wb') as f:
        f.write(response_test_accs.content)
    
    response_finetuned_model = requests.get(url_finetuned_model)

    with open('Models/finetuned_model_0.9.1.th', 'wb') as f:
        f.write(response_finetuned_model.content)

    test_accs = torch.load("Models/test_accs.th")
    analog_model.load_state_dict(
        torch.load("Models/finetuned_model_0.9.1.th", map_location=device)
    )
    print(f"Finetuned test acc. {test_step(analog_model, criterion, testloader)}%")

In [None]:
plt.title("Finetunig test accuracy")
plt.plot(test_accs, marker="d", linestyle="--", color="b")
plt.ylabel("Test acc. (%)")
plt.xlabel("Iteration")
plt.show()

We can also verify that the weights are clipped by looking at one random weight matrix in the network.

In [None]:
w, _ = (
    analog_model
    .layer3[0]
    .conv1.analog_module.get_weights(apply_weight_scaling=True)
)
plt.hist(w.flatten().detach().numpy(), color="r", bins=50)
plt.xlabel("Unnormalized weight")
plt.ylabel("Count")
plt.show()

Finally, we would like to see how robust our model is. We first have to convert our pre-trained model to analog.
We then repeatedly call `drift_analog_weights` with a time value (in seconds). This simulates the drifting of the weights to the specified time. Note that this call also programs the weights, i.e. it simulates programming by applying specific programming noise. This noise model is defined in the `noise_model` of the RPU config.

In [None]:
rpu_conf = gen_rpu_config()
converted_model = convert_to_analog(model, rpu_conf)
# - For programming the model, we need to put it into eval() mode
converted_model = converted_model.eval()
#analog_model = analog_model.eval()
# - We repeat each measurement 5 times
n_rep = 1
t_inferences = [60., 3600., 86400., 2592000., 31104000.]
drifted_test_accs_baseline = torch.zeros(size=(len(t_inferences),n_rep))
converted_model.program_analog_weights(noise_model = rpu_conf.noise_model)


In [None]:
model.layer1[0].conv1.weight[:,0,0,0]

In [None]:
converted_model.get_weights()['layer1.0.conv1.analog_module'][0][:,0]

In [None]:
n_rep = 1
#t_inferences = [60., 3600., 86400., 2592000., 31104000.]
t_inferences = [1., 10., 60.]
drifted_test_accs = torch.zeros(size=(len(t_inferences),n_rep))
drifted_test_accs_baseline = torch.zeros(size=(len(t_inferences),n_rep))
prog = test_step(converted_model, criterion, testloader)
print(prog)
for i,t in enumerate(t_inferences):
    for j in range(n_rep):
        analog_model.drift_analog_weights(t)
        print(analog_model.get_weights()['layer1.0.conv1.analog_module'][0][:,0])
        print("Drifted at t: ", t)
        drifted_test_accs_baseline[i,j] = test_step(analog_model, criterion, testloader)
        print(drifted_test_accs_baseline[i,j])
