In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
import sys
sys.path.append('/content/drive/MyDrive/deep_learning_quantized')

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import architectures
import network_training
import preprocessing

In [6]:
def read_data_from_csv(csv_file_name):
      data = pd.read_csv(csv_file_name)
      x_data = data.iloc[:, :-2].values
      y_data = data.iloc[:, -2].values
      subj = data.iloc[:, -1].values

      return x_data, y_data, subj

In [7]:
csv_file_name = '/content/drive/MyDrive/deep_learning_quantized/multimodal_data.csv'
labels = ('low', 'medium', 'high')
avg = True

features, targets, subj_data = read_data_from_csv(csv_file_name)
fs = 64

In [8]:
# features = features[:, 0:4]
features = features[:, 0].reshape(-1, 1)
# features = features[:, 1:4]

# features = features[:, 5].reshape(-1, 1)
# features = features[:, 6:9]

print(features)

[[ -18.68]
 [ -17.01]
 [ -14.2 ]
 ...
 [-116.5 ]
 [-116.5 ]
 [-113.31]]


In [19]:
params = {
    "seed" : 256,
    "learning_rate" : 0.005,
    "weight_decay" : 1e-6,
    "step_size" : 3,
    "gamma" : 0.85,
    "batch_size" : 128,
    "epochs" : 30,
    "num_resblocks" : 3,
    "resblock_id" : 0
}

params["window_size"] = fs * 8
params["overlap"] = params["window_size"] * 7//8

In [20]:
scaled_data = preprocessing.scale_data(features, targets)

sliding_X_data, sliding_y_data = preprocessing.apply_sliding_window(scaled_data, targets, subj_data, params["window_size"], params["overlap"], avg)

X_data = sliding_X_data.astype(np.float32)
y_data = sliding_y_data.astype(np.uint8)

print(X_data.shape, y_data.shape)

(40757, 512, 1) (40757,)


In [21]:
params["num_channels"] = X_data.shape[2]
params["num_classes"] = len(labels)

In [22]:
import keras.utils

X_train, X_test, y_train, y_test = network_training.split_data(X_data, y_data, train_size=0.8)

y_train = keras.utils.to_categorical(y_train, num_classes=3)
y_test = keras.utils.to_categorical(y_test, num_classes=3)

X_train = np.expand_dims(X_train, axis=-1)
X_test = np.expand_dims(X_test, axis=-1)

In [23]:
from keras.callbacks import LearningRateScheduler

def lr_schedule(epoch, lr):
    if epoch % params['step_size'] == 0 and epoch != 0:
        return lr * params['gamma']
    return lr

lr_scheduler = LearningRateScheduler(lr_schedule)

In [None]:
from keras.models import Sequential
from keras.optimizers import Adam
from keras.metrics import Precision, Recall
from sklearn.metrics import f1_score

model = architectures.ResNet(num_channels=params['num_channels'], num_classes=params['num_classes'], num_resblocks=params['num_resblocks'], opt=params['resblock_id'])
optimizer = Adam(learning_rate=params['learning_rate'],  weight_decay=params['weight_decay'])

precision = Precision()
recall = Recall()

def f1_metric(y_true, y_pred):
    precision_value = precision(y_true, y_pred)
    recall_value = recall(y_true, y_pred)
    return  2 * ((precision_value * recall_value) / (precision_value + recall_value +  1e-7))

model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy', f1_metric, precision, recall])

keras.utils.set_random_seed(params["seed"])
np.random.seed(params["seed"])

from sklearn.utils import class_weight

class_weights = class_weight.compute_class_weight(class_weight='balanced', classes=np.unique(y_data), y=y_data)
class_weight_dict = dict(enumerate(class_weights))

# model.fit(X_train, y_train, batch_size=params['batch_size'], epochs=params['epochs'], verbose=1, validation_data=(X_test, y_test), class_weight=class_weight_dict)
model.fit(X_train, y_train, batch_size=params['batch_size'], epochs=params['epochs'], verbose=1, validation_data=(X_test, y_test), class_weight=class_weight_dict)

Epoch 1/30

In [None]:
tflite_quant_model = network_training.get_quantized_model(model)
interpreter = network_training.get_tflite_interpreter(tflite_quant_model)
quantized_acc, quantized_time, report = network_training.evaluate_quantized_metrics(interpreter, X_test, y_test)

print(f"Quantized model accuracy: {quantized_acc *  100}%")

In [None]:
import os
import time

with open('converted_quant_model.tflite', 'wb') as f:
    f.write(tflite_quant_model)

non_quantized_model_size = os.path.getsize('non_quantized_model.h5') / float(2**20)
print(f"Non-quantized model size: {non_quantized_model_size} MB")

quantized_model_size = os.path.getsize('converted_quant_model.tflite') / float(2**20)
print(f"Quantized model size: {quantized_model_size} MB")

In [None]:
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
input_shape = input_details[0]['shape']

start_time = time.time()

# for i in range(len(X_test)):
#     input_data = X_test[i].reshape(input_shape)
#     model.predict(input_data, verbose=0)

score = model.evaluate(X_test, y_test, verbose=0)
non_quantized_time = time.time() - start_time
print(f"Non-quantized model inference time: {non_quantized_time} seconds")

print(f"Quantized model inference time: {quantized_time} seconds")