In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime

class HMM:
    def __init__(self, states, observations, pi=None, transition=None, emission=None):
        self.states = states
        self.pi = pi
        self.trans = transition
        self.emis = emission
        self.obs = observations
        self.initCalc()
        self.n = None
    
    def initCalc(self):
        if self.pi is None:
            self.pi = np.repeat(1/self.states, self.states)
        if self.trans is None:
            transmat = np.random.randint(low=1, high = 2000, size=(self.states,self.states))
            self.trans = transmat/transmat.sum(axis=1, keepdims=True)
        if self.emis is None:
            emissionprob = np.random.randint(low=1, high = 2000, size=(self.states,len(np.unique(self.obs))))
            self.emis = emissionprob/emissionprob.sum(axis=1, keepdims=True)
    
    def normalization(self, step=1):
        """ Normalization constant required for backward algorithm and gamma value 
        """
        n = np.ones(len(self.obs))
        alpha = self.forwardUnnorm()

        n[0] = 1 / sum(alpha[:,0])

        for t in range(step, len(n), step):
            n[t] = 1 / (sum(alpha[:,t]) * np.prod([n[0:t]]))
        
        return(n)
    
    def forward(self):
        """ Return normalized alpha vales of the observation sequence
        """
        alpha = np.zeros((self.states, len(self.obs)))

        # initilization with normalization
        alpha[:,0] = self.pi * self.emis[:,self.obs[0]]
        alpha[:,0] = alpha[:,0] / sum(alpha[:,0])

        # t > 1 with normalization
        for t in range(1, len(self.obs)):
            for s in range(self.states):
                start = sum([alpha[i,t-1] * self.trans[i,s] for i in range(self.states)])
                alpha[s,t] = start * self.emis[s, self.obs[t]]
            alpha[:,t] = alpha[:,t] / sum(alpha[:,t])

        return(alpha)
    
    def forwardUnnorm(self):
        """ Return alpha vales of the observation sequence
        """
        alpha = np.zeros((self.states, len(self.obs)))

        # initilization with normalization
        alpha[:,0] = self.pi * self.emis[:,self.obs[0]]

        # t > 1 with normalization
        for t in range(1, len(self.obs)):
            for s in range(self.states):
                start = sum([alpha[i,t-1] * self.trans[i,s] for i in range(self.states)])
                alpha[s,t] = start * self.emis[s, self.obs[t]]

        return(alpha)

    def backward(self):
        """ Return normalized beta vales of the observation sequence
        """
        self.n = self.normalization()
        beta = np.zeros((self.states, len(self.obs)))

        # initilization
        beta[:,len(self.obs)-1] = 1

        # t < T with normalization
        for t in range(len(self.obs)-2, -1, -1):
            for s in range(self.states):
                beta[s,t] = self.n[t+1] * sum(self.trans[s,:] * self.emis[:, self.obs[t+1]] * beta[:,t+1])

        return(beta)

    def backwardUnnorm(self):
        """ Return beta vales of the observation sequence
        """
        beta = np.zeros((self.states, len(self.obs)))

        # initilization
        beta[:,len(self.obs)-1] = 1

        # t < T with normalization
        for t in range(len(self.obs)-2, -1, -1):
            for s in range(self.states):
                beta[s,t] = sum(self.trans[s,:] * self.emis[:, self.obs[t+1]] * beta[:,t+1])
        
        return(beta)
    
    def BaumWelch(self, iteration = 10):
        count = len(self.obs)
        
        for _ in range(iteration):
            # E-Step
            alpha = self.forward()
            beta = self.backward()
            
            # M-Step
            trans = np.zeros((self.states, self.states))
            emis = np.zeros((self.states, self.emis.shape[1]))

            pi = np.array([self.gamma(alpha, beta, i, 0) for i in range(self.states)])

            for i in range(self.states):
                for j in range(self.states):
                    num = sum([self.zeta(alpha, beta, i, j, t) for t in range(count-1)])
                    denom = sum([self.gamma(alpha, beta, i, t) for t in range(count-1)])
                    trans[i,j] =  num /denom
            
            for i in range(self.states):
                for j in range(self.emis.shape[1]):
                    num = sum([self.gamma(alpha, beta, i, t) * 
                               np.where(self.obs[t] == range(self.emis.shape[1])[j], 1, 0) for t in range(count)])
                    denom = sum([self.gamma(alpha, beta, i, t) for t in range(count)])
                    emis[i,j] = num/denom
            
            self.trans = trans
            self.emis = emis
            self.pi = pi
            
    def Viterbi(self):
        count = len(self.obs)
        
        #init 
        delta = np.zeros((self.states, count))
        psi = np.zeros((self.states, count))
        output = np.zeros(count)
        
        delta[:,0] = self.pi * self.emis[:,self.obs[0]]
        
        # recurse
        for t in range(1, count):
            for j in range(self.states):
                delta[j, t] = max([delta[i,t-1] * self.trans[i,j] for i in range(self.states)]) * self.emis[j, self.obs[t]]
                psi[j, t] = np.argmax([delta[i,t-1] * self.trans[i,j] for i in range(self.states)])
        
        # termination
        p = max([delta[i,count-1] for i in range(self.states)])
        q = np.argmax([delta[i,count-1] for i in range(self.states)])
        
        # backtracking
        output[count-1] = q
        for t in range(count-2, -1, -1):
            output[t] = psi[int(output[t+1]), t+1]
        
        return output
        
    def zeta(self, alpha, beta, i, j, t):
        num = alpha[i, t] * self.trans[i,j] * self.emis[j, self.obs[t+1]] * beta[j, t+1] 
        denom = 0
        for k in range(self.states):
            for l in range(self.states):
                denom += alpha[k, t] * self.trans[k,l] * self.emis[l, self.obs[t+1]] * beta[l, t+1]       
        return (num / denom)
    
    def gamma(self, alpha, beta, i, t):
        num = alpha[i,t] * beta[i,t]
        denom = np.sum(alpha[:,t] * beta[:,t])
        return (num/denom)

In [59]:
data = pd.read_csv('datatraining.txt')
data= data.set_index('date')
data.index = pd.to_datetime(data.index)

newRes = data.resample('15T').mean()
newRes['discreteCO2'] = np.where(newRes.CO2 < 450, 0, np.where(newRes.CO2 < 650, 1, 2))
newRes['discreteHumidRatio'] = np.where(newRes.HumidityRatio < 0.0037, 0, np.where(newRes.HumidityRatio < 0.0044, 1, 2))

In [121]:
newRes.head()

Unnamed: 0_level_0,Temperature,Humidity,Light,CO2,HumidityRatio,Occupancy,discreteCO2,discreteHumidRatio
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2015-02-04 17:45:00,23.125556,27.2205,423.277778,705.833333,0.004768,1.0,2,2
2015-02-04 18:00:00,22.982667,27.265778,195.5,686.983333,0.004735,0.466667,2,2
2015-02-04 18:15:00,22.703854,27.4675,0.0,673.010417,0.004689,0.0,2,2
2015-02-04 18:30:00,22.474286,27.427143,0.0,641.017857,0.004617,0.0,1,2
2015-02-04 18:45:00,22.33,27.39,0.0,619.766667,0.00457,0.0,1,2


In [127]:
np.random.seed(19)
data1 = newRes.discreteCO2.values
model1 = HMM(states= 2, observations = data1)
model1.BaumWelch(iteration=12)

data2 = newRes.discreteHumidRatio.values
model2 = HMM(states= 2, observations = data2)
model2.BaumWelch(iteration=12)

In [128]:
print(pd.DataFrame(model1.pi).to_latex())
print(pd.DataFrame(model1.trans).to_latex())
print(pd.DataFrame(model1.emis).to_latex())

\begin{tabular}{lr}
\toprule
{} &             0 \\
\midrule
0 &  9.926692e-98 \\
1 &  1.000000e+00 \\
\bottomrule
\end{tabular}

\begin{tabular}{lrr}
\toprule
{} &         0 &         1 \\
\midrule
0 &  0.989630 &  0.010370 \\
1 &  0.031388 &  0.968612 \\
\bottomrule
\end{tabular}

\begin{tabular}{lrrr}
\toprule
{} &             0 &         1 &             2 \\
\midrule
0 &  6.125098e-01 &  0.387490 &  5.594238e-18 \\
1 &  3.812407e-26 &  0.013569 &  9.864311e-01 \\
\bottomrule
\end{tabular}



In [124]:
test = pd.read_csv('datatest.txt')
test= test.set_index('date')
test.index = pd.to_datetime(test.index)

test2 = test.resample('15T').mean()
test2['discreteCO2'] = np.where(test2.CO2 < 450, 0, np.where(test2.CO2 < 650, 1, 2))
test2['discreteHumidRatio'] = np.where(test2.HumidityRatio < 0.0037, 0, np.where(test2.HumidityRatio < 0.0044, 1, 2))
target = np.round(test2.Occupancy.values)

In [125]:
model1.obs = test2.discreteCO2.values
pred1= model1.Viterbi()

model2.obs = test2.discreteHumidRatio.values
pred2= model2.Viterbi()

In [135]:
def conf(prediction, actual):
    TP = 0
    TN = 0
    FP = 0
    FN = 0
    for i in range(len(prediction)):
        if (prediction[i] == 1) & (actual[i] == 1):
            TP += 1
        elif (prediction[i] == 0) & (actual[i] == 0):
            TN += 1
        elif (prediction[i] == 1) & (actual[i] == 0):
            FP += 1
        else:
            FN += 1
    return(TP, TN, FP, FN)

tp1, tn1, fp1, fn1 = conf(pred1, target)
tp2, tn2, fp2, fn2 = conf(pred2, target)

precision1 = tp1/(tp1+fp1)
precision2 = tp2/(tp2+fp2)

recall1 = tp1/(tp1+fn1)
recall2 = tp2/(tp2+fn2)

acc1= (tp1 + tn1)/(tp1+ tn1+ fp1+ fn2)
acc2= (tp2 + tn2)/(tp2+ tn2+ fp2+ fn2)

print(round(precision1*100,2), round(precision2*100,2))
print(round(recall1*100,2), round(recall2*100,2))
print(round(acc1*100,2), round(acc2*100,2))
print(np.array([[tp1, fp1], [fn1, tn1]]))
print(np.array([[tp2, fp2], [fn2, tn2]]))

75.95 54.87
92.31 95.38
87.5 69.66
[[60 19]
 [ 5 94]]
[[62 51]
 [ 3 62]]
