# This notebook includes scratch time series classification using CNNs
Here, we use CNNs architecture used in a [paper](https://arxiv.org/pdf/1611.06455.pdf) called FCN(Fully Convolutional Networks) and ResNets

## 1) Install required packages

In [1]:
import numpy as np
import pickle as pk
from google.colab import drive
import tensorflow as tf
import os
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense, Conv1D, BatchNormalization, GlobalAvgPool1D, Add
from tensorflow.keras import models
from tensorflow.keras import Model as Model_

In [2]:
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


## 2) Load dataset
- The dataset includes time-series data generated by a motion sensor and are collected by 10 actors with labels whether they are crouching, running, walking or something. Among several labels, here use only crouching, running and swing data. Here we load the dataset depending on window size and slide size. 

In [3]:
def motionLoad(classes, directory, window = 60, slide=1):

  file_list = os.listdir(directory)
  file_list = [i for i in file_list if any(s in i for s in classes)]
  instances, labels = [], []

  for f in file_list:
    label = [0.,0.,0.]
    index = 0
    for c, cla in enumerate(classes):
      if cla in f:
        label[c] = 1.0

    with open(directory+f) as doc:
      lines = doc.readlines()
    end = 0

    while index < len(lines):
      count = 0 
      subin = []
      if index + window < len(lines):
        if index != 0 :
          index = index - window + slide
        while count < window:
          count +=1 
          out = lines[index].split("|m ")[1].split(" |class")[0].split(" ") 
          out = [float(i) for i in out]
          subin.append(out)
          index +=1 
        instances.append(np.array(subin))
        labels.append   (np.array(label))

      else:
        index = len(lines)
  return instances, labels 

In [4]:
xv, yv = motionLoad(["Crouching", "Running", "Swing"], "/content/gdrive/My Drive/DLLab/Exercise/Motion/VAL/", window=60, slide=10)
xt, yt = motionLoad(["Crouching", "Running", "Swing"], "/content/gdrive/My Drive/DLLab/Exercise/Motion/TRAIN/", window=60, slide=10)

In [5]:
xv, yv, xt, yt = np.array(xv), np.array(yv), np.array(xt), np.array(yt)

In [None]:
xv.shape #(BATCH, TIME, FEATURES)
yv.shape #(BATCH, ONE-HOT LABELS)

(2105, 3)

## 3) Data Normalization
- For normalization, we use Min-Max normalization for each dataset. 



In [6]:
train_max = xt.max(axis=1).max(axis=0)
train_min = xt.min(axis=1).min(axis=0)
val_max   = xv.max(axis=1).max(axis=0)
val_min   = xv.min(axis=1).min(axis=0)

In [7]:
xv = (xv-val_min)/(val_max-val_min)
xt = (xt-train_min)/(train_max-train_min)

## 4) FCNs 

In [8]:
n_timesteps,n_features = xt.shape[1], xt.shape[2]
n_outputs = yt.shape[1]

In [31]:
class FCNBlock(tf.keras.layers.Layer):
  def __init__(self, filters, kernel_size, strides=1, activation='relu'):
    super(FCNBlock, self).__init__()
    
    self.conv = Conv1D(filters=filters, kernel_size=kernel_size, strides=strides, activation=activation)
    self.bn = BatchNormalization()

  def call(self, data):
    output = self.conv(data)
    output = self.bn(output)
    return output

In [30]:
class FCNs(Model_):
  def __init__(self):
    super(FCNs, self).__init__()

    self.Layer1 = FCNBlock(filters=128, kernel_size=8)
    self.Layer2 = FCNBlock(filters=256, kernel_size=5)
    self.Layer3 = FCNBlock(filters=128, kernel_size=3)
    self.Layer4 = GlobalAvgPool1D()
    self.Layer5 = Dense(units=n_outputs, activation='softmax')

  def call(self, data):
    output=self.Layer1(data)
    output=self.Layer2(output)
    output=self.Layer3(output)
    output=self.Layer4(output)
    output=self.Layer5(output)
    return output

In [43]:
class ResNetBlock(tf.keras.layers.Layer):
  def __init__(self, filters, kernel_size, strides=1, padding='same', activation='relu', data_format='channels_first'):
    super(ResNetBlock, self).__init__()
    
    self.conv1 = Conv1D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, activation=activation, data_format=data_format)
    self.bn1 = BatchNormalization()
    self.conv2 = Conv1D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding, activation=activation, data_format=data_format)
    self.bn2 = BatchNormalization()

  def call(self, data):
    output = self.conv1(data)
    output = self.bn1(output)
    output = self.conv2(output)
    output = self.bn2(output)
    return output

In [44]:
class ResNets(Model_):
  def __init__(self):
    super(ResNets, self).__init__()

    self.Layer1 = ResNetBlock(filters=64, kernel_size=3) 
    self.Layer2 = Conv1D(filters=1, kernel_size=1, strides=1, padding='same', data_format='channels_first')
    self.Layer3 = BatchNormalization()
    self.Layer4 = Add()

    self.Layer5 = ResNetBlock(filters=128, kernel_size=8) 
    self.Layer6 = Conv1D(filters=1, kernel_size=1, strides=1, padding='same', data_format='channels_first')
    self.Layer7 = BatchNormalization()
    self.Layer8 = Add()

    self.Layer9 = ResNetBlock(filters=128, kernel_size=8) 
    self.Layer10 = Conv1D(filters=1, kernel_size=1, strides=1, padding='same', data_format='channels_first')
    self.Layer11 = BatchNormalization()
    self.Layer12 = Add()

    self.Layer13 = GlobalAvgPool1D()
    self.Layer14 = Dense(units=n_outputs, activation='softmax')

  def call(self, data):
    t_output=self.Layer1(data)
    residual=self.Layer2(data)
    residual=self.Layer3(residual)
    output=self.Layer4([t_output, residual])

    t_output=self.Layer5(output)
    residual=self.Layer6(output)
    residual=self.Layer7(residual)
    output=self.Layer8([t_output, residual])

    t_output=self.Layer9(output)
    residual=self.Layer10(output)
    residual=self.Layer11(residual)
    output=self.Layer12([t_output, residual])

    output=self.Layer13(output)
    output=self.Layer14(output)

    return output

In [25]:
class Optimizer:
  def __init__(self, model, batch_size = 10, loss = tf.keras.losses.CategoricalCrossentropy, lr = 0.001, opt=tf.keras.optimizers.Adam):
    self.model     = model
    self.loss      = loss()
    self.optimizer = opt(learning_rate = lr)
    self.batch_size = batch_size
    self.train_loss     = tf.keras.metrics.Mean(name='train_loss')
    self.train_accuracy = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')
    self.test_loss     = tf.keras.metrics.Mean(name='test_loss')
    self.test_accuracy = tf.keras.metrics.CategoricalAccuracy(name='test_accuracy')
    
  @tf.function
  def train_step(self, x , y):
    with tf.GradientTape() as tape:
      predictions = self.model(x)
      loss = self.loss(predictions, y)
    gradients = tape.gradient(loss, self.model.trainable_variables)
    self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
    self.train_loss(loss)
    self.train_accuracy(y, predictions)
    return loss

  @tf.function
  def test_step(self, x , y):
    predictions = self.model(x)
    loss = self.loss(predictions, y)
    self.test_loss(loss)
    self.test_accuracy(y, predictions) 
    
  def train (self):
    for mbX, mbY in self.train_ds:
      self.train_step(mbX, mbY)

  def test  (self):
    for mbX, mbY in self.test_ds:
      self.test_step(mbX, mbY)  

  def run   (self, dataX, dataY, testX, testY, epochs, verbose=2): 
    historyTR_LOSS = []
    historyTS_LOSS = []
    historyTR_ACC = []
    historyTS_ACC = []

    template = '{} {}, {}: {}, {}: {}, {}: {}, {}: {}'
    self.train_ds = tf.data.Dataset.from_tensor_slices((dataX, dataY)).shuffle(16000).batch(self.batch_size)
    self.test_ds  = tf.data.Dataset.from_tensor_slices((testX,testY)).batch(self.batch_size)

    for i in range(epochs):
      self.train ()
      self.test  ()
      if verbose > 0: 
        print(template.format("epoch: ", i+1,
                              "TRAIN LOSS: ", self.train_loss.result(),
                              "TEST LOSS: " , self.test_loss.result(),
                              "TRAIN ACC: " , self.train_accuracy.result().numpy()*100,
                              "TEST ACC: "  , self.test_accuracy.result().numpy()*100))
      temp = '{}'
    
      historyTR_LOSS.append(float(temp.format(self.train_loss.result().numpy())))
      historyTS_LOSS.append(float(temp.format(self.train_loss.result().numpy())))
      historyTR_ACC.append(float(temp.format(self.train_accuracy.result().numpy()*100)))
      historyTS_ACC.append(float(temp.format(self.test_accuracy.result().numpy()*100)))

      self.train_loss.reset_states()
      self.test_loss.reset_states()
      self.train_accuracy.reset_states()
      self.test_accuracy.reset_states()
    return historyTR_LOSS, historyTS_LOSS, historyTR_ACC, historyTS_ACC

In [32]:
model = FCNs()
opt    = Optimizer (model, batch_size = 256, lr = 0.0001, loss = tf.keras.losses.CategoricalCrossentropy, opt=tf.keras.optimizers.Adam)
tl, vl, ta, va = opt.run (xt, yt, xv, yv, 10, verbose=1)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

epoch:  1, TRAIN LOSS: : 9.505522727966309, TEST LOSS: : 6.731117248535156, TRAIN ACC: : 50.770503282547, TEST ACC: : 64.41805362701416
epoch:  2, TRAIN LOSS: : 6.892348289489746, TEST LOSS: : 4.023904323577881, TRAIN ACC: : 60.8726441860199, TEST ACC: : 85.46318411827087
epoch:  3, TRAIN LOSS: : 5.484622478485107, TEST LOSS: : 1.8261260986328125, TRAIN ACC: : 69.8529839515686, TEST ACC: : 93.77672076225281
epoch:  4, TRAIN LOSS: : 4.563806533813477, TEST LOSS: : 1.293486475944519, TRAIN ACC: : 74.4405746459961, TEST ACC: : 94.53681707382202
epoch:  5, TRAIN LOSS: : 4.191636562347412, TEST LOSS: : 1.56113862991333, TRAIN ACC: : 76.0170042514801, TEST ACC: : 92.7315890789032
epoch:  6, TRAIN

In [45]:
model = ResNets()
opt    = Optimizer (model, batch_size = 256, lr = 0.0001, loss = tf.keras.losses.CategoricalCrossentropy, opt=tf.keras.optimizers.Adam)
tl, vl, ta, va = opt.run (xt, yt, xv, yv, 10, verbose=1)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

epoch:  1, TRAIN LOSS: : 8.249195098876953, TEST LOSS: : 6.038403034210205, TRAIN ACC: : 52.74841785430908, TEST ACC: : 76.10451579093933
epoch:  2, TRAIN LOSS: : 5.713881492614746, TEST LOSS: : 4.5189337730407715, TRAIN ACC: : 68.46548914909363, TEST ACC: : 75.43942928314209
epoch:  3, TRAIN LOSS: : 4.637859344482422, TEST LOSS: : 3.2411069869995117, TRAIN ACC: : 74.1512656211853, TEST ACC: : 86.55582070350647
epoch:  4, TRAIN LOSS: : 3.6348798274993896, TEST LOSS: : 2.238347291946411, TRAIN ACC: : 80.46879768371582, TEST ACC: : 91.82897806167603
epoch:  5, TRAIN LOSS: : 3.047306776046753, TEST LOSS: : 1.5874170064926147, TRAIN ACC: : 83.93458127975464, TEST ACC: : 94.10926103591919
epoch: