# Intesa Sanpaolo: AI4Citizens

## Tutorial Reti Neurali

In questo tutorial impareremo a:
- usare la **differenziazione automatica**;
- costruire e allenare una [rete neurale artificiale](https://www.wikiwand.com/it/Rete_neurale_artificiale) usando Tensorflow; 
- migliorare le performance di una rete neurale usando alcuni accorgimenti di base;
- usare il **Transfer Learning** sfruttando i pesi di una rete allenata su un dataset di un altro dominio; 
- generare dati sintetici usando gli **autoencoder**;
- costruire una **GAN**.

Punti di attenzione:
- dato il poco tempo a disposizione, questo tutorial vuole essere una piccola introduzione per iniziare a usare Tensorflow velocemente. Per una trattazione esaustiva, si prega di consultare la [documentazione ufficiale](https://www.tensorflow.org/api_docs);
- questo notebook contenente tutto il codice presentato sarà sempre a vostra disposizione;    

---


### Installazione pacchetti mancanti e aggiornamento Tensorflow

Su Colab è possibile eseguire comandi di terminale anteponendo il punto escamativo "!" al comando che vogliamo dare. Nella cella qui sotto andiamo a installare il pacchetto [emnist](https://www.nist.gov/itl/products-and-services/emnist-dataset) usando pip. 

In [None]:
! pip install emnist

In [None]:
! pip freeze

### Caricamento pacchetti

Importiamo alcune funzioni/oggetti che useremo dopo. 

In [None]:
from emnist import extract_training_samples, extract_test_samples
from matplotlib.pyplot import matshow, subplots, hist, show
from numpy import unique, prod, arange, exp, minimum, argmax
from numpy.random import randint, choice, normal
from string import ascii_lowercase
from tensorflow import GradientTape, constant as tf_constant, Variable, \
ones_like, zeros_like, function as tf_function, __version__ as tf_version
from tensorflow.data import Dataset
from tensorflow.random import normal as tf_normal
from tensorflow.math import reduce_mean as tf_reduce_mean, sqrt as tf_sqrt
from tensorflow.keras import Model
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Input, Dense, Dropout, Flatten, Conv2D, \
MaxPool2D, Reshape, BatchNormalization, Activation, LeakyReLU, Conv2DTranspose,\
LeakyReLU, Concatenate, RandomTranslation, RandomRotation
from tensorflow.keras.losses import SparseCategoricalCrossentropy, \
MeanSquaredError, BinaryCrossentropy, CategoricalCrossentropy
from tensorflow.keras.utils import set_random_seed
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model, to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import Accuracy

---



### Differenziazione automatica

Data una funzione *bella* $y = f(x)$ e un punto del suo dominio $x_0$, voglio sapere in quale direzione mi devo muovere per raggiungere un punto di minimo. Per esempio, $y = x^2$ e $x_0=3$. La derivata è $y'(x) = 2x$ e $y'(3) = 6$. Per raggiungere il minimo (che si trova in $x=0$), dobbiamo andare nella direzione opposta. L'operazione di differenziazione automatica è già implementata in Tensorflow. 

In [None]:
x = tf_constant(3.0)
with GradientTape() as tape:
    tape.watch(x)
    y = x * x
    dy_dx = tape.gradient(y, x) 
    print(dy_dx)

A questo punto la domanda spontanea è: e quindi? Supponiamo di voler fare una regressione lineare. 




In [None]:
set_random_seed(1102)

x1 = tf_normal([1000]) # <-- feature 1
x2 = tf_normal([1000]) # <-- feature 2
x3 = tf_normal([1000]) # <-- feature 3

# variabile target
y = 3.0 * x1 - 5.0 * x2 - 1.0 * x3 # <-- coefficienti = (3.0, -5.0, -1.0)

Supponiamo di non conoscere i ciefficienti a priori e di volerli ricavare dai dati. 

In [None]:
c = Variable([0.0, 0.0, 0.0]) # <-- inizializzazione
for j in range(100):
    with GradientTape() as tape:
        tape.watch(c)
        y_pred = c[0] * x1 + c[1] * x2 + c[2] * x3 # <-- predizione
        loss = tf_reduce_mean((y - y_pred)**2) # <-- quanto sto sbagliando?
        grad =  tape.gradient(loss, c) # <-- differenziazione automatica
        grad_normalized = grad / tf_sqrt(max(sum(grad**2), 0.00001)) 
        c = c - 10./(100.+j) * grad_normalized # <-- aggiorno i pesi
        if (j+1) % 10 == 0:
            print("iteration =", j+1, 
                  "\t loss =", loss.numpy(), 
                  #"\t grad =", grad.numpy(), 
                  "\t norm grad =", grad_normalized.numpy(), 
                  "\t coeff =", c.numpy())
print("Coefficients found", c.numpy())

---



### Caricamento dataset delle lettere

Carichiamo il dataset delle lettere scritte a mano usando la funzione `extract_training_samples` e `extract_test_samples`. I dati così caricati sono già divisi in train set (usato per allenare i pesi della rete) e test set (dati non presenti durante il training e usati solo per testare le performance). 

Come primo passo, normalizziamo tutte le features nel range [0, 1] e facciamo in modo che le etichette abbiano una numerazione che inizi da 0. 

In [None]:
letters_train_features, letters_train_labels \
= extract_training_samples('letters')
letters_train_features = letters_train_features / 255.0
letters_train_labels_one_hot = to_categorical(
    letters_train_labels - 1, 
    len(unique(letters_train_labels))
)

print("Dimensioni della feature table =", letters_train_features.shape)
print("Dimensione del vettore target  =", letters_train_labels_one_hot.shape)

In [None]:
letters_test_features, letters_test_labels \
= extract_test_samples('letters')
letters_test_features = letters_test_features / 255.0
letters_test_labels_one_hot = to_categorical(
    letters_test_labels - 1, 
    len(unique(letters_test_labels))
)

print("Dimensioni della feature table =", letters_test_features.shape)
print("Dimensione del vettore target  =", letters_test_labels_one_hot.shape)

---



### Visualizzazione dei dati

Possiamo disegna qualche istanza del training set.

In [None]:
def plot_samples(data) -> None:
    f, axarr = subplots(3,3)
    rnd_ints = randint(low=0, high=data.shape[0], size=prod(axarr.shape))
    for j, ax in enumerate([item for sublist in axarr for item in sublist]):
        ax.imshow(data[rnd_ints[j]])
    f.show()

plot_samples(letters_train_features)

### Distribuzione della variabile target

Controlliamo che la variabile target sia equidistribuita nelle varie classi. 

In [None]:
hist(letters_train_labels, rwidth=0.5, bins=len(unique(letters_train_labels)))
show()

---

### Prima rete neurale aritificiale

In modo molto semplicistico, possiamo affermare che i tre ingredienti per una rete neurale artificiale sono:
- architettura della rete; 
- la loss da minimizzare;
- l'ottimizzatore che aggiorna i pesi della rete affinché la loss venga minimizzata. 

Partiamo dunque con l'architettura della rete concatenando in sequenza diversi layer già implementati in Tensorflow. 

In [None]:
set_random_seed(1102)

def get_model_dummy(input_shape, output_shape, units=64) -> Model:
    model = Sequential([
        Flatten(input_shape=input_shape),
        Dense(units, activation='relu'),
        Dense(output_shape, activation="softmax")
    ])
    return model

model_dummy = get_model_dummy(
    letters_train_features.shape[1:], 
    len(unique(letters_train_labels)), 
    units=64
)

Possiamo anche disegnare la nostra rete usando il comando `plot_model`.

In [None]:
plot_model(model_dummy, show_shapes=True)

In [None]:
for layer in model_dummy.layers:
    print("\n", layer.name)
    for w in layer.weights:
        print(w.numpy())

Compiliamo la nostra prima rete specificando l'[ottimizzatore](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers), la [loss](https://www.tensorflow.org/api_docs/python/tf/keras/losses) e la [metrica di monitoraggio](https://www.tensorflow.org/api_docs/python/tf/keras/metrics). Quest'ultima non è strettamente necessaria ai fini del training ma può essere utile monitorarla. 


In [None]:
model_dummy.compile(
    optimizer='adam',
    loss=CategoricalCrossentropy(),
    metrics=["accuracy"]
)

model_dummy.fit(
    letters_train_features, letters_train_labels_one_hot, 
    batch_size=256, epochs=100, validation_split=0.2, verbose=2, 
    callbacks=[EarlyStopping(monitor='val_loss', patience=3)]
)

### Proviamo con la convoluzione

I [layer convoluzionali](https://www.wikiwand.com/it/Rete_neurale_convoluzionale) sono progettati appositamente per i task di computer vision. 

In [None]:
set_random_seed(1102)

def get_model_conv(input_shape, output_shape) -> Model:
    layer_input = Input(shape=input_shape)
    reshaped = Reshape((28, 28, 1))(layer_input)

    conv1 = Conv2D(32, (3,3))(reshaped)
    conv1 = BatchNormalization()(conv1)
    conv1 = LeakyReLU()(conv1)
    conv1 = MaxPool2D()(conv1)
    conv1 = Flatten()(conv1)
    conv1 = Dense(32, activation='sigmoid')(conv1)

    drop2 = Dropout(0.05)(reshaped)
    rot2 = RandomRotation(factor=(-0.05, +0.05))(drop2)
    tran2 = RandomTranslation(
        height_factor=(-0.05, +0.05), 
        width_factor=(-0.05, +0.05)
    )(rot2)
    conv2 = Conv2D(32, (5,5))(tran2)
    conv2 = BatchNormalization()(conv2)
    conv2 = LeakyReLU()(conv2)
    conv2 = MaxPool2D()(conv2)
    conv2 = Flatten()(conv2)
    conv2 = Dense(32, activation='sigmoid')(conv2)

    fc1 = Flatten()(reshaped)
    fc1 = Dense(64, activation='sigmoid')(fc1)
    fc1 = Dense(32, activation='sigmoid')(fc1)

    layer_latent = Concatenate(name="model_conv_latent")((conv1, conv2, fc1))
    x = Dense(64, activation='linear')(layer_latent)
    x = LeakyReLU()(x)
    layer_output = Dense(output_shape, activation="softmax")(x)
    model = Model(layer_input, layer_output)
    return model, layer_input, layer_latent

model_conv, model_conv_input, model_conv_latent = get_model_conv(
    letters_train_features.shape[1:], 
    len(unique(letters_train_labels))
)

plot_model(model_conv)

In [None]:
model_conv.compile(
    optimizer='adam',
    loss=CategoricalCrossentropy(),
)

model_conv.save("model_conv_random.h5")

model_conv.fit(
    letters_train_features, letters_train_labels_one_hot, 
    batch_size=256, epochs=100, validation_split=0.2, verbose=2, 
    callbacks=[EarlyStopping(monitor='val_loss', patience=3)]
)

model_conv.save("model_conv_trained.h5")

### Performance sul test set

Fin'ora abbiamo usato il training set e la validation set. Vediamo come le performance delle due reti sopra allenate sul test set. 

In [None]:
preds_dummy = model_dummy.predict(letters_test_features)
print("Dummy model accuracy =", sum(argmax(preds_dummy) == argmax(letter_test_labels_one_hot)))

In [None]:
preds_conv = model_conv.predict(letters_test_features)
print("Conv model accuracy =", sum(argmax(preds_conv) == argmax(letter_test_labels_one_hot)))

In [None]:
del model_dummy

---



### Problema: training set molto piccolo

Cambiamo il dataset. Questa volta prendiamo le immagini che riportano le cifre. Per complicarci le cose, selezioniamo casualmente solo 60 istanze dal training set. 

In [None]:
set_random_seed(1102)

digits_train_features, digits_train_labels \
= extract_training_samples('digits')
digits_train_features = digits_train_features / 255.0
idx = choice(arange(0, digits_train_features.shape[0]), size=60, replace=False)
digits_train_features = digits_train_features[idx, :, :]
digits_train_labels = digits_train_labels[idx]

print("Dimensioni della feature table =", digits_train_features.shape)
print("Dimensione del vettore target  =", digits_train_labels.shape)

In [None]:
digits_test_features, digits_test_labels \
= extract_test_samples('digits')
digits_test_features = digits_test_features / 255.0

print("Dimensioni della feature table =", digits_test_features.shape)
print("Dimensione del vettore target  =", digits_test_labels.shape)

Come prima, ispezioniamo il dataset disegnando qualche istanza a video. 

In [None]:
plot_samples(digits_train_features)

E studiamo un attimo anche la distribuzione delle varie classi. Ci sono delle classi che hanno solo 3 istanze! 

In [None]:
hist(digits_train_labels, rwidth=0.5, bins=len(unique(digits_train_labels)))
show()

Proviamo subito con una rete molto semplice per vedere fino a dove riusciamo ad arrivare. 

In [None]:
set_random_seed(1102)

model_dummy_4_digits = get_model_dummy(
    digits_train_features.shape[1:], 
    len(unique(digits_train_labels)), 
    16
)

model_dummy_4_digits.compile(
    optimizer='adam',
    loss=SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

In [None]:
model_dummy_4_digits.summary()

Questa rete ha 12730 pesi allenabili. 

In [None]:
model_dummy_4_digits.fit(
    digits_train_features, digits_train_labels, 
    batch_size=256, epochs=100, validation_split=0.2, verbose=0, 
    callbacks=[EarlyStopping(monitor='val_loss', patience=3)]
)

In [None]:
model_dummy_4_digits.evaluate(digits_test_features, digits_test_labels)

Non così male, considerando che abbiamo solo 60 istanze nel training set! Che cosa possiamo fare per migliorare? 3 Idee: 

- ottimizzare l'architettura della rete (esercizio a casa); 
- sfruttare il transfer learning; 
- generare dati sintetici. 

--- 



### Transfer Learning

Una delle magie delle reti neurali è che possiamo prendere una rete allenata su un dominio diverso dal nostro e usarlo per migliorare il nostro task. In questo esempio specifico, prendiamo la **rete allenata sulle lettere dell'alfabeto**, togliamo gli ultimi layer, fissiamo i pesi e poi aggiungiamo un paio di layer fully-connected per imparare a **classificare le cifre**.  

In [None]:
model_conv.layers

In [None]:
set_random_seed(1102)

layer_output_digits = Dense(len(unique(digits_train_labels)), activation="softmax")(
    Dense(64, activation="relu")(model_conv_latent)
)
model_transfer = Model(model_conv_input, layer_output_digits)
model_conv.trainable = False

In [None]:
model_transfer.summary()

Da notare che i pesi allenabili sono solo 6858, che sono di meno rispetto a quelli della rete dummy che abbiamo usato sopra. 

In [None]:
set_random_seed(1102)

model_transfer.compile(
    optimizer='adam',
    loss=SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

model_transfer.fit(
    digits_train_features, digits_train_labels, 
    batch_size=256, epochs=100, validation_split=0.2, verbose=0, 
    callbacks=[EarlyStopping(monitor='val_loss', patience=3)]
)

In [None]:
model_dummy_4_digits.evaluate(digits_test_features, digits_test_labels)

In [None]:
model_transfer.evaluate(digits_test_features, digits_test_labels)

Siamo riusciti a migliorare! :)

In [None]:
del model_dummy_4_digits
del model_transfer

--- 



### Generazione dati sintetici

Un'altra delle possibili applicazioni delle reti neurali è la generazione dei dati sintetici. In questo esempio, sfruttiamo un [autoencoder](https://www.wikiwand.com/en/Autoencoder). 

In [None]:
set_random_seed(1102)
model_conv.trainable = False

def get_model_ae(model_conv_input, model_conv_latent) -> Model:

    bottleneck = Dense(32, activation='linear')(model_conv_latent)
    model_encoder = Model(model_conv_input, bottleneck)

    layer_input_dec = Input(shape=32)
    x = Dense(64, activation='relu')(layer_input_dec)
    layer_output_ae = Dense(784, activation="sigmoid")(x)

    model_decoder = Model(layer_input_dec, layer_output_ae)

    model_ae = Sequential([
        model_encoder,
        model_decoder,
    ])

    return model_ae, model_encoder, model_decoder

model_ae, model_encoder, model_decoder = get_model_ae(
    model_conv_input, model_conv_latent
)

model_ae.summary()

Facciamo un training alternato lettere/numeri. 

In [None]:
model_ae.compile(
    optimizer=Adam(learning_rate=0.005, amsgrad=True),
    loss=MeanSquaredError(),
    metrics=['mse']
)

for epoch in range(20):
    model_ae.fit(
        letters_train_features, 
        letters_train_features.reshape(letters_train_features.shape[0], 784), 
        batch_size=1024, epochs=1, verbose=0
    )
    model_ae.fit(
        digits_train_features, 
        digits_train_features.reshape(digits_train_features.shape[0], 784), 
        batch_size=32, epochs=1, validation_split=0.2, verbose=2
    )

Proviamo ora a generare qualche istanza aggiungendo del rumore alla rappresentazione latente. 

In [None]:
def show_synthetic_data(data) -> None:
    synthetic_data = model_decoder.predict(
        model_encoder.predict(
            data
        ) + normal(size=32, loc=0.0, scale=0.01)
    ).reshape((data.shape[0], 28, 28))
    synthetic_data_processed = minimum(exp(synthetic_data)-1, 1)

    f, axarr = subplots(2, data.shape[0])
    for j in range(data.shape[0]):
        axarr[0][j].imshow(data[j])
        axarr[1][j].imshow(synthetic_data_processed[j])
    f.show()

In [None]:
rnd_ints = randint(low=0, high=letters_train_features.shape[0], size=5)
show_synthetic_data(letters_train_features[rnd_ints])

In [None]:
rnd_ints = randint(low=0, high=digits_train_features.shape[0], size=5)
show_synthetic_data(digits_train_features[rnd_ints])

Come possiamo sfruttare i dati sintetici per migliorare le performance di training? (Esercizio a casa) 

I dati sintetici generati non ti sembrano così realistici? Hai ragione, in realtà la questione è più complicata di così. Per approfondimenti, vedi [Variational Autoencoder](https://www.wikiwand.com/en/Variational_autoencoder). 

---


### Generative Adversarial Networks

In [None]:
def get_model_generator():

    layer_input = Input(shape=(100,))
    x = Dense(7*7*256, use_bias=False)(layer_input)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Reshape((7, 7, 256))(x)

    x = Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False)(x)

    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    x = Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh')(x)
    
    layer_output = Reshape((28, 28))(x)

    model = Model(layer_input, x)

    return model

In [None]:
def get_model_discriminator():

    layer_input = Input((28, 28))

    x = Reshape((28, 28, 1))(layer_input)

    x = Conv2D(64, (5, 5), strides=(2, 2), padding='same')(x)
    x = LeakyReLU()(x)
    x = Dropout(0.3)(x)

    x = Conv2D(128, (5, 5), strides=(2, 2), padding='same')(x)
    x = LeakyReLU()(x)
    x = Dropout(0.3)(x)

    x = Flatten()(x)
    layer_output = Dense(1)(x)

    model = Model(layer_input, layer_output)

    return model

In [None]:
model_generator = get_model_generator()
model_discriminator = get_model_discriminator()

def discriminator_loss(real_output, fake_output):
    real_loss = BinaryCrossentropy(from_logits=True)(ones_like(real_output), real_output)
    fake_loss = BinaryCrossentropy(from_logits=True)(zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return BinaryCrossentropy(from_logits=True)(ones_like(fake_output), fake_output)

optimizer_generator = Adam(1e-4)
optimizer_discriminator = Adam(1e-4)

In [None]:
@tf_function
def train_step(images, batch_size):

    noise = tf_normal([batch_size, 100])

    with GradientTape() as tape_generator, GradientTape() as tape_discriminator:

        generated_images = model_generator(noise, training=True)

        real_output = model_discriminator(images, training=True)
        fake_output = model_discriminator(generated_images, training=True)

        loss_generator = generator_loss(fake_output)
        loss_discriminator = discriminator_loss(real_output, fake_output)

    grad_generator = tape_generator.gradient(loss_generator, model_generator.trainable_variables)
    grad_discriminator = tape_discriminator.gradient(loss_discriminator, model_discriminator.trainable_variables)

    optimizer_generator.apply_gradients(zip(grad_generator, model_generator.trainable_variables))
    optimizer_discriminator.apply_gradients(zip(grad_discriminator, model_discriminator.trainable_variables))

In [None]:
for epoch in range(500):
    if (epoch+1)%5 == 0:
        print("epoch =", epoch+1)
    for image_batch in Dataset.from_tensor_slices(letters_train_features).batch(1024):
        train_step(image_batch, batch_size=1024)

In [None]:
synthetic = model_generator.predict(tf_normal([5, 100]))

In [None]:
f, axarr = subplots(1, synthetic.shape[0])
f.set_dpi(300)
for j in range(synthetic.shape[0]):
    axarr[j].imshow(synthetic[j].reshape((28, 28)))