<a href="https://colab.research.google.com/github/joemarshall/websensors/blob/main/assets/python/BinaryClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This colab workbook presents a workflow for training a machine learning model for a simple classifier, then outputs it as a tflite file which can be used in the websensor platform or on a raspberry pi.

In [15]:
# tensorflow is the machine learning library we use
import tensorflow as tf
# numpy is for fast python maths
import numpy as np
# pandas for importing datafiles
import pandas as pd
import io

# make some stuff that is in tensorflow be 
# easier to get at below
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
import tensorflow.keras.losses as losses




In [16]:
# load datafiles - each datafile is a csv file of continuous sensor data with. 
# accompanying ground truth data ( 0 or 1)

# recorded data can be from a raspberry pi or from the websensor platform

# this stuff makes an upload box appear
from google.colab import files
uploaded = files.upload()

Saving speech_ground_truth.csv to speech_ground_truth (2).csv


In [17]:
# preprocess - for each data point, add history of the previous 511 points
# this is called 'unrolling'
def unroll_data_and_preprocess(data,gt):
  np_data=np.array(data)
  np_data=(np_data/512.0) # scale the data so it isn't too big
  return (np.lib.stride_tricks.sliding_window_view(np_data,window_shape=[512])).copy(),gt[511:]



column_names=["sound level","ground truth"]

datasets=[]

for c in uploaded.keys():
  print(f"Loading: {c}")
  csv_frame=pd.read_csv(io.BytesIO(uploaded[c]))
  file_x = csv_frame[column_names[0]].to_numpy()
  file_y= csv_frame[column_names[1]].to_numpy()
  datasets.append(unroll_data_and_preprocess(file_x,file_y))

# make arrays for x and y
x_data=np.concatenate([x for (x,y) in datasets])
y_data=np.concatenate([y for (x,y) in datasets])
print(f"Loaded data: {x_data.shape},{y_data.shape}")

Loading: speech_ground_truth.csv
Loaded data: (6900, 512),(6900,)


In [18]:
# shuffle the datasets
p = np.random.permutation(x_data.shape[0])
x_data=x_data[p]
y_data=y_data[p]

# split the datasets into train and test
split_point=int (x_data.shape[0]*.75 )
#split_point=x_data.shape[0]-1
x_train=x_data[0:split_point]
x_test=x_data[split_point:]
y_train=y_data[0:split_point]
y_test=y_data[split_point:]




In [19]:
# build a model - 4 convolutional layers to identify features, then a fully connected layer to output 
# the result
model=keras.Sequential(layers=[layers.Input(name='x',shape=(512,1)),layers.Conv1D(32,kernel_size=3,padding="same",strides=2,activation="relu"),
                         layers.Conv1D(32,kernel_size=3,padding="same",strides=2,activation="relu"),
                         layers.Conv1D(32,kernel_size=3,padding="same",strides=2,activation="relu"),
                         layers.Conv1D(32,kernel_size=3,padding="same",strides=2,activation="relu"),
                         #layers.Conv1D(64,kernel_size=32,padding="same",strides=32,activation="relu"),
                         layers.Flatten(),
                         layers.Dense(64,activation="relu"),
                         layers.Dense(2,activation="softmax",name='y')]) # classifier output - 0 = true, 1 = false
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy',run_eagerly=False) # categorical cross-entropy loss
model.build(input_shape=(None,512,1))
model.summary()
print(model.input,model.output)

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_8 (Conv1D)           (None, 256, 32)           128       
                                                                 
 conv1d_9 (Conv1D)           (None, 128, 32)           3104      
                                                                 
 conv1d_10 (Conv1D)          (None, 64, 32)            3104      
                                                                 
 conv1d_11 (Conv1D)          (None, 32, 32)            3104      
                                                                 
 flatten_2 (Flatten)         (None, 1024)              0         
                                                                 
 dense_2 (Dense)             (None, 64)                65600     
                                                                 
 y (Dense)                   (None, 2)                

In [22]:
# call train on the model
model.fit(x_train,y_train,batch_size=32,validation_data=(x_test,y_test),epochs=50)


Epoch 1/250
Epoch 2/250
Epoch 3/250
Epoch 4/250
Epoch 5/250
Epoch 6/250
Epoch 7/250
Epoch 8/250
Epoch 9/250
Epoch 10/250
Epoch 11/250
Epoch 12/250
Epoch 13/250
Epoch 14/250
Epoch 15/250
Epoch 16/250
Epoch 17/250
Epoch 18/250
Epoch 19/250
Epoch 20/250
Epoch 21/250
Epoch 22/250
Epoch 23/250
Epoch 24/250
Epoch 25/250
Epoch 26/250
Epoch 27/250
Epoch 28/250
Epoch 29/250
Epoch 30/250
Epoch 31/250
Epoch 32/250
Epoch 33/250
Epoch 34/250
Epoch 35/250
Epoch 36/250
Epoch 37/250
Epoch 38/250
Epoch 39/250
Epoch 40/250
Epoch 41/250
Epoch 42/250
Epoch 43/250
Epoch 44/250
Epoch 45/250
Epoch 46/250
Epoch 47/250
Epoch 48/250
Epoch 49/250
Epoch 50/250
Epoch 51/250
Epoch 52/250
Epoch 53/250
Epoch 54/250
Epoch 55/250
Epoch 56/250
Epoch 57/250
Epoch 58/250
Epoch 59/250
Epoch 60/250
Epoch 61/250
Epoch 62/250
Epoch 63/250

KeyboardInterrupt: ignored

In [23]:
# Save model to a tflite model for inference on raspberry pi (or websensor platform)
converter=tf.lite.TFLiteConverter.from_keras_model(model)
tflite=converter.convert()

tflite_model_file = open('model.tflite',"wb")
tflite_model_file.write(tflite)

interpreter = tf.lite.Interpreter(model_content=tflite)

signatures = interpreter.get_signature_list()
print(signatures)

from google.colab import files
files.download('model.tflite')




{'serving_default': {'inputs': ['x'], 'outputs': ['y']}}


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>