In [None]:
import TimeseriesOversampler as to
import json, os, random
from sklearn.preprocessing import StandardScaler

file = open('timeseries_anom.txt', 'r')
file.readline() # skip header

timeseries = {}
for line in file:
    split = line[1:-2].split(', ', 1)
    
    label = int(split[0])
    ts = StandardScaler().fit_transform(json.loads(split[1])).tolist() 
    
    if label in timeseries:
        timeseries[label].append(ts)
    else:
        timeseries[label] = [ts]
file.close()

random.shuffle(timeseries[0])
random.shuffle(timeseries[1])

train_X = []
train_Y = []
test_X_synth = []
test_Y_synth = []
test_X_orig = []
test_Y_orig = []

train_X.extend(timeseries[0][:66])
train_Y.extend([0 for _ in range(66)])
train_X.extend(timeseries[1][:66])
train_Y.extend([1 for _ in range(66)])

tmp = list(zip(train_X, train_Y))
random.shuffle(tmp)
train_X, train_Y = zip(*tmp)

test_X_orig.extend(timeseries[0][-22:])
test_Y_orig.extend([0 for _ in range(22)])
test_X_orig.extend(timeseries[1][-22:])
test_Y_orig.extend([1 for _ in range(22)])

tmp = list(zip(test_X_orig, test_Y_orig))
random.shuffle(tmp)
test_X_orig, test_Y_orig = zip(*tmp)

oversampler = to.TimeseriesOversampler()
test_X_synth.extend(oversampler.oversample_timeseries(timeseries[0], ts_num=44, d=0))
test_Y_synth.extend([0 for _ in range(44)])
test_X_synth.extend(oversampler.oversample_timeseries(timeseries[1], ts_num=44, d=0))
test_Y_synth.extend([1 for _ in range(44)])

tmp = list(zip(test_X_synth, test_Y_synth))
random.shuffle(tmp)
test_X_synth, test_Y_synth = zip(*tmp)

del timeseries

print('Trainset', len(train_X), len(train_Y))
print('Original testset', len(test_X_orig), len(test_Y_orig))
print('Synthetic testset', len(test_X_synth), len(test_Y_synth))

In [None]:
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tcn import TCN

import numpy as np
import matplotlib.pyplot as plt

def data_generator():
    while True:
        for i in range(len(train_X)):
            yield np.expand_dims(train_X[i], axis=0), np.expand_dims(train_Y[i], axis=(0, 1))

JOBS_N = len(train_X)
EPOCHS = 30
BATCH_SIZE = 1
DIM = 18

print(f'{EPOCHS} epochs - {JOBS_N} jobs - batch size {BATCH_SIZE}')

i = Input(batch_shape=(1, None, DIM))

o = TCN(nb_filters=8, kernel_size=4, nb_stacks=1, dilations=(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024), use_layer_norm=True, dropout_rate=0.3)(i)
o = Dense(1, activation='sigmoid')(o)

m = Model(inputs=[i], outputs=[o])
m.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])

acc = []
loss = []

for e in range(EPOCHS):
    print(f'Epoch {e+1}/{EPOCHS}')
    hist = m.fit(data_generator(), steps_per_epoch=JOBS_N//BATCH_SIZE, max_queue_size=1)
    
    acc.append(hist.history['accuracy'])
    loss.append(hist.history['loss'])
    
    plt.plot(loss, color='dodgerblue', label='loss')
    plt.plot(acc, color='orange', label='accuracy')
    plt.show()

In [None]:
tot_correct = 0

synth_tot_correct = 0
synth_tot_counter = 0
synth_misclassified = {0: 0, 1: 0}

for i, item in enumerate(test_X_synth):
    synth_tot_counter += 1
    p = m.predict(np.array([item]))[0][0]
    pred = int(round(p))
    if pred == test_Y_synth[i]:
        synth_tot_correct += 1
        tot_correct += 1
    else:
        synth_misclassified[test_Y_synth[i]] += 1
    print(f'{i+1}/{len(test_X_synth)} Real label {test_Y_synth[i]} - predicted {pred} | accuracy {synth_tot_correct / synth_tot_counter}')
    del pred

print('Synthetic testset:')
print(f'Accuracy: {synth_tot_correct/len(test_Y_synth)}')
print(f'Misclassified: 0 -> {synth_misclassified[0]}, 1 -> {synth_misclassified[1]}')

In [None]:
orig_tot_correct = 0
orig_tot_counter = 0
orig_misclassified = {0: 0, 1: 0}

for i, item in enumerate(test_X_orig):
    orig_tot_counter += 1
    p = m.predict(np.array([item]))[0][0]
    pred = int(round(p))
    if pred == test_Y_orig[i]:
        orig_tot_correct += 1
        tot_correct += 1
    else:
        orig_misclassified[test_Y_orig[i]] += 1
    print(f'{i+1}/{len(test_X_orig)} Real label {test_Y_orig[i]} - predicted {pred} | accuracy {orig_tot_correct / orig_tot_counter}')
    del pred

print('Original testset:')
print(f'Accuracy: {orig_tot_correct/len(test_Y_orig)}')
print(f'Misclassified: 0 -> {orig_misclassified[0]}, 1 -> {orig_misclassified[1]}')

print('\nTotal:')
print(f'Accuracy: {tot_correct/128}')