In [2]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import copy
from time
from tqdm import tqdm
from scipy.stats import truncnorm, norm, levene, ttest_ind

from numpy.lib.stride_tricks import sliding_window_view

from sklearn.metrics import *
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D, Dense, LSTM, SimpleRNN, Bidirectional, Embedding
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.initializers import GlorotNormal, GlorotUniform, Zeros
from tensorflow.keras import backend as K

In [268]:
n_samples = 6000
max_len = 50
vocab_size = 1234
embedding_dim = 100

kernel_size = 3
n_filters = 64

# Example 1: with embedding layer

In [284]:
rng = np.random.default_rng(42)
X = rng.integers(0,vocab_size, size=(n_samples, max_len))


model = Sequential()
model.add(Embedding(vocab_size, embedding_dim, input_length = max_len, ))
X_emb = model(X)

st=time.time()
model.add(Conv1D(n_filters, kernel_size, input_shape= (max_len, embedding_dim)))
hidden_state = model(X)
et=time.time()
print('running time:',et-st)
cnn = model.layers[1]

running time: 0.09142708778381348


In [285]:
model.summary()

Model: "sequential_13"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_13 (Embedding)     (None, 50, 100)           123400    
_________________________________________________________________
conv1d_13 (Conv1D)           (None, 48, 64)            19264     
Total params: 142,664
Trainable params: 142,664
Non-trainable params: 0
_________________________________________________________________


In [296]:
output_axis1 = int((max_len-kernel_size)/cnn.strides[0] +1)
print(output_axis1 == cnn.output_shape[1])

True


In [287]:
X_emb_ = copy.deepcopy(X_emb.numpy())

## implementation: with python for loop

In [298]:
st=time.time()
result_all = np.zeros((n_samples, output_axis1, n_filters))

for d in tqdm(range(n_filters)):
    kernel_sample = copy.deepcopy(cnn.kernel[:,:,d].numpy())
    result = np.array([]).reshape(-1, output_axis1)
    for samp in range(n_samples):
        array = [] #np.zeros(output_axis1)
    #     X_emb__ = X_emb_[samp,:,:]
        for k in range(0, output_axis1, cnn.strides[0]):
            array.append((X_emb_[samp,k:k+kernel_size,:]*kernel_sample).sum())
        result = np.vstack((result, array ))
        
    result_all[:,:,d]= result
et=time.time()
print(et-st)

100%|██████████████████████████████████████████████████████████████████████████████████| 64/64 [02:18<00:00,  2.17s/it]

138.61507439613342





In [303]:
print('Validation:', np.allclose(result_all, hidden_state.numpy(), atol=1e-7))

Validation: True


## implementation: with numpy functions

In [300]:
st=time.time()
X_emb_sld = sliding_window_view(X_emb_, window_shape=cnn.kernel.shape[:2], axis=(1,2))
result = np.einsum('abijk,jkc->abc', X_emb_sld, cnn.kernel.numpy())

et=time.time()
print(et-st)

3.6424896717071533


In [301]:
print('Validation:', np.allclose(result, hidden_state.numpy(), atol=5e-8))

Validation: True


## implementation: with numpy functions & flattening kernels

In [304]:
st=time.time()
X_emb_sld = sliding_window_view(X_emb_, window_shape=cnn.kernel.shape[:2], axis=(1,2))
X_emb_sld_flt = X_emb_sld.reshape(n_samples,output_axis1,1,-1,)
kernel_flt = cnn.kernel.numpy().reshape(-1, n_filters )
result = np.einsum('aijk,kl->ail', X_emb_sld_flt, kernel_flt)

et=time.time()
print(et-st)

3.6195778846740723


In [305]:
print('Validation:', np.allclose(result, hidden_state.numpy(), atol=5e-8))

Validation: True
