In [None]:
import tensorflow as tf
import numpy as np
import cmsisdsp as dsp
import random

from tensorflow.keras.layers import Conv1D

# part 1; standalone convolution running for Kx1 followed by 1x1

In [None]:
BATCH_SIZE = 1  # required for running through keras layer

IN_D = 2      # input depth
K = 3         # kernel size for c1
C1_OUT_D = 4  # output depth of first Kx1 conv
C2_OUT_D = 5  # output depth of second 1x1 conv

random.seed(123)
tf.random.set_seed(123)
np.random.seed(123)

x = np.random.random((K, IN_D))    # K time series, feature depth IN_DIM
batched_x = np.expand_dims(x, axis=0)
x.shape, batched_x.shape, x

## v1; conv with no bias or activation

In [None]:
random.seed(123)
tf.random.set_seed(123)
np.random.seed(123)

c1d = Conv1D(filters=C1_OUT_D, kernel_size=K, use_bias=False, activation=None)

In [None]:
keras_y = c1d(batched_x).numpy()
assert keras_y.shape == (BATCH_SIZE, 1, C1_OUT_D)
keras_y = keras_y.squeeze()
keras_y

In [None]:
assert len(c1d.weights) == 1  # just kernel
kernel = c1d.weights[0].numpy()
assert kernel.shape == (K, IN_D, C1_OUT_D)
kernel.shape, kernel

so now let's run this convolution explicitly using mat muls.

using einsum it'd easy; we could ask for all three matmuls to do be done
and then reduce over K

In [6]:
np.einsum('ki,kij->j', x, kernel)

array([ 0.42319987, -0.08421276, -0.09528882,  0.29867487])

but, we don't have einsum....

note: it's also doable as a batched matmul by introducing a dummy axis into X to denote that we want the K matmuls to be done before the sum reduction

In [7]:
x2 = x.reshape((K, 1, IN_D))
# recall kernel is (K, IN_D, OUT_D)

result = np.matmul(x2, kernel)  # (K, 1, OUT_D)
result.squeeze().sum(axis=0)

array([ 0.42319987, -0.08421276, -0.09528882,  0.29867487])

so though we don't have the matmul op this is the approach we'll take; 3 seperate (1, IN_D).(IN_D, OUT_D) mat muls that we accumulate into a result. 

for reference let's look at the three intermediate results before the summing

In [8]:
np.matmul(x2, kernel).squeeze()

array([[ 0.01432191, -0.00617941,  0.15022187,  0.21971583],
       [ 0.13714525, -0.20185   ,  0.10646991, -0.083494  ],
       [ 0.27173271,  0.12381665, -0.3519806 ,  0.16245304]])

In [9]:
assert x.shape == (K, IN_D)
assert kernel.shape == (K, IN_D, C1_OUT_D)

result = np.empty((1, C1_OUT_D), dtype=np.float32)
result = dsp.arm_fill_f32(0, C1_OUT_D)
for k in range(K):
    x_mi = x[k:k+1,:]
    kernel_mi = kernel[k]
    assert kernel_mi.shape == (IN_D, C1_OUT_D)
    #kernel_mi = dsp.arm_matrix_instance_f32(IN_D, OUT_D, kernel[k])        
    _status, intermediate_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
    result = dsp.arm_add_f32(intermediate_result, result)    
result

array([ 0.4231999 , -0.08421277, -0.09528881,  0.29867488], dtype=float32)

In [10]:
np.all(np.isclose(result, keras_y))

True

## v2; convolution with bias ( but still no activation )

In [11]:
random.seed(123)
tf.random.set_seed(123)
np.random.seed(123)

c1d = Conv1D(filters=C1_OUT_D, kernel_size=K, 
             use_bias=True, bias_initializer='RandomNormal',
             activation=None)

In [12]:
keras_y = c1d(batched_x).numpy()
assert keras_y.shape == (BATCH_SIZE, 1, C1_OUT_D)
keras_y = keras_y.squeeze()
keras_y

array([ 0.4251941 , -0.07354339, -0.01509508,  0.32522374], dtype=float32)

In [13]:
assert len(c1d.weights) == 2  # kernel and bias now

kernel = c1d.weights[0].numpy()
assert kernel.shape == (K, IN_D, C1_OUT_D)
print("kernel", kernel.shape, kernel)

bias = c1d.weights[1].numpy()
assert bias.shape == (C1_OUT_D, )
print("bias", bias.shape, bias)

kernel (3, 2, 4) [[[ 0.18940407  0.0069387   0.08696932  0.32015443]
  [-0.41096127 -0.03848475  0.31331038 -0.01139957]]

 [[ 0.21257627 -0.249879    0.1776231  -0.32037094]
  [ 0.16129082 -0.26330617  0.12003279 -0.01962107]]

 [[ 0.4214204   0.51079106 -0.46706867  0.0686931 ]
  [-0.07436943 -0.57593465 -0.0376718   0.26714432]]]
bias (4,) [0.00199423 0.01066937 0.08019374 0.02654888]


In [14]:
assert x.shape == (K, IN_D)
assert kernel.shape == (K, IN_D, C1_OUT_D)
assert bias.shape == (C1_OUT_D,)

result = np.empty((1, C1_OUT_D), dtype=np.float32)
result = dsp.arm_fill_f32(0, C1_OUT_D)
for k in range(K):
    x_mi = x[k:k+1,:]
    kernel_mi = kernel[k]
    assert kernel_mi.shape == (IN_D, C1_OUT_D)
    #kernel_mi = dsp.arm_matrix_instance_f32(IN_D, OUT_D, kernel[k])        
    _status, intermediate_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
    result = dsp.arm_add_f32(result, intermediate_result)
    
# add bias    
result = dsp.arm_add_f32(result, bias)

result

array([ 0.4251941 , -0.07354339, -0.01509507,  0.32522374], dtype=float32)

In [15]:
np.all(np.isclose(result, keras_y))

True

## v3 with bias and relu activation

In [16]:
random.seed(123)
tf.random.set_seed(123)
np.random.seed(123)

c1d = Conv1D(filters=C1_OUT_D, kernel_size=K, 
             use_bias=True, bias_initializer='RandomNormal',
             activation='relu')

In [17]:
keras_y = c1d(batched_x).numpy()
assert keras_y.shape == (BATCH_SIZE, 1, C1_OUT_D)
keras_y = keras_y.squeeze()
keras_y

array([0.4251941 , 0.        , 0.        , 0.32522374], dtype=float32)

In [18]:
assert x.shape == (K, IN_D)
assert kernel.shape == (K, IN_D, C1_OUT_D)
assert bias.shape == (C1_OUT_D,)

result = np.empty((1, C1_OUT_D), dtype=np.float32)
result = dsp.arm_fill_f32(0, C1_OUT_D)
for k in range(K):
    x_mi = x[k:k+1,:]
    kernel_mi = kernel[k]
    assert kernel_mi.shape == (IN_D, C1_OUT_D)
    #kernel_mi = dsp.arm_matrix_instance_f32(IN_D, OUT_D, kernel[k])        
    _status, intermediate_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
    result = dsp.arm_add_f32(result, intermediate_result)

# add bias
result = dsp.arm_add_f32(result, bias) 

# apply relu
# looks like the most effecient will be to use MAX ?
# see https://github.com/ARM-software/CMSIS-NN/blob/main/Source/ActivationFunctions/arm_relu6_s8.c
# val = MAX(val, 0.0);
result = np.maximum(result, 0)

result

array([0.4251941 , 0.        , 0.        , 0.32522374], dtype=float32)

In [19]:
np.all(np.isclose(result, keras_y))

True

## v4; conv1D & additional 1x1 conv

In [20]:
random.seed(123)
tf.random.set_seed(123)
np.random.seed(123)

c1d1 = Conv1D(filters=C1_OUT_D, kernel_size=K, 
              use_bias=True, bias_initializer='RandomNormal',
              activation='relu')
c1d2 = Conv1D(filters=C2_OUT_D, kernel_size=1, 
              use_bias=True, bias_initializer='RandomNormal',
              activation='relu')


In [21]:
keras_y = c1d2(c1d1(batched_x)).numpy()
assert keras_y.shape == (BATCH_SIZE, 1, C2_OUT_D)
keras_y = keras_y.squeeze()
keras_y

array([0.        , 0.31180638, 0.        , 0.28677797, 0.        ],
      dtype=float32)

In [22]:
c1_kernel, c1_bias = c1d1.weights
c2_kernel, c2_bias = c1d2.weights


In [23]:
assert x.shape == (K, IN_D)

assert c1_kernel.shape == (K, IN_D, C1_OUT_D)
assert c1_bias.shape == (C1_OUT_D,)
assert c2_kernel.shape == (1, C1_OUT_D, C2_OUT_D)
assert c2_bias.shape == (C2_OUT_D,)

def apply(x, c1_kernel, c1_bias, c2_kernel, c2_bias):
    # apply first Kx1 convolution
    c1_result = np.empty((1, C1_OUT_D), dtype=np.float32)
    c1_result = dsp.arm_fill_f32(0, C1_OUT_D)
    for k in range(K):
        x_mi = x[k:k+1,:]
        assert x_mi.shape == (1, IN_D)
        kernel_mi = c1_kernel[k]
        assert kernel_mi.shape == (IN_D, C1_OUT_D)
        _status, intermediate_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
        c1_result = dsp.arm_add_f32(c1_result, intermediate_result)
    # add bias and apply RELU
    c1_result = dsp.arm_add_f32(c1_result, c1_bias) 
    c1_result = np.maximum(c1_result, 0)

    # apply second 1x1 convolution
    x_mi = c1_result
    x_mi = x_mi.reshape((1, C1_OUT_D))
    #assert x_mi.shape == (1, OUT_D), x_mi.shape
    kernel_mi = c2_kernel[0]
    assert kernel_mi.shape == (C1_OUT_D, C2_OUT_D), kernel_mi.shape
    _status, c2_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
    # add bias and apply RELU
    c2_result = dsp.arm_add_f32(c2_result, c2_bias) 
    c2_result = np.maximum(c2_result, 0)
    return c2_result

result = apply(x, c1_kernel, c1_bias, c2_kernel, c2_bias)

In [24]:
result

array([0.        , 0.31180638, 0.        , 0.28677797, 0.        ],
      dtype=float32)

In [25]:
np.all(np.isclose(result, keras_y))

True

## v5; same thing but with inspection of kernels for sizing

in prep for stacking

In [28]:
# assert x.shape == (K, IN_D)

# assert c1_kernel.shape == (K, IN_D, C1_OUT_D)
# assert c1_bias.shape == (C1_OUT_D,)
# assert c2_kernel.shape == (1, C1_OUT_D, C2_OUT_D)
# assert c2_bias.shape == (C2_OUT_D,)

class Block(object):
    def __init__(self, c1_kernel, c1_bias, c2_kernel, c2_bias):

        assert len(c1_kernel.shape) == 3
        assert len(c1_bias.shape) == 1    
        assert len(c2_kernel.shape) == 3
        assert len(c2_bias.shape) == 1

        self.k = c1_kernel.shape[0]
        self.in_d = c1_kernel.shape[1]
        self.c1_out_d = c1_kernel.shape[2]        
        assert c1_bias.shape[0] == self.c1_out_d
        
        assert c2_kernel.shape[0] == 1
        assert c2_kernel.shape[1] == self.c1_out_d
        self.c2_out_d = c2_kernel.shape[2]
        assert c2_bias.shape[0] == self.c2_out_d
        
        self.c1_kernel = c1_kernel
        self.c1_bias = c1_bias
        self.c2_kernel = c2_kernel
        self.c2_bias = c2_bias

    def apply(self, x):
        # apply first Kx1 convolution
        c1_result = np.empty((1, self.c1_out_d), dtype=np.float32)
        c1_result = dsp.arm_fill_f32(0, self.c1_out_d)
        for k in range(self.k):
            x_mi = x[k:k+1,:]
            assert x_mi.shape == (1, self.in_d)
            kernel_mi = c1_kernel[k]
            assert kernel_mi.shape == (self.in_d, self.c1_out_d)
            _status, intermediate_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
            c1_result = dsp.arm_add_f32(c1_result, intermediate_result)
        # add bias and apply RELU
        c1_result = dsp.arm_add_f32(c1_result, self.c1_bias) 
        c1_result = np.maximum(c1_result, 0)

        # apply second 1x1 convolution
        x_mi = c1_result
        x_mi = x_mi.reshape((1, self.c1_out_d))
        #assert x_mi.shape == (1, OUT_D), x_mi.shape
        kernel_mi = self.c2_kernel[0]
        #assert kernel_mi.shape == (self.c1_out_d, self.c2_out_d), kernel_mi.shape
        _status, c2_result = dsp.arm_mat_mult_f32(x_mi, kernel_mi)
        # add bias and apply RELU
        c2_result = dsp.arm_add_f32(c2_result, self.c2_bias) 
        c2_result = np.maximum(c2_result, 0)
        return c2_result

layer0 = Block(c1_kernel, c1_bias, c2_kernel, c2_bias)
result = layer0.apply(x)

print("result", result)
print("all_close", np.all(np.isclose(result, keras_y)))

result [0.         0.31180638 0.         0.28677797 0.        ]
all_close True


# part2, caching for streaming 

ok. so now we have a little block that can run a Kx1 conv followed by another mixing MLP like 1x1 convolution

first construct a full keras model and test it on a sequence

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

records_long = []
records_wide = []
n = 0
for line in open('../serial_dump_from_daisy.txt', 'r'):
    try:
        if line.startswith('b'): 
            cv = float(line.split(" ")[2])
        else:
            in_v, out_v = map(float, line.split(" "))
            records_long.append((n, 'cv', cv))
            records_long.append((n, 'in_v', in_v))
            records_long.append((n, 'out_v', out_v))
            records_wide.append((n, cv, in_v, out_v))
            n += 1
    except Exception as e:
        print(f"? [{line.strip()}] ({str(e)})")
df_long = pd.DataFrame(records_long, columns=['n', 'name', 'val'])
df_wide = pd.DataFrame(records_wide, columns=['n', 'cv', 'in_v', 'out_v'])

In [None]:
plt.figure(figsize=(16, 6))
sns.lineplot(df_wide, x='n', y='cv')

In [None]:
plt.figure(figsize=(16, 6))
sns.lineplot(df_long[11000:13000], x='n', y='val', hue='name')

In [None]:
cvs = np.array(df_wide['cv'])
in_vs = np.array(df_wide['in_v'])
x = np.stack([cvs, in_vs]).transpose()

y_true = np.expand_dims(np.array(df_wide['out_v']), -1)

split = int(len(x) * 0.8)

print(split, cvs.shape, in_vs.shape, x.shape)

In [None]:
train_x, train_y = x[:split], y_true[:split]
test_x, test_y = x[split:], y_true[split:]

print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)

In [None]:
TRAIN_SEQ_LEN = 20
TEST_SEQ_LEN = 9   # this comes from the model; kernel_size=3 and 2 dilations...

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

IN_D = 2        # input depth
K = 3           # kernel size for c1
C1_FILTERS = 4  # filters for first layer Kx1 and 1x1 convs
C2_FILTERS = 8  # filters for second layer Kx1 and 1x1 convs

def create_dilated_model(seq_len, all_outputs=False):
    inp = Input((seq_len, 2))
    c1a_output = Conv1D(name='c1a', filters=C1_FILTERS, kernel_size=3, dilation_rate=1, 
                        padding='causal', activation='relu')(inp)
    c1b_output = Conv1D(name='c1b', filters=C1_FILTERS, kernel_size=1, strides=1,
                        activation='relu')(c1a_output)
    c2a_output = Conv1D(name='c2a', filters=C2_FILTERS, kernel_size=3, dilation_rate=3, 
                        padding='causal', activation='relu')(c1b_output)
    c2b_output = Conv1D(name='c2b', filters=C2_FILTERS, kernel_size=1, strides=1,
                        activation='relu')(c2a_output)
    y_pred = Conv1D(name='y_pred', filters=1, kernel_size=1, strides=1,
                    activation=None)(c2b_output)    
    if all_outputs:
        model = Model(inp, [c1a_output, c1b_output, c2a_output, c2b_output, y_pred])
    else:
        model = Model(inp, y_pred)        
    print(model.summary())
    return model

# def create_strided_model(seq_len, all_outputs=False):
#     inp = Input((seq_len, 2))
#     c1a_output = Conv1D(name='c1a', filters=C1_FILTERS, kernel_size=3, strides=3, 
#                        activation='relu')(inp)
#     c1b_output = Conv1D(name='c1b', filters=C1_FILTERS, kernel_size=1, strides=1,
#                        activation='relu')(c1a_output)    
#     c2a_output = Conv1D(name='c2a', filters=C2_FILTERS, kernel_size=3, strides=3,
#                        )(c1_output)
#     c2_output = Flatten()(c2_output)
#     y_pred = Dense(name='d', units=1, activation=None)(c2_output)
    
#     if all_outputs:
#         model = Model(inp, [c1_output, c2_output, y_pred])
#     else:
#         model = Model(inp, y_pred)
#     print(model.summary())
#     return model

In [None]:
train_model = create_dilated_model(TRAIN_SEQ_LEN, all_outputs=False)

In [None]:
from tensorflow.keras.optimizers import Adam

def gen():    
    for i in range(len(train_x)-TRAIN_SEQ_LEN-1):
        x = train_x[i:i+TRAIN_SEQ_LEN]
        y = train_y[i+1:i+1+TRAIN_SEQ_LEN]
        yield x, y  # (S, 2) & (S, 1)
                 
ds = tf.data.Dataset.from_generator(gen, 
    output_signature=(tf.TensorSpec(shape=(TRAIN_SEQ_LEN, 2), dtype=tf.float32),
                      tf.TensorSpec(shape=(TRAIN_SEQ_LEN, 1), dtype=tf.float32)))
ds = ds.cache().shuffle(1000).batch(32)
train_model.compile(Adam(1e-4), loss='mse')
train_model.fit(ds, epochs=10)


In [None]:
test_model = create_dilated_model(TEST_SEQ_LEN, all_outputs=True)
test_model.set_weights(train_model.get_weights())

In [None]:
test_seq = np.expand_dims(test_x[10:10+TEST_SEQ_LEN], 0)
test_seq.shape, test_seq

In [None]:
model_out = test_model(test_seq)
model_out = [v.numpy() for v in model_out]
model_out = [v[0] for v in model_out]            # drop batch, which is always 1
all_steps_y_pred = model_out[-1]
all_steps_y_pred

In [None]:
c1a_out, c1b_out, c2a_out, c2b_out, y_pred_out = model_out
c1a_out.shape, c1b_out.shape, c2a_out.shape, c2b_out.shape, y_pred_out.shape

In [None]:
final_step_y_pred = all_steps_y_pred[-1,0]
final_step_y_pred

In [None]:
y_true = test_y[10+TEST_SEQ_LEN][0]
y_true

manually run the steps of `def apply(x, c1_kernel, c1_bias, c2_kernel, c2_bias):`

to replicate first layer output for last value in sequence

In [None]:
c1a_kernel = test_model.layers[1].weights[0].numpy()
c1a_bias = test_model.layers[1].weights[1].numpy()
c1b_kernel = test_model.layers[2].weights[0].numpy()
c1b_bias = test_model.layers[2].weights[1].numpy()

assert c1a_kernel.shape == (K, IN_D, C1_FILTERS)
assert c1a_bias.shape == (C1_FILTERS,)
assert c1b_kernel.shape == (1, C1_FILTERS, C1_FILTERS)
assert c1b_bias.shape == (C1_FILTERS,)

In [None]:
# run first layer
layer_0_out_0 = apply(test_seq[0,0:3,:],   # first three elements
                      c1a_kernel, c1a_bias, 
                      c1b_kernel, c1b_bias)
layer_0_out_1 = apply(test_seq[0,3:6,:],   # second three elements
                      c1a_kernel, c1a_bias, 
                      c1b_kernel, c1b_bias)
layer_0_out_2 = apply(test_seq[0,6:9,:],   # last three elements
                      c1a_kernel, c1a_bias, 
                      c1b_kernel, c1b_bias)

layer_0_out_0, layer_0_out_1, layer_0_out_2

In [None]:
# compare to outputs from keras model

c1b_out[2], c1b_out[5], c1b_out[8]

In [None]:
np.all(np.isclose(
    np.stack([layer_0_out_0, layer_0_out_1, layer_0_out_2]),
    np.stack([c1b_out[2], c1b_out[5], c1b_out[8]])
))

In [None]:
# now run second layer

c2a_kernel = test_model.layers[3].weights[0].numpy()
c2a_bias = test_model.layers[3].weights[1].numpy()
c2b_kernel = test_model.layers[4].weights[0].numpy()
c2b_bias = test_model.layers[4].weights[1].numpy()

assert c2a_kernel.shape == (K, C1_FILTERS, C2_FILTERS)
assert c2a_bias.shape == (C2_FILTERS,)
assert c2b_kernel.shape == (1, C2_FILTERS, C2_FILTERS)
assert c2b_bias.shape == (C2_FILTERS,)

In [None]:
input_for_layer_1 = np.stack([layer_0_out_0, layer_0_out_1, layer_0_out_2])
print("input_for_layer_1.shape", input_for_layer_1.shape)

layer_1_out = apply(input_for_layer_1,
                    c2a_kernel, c2a_bias, 
                    c2b_kernel, c2b_bias)
print("layer_1_out.shape", layer_1_out.shape)
layer_1_out

In [None]:
# compare to keras model
c2b_out[-1]

In [None]:
np.all(np.isclose(
    layer_1_out,
    c2b_out[-1]
))

finally, run the last classifier layer

In [None]:
classifier_kernel = test_model.layers[5].weights[0].numpy()
classifier_bias = test_model.layers[5].weights[1].numpy()

classifier_kernel.shape, classifier_bias.shape

In [None]:
print("layer_1_out", layer_1_out.shape)
print("classifier_kernel", classifier_kernel.shape)

In [None]:
final_prediction = np.dot(layer_1_out, classifier_kernel.squeeze()) + classifier_bias
final_prediction

In [None]:
final_step_y_pred

In [None]:
np.isclose(
    final_prediction,
    final_step_y_pred
)

so it works to run `apply` three times to get the output for layer0. 
and then run `apply` one more time on that output to get the output from layer1
and finally a simple classifier.

now to run in a streaming way using a cache