In [20]:
# https://www.kaggle.com/marcusgawronsky/tabnet-in-tensorflow-2-0/execution

In [None]:
# https://ichi.pro/pytorch-de-no-tabnet-no-jisso-277727554318969
# https://zenn.dev/sinchir0/articles/9228eccebfbf579bfdf4
# https://www.guruguru.science/competitions/16/discussions/70f25f95-4dcc-4733-9f9e-f7bc6472d7c0/

In [16]:
# https://ichi.pro/glu-ge-totsuki-senkei-yunitto-37276504891521
# https://www.slideshare.net/JiroNishitoba/20170629a

In [19]:
# https://www.guruguru.science/competitions/16/discussions/70f25f95-4dcc-4733-9f9e-f7bc6472d7c0/

In [2]:
from typing import Optional, Union, Tuple

import numpy as np
import tensorflow as tf
# tesorflow>=2.4
import tensorflow_probability as tfp
import tensorflow_addons as tfa
import pandas as pd
from sklearn.metrics import accuracy_score

In [3]:
@tf.function
def identity(x):
    return x

## GatedLinearUnit

In [4]:
class GLU(tf.keras.layers.Layer):
    """
    GLU:
    Ghost batch normalizationを適用

    GBNは汎化誤差の抑制する
    https://www.tensorflow.org/api_docs/python/tf/keras/layers/BatchNormalization
    """
    def __init__(self,
                n_a: Optional[int]=None,
                n_d: Optional[int]=None,
                virtual_batch_size: Optional[int]=128,
                momentum: Optional[float]=0.01,
                fc: tf.keras.layers.Layer = None,
                apply_glu: bool = True,
                ):
            super(GLU, self).__init__()
            self.units = 2*(n_a + n_d) 
            self.virtual_batch_size = virtual_batch_size
            self.momentum = momentum
    

    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]

        self.fc_out = tf.keras.layers.Dense(self.units, use_bias=False) if fc is None else fc
        self.bn_out = tf.keras.layers.BatchNormalization(
            virtual_batch_size=self.virtual_batch_size,
            momentum=self.momentum)

    def call(self,
        inputs: Union[tf.Tensor, np.ndarray], 
        training: Optional[bool]=None):
        # Pass GBN
        output=self.bn_out(self.fc_out(inputs), training=training)
        if self.apply_glu:
            # ゲート付き線形ユニット
            return output[:,:(n_a + n_d)] * tf.keras.activations.sigmoid(output[:,(n_a + n_d):])
        else
            # そのまま通過
            return output[:,:(n_a + n_d)]

In [13]:
class FeatureTransformer(tf.keras.layers.Layer):
    def __init__(self, 
                units: Optional[int] = None,
                n_total: int = 4,
                n_shared: int = 2,
                virtual_batch_size: Optional[int]  = 128,
                momentum: Optional[float]=0.02,
                skip= False):
        super(FeatureTransformer).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        self.n_a = n_a
        self.n_d = n_d
        self.n_share = n_share
        self.n_decision = n_decision
        self.share_fcs=List[tf.keras.layers.Layer] = [] # Share phase dence
        self.skip = skip 
        self.blocks = []
        # n_a -> to Attention
        # n_d -> to final Dicision 
    
    def  build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]

        for n in range(n_share+n_decision):
            # shared blocks
            if self.share_fcs and n < len(fcs):
                self.blocks.append(
                    GLU(n_a=n_a, n_d=n_d,fc=self.share_fcs[n]
                        virtual_batch_size=self.virtual_batch_size, momentum=self.momentum)
            # build new blocks
            else:
                self.blocks.append(
                    GLU(n_a=n_a, n_d=n_d
                        virtual_batch_size=self.virtual_batch_size, momentum=self.momentum)

    def call(self, 
            inputs: Union[tf.Tensor, np.ndarray], 
            training: Optional[bool] = None):

        initial = self.initial(inputs, training=training)
        
        #　直前入力ある場合(true)は加算
        #　何も無い場合(false)は初期化状態のママ
        if self.skip == True:
            initial += inputs

        residual = self.residual(initial, training=training) 
        return (initial + residual) * np.sqrt(0.5)
        
    def call(self,
             x: tf.Tensor, 
             training: Optional[bool] = None):
        x = self.blocks[0](x, training=training)
        for n in range(1, self.n_total):
            x = x * tf.sqrt(0.5) + self.blocks[n](x, training=training)
        return x

In [14]:
class AttentiveTransformer(tf.keras.layers.Layer): 
    def __init__(self, 
                units: Optional[int] = None, 
                virtual_batch_size: Optional[int] = 128, 
                momentum: Optional[float] = 0.02):
        super(AttentiveTransformer, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
            
        self.fc = tf.keras.layers.Dense(
            self.units, 
            use_bias=False)
        self.bn = tf.keras.layers.BatchNormalization(
            virtual_batch_size=self.virtual_batch_size,
            momentum=self.momentum)
        
    def call(self, 
            inputs: Union[tf.Tensor, np.ndarray], 
            priors: Optional[Union[tf.Tensor, np.ndarray]] = None, 
            training: Optional[bool] = None) -> tf.Tensor:
        feature = self.bn(self.fc(inputs), 
                          training=training)
        if priors is None:
            output = feature
        else:
            output = feature * priors
        
        # sparsemax: softmaxの進化版
        # return key( inject into musk)
        return tfa.activations.sparsemax(output)

In [11]:
class TabNetStep(tf.keras.layers.Layer):
    def __init__(self, 
                units: Optional[int] = None, 
                virtual_batch_size: Optional[int]=128, 
                momentum: Optional[float] =0.02):
        super(TabNetStep, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, 
            input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
        
        self.unique = FeatureTransformer(
            units = self.units, 
            virtual_batch_size=self.virtual_batch_size, 
            momentum=self.momentum,
            skip=True)
        self.attention = AttentiveTransformer(
            units = input_shape[-1], 
            virtual_batch_size=self.virtual_batch_size, 
            momentum=self.momentum)

    def call(self, 
            inputs, # input (raw)
            shared, # shared ()
            priors, #  prior
            training=None) -> Tuple[tf.Tensor]:  

        
        split = self.unique(shared, training=training)

        # mul attention
        # 過去の情報を適用しMasked_xを生成
        keys = self.attention(split, priors, training=training)
        masked = keys * inputs
        # split
        # masked: 
        # keys: 
        return split, masked, keys

In [12]:
class TabNetEncoder(tf.keras.layers.Layer):
    def __init__(self, 
                units: int =1,
                n_feature: int = 8,
                n_steps: int = 3,
                outputs: int = 1,
                gammna: float = 1.3,
                eps: float = 1e-8,
                sparse: float = 1e-5,
                virtual_batch_size: Optional[int] = 128,
                momemtum: Optional[float] = 0.02 ):
        super(TabNetEncoder, self).__init__()
        self.units = units
        self.n_features = n_features
        self.n_steps = n_steps
        self.gamma = gamma
        self.eps = eps
        self.momentum = momentum
        self.sparse = sparse
        self.virtual_batch_size = virtual_batch_size

    def build(self, 
            input_shape: tf.TensorShape):
        self.bn = tf.keras.layers.BatchNormalization(
            virtual_batch_size=self.virtual_batch_size, 
            momentum=self.momentum)
        self.shared_block = FeatureTransformer(
            units = self.n_features, 
            virtual_batch_size=self.virtual_batch_size, 
            momentum=self.momentum)        
        self.initial_step = TabNetStep(
            units = self.n_features, 
            virtual_batch_size=self.virtual_batch_size, 
            momentum=self.momentum)

        self.steps = [
            TabNetStep(
                units = self.n_features,
                virtual_batch_size=self.virtual_batch_size, 
                momentum=self.momentum) for _ in range(self.n_steps)]

        self.final = tf.keras.layers.Dense(
            units = self.units, 
            use_bias=False)

    def call(self, 
            X: Union[tf.Tensor, np.ndarray], 
            training: Optional[bool] = None) -> Tuple[tf.Tensor]:    
        entropy_loss = 0.0
        encoded = 0.0
        output = 0.0
        importance = 0.0
        prior = tf.reduce_mean(tf.ones_like(X), axis=0)
        
        B = prior * self.bn(X, training=training)
        shared = self.shared_block(B, training=training)
        _, masked, keys = self.initial_step(B, shared, prior, training=training)

        for step in self.steps:
            entropy_loss += tf.reduce_mean(
                tf.reduce_sum(-keys * tf.math.log(keys + self.epsilon), axis=-1))/tf.cast(self.n_steps, tf.float32)
            prior *= (self.gamma - tf.reduce_mean(keys, axis=0))
            importance += keys
            
            shared = self.shared_block(masked, training=training)
            split, masked, keys = step(B, shared, prior, training=training)
            features = tf.keras.activations.relu(split)
            
            output += features
            encoded += split
            
        self.add_loss(self.sparsity * entropy_loss)
          
        prediction = self.final(output)
        return prediction, encoded, importance

In [15]:
class TabNetDecoder(tf.keras.layers.Layer):
    def __init__(self, units=1, 
                 n_steps = 3, 
                 n_features = 8,
                 outputs = 1, 
                 gamma = 1.3,
                 epsilon = 1e-8, 
                 sparsity = 1e-5, 
                 virtual_batch_size=128, 
                 momentum=0.02):
        super(TabNetDecoder, self).__init__()
        self.units = units
        self.n_steps = n_steps
        self.n_features = n_features
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        self.shared_block = FeatureTransformer(
            units = self.n_features, 
            virtual_batch_size=self.virtual_batch_size, 
            momentum=self.momentum)
        self.steps = [
            FeatureTransformer(
                units = self.n_features,
                virtual_batch_size=self.virtual_batch_size, 
                momentum=self.momentum) for _ in range(self.n_steps)
                ]
        self.fc = [
            tf.keras.layers.Dense(units = self.units) for _ in range(self.n_steps)]
    

    def call(self, 
            X: Union[tf.Tensor, np.ndarray], 
            training: Optional[bool] = None) -> tf.Tensor:
        decoded = 0.0
        
        for ftb, fc in zip(self.steps, self.fc):
            shared = self.shared_block(X, training=training)
            feature = ftb(shared, training=training)
            output = fc(feature)
            
            decoded += output
        return decoded