# Imports and Setup

In [28]:
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets, transforms, models
from torch import nn, optim
from torch.nn import functional as F
from torch.autograd import Variable
from scipy import ndimage
import copy
import random
import time
import pickle

torch.set_printoptions(precision=3)
cuda = True if torch.cuda.is_available() else False

# Data Entry and Processing

In [2]:
# Transform image to tensor and normalize features from [0,255] to [0,1]
transform = transforms.Compose([transforms.ToTensor(), 
                                transforms.Normalize((0.5,),(0.5,),(0.5)),
                                ])

In [3]:
# Using MNIST
traindata = datasets.CIFAR100('/data', download=True, train=True, transform=transform)
testdata = datasets.CIFAR100('/data', download=True, train=False, transform=transform)

Files already downloaded and verified
Files already downloaded and verified


In [4]:
# Loaders that give 64 example batches
all_data_train_loader = torch.utils.data.DataLoader(traindata, batch_size=64, shuffle=True)
all_data_test_loader = torch.utils.data.DataLoader(testdata, batch_size=64, shuffle=True)

In [5]:
# Test dataloaders
target_index = []
nontarget_index = []
for i in range(0, len(testdata)):
  if testdata[i][1] == 81:
    target_index.append(i)
  else:
    nontarget_index.append(i)
target_test_loader = torch.utils.data.DataLoader(testdata, batch_size=64,
              sampler = torch.utils.data.SubsetRandomSampler(target_index))
nontarget_test_loader = torch.utils.data.DataLoader(testdata, batch_size=64,
              sampler = torch.utils.data.SubsetRandomSampler(nontarget_index))


In [66]:
# Unlearning dataset with all target class labels randomly assigned
unlearningdata = copy.deepcopy(traindata)
unlearninglabels = list(range(100))
unlearninglabels.remove(81)
for i in range(len(unlearningdata)):
  if unlearningdata.targets[i] == 81:
    unlearningdata.targets[i] = random.choice(unlearninglabels)
unlearning_train_loader = torch.utils.data.DataLoader(unlearningdata, batch_size=64, shuffle=True)

# Model

In [7]:
# Hyperparameters
batch_size_train = 64
batch_size_test = 64
log_interval = 10
num_classes = 100
torch.backends.cudnn.enabled = True
criterion = F.nll_loss

In [22]:
# Training method
def train(model, epoch, loader, returnable=False):
  model.train()
  delta = {}
  for param_tensor in model.state_dict():
        if "weight" in param_tensor or "bias" in param_tensor:
            delta[param_tensor] = 0
  if returnable:
    thracc = []
    nacc = []
  for batch_idx, (data, target) in enumerate(loader):
    if 81 in target:
      before = {}
      for param_tensor in model.state_dict():
        if "weight" in param_tensor or "bias" in param_tensor:
          before[param_tensor] = model.state_dict()[param_tensor].clone()
    optimizer.zero_grad()
    output = model(data)
    loss = criterion(output, target)
    loss.backward()
    optimizer.step()
    if 81 in target:
      after = {}
      for param_tensor in model.state_dict():
        if "weight" in param_tensor or "bias" in param_tensor:
          after[param_tensor] = model.state_dict()[param_tensor].clone()
      for key in before:
        delta[key] = delta[key] + after[key] - before[key]
    if batch_idx % log_interval == 0:
      print("\rEpoch: {} [{:6d}]\tLoss: {:.6f}".format(
          epoch, batch_idx*len(data),  loss.item()
      ), end="")
    if returnable and False:
      thracc.append(test(model, target_test_loader, dname="Threes only", printable=False))
      if batch_idx % 10 == 0:
        nacc.append(test(model, nontarget_test_loader, dname="nonthree only", printable=False))
      model.train()
  if returnable:
    return thracc, nacc, delta

In [9]:
# Testing method
def test(model, loader, dname="Test set", printable=True):
  model.eval()
  test_loss = 0
  total = 0
  correct = 0
  with torch.no_grad():
    for data, target in loader:
      output = model(data)
      total += target.size()[0]
      test_loss += criterion(output, target).item()
      _, pred = torch.topk(output, 10, dim=1, largest=True, sorted=True)
      for i, t in enumerate(target):
        if t in pred[i]:
            correct += 1
  test_loss /= len(loader.dataset)
  if printable:
    print('{}: Mean loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
        dname, test_loss, correct, total, 
        100. * correct / total
        ))
  return 1. * correct / total

# Original Training

In [23]:
trainingepochs = 10
forgetfulepochs = 10

In [24]:
# load resnet 18 and change to fit problem dimensionality
resnet = models.resnet18()
resnet.conv1 = nn.Conv2d(3, 64, kernel_size=(7,7), stride=(2,2), padding=(3,3), bias=False)
resnet.fc = nn.Sequential(nn.Linear(512, num_classes), nn.LogSoftmax(dim=1))
optimizer = optim.Adam(resnet.parameters())

In [25]:
# Train new model for n epochs
batches = {}
for param_tensor in resnet.state_dict():
    if "weight" in param_tensor or "bias" in param_tensor:
        batches[param_tensor] = 0
for epoch in range(1, trainingepochs+1):
  starttime = time.process_time()
  # train(resnet, epoch, all_data_train_loader, returnable=False)
  thracc, nacc, delta = train(resnet, epoch, all_data_train_loader, returnable=True)
  naive_accuracy_three += thracc
  naive_accuracy_nonthree += nacc
  for key in batches:
        batches[key] = batches[key] + delta[key]
#   test(resnet, all_data_test_loader, dname="All data")
#   test(resnet, target_test_loader, dname="Threes  ")
#   test(resnet, nontarget_test_loader, dname="Nonthree")
  print(f"Time taken: {time.process_time() - starttime}")

Epoch: 1 [ 49920]	Loss: 3.289364Time taken: 428.77150072099994
Epoch: 2 [ 49920]	Loss: 2.463752Time taken: 435.0684901230002
Epoch: 3 [ 49920]	Loss: 2.405546Time taken: 432.6344274480002
Epoch: 4 [ 49920]	Loss: 2.020322Time taken: 427.58587273600006
Epoch: 5 [ 49920]	Loss: 2.062539Time taken: 429.1181333110003
Epoch: 6 [ 49920]	Loss: 1.388036Time taken: 428.2883783240004
Epoch: 7 [ 49920]	Loss: 1.320160Time taken: 428.932248094
Epoch: 8 [ 49920]	Loss: 1.160982Time taken: 426.4138098679996
Epoch: 9 [ 49920]	Loss: 0.531344Time taken: 426.44849738799985
Epoch: 10 [ 49920]	Loss: 1.154414Time taken: 427.59354569599964


In [26]:
path = F"resnet/cifar_trained.pt"
torch.save({
            'model_state_dict': resnet.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, path)

In [30]:
f = open(f"resnet/batches.pkl", "wb")
pickle.dump(batches, f)
f.close()

In [None]:
path = F"resnet/cifar_trained_accuracy_three.txt"
with open(path, 'w') as f:
  for data in naive_accuracy_three:
    f.write(f"{data},")

In [None]:
path = F"resnet/cifar_trained_accuracy_nonthree.txt"
with open(path, 'w') as f:
  for data in naive_accuracy_nonthree:
    f.write(f"{data},")

# Naive Retraining

In [None]:
path = F"resnet/cifar_trained.pt"

In [None]:
checkpoint = torch.load(path)
resnet.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
naive_accuracy_three = []
naive_accuracy_nonthree =[]

In [None]:
# Train model for 5 epochs
for epoch in range(trainingepochs+1,trainingepochs+forgetfulepochs+1):
  # train(resnet, epoch, nonthree_train_loader, returnable=False)
  thracc, nacc, _ = train(resnet, epoch, nontarget_train_loader, returnable=True)
  naive_accuracy_three += thracc
  naive_accuracy_nonthree += nacc
  test(resnet, all_data_test_loader, dname="All data")
  test(resnet, target_test_loader, dname="Threes  ")
  test(resnet, nontarget_test_loader, dname="Nonthree")
  path = F"resnet/cifar_retraining-epoch-{epoch}.pt"
  torch.save({ 
            'model_state_dict': resnet.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, path)

In [None]:
path = F"resnet/cifar_naive_accuracy_three.txt"
with open(path, 'w') as f:
  for data in naive_accuracy_three:
    f.write(f"{data},")

In [None]:
path = F"resnet/cifar_naive_accuracy_nonthree.txt"
with open(path, 'w') as f:
  for data in naive_accuracy_nonthree:
    f.write(f"{data},")

# Unlearning

In [None]:
path = F"resnet/cifar_trained.pt"

In [None]:
checkpoint = torch.load(path)
resnet.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
unlearning_accuracy_three = []
unlearning_accuracy_nonthree =[]

In [None]:
# Train model for 5 epochs
for epoch in range(trainingepochs+1,trainingepochs+forgetfulepochs+1):
  # train(resnet, epoch, unlearning_train_loader, returnable=False)
  thracc, nacc, _ = train(resnet, epoch, unlearning_train_loader, returnable=True)
  unlearning_accuracy_three += thracc
  unlearning_accuracy_nonthree += nacc
  test(resnet, all_data_test_loader, dname="All data")
  test(resnet, target_test_loader, dname="Threes  ")
  test(resnet, nontarget_test_loader, dname="Nonthree")
  path = F"resnet/cifar_unlearning-epoch-{epoch}.pt"
  torch.save({ 
            'model_state_dict': resnet.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, path)

In [None]:
path = F"resnet/cifar_unlearning_accuracy_three.txt"
with open(path, 'w') as f:
  for data in unlearning_accuracy_three:
    f.write(f"{data},")

In [None]:
path = F"resnet/cifar_unlearning_accuracy_nonthree.txt"
with open(path, 'w') as f:
  for data in unlearning_accuracy_nonthree:
    f.write(f"{data},")


# Selective Unlearning

In [None]:
path = F"resnet/cifar_trained.pt"

In [None]:
checkpoint = torch.load(path)
resnet.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
selective_accuracy_three = []
selective_accuracy_nonthree =[]

In [None]:
const = 1
with torch.no_grad():
    state = resnet.state_dict()
    for param_tensor in state:
        if "weight" in param_tensor or "bias" in param_tensor:
          state[param_tensor] = state[param_tensor] - const*batches[param_tensor]
resnet.load_state_dict(state)

In [None]:
path = F"resnet/cifar_selective-epoch-{10}.pt"
torch.save({ 
        'model_state_dict': resnet.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        }, path)

In [None]:
# Train model for 10 epochs
for epoch in range(trainingepochs+1,trainingepochs+forgetfulepochs+1):
  # train(resnet, epoch, unlearning_train_loader, returnable=False)
  thracc, nacc, _ = train(resnet, epoch, nontarget_train_loader, returnable=True)
  selective_accuracy_three += thracc
  selective_accuracy_nonthree += nacc
  test(resnet, all_data_test_loader, dname="All data")
  test(resnet, target_test_loader, dname="Threes  ")
  test(resnet, nontarget_test_loader, dname="Nonthree")
  path = F"resnet/cifar_selective-epoch-{epoch}.pt"
  torch.save({ 
            'model_state_dict': resnet.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, path)

In [None]:
path = F"resnet/cifar_selective_accuracy_nonthree.txt"
with open(path, 'w') as f:
  for data in selective_accuracy_nonthree:
    f.write(f"{data},")

In [None]:
path = F"resnet/cifar_selective_accuracy_three.txt"
with open(path, 'w') as f:
  for data in selective_accuracy_three:
    f.write(f"{data},")