In [None]:
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
plt.rc('font', size=16) 
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import MinMaxScaler
import warnings
import logging
import math

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)

2.9.2


In [None]:
# Random seed for reproducibility
seed = 42

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
from sklearn.model_selection import train_test_split

%cd /content/drive/MyDrive/2022_AN2DL(Private)
!yes | unzip training_dataset_homework2.zip

data = np.load('x_train.npy')
labels = np.load('y_train.npy')

from sklearn.utils import compute_class_weight

class_weights = compute_class_weight(
                                        class_weight = "balanced",
                                        classes = np.unique(labels),
                                        y = labels                                                    
                                    )
class_weights = dict(zip(np.unique(labels), class_weights))
class_weights

X_train, X_valid, y_train, y_valid = train_test_split(data, labels, test_size=0.2, shuffle= True)

y_train = tfk.utils.to_categorical(y_train)
y_valid = tfk.utils.to_categorical(y_valid)

print(X_train.shape)
print(y_train.shape)

input_shape = X_train.shape[1:]
classes = y_train.shape[-1]
batch_size = 3
epochs = 100

/content/drive/MyDrive/2022_AN2DL(Private)
Archive:  training_dataset_homework2.zip
replace y_train.npy? [y]es, [n]o, [A]ll, [N]one, [r]ename:   inflating: y_train.npy             
replace x_train.npy? [y]es, [n]o, [A]ll, [N]one, [r]ename:   inflating: x_train.npy             
(1943, 36, 6)
(1943, 12)


### Model

In [None]:
# Model parameters

d_model = 512 # dense vector dimension
d_hidden = 1024 # dimension of hidden layer
d_input = 36 # dimension of window 
d_channel = 6 # number of channels/variables
d_output = 12 # number of classes
q = 8 #linear mapping dimension in mha
v = 8 #linear mapping dimension in mha
h = 8 # Number of heads in multihead attention
N = 8
dropout = 0.2
pe = True  
mask = True 

In [None]:
from tensorflow._api.v2.compat.v1 import float32
class MulHeadAtt(tfkl.Layer):
    def __init__(self,
                 d_model: int,
                 q: int,
                 v: int,
                 h: int,
                 mask: bool=False,
                 dropout: float = 0.1):
        super(MulHeadAtt, self).__init__()

        #self.W_q = torch.nn.Linear(d_model, q * h)
        #self.W_q = tfkl.Dense((q * h),
        #                      input_shape = (d_model,),
        #                      activation = None)
        self.W_q = self.add_weight(name='W_q', 
                                   shape=(d_model, q * h),
                                   initializer='glorot_uniform')
        #self.W_k = torch.nn.Linear(d_model, q * h)
        #self.W_k = tfkl.Dense((q * h),
        #                      input_shape = (d_model,),
        #                      activation = None)
        self.W_k = self.add_weight(name='W_k', 
                                   shape=(d_model, q * h),
                                   initializer='glorot_uniform')
        #self.W_v = torch.nn.Linear(d_model, v * h)
        #self.W_v = tfkl.Dense((v * h),
        #                      input_shape = (d_model,),
        #                      activation = None)
        self.W_v = self.add_weight(name='W_v', 
                                   shape=(d_model, v * h),
                                   initializer='glorot_uniform')

        #self.W_o = torch.nn.Linear(v * h, d_model)
        #self.W_o = tfkl.Dense(d_model,
        #                      input_shape = ((v * h),),
        #                      activation = None)
        self.W_o = self.add_weight(name='W_o', 
                                   shape=(v * h, d_model),
                                   initializer='glorot_uniform')


        #self.device = device
        self._h = h
        self._q = q

        self.mask = mask
        #self.dropout = torch.nn.Dropout(p=dropout)
        self.dropout = tfkl.Dropout(dropout)
        self.score = None

    def call(self, x):

        # Apply the W_q operation to x and split the result into self._h chunks along the last dimension
        q_chunks = tf.split(self.W_q(x), num_or_size_splits=self._h, axis=-1)

        # Concatenate the chunks along the first dimension
        Q = tf.concat(q_chunks, axis=0)

        # Apply the W_k operation to x and split the result into self._h chunks along the last dimension
        k_chunks = tf.split(self.W_k(x), num_or_size_splits=self._h, axis=-1)

        # Concatenate the chunks along the first dimension
        K = tf.concat(k_chunks, axis=0)

        # Apply the W_v operation to x and split the result into self._h chunks along the last dimension
        v_chunks = tf.split(self.W_v(x), num_or_size_splits=self._h, axis=-1)

        # Concatenate the chunks along the first dimension
        V = tf.concat(v_chunks, axis=0)

        #score = torch.matmul(Q, K.transpose(-1, -2)) / math.sqrt(self._q)
        score = tf.linalg.matmul(Q, K, transpose_b = True) / math.sqrt(self._q)
        self.score = score

        # OMITTING STAGE IMPLEMENTATION!
        if self.mask:
            # Create a tensor of ones with the same shape as score[0]
            mask = tf.ones_like(score[0])

            # Create a lower triangular matrix with ones on the main diagonal and zeros elsewhere
            mask = tf.linalg.band_part(mask, num_lower=-1, num_upper=0)

            # Create a tensor of -2**32+1 with the same shape as score[0]
            fill_value = tf.constant(-2**32+1, shape=score[0].shape, dtype=score[0].dtype)

            # Expand the fill_value tensor to the same shape as score
            shape = tf.shape(score)
            fill_value = tf.broadcast_to(fill_value, shape)

            # Replace the elements of score where mask is greater than 0 with fill_value
            score = tf.where(mask > 0, score, fill_value)
        
        score = tf.nn.softmax(score, axis=-1)

        attention = tf.linalg.matmul(score, V)

        # Split attention into self._h chunks along the first dimension
        attention_chunks = tf.split(attention, num_or_size_splits=self._h, axis=0)

        # Concatenate the chunks along the last dimension
        attention_heads = tf.concat(attention_chunks, axis=-1)

        self_attention = self.W_o(attention_heads)
      
        return self_attention, self.score

class FeedFwd(tfkl.Layer):
    def __init__(self,
                 d_model: int,
                 d_hidden: int = 512):
        super(FeedFwd, self).__init__()

        self.linear_1 = tfkl.Dense(d_hidden, 
                                   input_shape = (d_model,),
                                   activation = None)
        
        self.linear_2 = tfkl.Dense(d_model, 
                                   input_shape = (d_hidden,),
                                   activation = None)

    def call(self, x):

        x = self.linear_1(x)
        x = tf.nn.relu(x)
        x = self.linear_2(x)

        return x

class Encodr(tfkl.Layer):
    def __init__(self,
                 d_model: int,
                 d_hidden: int,
                 q: int,
                 v: int,
                 h: int,
                 mask: bool = False,
                 dropout: float = 0.1):
        super(Encodr, self).__init__()

        self.MHA = MulHeadAtt(d_model=d_model, q=q, v=v, h=h, mask=mask, dropout=dropout)
        self.feedforward = FeedFwd(d_model=d_model, d_hidden=d_hidden)
        self.dropout = tfkl.Dropout(dropout)
        self.layerNormal_1 = tf.keras.layers.LayerNormalization()
        self.layerNormal_2 = tf.keras.layers.LayerNormalization()

    def call(self, x):

        residual = x
        x, score = self.MHA(x)
        x = self.dropout(x)
        x = x + residual
        x = self.layerNormal_1(x)

        residual = x
        x = self.feedforward(x)
        x = self.dropout(x)
        x = x + residual
        x = self.layerNormal_2(x)

        return x, score

class Transformer(tfkl.Layer):
    def __init__(self,
                 d_model: int,
                 d_output: int,
                 d_hidden: int,
                 q: int,
                 v: int,
                 h: int,
                 N: int,
                 dropout: float = 0.1,
                 d_input: int = 36,
                 d_channel: int = 6,
                 pe: bool = False,
                 mask: bool = False):
        super(Transformer, self).__init__()

        self.encoder_list_1 = [Encodr(d_model=d_model,
                               d_hidden=d_hidden,
                               q=q,
                               v=v,
                               h=h,
                               mask=mask,
                               dropout=dropout) for _ in range(N)]

        self.encoder_list_2 = [Encodr(d_model=d_model,
                               d_hidden=d_hidden,
                               q=q,
                               v=v,
                               h=h,
                               dropout=dropout) for _ in range(N)]

        self.embedding_channel = tfkl.Dense(d_model, 
                                   input_shape = (d_channel,),
                                   activation = None)

        self.embedding_input = tfkl.Dense(d_model, 
                                   input_shape = (d_input,),
                                   activation = None)

        #self.gate = torch.nn.Linear(d_model * d_input + d_model * d_channel, 2)
        self.gate = tfkl.Dense(2, 
                                   input_shape = (d_model * d_input + d_model * d_channel,),
                                   activation = None)
        #self.output_linear = torch.nn.Linear(d_model * d_input + d_model * d_channel, d_output)
        self.output_linear = tfkl.Dense(d_output, 
                                   input_shape = (d_model * d_input + d_model * d_channel,),
                                   activation = None)

        self.pe = pe
        self._d_input = d_input
        self._d_model = d_model

    def call(self, x):
        
        encoding_1 = self.embedding_channel(x)
        input_to_gather = encoding_1

        if self.pe:
            # Create a tensor of ones with the same shape as encoding_1[0]
            pe = tf.ones_like(encoding_1[0])

            # Create a tensor with values ranging from 0 to self._d_input and add an additional dimension at the end
            position = tf.expand_dims(tf.range(self._d_input), axis=-1)

            # Create a 1D tensor with values ranging from 0 to self._d_model in steps of 2
            temp = tf.range(0, self._d_model, 2)

            # Multiply temp by a scalar and exponentiate it
            temp = tf.exp(-tf.cast(temp, float32) * tf.cast(math.log(10000) / self._d_model, float32))

            # Add a dimension at the beginning of temp
            temp = tf.expand_dims(temp, axis=0)

            # Multiply position and temp element-wise and flatten the result to a 1D tensor
            temp = tf.reshape(tf.cast(position, float32) * tf.cast(temp, float32), [-1])

            # Create a tensor of zeros with the same shape as pe
            pe_updated = tf.zeros_like(pe)

            # Assign the sine of temp to the even-indexed elements of pe_updated
            pe_updated = tf.tensor_scatter_nd_update(pe_updated, [[i, j] for i in range(pe.shape[0]) for j in range(0, pe.shape[1], 2)], tf.sin(temp))

            # Assign the cosine of temp to the odd-indexed elements of pe_updated
            pe_updated = tf.tensor_scatter_nd_update(pe_updated, [[i, j] for i in range(pe.shape[0]) for j in range(1, pe.shape[1], 2)], tf.cos(temp))

            # Create a new tensor with the same shape and dtype as pe and copy the values of pe_updated to it
            pe = tf.identity(pe_updated)

            encoding_1 = encoding_1 + pe

        for encoder in self.encoder_list_1:
            encoding_1, score_input = encoder(encoding_1)

        # channel-wise
        encoding_2 = self.embedding_input(tf.transpose(x, perm=[0,2,1]))
        channel_to_gather = encoding_2

        for encoder in self.encoder_list_2:
            encoding_2, score_channel = encoder(encoding_2)

        encoding_1 = tf.reshape(encoding_1, [tf.shape(encoding_1)[0], 36*512])
        encoding_2 = tf.reshape(encoding_2, [tf.shape(encoding_2)[0], 6*512])

        # gate
        gate = tf.nn.softmax(self.gate(tf.concat([encoding_1, encoding_2], axis=-1)))
        encoding = tf.concat([encoding_1 * gate[:, 0:1], encoding_2 * gate[:, 1:2]], axis=-1)

        # output
        output = self.output_linear(encoding)

        return output#, encoding, score_input, score_channel, input_to_gather, channel_to_gather, gate



def build_model(
    input_shape
):
    inputs = tfk.Input(shape=input_shape)
    x = inputs
    output = Transformer(d_model=d_model, d_input=d_input, d_channel=d_channel, d_output=d_output, d_hidden=d_hidden,
                      q=q, v=v, h=h, N=N, dropout=dropout, pe=pe, mask=mask)(x)
    
    
    


    return tfk.Model(inputs, output)



model = build_model(
    input_shape
    #head_size=256,
    #num_heads=4,
    #ff_dim=4,
    #num_transformer_blocks=4,
    #mlp_units=[128],
    #mlp_dropout=0.4,
    #dropout=0.25,
)

model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics="accuracy")

model.summary()


TypeError: ignored

### Train the model

In [None]:
# Train the model
history = model.fit(
    x = X_train,
    y = y_train,
    batch_size = batch_size,
    epochs = 100,
    validation_data=(X_valid, y_valid),
    #class_weight=class_weights,
    callbacks = [
        tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=10, restore_best_weights=True),
        tfk.callbacks.ReduceLROnPlateau(monitor='val_accuracy', mode='max', patience=5, factor=tf.math.exp(-0.1), min_lr=1e-5)
    ]
).history


