<a href="https://colab.research.google.com/github/qamtam/Hands-on-machine-learning/blob/main/CH12_with_notes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf

In [None]:
tf.constant([[2., 5., 7.],
              [3., 6., 8.]])

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[2., 5., 7.],
       [3., 6., 8.]], dtype=float32)>

In [None]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [None]:
t = tf.constant([[2., 5., 7.],
              [3., 6., 8.]])

In [None]:
t.shape

TensorShape([2, 3])

In [None]:
t.dtype

tf.float32

In [None]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[5., 7.],
       [6., 8.]], dtype=float32)>

In [None]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[5.],
       [6.]], dtype=float32)>

In [None]:
t[:,1]

<tf.Tensor: shape=(2,), dtype=float32, numpy=array([5., 6.], dtype=float32)>

In [None]:
def huber_fun(y_true, y_pred):
  error = y_true - y_pred
  is_small = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss = tf.abs(error) - 0.5
  return tf.where(is_small, squared_loss, linear_loss)

In [None]:
def my_softplus(z):
  return tf.math.log(tf.exp(z + 1.0))

def my_glorot_initializer(shape, dtype = tf.float32):
  stddev = tf.sqrt(2. / shape[0] + shape[1])
  return tf.random.normal(shape, stddev=stddev, dtype=dtype) #gaussian distro centered at 0, stddev defined here

def my_l1_regularizer(weights):
  return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):
  return tf.where(weights < 0., tf.zeros_like(weights), weights)


In [None]:
# do łatwego odczytywania szczegółów z wcześniej zapisanego modelu (na przykład parametru alpha przy regularyzacji l1) potrzebne jest uprzednie stworzenie klasy
import tensorflow as tf

from tensorflow import keras
class MyL1Regularizer(keras.regularizers.Regularizer):
  def __init__(self, factor):
    self.factor = factor
  def __call__(self, weights):
    return tf.reduce_sum(tf.abs(self.factor * weights))
  def get_config(self):
    return {"factor": self.factor}

class HuberMetric(keras.metrics.Metric):
  def __init__(self, threshold = 1.0, **kwargs):
    super().__init__(**kwargs)
    self.threshold = threshold
    self.huber_fn = create_huber(threshold)
    self.total = self.add_weight("total", initializer="zeros")
    self.count = self.add_weight("count")
  def update_state(self, y_true, y_pred, sample_weight = None):
    metric = self.huber_fn(y_true, y_pred)
    self.total.assign_add(tf.reduce_sum(metric))
    self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
  def result(self):
    return self.total / self.count
  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

In [None]:
# simple layer as lambda
# exp_layer = keras.layers.Lambda(lambda x = tf.exp(x))

class MyDense(keras.layers.Layer):
  def __init__(self, units, activation=None, **kwargs):
    super().__init__(**kwargs)
    self.units = units
    self.activation = keras.activations.get(activation)
  
  def build(self, batch_input_shape):
    self.kernel = self.add_weight(
        name="kernel", shape=[batch_input_shape[-1], self.units],
        initializer="glorot_normal"
    )
    self.bias = self.add_weight(
        name="bias", shape=[self.units], initializer="zeros"
    )
    super().build(batch_input_shape)
  
  def call(self, X):
    return self.activation(X @ self.kernel + self.bias)
  def compute_output_shape(self, batch_input_shape):
    return tf.tensorShape(batch_input_shape.as_list()[:-1] + [self.units])
  
  def get_config(self):
    base_config = super().get_config
    return { **base_config, "units": self.units,
            "activation": keras.activations.serialize(self.activation)}                                                                                          

In [None]:
t = tf.random.uniform(shape=[10,4]) # batch size 4, 10 features
kernel = tf.random.uniform(shape=[4,3])
bias = tf.constant([3., 5., 6.]) #shape = [3,1]
t@kernel + bias
    
print(t.shape.as_list()[:-1]) #10
Shape = tf.TensorShape(t.shape.as_list()[:-1] + [3])
Shape
Shape2 = tf.TensorShape([10] + [3])
Shape2

[10]


TensorShape([10, 3])

In [None]:
shape = tf.TensorShape([1, 3])
print(shape.as_list()[:-1])

[1]


In [None]:
new_shape = tf.TensorShape(shape.as_list()[:-1] + [5])

In [None]:
new_shape

TensorShape([1, 5])

In [None]:
z = shape
z1, z2 = z
z1

1

In [None]:
z

TensorShape([1, 5])

In [None]:
class MyGaussianNoise(keras.layers.Layer): #adds gaussian noise during traing, does nothing during testing and val
  def __innit__(self, stddev, **kwargs):
    super().__init__(**kwargs)
    self.stddev = stddev

  def call(self, X, training=None):
    if training:
      noise = tf.random.normal(tf.shape(X), stddev = self.stddev)
      return X + Noise
    else:
      return X
    
  def compute_output_shape(self, batch_input_shape):
    return batch_input_shape
    

In [None]:
class ResidualBlock(keras.layers.Layer):
  def __init__(self, n_layers, n_neurons, **kwargs):
    super().__init__(**kwargs)
    self.hidden = [keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal")
                  for _ in range(n_layers)]
  def call(self, inputs):
    Z = inputs
    for layer in self.hidden:
      Z = layer(Z)
    return inputs + Z

class ResidualRegression(keras.Model):
  def __init__(self, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
    self.block1 = ResidualBlock(2,30)
    self.block2 = ResidualBlock(2,30)
    self.out = keras.layers.Dense(output_dim)
  def call(self, inputs):
    Z = self.hidden1(inputs)
    for _ in range(1 + 3):
      Z = self.block1(Z)
    Z = self.block2(Z)
    return self.out(Z)

  

In [None]:
class ReconstructingRegressor(keras.Model):
  def __init__(self, output_dum, **kwargs):
    super().__init__(**kwargs)
    self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal" ) for _ in range(5)]
    self.out = keras.layers.Dense(output_dim)
  def build(self, batch_input_shape):
    n_inputs = batch_input_shape[-1]
    self.reconstruct = keras.layers.Dense(n_inputs)
    super().build(batch_input_shape)
  
  def call(self, inputs):
    Z = inputs
    for layer in self.hidden:
      Z = layer(Z)
    reconstruction = self.reconstruct(Z)
    recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
    self.add_loss(0.05 * recon_loss)
    return self.out(Z)

In [None]:
def f(w1,w2):
  return 3*w1 **2 + 2*w1*w2

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1,w2)

gradients = tape.gradient(z, [w1,w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [None]:
#Custom training loops
import numpy as np

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)                         
      


])

def random_batch(X,y, batch_size=32):
  idx = np.random.randint(len(X), size=batch_size)
  return X[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None):
  metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                        for m in [loss] + (metrics or [])])
  end = "" if iteration < total else "\n"
  print("\r{}\{} - ".format(iteration,total)+metrics, end=end)

n_epochs=5
batch_size=32
n_steps = len(X_train) //batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.sparse_categorical_crossentropy
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()



Downloading Cal. housing from https://ndownloader.figshare.com/files/5976036 to /root/scikit_learn_data


Epoch 1/5


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

11610\11610 - mean: nan - mean_absolute_error: nan
Epoch 2/5
11610\11610 - mean: nan - mean_absolute_error: nan
Epoch 3/5
11610\11610 - mean: nan - mean_absolute_error: nan
Epoch 4/5
11610\11610 - mean: nan - mean_absolute_error: nan
Epoch 5/5
11610\11610 - mean: nan - mean_absolute_error: nan


In [None]:
t = tf.range(10)

In [None]:
import numpy as np
y = tf.constant(np.arange(10))

In [None]:
y == t

InvalidArgumentError: ignored

In [None]:
u = tf.constant([ord(c) for c in "café"])
u
#getting unicode utf-8 codes from a string
b = tf.strings.unicode_encode(u, "UTF-8")
tf.strings.length(b, unit="UTF8_CHAR")
tf.strings.unicode_decode(b, "UTF-8")



NameError: ignored

In [None]:
import keras
import numpy as np
import tensorflow as tf


(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_train_full = X_train_full.astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

X_test = X_test.astype(np.float32) / 255.

model = keras.models.Sequential([
      keras.layers.Flatten(input_shape=[28,28]),
      keras.layers.Dense(100, activation="relu"),
      keras.layers.Dense(10, activation="softmax"),
])

n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_dn = keras.losses.sparse_categorical_crossentropy
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.SparseCategoricalAccuracy()]
loss_fn = keras.losses.mean_squared_error
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]
with trange(1, n_epochs + 1, desc="All epochs") as epochs:
    for epoch in epochs:
        with trange(1, n_steps + 1, desc="Epoch {}/{}".format(epoch, n_epochs)) as steps:
            for step in steps:
                X_batch, y_batch = random_batch(X_train, y_train)
                with tf.GradientTape() as tape:
                    y_pred = model(X_batch)
                    main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                    loss = tf.add_n([main_loss] + model.losses)
                gradients = tape.gradient(loss, model.trainable_variables)
                optimizer.apply_gradients(zip(gradients, model.trainable_variables))
                for variable in model.variables:
                    if variable.constraint is not None:
                        variable.assign(variable.constraint(variable))                    
                status = OrderedDict()
                mean_loss(loss)
                status["loss"] = mean_loss.result().numpy()
                for metric in metrics:
                    metric(y_batch, y_pred)
                    status[metric.name] = metric.result().numpy()
                steps.set_postfix(status)
            y_pred = model(X_valid)
            status["val_loss"] = np.mean(loss_fn(y_valid, y_pred))
            status["val_accuracy"] = np.mean(keras.metrics.sparse_categorical_accuracy(
                tf.constant(y_valid, dtype=np.float32), y_pred))
            steps.set_postfix(status)
        for metric in [mean_loss] + metrics:
            metric.reset_states()


HBox(children=(FloatProgress(value=0.0, description='All epochs', max=5.0, style=ProgressStyle(description_wid…

HBox(children=(FloatProgress(value=0.0, description='Epoch 1/5', max=1718.0, style=ProgressStyle(description_w…





InvalidArgumentError: ignored