This notebook serves as a playground to test out the pomegranate library. The goal is to fit a simple HMM given known data.

In [15]:
from pomegranate.hmm import DenseHMM
from pomegranate.distributions import Categorical
import numpy as np

In [20]:
# generate random data
# first generate the hidden states
# then generate the observations

def generate_next_hidden_state(current_state: int):
    if current_state == 0:
        # switch 90% of the time
        return 1 if np.random.random(size=(1)) < 0.9 else 0
    else:
        # switch 50% of the time
        return 1 if np.random.random(size=(1)) < 0.5 else 0

def generate_hidden_states(batch_size:int, context_length: int):
    hidden_sequences = []
    for _ in range(batch_size):
        # get the starting state which is 0 or 1
        hidden_sequence = [np.random.randint(low=0,high=2)]
        while len(hidden_sequence) < context_length:
            hidden_sequence.append(generate_next_hidden_state(hidden_sequence[-1]))
        hidden_sequences.append(hidden_sequence)
    hidden_sequences = np.array(hidden_sequences).reshape(batch_size, context_length, 1)
    return hidden_sequences

batch_size = 200
context_length = 100
hidden_sequences = generate_hidden_states(batch_size=batch_size, context_length=context_length)
hidden_sequences.shape
    

(200, 100, 1)

In [21]:
# now we generate the emissions
def _get_emission(hidden_state):
    if hidden_state == 0:
        # emit 0 or 1 with 50% each
        return 1 if np.random.random(size=(1)) < 0.5 else 0
    else:
        # emit 0 80% of the time, and 1 20% of the time
        return 0 if np.random.random(size=(1)) < 0.8 else 1

# vectorize it
get_emission = np.vectorize(_get_emission)
emission_sequences = get_emission(hidden_sequences)
emission_sequences.shape

(200, 100, 1)

In [18]:
# fit the HMM
d1 = Categorical([[0.5,0.5]])
d2 = Categorical([[0.8,0.2]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

[1] Improvement: -1925.69140625, Time: 0.02726s


DenseHMM(
  (start): Silent()
  (end): Silent()
  (distributions): ModuleList(
    (0-1): 2 x Categorical()
  )
)

Great, we have a working model with non-nan improvement. Let's see if we can find its learnt transition matrix.

In [19]:
np.exp(model.edges)

tensor([[0.4820, 0.5130],
        [0.4852, 0.5098]])

The emission matrix is wrong. It is 50% for each emission above, but by design we have 50%/50% for state 0 and 80%/20% for state 1.

Let's increase the training dataset size and see if the model learns.

In [22]:
batch_size = 400
context_length = 400
hidden_sequences = generate_hidden_states(batch_size=batch_size, context_length=context_length)
emission_sequences = get_emission(hidden_sequences)

In [23]:
# fit the HMM
d1 = Categorical([[0.5,0.5]])
d2 = Categorical([[0.8,0.2]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

[1] Improvement: -1890.4765625, Time: 0.05093s


DenseHMM(
  (start): Silent()
  (end): Silent()
  (distributions): ModuleList(
    (0-1): 2 x Categorical()
  )
)

In [24]:
np.exp(model.edges)

tensor([[0.4833, 0.5143],
        [0.4866, 0.5108]])

The model doesn't learn. Let's try with a different starting emission distribution for the model.

In [None]:
# fit the HMM
d1 = Categorical([[0.3,0.7]])
d2 = Categorical([[0.3,0.7]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

[1] Improvement: 406.96875, Time: 0.05005s
[2] Improvement: 19.640625, Time: 0.05073s
[3] Improvement: 15.171875, Time: 0.05011s
[4] Improvement: 11.7421875, Time: 0.04996s
[5] Improvement: 10.0859375, Time: 0.05115s
[6] Improvement: 7.2421875, Time: 0.05062s
[7] Improvement: 5.8828125, Time: 0.05063s
[8] Improvement: 4.9765625, Time: 0.05076s
[9] Improvement: 3.3125, Time: 0.05125s
[10] Improvement: 3.3828125, Time: 0.05061s
[11] Improvement: 2.421875, Time: 0.04969s
[12] Improvement: 1.3828125, Time: 0.04975s
[13] Improvement: 1.5703125, Time: 0.04993s
[14] Improvement: 1.1328125, Time: 0.05014s
[15] Improvement: 0.5234375, Time: 0.05022s
[16] Improvement: 0.453125, Time: 0.05074s
[17] Improvement: 1.015625, Time: 0.04982s
[18] Improvement: -0.1015625, Time: 0.04975s


DenseHMM(
  (start): Silent()
  (end): Silent()
  (distributions): ModuleList(
    (0-1): 2 x Categorical()
  )
)

In [44]:
# emission probabilities
for dist in model.distributions:
    print(dist.probs)

Parameter containing:
tensor([[0.3862, 0.6138]])
Parameter containing:
tensor([[0.9336, 0.0664]])


In [45]:
# transition probabilities
np.exp(model.edges)

tensor([[0.3860, 0.6120],
        [0.4835, 0.5136]])

There is more than one iteration, but the transition probabilities are still wrong.

Let's look at the training data and sanity check.

In [28]:
hidden_sequences[0]

array([[0],
       [1],
       [1],
       [1],
       [0],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [0],
       [1],
       [0],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [0],
       [1],
       [1],
       [1],
       [1],
       [0],
       [1],
       [1],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [1],
       [0],
       [1],
       [0],
       [0],
       [1],
       [1],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [1],
       [1],
       [0],
       [1],
       [0],
       [1],
       [0],
       [1],
       [1],
       [0],
       [0],
       [1],
       [1],
       [0],
    

Data is ok. Let's try another initialization

In [47]:
# fit the HMM
d1 = Categorical([[0.3,0.7]])
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: 406.96875, Time: 0.05036s
[2] Improvement: 19.640625, Time: 0.04996s
[3] Improvement: 15.171875, Time: 0.05054s
[4] Improvement: 11.7421875, Time: 0.05298s
[5] Improvement: 10.0859375, Time: 0.05017s
[6] Improvement: 7.2421875, Time: 0.05205s
[7] Improvement: 5.8828125, Time: 0.05024s
[8] Improvement: 4.9765625, Time: 0.0508s
[9] Improvement: 3.3125, Time: 0.0505s
[10] Improvement: 3.3828125, Time: 0.05158s
[11] Improvement: 2.421875, Time: 0.05171s
[12] Improvement: 1.3828125, Time: 0.05274s
[13] Improvement: 1.5703125, Time: 0.05044s
[14] Improvement: 1.1328125, Time: 0.05098s
[15] Improvement: 0.5234375, Time: 0.05115s
[16] Improvement: 0.453125, Time: 0.05155s
[17] Improvement: 1.015625, Time: 0.05075s
[18] Improvement: -0.1015625, Time: 0.04987s
emission probabilities
Parameter containing:
tensor([[0.3862, 0.6138]])
Parameter containing:
tensor([[0.9336, 0.0664]])
transition probabilities


tensor([[0.3860, 0.6120],
        [0.4835, 0.5136]])

Now it's different. Let's use more training examples.

In [49]:
batch_size = 1000
context_length = 400
hidden_sequences = generate_hidden_states(batch_size=batch_size, context_length=context_length)
emission_sequences = get_emission(hidden_sequences)

# fit the HMM
d1 = Categorical([[0.3,0.7]])
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: 1207.828125, Time: 0.09219s
[2] Improvement: 53.28125, Time: 0.09311s
[3] Improvement: 43.21875, Time: 0.09173s
[4] Improvement: 33.28125, Time: 0.08891s
[5] Improvement: 27.5, Time: 0.08947s
[6] Improvement: 21.9375, Time: 0.08871s
[7] Improvement: 16.125, Time: 0.08975s
[8] Improvement: 14.015625, Time: 0.09058s
[9] Improvement: 10.53125, Time: 0.09092s
[10] Improvement: 7.421875, Time: 0.08927s
[11] Improvement: 7.65625, Time: 0.09153s
[12] Improvement: 5.46875, Time: 0.09055s
[13] Improvement: 4.03125, Time: 0.09012s
[14] Improvement: 2.046875, Time: 0.08996s
[15] Improvement: 3.390625, Time: 0.08982s
[16] Improvement: 0.9375, Time: 0.09172s
[17] Improvement: 1.0, Time: 0.09116s
[18] Improvement: 2.5, Time: 0.08865s
[19] Improvement: -0.3125, Time: 0.08573s
emission probabilities
Parameter containing:
tensor([[0.3868, 0.6132]])
Parameter containing:
tensor([[0.9343, 0.0657]])
transition probabilities


tensor([[0.3806, 0.6173],
        [0.4862, 0.5111]])

In [50]:
batch_size = 10000
context_length = 400
hidden_sequences = generate_hidden_states(batch_size=batch_size, context_length=context_length)
emission_sequences = get_emission(hidden_sequences)

# fit the HMM
d1 = Categorical([[0.3,0.7]])
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: 11882.0, Time: 0.7049s
[2] Improvement: 544.75, Time: 0.7117s
[3] Improvement: 408.25, Time: 0.718s
[4] Improvement: 342.75, Time: 0.717s
[5] Improvement: 279.25, Time: 0.7203s
[6] Improvement: 199.75, Time: 0.7041s
[7] Improvement: 163.0, Time: 0.7048s
[8] Improvement: 139.0, Time: 0.7146s
[9] Improvement: 105.5, Time: 0.7269s
[10] Improvement: 95.25, Time: 0.7192s
[11] Improvement: 54.0, Time: 0.7321s
[12] Improvement: 50.5, Time: 0.7039s
[13] Improvement: 47.25, Time: 0.7103s
[14] Improvement: 18.25, Time: 0.7208s
[15] Improvement: 33.75, Time: 0.7167s
[16] Improvement: 9.75, Time: 0.7289s
[17] Improvement: 18.5, Time: 0.7265s
[18] Improvement: 12.25, Time: 0.7148s
[19] Improvement: 10.25, Time: 0.7203s
[20] Improvement: 2.5, Time: 0.7185s
[21] Improvement: 9.5, Time: 0.7238s
[22] Improvement: 5.25, Time: 0.699s
[23] Improvement: 3.25, Time: 0.7167s
[24] Improvement: 4.5, Time: 0.7163s
[25] Improvement: -4.0, Time: 0.7177s
emission probabilities
Parameter containing

tensor([[0.3775, 0.6200],
        [0.4885, 0.5090]])

In [51]:
batch_size = 5000
context_length = 1000
hidden_sequences = generate_hidden_states(batch_size=batch_size, context_length=context_length)
emission_sequences = get_emission(hidden_sequences)

# fit the HMM
d1 = Categorical([[0.3,0.7]])
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_sequences)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: 57394.5, Time: 0.9694s
[2] Improvement: 654.75, Time: 0.9465s
[3] Improvement: 517.5, Time: 0.9557s
[4] Improvement: 404.75, Time: 0.9587s
[5] Improvement: 349.75, Time: 0.9344s
[6] Improvement: 254.5, Time: 0.9794s
[7] Improvement: 224.25, Time: 0.9266s
[8] Improvement: 166.5, Time: 0.9453s
[9] Improvement: 119.25, Time: 0.9318s
[10] Improvement: 89.5, Time: 0.9265s
[11] Improvement: 86.75, Time: 0.9282s
[12] Improvement: 75.5, Time: 0.9342s
[13] Improvement: 40.0, Time: 0.9567s
[14] Improvement: 47.0, Time: 0.9778s
[15] Improvement: 23.25, Time: 0.9487s
[16] Improvement: -4.75, Time: 0.944s
emission probabilities
Parameter containing:
tensor([[0.3871, 0.6129]])
Parameter containing:
tensor([[0.9340, 0.0660]])
transition probabilities


tensor([[0.3846, 0.6144],
        [0.4847, 0.5143]])

It is possible that we mixed up emission probabilities and transition probabilities.

Let's create a new set of training examples.

In [94]:
from tqdm import tqdm

def generate_next_hidden_state(hidden_states: np.ndarray, transition_matrix: np.ndarray):
    # hidden_states is a column vector
    # the number of rows is B or batch_size
    # hidden_states.shape == (B, 1)
    B = hidden_states.shape[0]
    assert hidden_states.shape == (B, 1)
    
    # transition_matrix is a square matrix
    # transition_matrix.shape == (H,H)
    H = transition_matrix.shape[0]
    assert transition_matrix.shape == (H,H)
    
    # each hidden state will get their corresponding probabilities from transition matrix
    # probs.shape == (B, H)
    probs = transition_matrix[hidden_states.squeeze(), :]
    assert probs.shape == (B, H)
    
    # generate the next state based on the probs
    # next_state.shape == (B, 1)
    rng = np.random.default_rng()
    next_states = []
    for row in probs:
        next_state = rng.choice(a=np.arange(H), p=row)
        next_states.append(next_state)
    return np.array(next_states).reshape(B, 1)

100%|██████████| 999/999 [01:07<00:00, 14.76it/s]


In [95]:
def generate_hidden_states(transition_matrix:np.ndarray):
    all_hidden_states = np.random.randint(low=0,high=2,size=(batch_size,1))
    for _ in tqdm(range(context_length-1)):
        # get the last column
        prev_hidden_states = all_hidden_states[:,-1].reshape(batch_size, 1)
        assert prev_hidden_states.shape == (batch_size, 1)
        new_hidden_states = generate_next_hidden_state(hidden_states=prev_hidden_states,transition_matrix=transition_matrix)
        assert new_hidden_states.shape == (batch_size, 1)
        all_hidden_states = np.concatenate((all_hidden_states, new_hidden_states), axis=1)
    assert all_hidden_states.shape == (batch_size, context_length)
    return all_hidden_states

In [104]:
def generate_emissions(hidden_states:np.ndarray, emission_matrix: np.ndarray):
    # hidden_states.shape == (B, L)
    B = hidden_states.shape[0]
    L = hidden_states.shape[1]
    
    # emission_matrix.shape == (H,E)
    # E is the number of possible emissions
    H = emission_matrix.shape[0]
    E = emission_matrix.shape[0]
    
    # generate the emissions for each sequence
    # all_emissions.shape == (B, L)
    rng = np.random.default_rng()
    all_emissions = []
    for row in tqdm(hidden_states):
        probs = emission_matrix[row, :]
        # probs.shape == (L, E)
        assert probs.shape == (L, E)
        emissions = []
        for p in probs:
            emission_state = rng.choice(a=np.arange(E), p=p)
            emissions.append(emission_state)
        all_emissions.append(emissions)
    all_emissions = np.array(all_emissions)
    assert all_emissions.shape == (B, L)
    return all_emissions.reshape(B,L,1)

In [105]:
batch_size = 5000
context_length = 1000

transition_matrix = np.array([[0.3,0.7],[0.5,0.5]])
emission_matrix = np.array([[0.9,0.1],[0.2,0.8]])

hidden_states = generate_hidden_states(transition_matrix=transition_matrix)
emission_states = generate_emissions(hidden_states=hidden_states, emission_matrix=emission_matrix)

100%|██████████| 999/999 [01:07<00:00, 14.74it/s]
100%|██████████| 5000/5000 [01:05<00:00, 76.13it/s]


In [103]:
np.min(emission_states)

0

In [106]:
# fit the HMM
d1 = Categorical([[0.3,0.7]])
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: 90096.5, Time: 0.9656s
[2] Improvement: 4806.25, Time: 0.9537s
[3] Improvement: 3506.25, Time: 0.9854s
[4] Improvement: 2522.75, Time: 0.9729s
[5] Improvement: 1814.25, Time: 0.9663s
[6] Improvement: 1284.75, Time: 0.9675s
[7] Improvement: 836.0, Time: 0.9536s
[8] Improvement: 530.5, Time: 0.931s
[9] Improvement: 369.5, Time: 0.9518s
[10] Improvement: 236.0, Time: 0.9373s
[11] Improvement: 124.0, Time: 0.9404s
[12] Improvement: 75.5, Time: 0.9349s
[13] Improvement: 77.0, Time: 0.9397s
[14] Improvement: 18.0, Time: 0.9617s
[15] Improvement: 20.5, Time: 0.9439s
[16] Improvement: -13.5, Time: 0.9713s
emission probabilities
Parameter containing:
tensor([[0.1989, 0.8011]])
Parameter containing:
tensor([[0.8694, 0.1306]])
transition probabilities


tensor([[0.4720, 0.5270],
        [0.6801, 0.3189]])

Nice! The emission and transition matrices are learnt and are similar to the generator.

Now, let's see if we can drop the initialization.

In [107]:
# fit the HMM
d1 = Categorical(n_categories=2)
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: -12818.0, Time: 1.251s
emission probabilities
Parameter containing:
tensor([[0., 1.]])
Parameter containing:
tensor([[1., 0.]])
transition probabilities


tensor([[0.4613, 0.5377],
        [0.5560, 0.4430]])

In [108]:
# fit the HMM
d1 = Categorical()
d2 = Categorical([[0.9,0.1]])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: -12818.0, Time: 1.248s
emission probabilities
Parameter containing:
tensor([[0., 1.]])
Parameter containing:
tensor([[1., 0.]])
transition probabilities


tensor([[0.4613, 0.5377],
        [0.5560, 0.4430]])

In [109]:
# fit the HMM
# they don't allow both to be empty
d1 = Categorical()
d2 = Categorical()

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

ValueError: Parameter X must have a maximum value below 0

In [110]:
# fit the HMM
num_emissions = 2
d1 = Categorical([[1/num_emissions]*num_emissions])
d2 = Categorical([[1/num_emissions]*num_emissions])

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: -35394.25, Time: 0.8966s
emission probabilities
Parameter containing:
tensor([[0.4917, 0.5083]])
Parameter containing:
tensor([[0.4917, 0.5083]])
transition probabilities


tensor([[0.4995, 0.4995],
        [0.4995, 0.4995]])

In [116]:
# fit the HMM
num_emissions = 2
d1 = Categorical([[1/num_emissions]*num_emissions])
d2 = Categorical()

model = DenseHMM(distributions=[d1, d2], verbose=True, max_iter=3)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: nan, Time: 0.9774s
[2] Improvement: nan, Time: 0.9764s
[3] Improvement: nan, Time: 1.96s
emission probabilities
Parameter containing:
tensor([[nan, nan]])
Parameter containing:
tensor([[nan, nan]])
transition probabilities


tensor([[nan, nan],
        [nan, nan]])

In [115]:
# fit the HMM
num_emissions = 2
d1 = Categorical([[1.0/num_emissions]*num_emissions])
d2 = Categorical()

model = DenseHMM(distributions=[d1, d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: -12818.0, Time: 1.214s
emission probabilities
Parameter containing:
tensor([[1., 0.]])
Parameter containing:
tensor([[0., 1.]])
transition probabilities


tensor([[0.4430, 0.5560],
        [0.5377, 0.4613]])

Looking at the two previous code snippets, we need to cast it as double

In [119]:
# fit the HMM
num_emissions = 2
d1 = Categorical([[1.0/num_emissions]*num_emissions])
d2 = Categorical([[1.0/num_emissions]*num_emissions])

model = DenseHMM(distributions=[d1,d2], verbose=True)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: -35394.25, Time: 0.9203s
emission probabilities
Parameter containing:
tensor([[0.4917, 0.5083]])
Parameter containing:
tensor([[0.4917, 0.5083]])
transition probabilities


tensor([[0.4995, 0.4995],
        [0.4995, 0.4995]])

In [118]:
# fit the HMM
num_emissions = 2
d = [Categorical([[1.0/num_emissions]*num_emissions])] * 2

model = DenseHMM(distributions=d, verbose=True, max_iter=3)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: nan, Time: 1.029s
[2] Improvement: nan, Time: 1.029s
[3] Improvement: nan, Time: 2.058s
emission probabilities
Parameter containing:
tensor([[nan, nan]])
Parameter containing:
tensor([[nan, nan]])
transition probabilities


tensor([[nan, nan],
        [nan, nan]])

In [122]:
# fit the HMM
num_emissions = 2
d = [Categorical([[1.0/num_emissions]*num_emissions]) for _ in range(2)]

model = DenseHMM(distributions=d, verbose=True, max_iter=3)
model.fit(emission_states)

print("emission probabilities")
for dist in model.distributions:
    print(dist.probs)

print("transition probabilities")
np.exp(model.edges)

[1] Improvement: -35394.25, Time: 0.8969s
emission probabilities
Parameter containing:
tensor([[0.4917, 0.5083]])
Parameter containing:
tensor([[0.4917, 0.5083]])
transition probabilities


tensor([[0.4995, 0.4995],
        [0.4995, 0.4995]])

Noting the past two snippets, it seems like `[a]*2` will create nan presumably by using the same `Categorical()` across distributions, so we have to explicit call a new one for each distribution.