# Homework 1: Coding

<b>Important</b>: when you submit this file to gradescope, it should contain only method definitions (except the imports and `test_data`, `username` definitions below). To test your work, you need to use `@publictest` to decorate test methods and they will be executed for you here. You can use `test_data` to store data for your tests. See for example the `test_train_sklearn` method below.

<b>Important</b>: gradescope will sometimes use the output of the public test cases in this file to assign points. Make sure you submit a version of this notebook that includes the outputs of the `@publictest` cells we provide for you. Additional tests you define for your own purposes will be ignored.

Your code should fully fit between these comments:

```# >> Your code starts here. << ```

```# >> Your code ends here. << ```

`utils` contains a few useful methods and classes you need to use. Firstly:

- `load_mnist` -- loads mnist data, returns `Splits` namedtuple.
- namedtuple `Hypers(epochs: int, learning_rate: float, batch_size: int)` -- training hyper parameters
- namedtuple `Splits(train: Dataset, test: Dataset, valid: Dataset)` -- stores data splits
- namedtuple `Dataset(X: numpy.array, y: numpy.array)` -- stores a dataset
- namedtuple `LinearModel(W: numpy.array, b: numpy.array` -- stores a linear model
- `Visualize` -- several visualization utilities. See `test_explain` on sample usage.
- `timeit` -- times a thunk, relevant to the last deep learning exercise.
- `check_submission` -- tests whether this notebook has the required definitions.


First, run the following command to get the python scripts needed.

In [None]:
import os
if (not os.path.exists("utils.py")) or (not os.path.exists("requirements.txt")):
  print("downloading requirements")
  assert(0 == os.system("wget -O hw1_scripts.tar.gz "
    + "https://www.dropbox.com/s/fvvoag1vl3ueulb/hw1_scripts.tar.gz?dl=0"
  ))
  assert(0 == os.system("tar xvzf hw1_scripts.tar.gz"))
  assert(0 == os.system("rm hw1_scripts.tar.gz"))
  assert(0 == os.system("pip install -r requirements.txt"))

Install the required dependencies

In [None]:
!pip install -r requirements.txt

In [None]:
username = "" # Your username

### LEAVE THE REST OF THIS CELL AS IS ###

from utils import load_mnist, load_cifar, Hypers, Splits, Dataset, LinearModel, Visualize, exercise, timeit, check_submission
from bunch import Bunch
import tensorflow as tf
import numpy as np
from tensorflow.keras.models import Model
from google.colab import drive, auth
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from oauth2client.client import GoogleCredentials

test_data = Bunch() # Store anything you need between your personal test cases in this bunch.

# Decorate your personal tests with @publictest so they are not executed under gradescope. The decorator 
# also controls the random seeds set for numpy and keras so it should produce repeatable result. You can 
# change the seed to try out different outcomes by passing the seed argument to the decorator like below.
def publictest(_func=None, *, seed=42):
    def wf(f):
        def wwf():
            exercise(username=username, seed=seed, cname=f.__name__)
            f()

        if __name__ == "__main__":
            wwf()
          
    if _func is None:
        return wf
    else:
        return wf(_func)

@publictest(seed=42)
def initialize_tests():
    test_data.mnist = load_mnist(flatten=True)
    test_data.cifar = load_cifar(flatten=False)
    
    v1 = Visualize.images(10, title="example MNIST training images")
    v1(test_data.mnist.train.X[0:10])
    
    v2 = Visualize.images(10, title="example CIFAR training images")

    v2(test_data.cifar.train.X[0:10])

In [None]:
# Additional imports needed for your solutions.

# >> Your code starts here. <<

# >> Your code ends here. <<

## Part 1: Logistic regression implementations

Implement a logistic regression to classify images of hand-written digits in the `MNIST` dataset. In this
dataset, each input image is of size $28 \times 28$ and reshaped into a size $784$ vector. The
output is an integer from 0 to 9 representing the image class. <b>You need to solve the same
  problem three ways</b>: using `scikit-learn`, using `keras`, and using `numpy`. For the numpy version, you will need to implement stochastic gradient descent.

### logistic regression using scikit-learn

Use sklearn to train a logistic regression model, extract the parameters it produces and define methods for using those parameters to make predictions. The methods you need to provide are:

- `train_sklearn`
- `softmax` -- the softmax activation function. Be aware that your input is a batch of
  data of size (N, 10) where N is the batch size.
- `scores`
- `forward`
- `evaluate` -- accuracy measurement. Given the model parameters and test data, computes
  the predictions and returns the accuracy of the predictions relative to the given test data.

The scaffolding around these is provided for you in `test_train_sklearn`.

In [None]:
def train_sklearn(hypers: Hypers, dataset: Dataset):
    """train_sklearn Train scikit-learn multiclass one-vs-rest Logistic Regression model directly
    on data.

    Arguments:
        hypers {Hypers} -- Training hyper-parameters, only epochs is relevant to this method. 
                           You can ignore the rest.
        dataset {Dataset} -- Input dataset tuple with X and y

    Returns:
        {LinearModel} -- Tuple with W=Optimal weight and b=bias
    """

    # >> Your code starts here. <<

    # >> Your code ends here. <<

def softmax(y: np.ndarray):
    """softmax Softmax activation function

    Arguments:
        y {np.float np.ndarray} -- Input logits

    Returns:
        {np.float np.ndarray} -- Post-softmax probits
    """

    # >> Your code starts here. <<

    # >> Your code ends here. <<

def scores(X: np.ndarray, model: LinearModel):
    """Return pre-softmax scores."""
    
    # >> Your code starts here. <<

    # >> Your code ends here. <<
    
def forward(X: np.ndarray, model: LinearModel):
    """Return linear model output probabilities. """
    
    # >> Your code starts here. <<

    # >> Your code ends here. <<
    
def evaluate(dataset: Dataset, model: LinearModel):
    """test_accuracy Evaluate the model accuracy on the test dataset

    Arguments:
        dataset {Dataset} -- dataset
        model {LinearModel} -- linear model
    """

    # >> Your code starts here. <<

    # >> Your code ends here. <<

In [None]:
@publictest
def test_train_sklearn():
    mnist = test_data.mnist
    
    model = train_sklearn(Hypers(epochs=2), mnist.train)
    
    test_accuracy = evaluate(dataset=mnist.test, model=model)
    
    print("Test accuracy [SKLEARN]:", test_accuracy)
    
    if test_accuracy >= 0.80:
        print("ACCURACY OK")
    else:
        print("ACCURACY FAIL")
    
    test_data.model_sklearn = model

### Logistic regression and stochastic gradient descent using numpy

#### Exercise

You program in this section should be self contained and independent from other sections; do not
make use of any functions other than `numpy` methods. <b>No Keras or scikit-learn</b> methods are allowed in this section. Furthermore, other than looping over epochs or batches,
<b>do not use loops in your code</b>. Processing instances in a batch or values in an instance
should be done using numpy vector/matrix/tensor operations. Loops include `for` and
`while` statements, comprehensions, generators, and recursion.

Complete methods `onehot`, `backward`, and `sgd`. The scaffolding around these is provided for you in `test_numpy`.


- `onehot`
- `backward`
- `sgd` -- min-batch Stochastic Gradient Descent. Computes the optimal weight and bias
  after the given number of epochs. You should use the training parameters specified in the method
  arguments.


In [None]:
def onehot(dataset: Dataset):
    y_onehot = None # dataset.y converted into onehot
    
    # >> Your code starts here. <<
    
    # >> Your code ends here. <<
    
    return dataset._replace(y=y_onehot)

def backward(dataset: Dataset, model: LinearModel):
    """
    Return dLdW and dLdb .
    """
    
    dLdW = None
    dLdb = None
    
    # >> Your code starts here. <<

    # >> Your code ends here. <<
    
    return dLdW, dLdb
    
def sgd(hypers: Hypers, dataset: Dataset):
    """sgd Run SGD optimization all the parameters

    Arguments:
        hypers {Hypers} -- training hyper parameters
        dataset {Dataset} -- training data

    Returns: LinearModel with
        W {np.float np.ndarray} -- Learned weight
        b {np.float np.ndarray} -- and bias
    """

    n, m = dataset.X.shape
    n_class = dataset.y.shape[1]
    W = np.zeros((m, n_class))
    b = np.zeros((n_class, ))
    
    # >> Your code starts here. <<

    # >> Your code ends here. <<

In [None]:
@publictest
def test_numpy():
    hypers = Hypers(
        epochs=5,
        learning_rate=1.0,
        batch_size=64
    )
    
    mnist = test_data.mnist

    model = sgd(hypers, dataset=onehot(mnist.train))

    test_accuracy = evaluate(mnist.test, model)
    
    print("Test accuracy [NUMPY SGD]:", test_accuracy)

    if test_accuracy >= 0.80:
        print("ACCURACY OK")
    else:
        print("ACCURACY FAIL")
    
    test_data.model_numpy = model

### Logistic regression using Keras

Keras is a high-level neural network API which runs on top of a Tensorflow, CNTK, or Theano
backend. Typically one can choose the backend if they have more than one installed, however, we
will be using Tensorflow exclusively.

#### Exercise
  Complete `build_keras_model` and `train_run_keras`. You may want to to consult
  the documentation for Keras on [Keras docs](https://keras.io/). Your program should contain the following
  parts with each no more than a line or two:
  
 
  - The scaffolding around the two required methods is given to you in `test_keras`. This method should run and produce your model's accuracy.
  - Create the logistic regression model using canonical Keras via the Sequential or Functional
    approach.
  - Compile your model with the desired loss function, optimizer, and metrics.
  - Fit the training data (you can also specify validation data using the validation set
    here).
  - Predict on the test data and report the test accuracy (the percentage of images correctly
    classified).

In [None]:
def build_keras_model(input_dim=784, num_class=10):
    """build_model Build a Keras model of logistic regression

    Keyword Arguments:
        input_dim {int} -- The number of dimensions for the input data
            (default: {784})
        num_class {int} -- The number of classes (default: {10})

    Returns:
        {keras.models.Model} -- Your logistic regression model. It is a
            keras.model.Model object created by either a sequential way or a
            functional way.
    """
    
    model: Model = None

    # >> Your code starts here. <<

    # >> Your code ends here. <<

    return model


def train_keras_model(hypers: Hypers, keras_model: Model, splits: Splits):  
    """train the Keras model using compile and fit

    Keyword Arguments:
        hypers {Hypers} -- training hyper parameters
        Splits(train: Dataset, test: Dataset, valid: Dataset) -- stores data splits

    Returns:
        {LinearModel} -- Tuple with W=Optimal weight and b=bias
    """  
    model: LinearModel = None
    
    # >> Your code starts here. <<

    # >> Your code ends here. <<
    
    return model

In [None]:
@publictest
def test_keras():
    hypers = Hypers(
        epochs=2,
        learning_rate=0.1,
        batch_size=64
    )
    
    mnist = test_data.mnist
    
    input_dim = mnist.train.X.shape[1]
    num_class = len(set(mnist.train.y))
    
    keras_model = build_keras_model(
        input_dim=input_dim,
        num_class=num_class
    )
    
    model = train_keras_model(
        hypers=hypers,
        keras_model=keras_model,
        splits=mnist
    )

    test_accuracy = evaluate(dataset=mnist.test, model=model)
    
    print("Test accuracy:", test_accuracy)
    
    if test_accuracy >= 0.80:
        print("ACCURACY OK")
    else:
        print("ACCURACY FAIL")
        
    test_data.model_keras = model

## Part 2: Applications

### Application: Explanations

Complete the following methods. Scaffolding/test is provided in the `test_explain` method and `most_wrong` can be used to find interesting examples to explain.

- `attribution`
- `explain`

In [None]:
def attribution(model: LinearModel, X, y_explained):
    """
    Complete the attribution function for pre-softmax logistic regression
    Returns:
        Attribution vector which is the same size as X
    """
    # >> Your code starts here. <<

    # >> Your code ends here. <<

def explain(model: LinearModel, X, y_explained):
    """
    An explanation is the
    element-wise product of an input x and the attribution a for a given prediction
    Returns:
        Explanation vector which is the same size as X
    """
    # >> Your code starts here. <<
    
    # >> Your code ends here. <<

def most_wrong(model: LinearModel, dataset: Dataset, y_wrong: int):
    """
    Finds instances in a given dataset that are most confidentally incorrectly 
    predicted by a given model as the given class. Returned are the most wrong input, 
    its correct class"""
    
    scores = forward(dataset.X, model)
    preds = scores.argmax(axis=1)

    indices = (preds != dataset.y) * (preds == y_wrong)
    
    wrongs = Dataset(
        X=dataset.X[indices],
        y=dataset.y[indices]
    )
    
    wrong_scores = scores[indices]

    worst_index = np.argsort(wrong_scores.max(axis=1))[0]
    
    return wrongs.X[worst_index], wrongs.y[worst_index]

In [None]:
@publictest
def test_explain():
    model = test_data.model_numpy
    mnist = test_data.mnist
    
    x = mnist.train.X[5]
    c = 4 # class
    a = attribution(model, x, c)
    
    # check completeness with some baselines
    for baseline in [np.ones_like(x), np.zeros_like(x)]:
        print("attribution complete?: ", 
              abs(((x-baseline)*a).sum() - 
                  (scores(x, model)[c] - scores(baseline, model)[c])) 
              < 0.0000001)
    
    # visualize attributions and explanations
    v_pos = Visualize.influences(10, title="attributions")
    v_pos([attribution(model, x, i).reshape(28,28) for i in range(10)])
    
    v_pos = Visualize.influences(10, title="explanations")
    v_pos([explain(model, x, i).reshape(28,28) for i in range(10)])

    target_y = 9
    
    wrong, wrong_y = most_wrong(model, mnist.train, target_y)

    v = Visualize.images(1, title="most wrong example")
    v(wrong)

    v_neg = Visualize.influences(2, title="explanation for correct class, predicted class")
    v_neg([explain(model, wrong, i).reshape(28,28) for i in [wrong_y, target_y]])

    print("correct class = ", wrong_y)
    print("predicted class = ", target_y)

### Application: model stealing

Complete the following methods. A test is provided in `test_invert`.
- `invert`

In [None]:
def invert(f):
    """Produce LinearModel(W, b) with only functional interface to pre-softmax scores of a linear model."""
    
    b = None
    W = None
    
    # >> Your code starts here. <<
    
    # >> Your code ends here. <<
    
    return LinearModel(W, b)

In [None]:
@publictest
def test_invert():
    model = test_data.model_numpy
    
    model_inv = invert(lambda x: scores(X=x.reshape((1,28*28)), model=model)[0])
    
    print("W match?", (abs(model_inv.W - model.W) < 0.000001).all())
    print("b match?", (abs(model_inv.b - model.b) < 0.000001).all())

### Application: adversarial attacks

Complete the following method. `test_attack` provides a use case.
- `attack`

In [None]:
def attack(model, x, y_target):
    """
    Transform x into a valid image in [0,1] that makes model W,b indifferent between y_real and y_target.
    Returns:
        Transformed x
    """
    
    x = x.copy() # working on x directly would pollute your dataset otherwise
    
    # >> Your code starts here. <<
    
    # >> Your code ends here. <<
    
    return x

In [None]:
@publictest
def test_attack():    
    model = test_data.model_numpy
    mnist = test_data.mnist
    
    target_y = 0
    
    x, y = mnist.test.X[0], mnist.test.y[0]
    xa = attack(model, x, target_y)

    print("attack success?", scores(xa, model)[target_y] >= max(scores(xa, model)))
    
    v = Visualize.images(2, title="original, attacked")
    v(x, xa)
    vdiff = Visualize.influences(1, title="delta")
    vdiff(x - xa)
    
    print("delta from original:")
    print(*[f"L_{o}={np.linalg.norm(x-xa, ord=o)}" for o in [0, 1, 2, np.inf]])

## Part 3: Intense training using GPU

Implement the following method and test with `test_cifar_model`.

- `train_cifar_model`

In [None]:
def train_cifar_model(hypers: Hypers, cifar: Splits):
    """
    Compile and fit a Keras model for training using the CIFAR dataset. 
    You are free to choose the number/type of layers you want in the model as
    long as the accuracy is >= 70%, as tested in the following public test.
    Keyword Arguments:
        hypers {Hypers} -- training hyper parameters
        Splits(train: Dataset, test: Dataset, valid: Dataset) -- stores data splits
    Returns:
        {keras.models.Model}
    """
    model: Model = None
        
    # >> Your code starts here. <<
    
    # >> Your code ends here. <<

    return model

In [None]:
@publictest
def test_cifar_model():
    cifar_2d = test_data.cifar
    
    # You can tune the hyper parameters to suit your model.
    hypers = Hypers(epochs=50, learning_rate=0.1, batch_size=512)
    
    model, time = timeit(lambda: train_cifar_model(hypers, cifar_2d))
    
    test_data.cifar_model = model
    
    test_accuracy = np.mean(
        np.argmax(model.predict(cifar_2d.test.X), axis=1) == cifar_2d.test.y
    )
    
    print("Test accuracy:", test_accuracy)
    
    if test_accuracy >= 0.70:
        print("ACCURACY OK")
    else:
        print("ACCURACY FAIL")
    
    print("Training time:", time)
    
    if time.total_seconds() < 1000:
        print("TIME OK")
    else:
        print("TIME FAIL")

# Check your submission

Make sure all of the methods originally part of this notebook are defined. The first time you run this, you should get a google drive authentication prompt. This is due to this notebook being stored on google drive and thus needs to be retrieved before checking contents.

In [None]:
@publictest
def check():
    auth.authenticate_user()
    gauth = GoogleAuth()
    gauth.credentials = GoogleCredentials.get_application_default()
    drive = GoogleDrive(gauth)
    # Find solution notebook in your drive and make a copy here in colab.
    fid = drive.ListFile({'q':"title='solution.ipynb'"}).GetList()[0]['id']
    f = drive.CreateFile({'id': fid})
    f.GetContentFile('solution.ipynb')
    check_submission(reqs=[
        'train_sklearn', 'softmax', 'scores', 'forward', 'evaluate', 'test_train_sklearn',
        'onehot', 'backward', 'sgd', 'test_numpy',
        'build_keras_model', 'train_keras_model', 'test_keras',
        'explain', 'most_wrong', 'test_explain',
        'invert', 'test_invert',
        'attack', 'test_attack', 'train_cifar_model', 'test_cifar_model'
    ])