In [None]:
# Install TensorFlow 2.1.
!pip install tensorflow==2.1

In [None]:
# Import tensorflow as tf.
import tensorflow as tf

# Import numpy as np.
import numpy as np

# Import pandas as pd.
import pandas as pd

# Import matplotlib.
import matplotlib

# Import pyplot as plt.
from matplotlib import pyplot as plt

# Import Model.
from tensorflow.keras import Model

# Import Keras' various types of layer.
from tensorflow.keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Dropout

# Import os.
import os

# Import drive.
from google.colab import drive

# Mount Google Drive.
drive.mount('/content/drive')

In [None]:
# Activities list.
activities = ['Sitting', 'Lying', 'Standing', 'Jumping', 'Walking', 'Running']

# Number of classes.
N_CLASSES = len(activities)

# Number of sensors.
N_SENSORS = 6

# Number of raw samples considered in a single inference.
N_TIMESTEPS = 150

In [None]:
# Function that reads the "raw_data.csv" file and returns a DataFrame.
def read_data():

  # Read data.
  df = pd.read_csv('/content/drive/My Drive/raw_data.csv')

  # Return df.
  return df

In [None]:
# Function that pre-process the raw data.
def preprocess(df):

  # Cast df to numpy array.
  data = df.to_numpy()

  # Normalization of the accelerometer values.
  data[:, :3] = data[:, :3] / 2.0

  # Normalization of the gyroscore values.
  data[:, 3:-1] = data[:, 3:-1] / 2000.0

  # Reshape data.
  data = data.reshape(int(len(data) / N_TIMESTEPS), N_TIMESTEPS, N_SENSORS + 1)

  # Shuffle data.
  np.random.shuffle(data)

  # Return data.
  return data

In [None]:
# Function that allows the creation of the model.
def get_model(FILTERS, KERNEL_SIZE, POOL_SIZE, NEURONS, DROPOUT_RATE):

  # Creation of the model.
  model = tf.keras.Sequential([
    
    # Convolutional layer.
    Conv1D(filters = FILTERS, kernel_size = KERNEL_SIZE, activation = 'relu', input_shape = (N_TIMESTEPS, N_SENSORS), padding = 'same'),
    
    # Pooling layer.
    MaxPooling1D(pool_size = POOL_SIZE, padding = 'same'),
    
    # Convolutional layer.
    Conv1D(filters = FILTERS, kernel_size = KERNEL_SIZE, activation = 'relu', padding = 'same'),
    
    # Pooling layer.
    MaxPooling1D(pool_size = POOL_SIZE, padding = 'same'),

    # Convolutional layer.
    Conv1D(filters = FILTERS, kernel_size = KERNEL_SIZE, activation = 'relu', padding = 'same'),
    
    # Pooling layer.
    MaxPooling1D(pool_size = POOL_SIZE, padding = 'same'),
    
    # Dropout layer.
    Dropout(DROPOUT_RATE),
    
    # Flatten layer.
    Flatten(),
    
    # Fully connected layer.
    Dense(NEURONS, activation = 'relu'),
    
    # Dropout layer.
    Dropout(DROPOUT_RATE),
    
    # Fully connected layer.
    Dense(N_CLASSES, activation = 'softmax')

  ])

  # Compile model.
  model.compile(optimizer = 'adam', loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = False), metrics = ['accuracy'])

  # Print the model informations.
  model.summary()
  
  # Return model.
  return model

In [None]:
# Number of training epochs.
EPOCHS = 100

# Validation split.
VALIDATION_SPLIT = 0.2

# Number of kernels used by each convolutional layer.
FILTERS  = 16

# Kernel size.
KERNEL_SIZE = 3

# Pool size.
POOL_SIZE = 3

# Neurons in the single fully connected hidden layer.
NEURONS = 128

# Dropout rate.
DROPOUT_RATE = 0.5

In [None]:
# Read data and pre-process the entire dataset.
data = preprocess(read_data())

# Create the model.
model = get_model(FILTERS, KERNEL_SIZE, POOL_SIZE, NEURONS, DROPOUT_RATE)

# Train the model.
model.fit(data[:, :, :N_SENSORS], data[:, 0, N_SENSORS], epochs = EPOCHS, validation_split = VALIDATION_SPLIT)

In [None]:
# Creazione dell'array contenente le features di validazione.
test_samples = data[int((1 - VALIDATION_SPLIT) * len(data)):, :, :N_SENSORS]

# Calibration steps.
num_calibration_steps = 128

# Calibration dtype.
calibration_dtype = tf.float32

# Function that allows the creation of a representative dataset in order to continue with the post-training quantization.
def representative_dataset_gen():

  # Cycle used to generate random samples.
  for _ in range(num_calibration_steps):

    # Random indexes generation.
    rand_idx = np.random.randint(0, test_samples.shape[0] - 1)

    # Pick samples according to the generated indexes.
    sample = test_samples[rand_idx]

    # Add new axis.
    sample = sample[tf.newaxis, ...]

    # Cast sample to calibration_dtype.
    sample = tf.cast(sample, dtype = calibration_dtype)

    # Yeld sample.
    yield [sample]

In [None]:
# Definition of the directory containing the models.
MODELS_DIR = 'models/'

# Check if "models/" already exists.
if not os.path.exists(MODELS_DIR):

    # Directory creation.
    os.mkdir(MODELS_DIR)

# Definition of the .tflite model's name.
MODEL_TFLITE = MODELS_DIR + 'model.tflite'

# Definition of the .cc model's name.
MODEL_TFLITE_MICRO = MODELS_DIR + 'model.cc'

# Create converter.
converter = tf.lite.TFLiteConverter.from_keras_model(model)

# Set optimizations.
converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Enforce full int8 quantization (except for inputs and outputs which are always floats)
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

# Provide a representative dataset.
converter.representative_dataset = representative_dataset_gen

# Convert the model.
model_tflite = converter.convert()

# Save the model.
open(MODEL_TFLITE, 'wb').write(model_tflite)

In [None]:
# xxd installation.
!apt-get update && apt-get -qq install xxd

# Model conversion.
!xxd -i {MODEL_TFLITE} > {MODEL_TFLITE_MICRO}

# Variables names' update.
REPLACE_TEXT = MODEL_TFLITE.replace('/', '_').replace('.', '_')
!sed -i 's/'{REPLACE_TEXT}'/g_model/g' {MODEL_TFLITE_MICRO}

In [None]:
# Print the .cc model.
!cat {MODEL_TFLITE_MICRO}