In [None]:
import tensorflow as tf
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pandas as pd
import os
import sys
import time
from tensorflow import keras

In [None]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()
# print(housing.DESCR)
print(housing.data.shape)
print(housing.target.shape)

import pprint
pprint.pprint(housing.data[0])
pprint.pprint(housing.target[0])

In [None]:
from sklearn.model_selection import train_test_split

# by default train_test_split split data in 3: 1 -> train and test -> default param -> test_size = 0.25
x_train_all, x_test, y_train_all, y_test = train_test_split(housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(x_train_all, y_train_all, random_state = 11)

print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)

In [None]:
from sklearn.preprocessing import StandardScaler

# before normalization
print(np.max(x_train), np.min(x_train))

# perform normalization
scaler = StandardScaler()
# 1. data in x_train is int32, we need to convert them to float32 first 
# 2. convert x_train data from 
#    [None, 28, 28] -> [None, 784] 
#       -> after all reshape back to [None, 28, 28]
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled  = scaler.transform(x_test)

# after normalization
print(np.max(x_train_scaled), np.min(x_train_scaled))

### Implement customized loss function

In [None]:
def customized_mse(y_actual, y_pred):
    return tf.reduce_mean(tf.square(y_actual - y_pred))

### Test Dense

In [None]:
# Define a layer with unlimited rows and 5 columns as input shape 
# and 100 neurons as output
layer = keras.layers.Dense(100, input_shape = (None, 5))
# give layer(layer is used as function) [10(rows), 5(columns)] as input
layer(tf.zeros([10, 5]))
# result shape should be (10, 100) and filled with zeros

### Variables of Dense function -> Basic example just for demonstration

In [None]:
# y = x * w + b -> 
# w is kernel (dense_11/kernel:0 / layer.kernel), 
# b is bias (dense_11/bias:0  /layer.bias )
layer.variables

layer.trainable_variables # which is the same as variables

# help(layer)

### Now build my Dense layer

In [None]:
class CustomizedDenseLayer(keras.layers.Layer):
    '''overwrite __init__, build and call methods of class Layer'''
    def __init__(self, units, activation = None, **kwargs):
        self.units = units # input neuron
        self.activation = keras.layers.Activation(activation)
        super(CustomizedDenseLayer, self).__init__(**kwargs)
    
    def build(self, input_shape):
        self.kernel = self.add_weight(name = 'kernel',
                                     shape = (input_shape[1], self.units),
                                     initializer = 'uniform',
                                     trainable = True)
        
        self.bias = self.add_weight(name = 'bias',
                                    shape = (self.units, ),
                                    initializer = 'zeros',
                                    trainable = True)
        
        super(CustomizedDenseLayer, self).build(input_shape)
    
    def call(self, x):
        return self.activation(x @ self.kernel + self.bias)

In [None]:
model = keras.models.Sequential()
model.add(CustomizedDenseLayer(30, activation = 'relu', input_shape=x_train.shape[1:]))
model.add(CustomizedDenseLayer(50, activation = 'relu', input_shape=x_train.shape[1:]))
model.add(CustomizedDenseLayer(1))

In [None]:
model.summary()
# set tf.mse to our cusotmized_mse
model.compile(loss = customized_mse, optimizer = "sgd", metrics = ["accuracy"])
callbacks = [
    keras.callbacks.EarlyStopping(patience = 5, min_delta = 1e-3)
]

In [None]:
history = model.fit(x_train_scaled, y_train, 
                    validation_data=(x_valid_scaled, y_valid),
                    epochs = 100, 
                    callbacks = callbacks)

In [None]:
from tensorflow.python.keras.callbacks import History

def plot_learning_curves(history: History):
    pd.DataFrame(history.history).plot(figsize = (8, 5))
    plt.grid(True)
    plt.gca().set_ylim(0, 1)
    plt.show()

plot_learning_curves(history)

In [None]:
test_loss, test_acc = model.evaluate(x_test_scaled, y_test)

In [None]:
# one_hot encoded results
predictions = model.predict(x_test_scaled)

index = 40

for indx in range(index):
    print(y_test[indx], predictions[indx])