In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt

from hmmlearn import hmm
from torch.distributions import uniform
from numpy.lib.stride_tricks import numpy.lib.sliding_window_view

import sys
sys.path.append("../")



In [58]:
#Generate Datasets using HMM

def random_transmat(n_states):
    matrix = np.random.rand(n_states, n_states)
    return matrix/matrix.sum(axis=1)[:,None]

def random_startprob(n_states):
    startprob = np.random.rand(n_states)
    return startprob/startprob.sum()

def random_means(n_features):
    return np.random.randint(5, size=(n_features,n_features))

def generate_hmm(n_states, n_features , length):
    #GENERATING A MODEL
    model = hmm.GaussianHMM(n_components=n_states, covariance_type="full")
    model.startprob_ = random_startprob(n_states)
    model.transmat_ = random_transmat(n_states)

    model.means_ = random_means(n_features)
    model.covars_ = np.tile(np.identity(n_features), (n_features, 1, 1))


    #SAMPLING FROM MODEL and STORING IN TENSOR

    #Number of Samples in Dataset
    dataset=[]
    states = []

    
    X, Z = model.sample(length)
    dataset.append(np.array(X))
    states.append(Z)

    dataset = np.stack(dataset)
    
    return np.array(X), np.array(Z)

def sliding_windows(dataset, Z, window_length = 10):
    lis = []
    targets=[]

    window_length = 10
    if len(dataset) >= window_length+1:
        for i in range(0, len(dataset)-window_length, 1):

            x_window = dataset[i:i+window_length, :]
            z_window = Z[i:i+window_length]

            lis.append(x_window)
            targets.append(z_window[-1])
    return np.array(lis), np.array(targets)

def generate_time_dependent_flip(length, startprob, transmat):
    #GENERATING A MODEL


    model = hmm.GaussianHMM(n_components=n_states, covariance_type="full")
    model.startprob_ = startprob
    model.transmat_ = transmat

    #this doesn't actually matter for us
    model.means_ = np.array([[0.0, 0.0], 
                             [5.0, 10.0]])
    model.covars_ = np.tile(np.identity(2), (3, 1, 1))


    #SAMPLING FROM MODEL and STORING IN TENSOR

    #Number of Samples in Dataset


    X, Z = model.sample(length)

    
    return Z


In [154]:
#Injecting Noise into Labels

#Given a flip_mask, flip an input
def flip(array, flip_mask):
    flipped_array = np.logical_xor(array, flip_mask, out=array)
    return flipped_array

#Class Independent / Time Independent
def flip_labels_basic(array, flip_probability):
    flip_mask = np.random.binomial(1, flip_probability, len(array))
    return flip(array, flip_mask)

#Class Dependent / Time Independent
def flip_labels_class(array, flip_probability_0, flip_probability_1):
    flip_mask = []
    for elem in array:
        if elem == 0:
            to_flip = np.random.binomial(1, flip_probability_0, 1)[0]
            flip_mask.append(to_flip)
        else:
            to_flip = np.random.binomial(1, flip_probability_1, 1)[0]
            flip_mask.append(to_flip)
            
    return flip(array, flip_mask)

#Class Independent / Time Dependent
def flip_labels_time(array, startprob, transmat):
    flip_mask = generate_time_dependent_flip(len(array), startprob, transmat)[0]

    return flip(array, flip_mask)


#Class Dependent / Time Dependent
#This can be achieved by careful design of the transition matrix (transmat)

In [5]:
dataset,Z = generate_hmm(2,3, 100)

In [32]:
Z

array([1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1,
       1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1,
       1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0,
       0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1])

90

In [495]:
startprob = random_startprob(2)
transmat = np.array([[0.95, 0.05],
                    [0.95, 0.05]])


In [159]:
def generate_dataset(n_states, n_features, length,window_length, train_ratio, method, 
                     flip_probability= None, flip_probability_0=None, flip_probability_1=None,
                    startprob=None, transmat=None):
    
    #Generate Data
    dataset, states_true = generate_hmm(n_states, n_features , length)
    
    if method == "basic":
        states_flipped = (flip_labels_basic(states_true, flip_probability))
        
    elif method == "class":
        states_flipped = (flip_labels_class(states_true, flip_probability_0, flip_probability_1))
        
    elif method == "time":
        states_flipped = (flip_labels_time(states_true, startprob, transmat))
    
    #RESHAPE INTO WINDOWS
    dataset_windows, states_true = sliding_windows(dataset, states_true, window_length= window_length)
    
    #RESHAPE INTO WINDOWS
    _, states_flipped = sliding_windows(dataset, states_flipped, window_length= window_length)
    
    x_train = dataset_windows[:int(train_ratio*len(dataset_windows)),:,:]
    x_test = dataset_windows[int(train_ratio*len(dataset_windows)):,:,:]
    
   
    #Flip The Labels according to method
    
    
    y_train_true = states_true[:int(train_ratio*len(dataset_windows))]
    y_test_true = states_true[int(train_ratio*len(dataset_windows)):]
    
    y_train_flipped = np.array(states_flipped[:int(train_ratio*len(dataset_windows))])
    y_test_flipped = np.array(states_flipped[int(train_ratio*len(dataset_windows)):])
    
    
    return x_train, y_train_true, y_train_flipped, x_test, y_test_true, y_test_flipped

In [160]:
n_states = 2
n_features = 3
length = 10000
window_length = 10
train_ratio = 0.7
method = "basic"
flip_probability = 0.1

x_train, y_train_true, y_train_flipped, x_test, y_test_true, y_test_flipped= generate_dataset(n_states, n_features, length,window_length, train_ratio, method, 
                     flip_probability, flip_probability_0=None, flip_probability_1=None,
                    startprob=None, transmat=None)

In [161]:
y_train_flipped.shape

(6993,)

In [162]:
n_states = 2
n_features = 3
length = 1000
window_length = 10
train_ratio = 0.7
method = "class"
flip_probability_0 = 0.1
flip_probability_1 = 0.2

x_train, y_train_true, y_train_flipped, x_test, y_test_true, y_test_flipped = generate_dataset(n_states, n_features, length,window_length, train_ratio, method, 
                     flip_probability= None, flip_probability_0=flip_probability_0, flip_probability_1 = flip_probability_1,
                    startprob=None, transmat=None)

In [163]:
x_train.shape

(693, 10, 3)

In [164]:
n_states = 2
n_features = 3
length = 1000
window_length = 10
train_ratio = 0.7
method = "time"

startprob = random_startprob(2)
transmat = np.array([[0.95, 0.05],
                    [0.95, 0.05]])

x_train, y_train_true, y_train_flipped, x_test, y_test_true, y_test_flipped = generate_dataset(n_states, n_features,length, window_length, train_ratio, method, 
                     flip_probability=None, flip_probability_0=None, flip_probability_1=None,
                    startprob=startprob, transmat=transmat)




In [165]:
y_train_flipped.shape

(693,)

In [187]:
#ADD NOISE TO HAR

noise_pair_45= np.array([[.55,0.45,0.0,0.0,0.0,0.0],
                        [0.0,.55,0.45,0.0,0.0,0.0],
                        [0.0,0.0,.55,0.45,0.0,0.0],
                        [0.0,0.0,0.0,.55,0.45,0.0],
                        [0.0,0.0,0.0,0.0,.55,0.0],
                        [0.45,0.0,0.0,0.0,0.0,.55]])
noise_sym_50 = np.array([[.50,.10,.10,.10,.10,.10],
                        [.10,.50,.10,.10,.10,.10],
                        [.10,.10,.50,.10,.10,.10],
                        [.10,.10,.10,.50,.10,.10],
                        [.10,.10,.10,.10,.50,.0],
                        [.10,.10,.10,.10,.10,.50]])

noise_sym_25 = np.array([[.25,.15,.15,.15,.15,.15],
                        [.15,.25,.15,.15,.15,.15],
                        [.15,.15,.25,.15,.15,.15],
                        [.15,.15,.15,.25,.15,.15],
                        [.15,.15,.15,.15,.25,.15],
                        [.15,.15,.15,.15,.15,.25]])

def flip_HAR_labels(array, noise_matrix):
    flipped = []
    for elem in array:
        flipped.append(np.random.choice([0,1,2,3,4,5], p=noise_matrix[elem]))
    
    return np.array(flipped)


def flip_HAR_labels_basic(array, flip_probability):
    flip_mask = np.random.binomial(1, flip_probability, len(array))
    flipped = []
    for i in range(len(array)):
        if flip_mask[i]==1:
            options = [0.0,1.0,2.0,3.0,4.0,5.0]
            new_options = [x for x in options if x != array[i]]
            flipped.append(np.random.choice(new_options, p=[0.2,0.2,0.2,0.2, 0.2]))
        else:
            flipped.append(array[i])
    
    return np.array(flipped)
    


In [192]:
array = np.array([0.0,1.0,2.0,3.0,4.0,5.0])
noise_matrix = noise_sym_25

flip_HAR_labels_basic(array, 0.1)

array([0., 0., 2., 3., 4., 5.])

In [186]:
flipped

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0]

In [155]:
flip_mask = np.random.binomial(1, 0.5, len(array))

In [156]:
flip_mask

array([0, 0, 1, 0, 0, 0])