# Hidden Markov Model on group elements

## Construct the Cayley graph

In [1]:
import networkx as nx
import numpy as np

# Construct a Cayley graph of the group PSL(2, 7)

A = np.array([[1, 1], [0, 1]])
B = np.array([[0, 1], [-1, 0]])
A_inv = np.array([[1, -1], [0, 1]])  # Inverse of A in PSL(2,7)
generator_set = {
    'A' : A,
    'A_inv' : A_inv,
    'B' : B
}
d = 10  # Diameter for BFS
G = nx.DiGraph()

# Helper function to reduce matrix modulo 7
def mod7(matrix):
    return matrix % 7

# Normalize matrix to canonical projective representative
# In PSL(2, 7), we identify M with -M (i.e., M with 6M mod 7)
def normalize_projective(matrix):
    m = mod7(matrix)
    m_neg = mod7(-m)  # This is 6*m mod 7
    
    # Choose the lexicographically smaller one
    m_flat = m.flatten()
    m_neg_flat = m_neg.flatten()
    
    for i in range(4):
        if m_flat[i] < m_neg_flat[i]:
            return m
        elif m_flat[i] > m_neg_flat[i]:
            return m_neg
    return m

# Helper function to find node with given element
def find_node_with_elem(elem_normalized):
    for node, attrs in G.nodes(data=True):
        if np.array_equal(attrs['elem'], elem_normalized):
            return node
    return None

ind = 0
identity = np.eye(2, dtype=int)
G.add_node(ind, elem=normalize_projective(identity), word_len=0, path=[])

prev_count = 0
for path_length in range(d):
    # Get nodes at current path length
    nodes_at_length = [node for node, attrs in G.nodes(data=True) if attrs['word_len'] == path_length]
    
    for v in nodes_at_length:
        v_elem = G.nodes[v]['elem']
        
        for a, label in [(A, 'A'), (A_inv, 'A_inv'), (B, 'B')]:
            new_elem = normalize_projective(mod7(v_elem @ a))
            
            # Check if this element already exists as a node
            existing_node = find_node_with_elem(new_elem)
            
            if existing_node is None:
                # Add new node
                ind += 1
                G.add_node(ind, elem=new_elem, word_len=path_length + 1, path=G.nodes[v]['path'] + [label])
                G.add_edge(v, ind, generator=label)
            else:
                # Add edge to existing node
                G.add_edge(v, existing_node, generator=label)
    
    # Check if we're still finding new nodes
    current_count = G.number_of_nodes()
    if current_count == prev_count:
        print(f"No new nodes found at depth {path_length}. Stopping early.")
        break
    prev_count = current_count

print(f"\nTotal nodes: {G.number_of_nodes()}")
print(f"Total edges: {G.number_of_edges()}")
print(f"Expected: 168 nodes for PSL(2, 7)")


Total nodes: 168
Total edges: 498
Expected: 168 nodes for PSL(2, 7)


In [2]:
G.nodes[167]['path']

['A', 'B', 'A_inv', 'A_inv', 'B', 'A', 'A', 'A', 'B', 'A']

### The transmission matrix

In [3]:
# Define the state transition matrix
import itertools
from tqdm import tqdm
adjacent_nodes = {i+1 : [] for i in range(2)}
adjacent_elems = {i+1 : [] for i in range(2)}
prob_weights = [0.8/3, 0.2/6]
for node, attrs in G.nodes(data=True):
    if attrs['word_len'] <= 2 and attrs['word_len'] > 0:
        adjacent_nodes[attrs['word_len']].append(node)
        adjacent_elems[attrs['word_len']].append(attrs['elem'])


def inverse_path(path):
    inv_path = []
    for label in path:
        if label == 'A':
            inv_path.append('A_inv')
        elif label == 'A_inv':
            inv_path.append('A')
        elif label == 'B':
            inv_path.append('B')
    
    return inv_path[::-1]
            


T = np.zeros((G.number_of_nodes(), G.number_of_nodes()))

for i, j in tqdm(itertools.product(range(G.number_of_nodes()), repeat=2)):
    total_path = inverse_path(G.nodes[i]['path']) + G.nodes[j]['path']
    elem = np.eye(2, dtype=int)
    for label in total_path:
        elem = normalize_projective(generator_set[label] @ elem)

    # NOTE: overflow problem if we consider larger groups etc

    for key, elems in adjacent_elems.items():
        for elem_ in elems:
            if np.array_equal(elem, elem_):
                T[i, j] = prob_weights[key-1]


print(T)
    

0it [00:00, ?it/s]

28224it [00:02, 11060.64it/s]

[[0.         0.26666667 0.26666667 ... 0.         0.         0.        ]
 [0.26666667 0.         0.03333333 ... 0.         0.         0.        ]
 [0.26666667 0.03333333 0.         ... 0.         0.         0.        ]
 ...
 [0.         0.         0.         ... 0.         0.         0.        ]
 [0.         0.         0.         ... 0.         0.         0.        ]
 [0.         0.         0.         ... 0.         0.         0.        ]]





### State emission matrix $B$

In [4]:
# PSL(2,7) representation as permutation matrices on P^1(F_7)

# labels: 0..7 with 0 -> 'inf', 1 -> 0, 2 -> 1, ..., 7 -> 6
labels = ['inf', 0, 1, 2, 3, 4, 5, 6]

def inverse_mod(a, m):
    """Compute modular inverse of a mod m using extended Euclidean algorithm"""
    def extended_gcd(a, b):
        if a == 0:
            return b, 0, 1
        gcd, x1, y1 = extended_gcd(b % a, a)
        x = y1 - (b // a) * x1
        y = x1
        return gcd, x, y
    
    _, x, _ = extended_gcd(a % m, m)
    return (x % m + m) % m

def mobius_image(M, z):
    """Apply Möbius transformation M to point z in P^1(F_7)
    M is a 2x2 matrix [[a,b],[c,d]]
    Transformation: z -> (az + b)/(cz + d)
    """
    a, b, c, d = M[0,0] % 7, M[0,1] % 7, M[1,0] % 7, M[1,1] % 7
    
    if z == 'inf':
        if c % 7 == 0: 
            return 'inf'
        return (a * inverse_mod(c, 7)) % 7
    
    denom = (c * z + d) % 7
    if denom == 0: 
        return 'inf'
    
    return ((a * z + b) * inverse_mod(denom, 7)) % 7

def permutation_to_matrix(sigma, labels):
    """Convert a permutation sigma to an 8x8 permutation matrix
    sigma[i] is the image of labels[i]
    """
    n = len(labels)
    P = np.zeros((n, n), dtype=int)
    
    for i, image in enumerate(sigma):
        # Find which index the image corresponds to
        j = labels.index(image)
        P[j, i] = 1  # Row j, column i
    
    return P

def get_permutation_matrix(M):
    """Get the 8x8 permutation matrix for element M in PSL(2,7)"""
    # Compute the permutation
    sigma = [mobius_image(M, z) for z in labels]
    # Convert to matrix
    return permutation_to_matrix(sigma, labels)

# Test with identity matrix
I = np.eye(2, dtype=int)
print("Identity matrix representation:")
print(get_permutation_matrix(I))
print("\nShould be the 8x8 identity matrix")

# Test with generator A
print("\nGenerator A representation:")
print(get_permutation_matrix(A))

# Test with generator B  
print("\nGenerator B representation:")
print(get_permutation_matrix(B))

Identity matrix representation:
[[1 0 0 0 0 0 0 0]
 [0 1 0 0 0 0 0 0]
 [0 0 1 0 0 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 1 0 0 0]
 [0 0 0 0 0 1 0 0]
 [0 0 0 0 0 0 1 0]
 [0 0 0 0 0 0 0 1]]

Should be the 8x8 identity matrix

Generator A representation:
[[1 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1]
 [0 1 0 0 0 0 0 0]
 [0 0 1 0 0 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 1 0 0 0]
 [0 0 0 0 0 1 0 0]
 [0 0 0 0 0 0 1 0]]

Generator B representation:
[[0 1 0 0 0 0 0 0]
 [1 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1]
 [0 0 0 0 1 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 1 0]
 [0 0 0 0 0 1 0 0]
 [0 0 1 0 0 0 0 0]]


In [5]:
# Get 8x8 representation matrices for all elements in PSL(2,7)

# Store all 168 permutation matrices
representation_matrices = {}

for node, attrs in G.nodes(data=True):
    M = attrs['elem']
    perm_matrix = get_permutation_matrix(M)
    representation_matrices[node] = perm_matrix

print(f"Computed {len(representation_matrices)} permutation matrices")

# Example: get the matrix for a specific node
print("\nPermutation matrix for node 0 (identity):")
print(representation_matrices[0])

# Example: get the matrix for node 1
print("\nPermutation matrix for node 1:")
print(representation_matrices[1])
print(f"Element at node 1: {G.nodes[1]['elem']}")
print(f"Path to node 1: {G.nodes[1]['path']}")

Computed 168 permutation matrices

Permutation matrix for node 0 (identity):
[[1 0 0 0 0 0 0 0]
 [0 1 0 0 0 0 0 0]
 [0 0 1 0 0 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 1 0 0 0]
 [0 0 0 0 0 1 0 0]
 [0 0 0 0 0 0 1 0]
 [0 0 0 0 0 0 0 1]]

Permutation matrix for node 1:
[[1 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1]
 [0 1 0 0 0 0 0 0]
 [0 0 1 0 0 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 1 0 0 0]
 [0 0 0 0 0 1 0 0]
 [0 0 0 0 0 0 1 0]]
Element at node 1: [[1 1]
 [0 1]]
Path to node 1: ['A']


In [6]:
# Consistency test: verify that path multiplication gives the same result as direct computation

# Pick a non-trivial node to test
test_node = 158

# Method 1: Get permutation matrix directly from the stored element
elem = G.nodes[test_node]['elem']
perm_matrix_direct = get_permutation_matrix(elem)

print(f"Testing node {test_node}")
print(f"Path: {G.nodes[test_node]['path']}")
print(f"Element:\n{elem}")
print(f"\nMethod 1 (direct from element):")
print(perm_matrix_direct)

# Method 2: Compute by multiplying generator permutation matrices along the path
path = G.nodes[test_node]['path']
perm_matrix_path = np.eye(8, dtype=int)

for gen_name in path:
    gen_matrix = generator_set[gen_name]
    gen_perm = get_permutation_matrix(gen_matrix)
    perm_matrix_path = perm_matrix_path @ gen_perm

print(f"\nMethod 2 (product along path):")
print(perm_matrix_path)

# Check if they're equal
if np.array_equal(perm_matrix_direct, perm_matrix_path):
    print("\n✓ SUCCESS: Both methods give the same permutation matrix!")
else:
    print("\n✗ FAILURE: Methods give different results!")
    print(f"Difference:\n{perm_matrix_direct - perm_matrix_path}")

# Also verify that the group element from path matches stored element
elem_from_path = np.eye(2, dtype=int)
for gen_name in path:
    elem_from_path = mod7(elem_from_path @ generator_set[gen_name])

elem_from_path_normalized = normalize_projective(elem_from_path)
print(f"\nElement from path:\n{elem_from_path_normalized}")
print(f"Stored element:\n{elem}")
print(f"Elements match: {np.array_equal(elem_from_path_normalized, elem)}")

Testing node 158
Path: ['A', 'B', 'A_inv', 'A_inv', 'A_inv', 'B', 'A', 'A', 'B']
Element:
[[2 3]
 [0 4]]

Method 1 (direct from element):
[[1 0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 0 1 0 0]
 [0 0 0 0 0 0 0 1]
 [0 0 1 0 0 0 0 0]
 [0 0 0 0 1 0 0 0]
 [0 0 0 0 0 0 1 0]
 [0 1 0 0 0 0 0 0]]

Method 2 (product along path):
[[1 0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0 0]
 [0 0 0 0 0 1 0 0]
 [0 0 0 0 0 0 0 1]
 [0 0 1 0 0 0 0 0]
 [0 0 0 0 1 0 0 0]
 [0 0 0 0 0 0 1 0]
 [0 1 0 0 0 0 0 0]]

✓ SUCCESS: Both methods give the same permutation matrix!

Element from path:
[[2 3]
 [0 4]]
Stored element:
[[2 3]
 [0 4]]
Elements match: True


In [7]:
# Test multiple random nodes for consistency
import random

num_tests = 20
test_nodes = random.sample(range(G.number_of_nodes()), num_tests)

all_passed = True
for node in test_nodes:
    # Method 1: Direct
    elem = G.nodes[node]['elem']
    perm_direct = get_permutation_matrix(elem)
    
    # Method 2: Path multiplication
    path = G.nodes[node]['path']
    perm_path = np.eye(8, dtype=int)
    for gen_name in path:
        gen_perm = get_permutation_matrix(generator_set[gen_name])
        perm_path = perm_path @ gen_perm
    
    # Check consistency
    if not np.array_equal(perm_direct, perm_path):
        print(f"✗ FAILED at node {node}")
        all_passed = False
    
if all_passed:
    print(f"✓ All {num_tests} random nodes passed the consistency test!")
    print("\nThis confirms that:")
    print("1. The Möbius transformation correctly maps PSL(2,7) elements to permutations")
    print("2. The representation is a group homomorphism (respects multiplication)")
    print("3. The stored elements match their paths in the Cayley graph")

✓ All 20 random nodes passed the consistency test!

This confirms that:
1. The Möbius transformation correctly maps PSL(2,7) elements to permutations
2. The representation is a group homomorphism (respects multiplication)
3. The stored elements match their paths in the Cayley graph


### Fixing an initial distribution

In [8]:
# Use two copies of the same system
d_vocab = 64
vec = np.zeros((d_vocab,))
set_of_indices = np.random.choice(d_vocab, size=3, replace=False)  # No duplicates
vec[0] = 0.8
vec[1] = 0.18
vec[2] = 0.02

assert vec.sum() == 1
# Now assign the B matrix using group multiplications

S = np.zeros((G.number_of_nodes(), d_vocab))

for i in range(G.number_of_nodes()):
    elem = G.nodes[i]['elem']
    perm_matrix = get_permutation_matrix(elem)
    S[i, :] = np.kron(perm_matrix, perm_matrix) @ vec

print(S.sum())

168.0


In [9]:
def verify_stochastic(T):
    row_stochastic = np.allclose(T.sum(axis=1), np.ones(T.shape[0]))
    col_stochastic = np.allclose(T.sum(axis=0), np.ones(T.shape[1]))
    return f'Row stochastic: {row_stochastic}, Column stochastic: {col_stochastic}, Bistochastic: {row_stochastic and col_stochastic}'

verify_stochastic(T)
verify_stochastic(S)

'Row stochastic: True, Column stochastic: False, Bistochastic: False'

In [10]:
S.sum(axis=1).shape

import einops
em = einops.einsum(S, T, 'Z_t X_t, Z_t Z_nextt -> X_t Z_t Z_nextt')


In [11]:
print(em.shape)

(64, 168, 168)


## HMM emission matrices

In [12]:
from hmm import HMM

class PSL7HMM(HMM):
    def __init__(self, emission_matrices: np.ndarray):
        super().__init__()
        self._emission_matrices = emission_matrices  # Store in private variable
    
    @property
    def emission_matrices(self) -> np.ndarray:
        return self._emission_matrices  # Implement abstract property

proc = PSL7HMM(em)
print(f"Created PSL7HMM with {proc.num_hidden_states} hidden states and vocabulary size {proc.d_vocab}")

Created PSL7HMM with 168 hidden states and vocabulary size 64


In [16]:
np.save("data/psl_instance_emission_matrix.npy", em)

In [14]:
import time

t0 = time.time()
for i in range(10):
    stationary_distribution = proc.get_stationary_distribution()

t1 = time.time()
print(f"Time taken per run: {(t1 - t0) / 10} seconds")

for i in range(10):
    proc.generate_sequence(length=100)

t2 = time.time()
print(f"Time taken per run: {(t2 - t1) / 10} seconds")



Time taken per run: 0.34107863903045654 seconds
Time taken per run: 0.3943190097808838 seconds


In [17]:
t0 = time.time()
for i in range(10):
    proc.generate_sequence(length=100, init_state=random.randint(0, proc.num_hidden_states - 1))

t1 = time.time()
print(f"Time taken per run: {(t1 - t0) / 10} seconds")


Time taken per run: 0.017665815353393555 seconds


In [23]:
proc.entropy_rate_theory_estimate()

2.303063166534798

## Testing data generation

In [27]:
proc.generate_data(batch_size=64, length=512, use_tqdm=True)

Generating data: 100%|██████████| 64/64 [00:11<00:00,  5.42it/s]


tensor([[27, 27, 51,  ..., 36, 54,  9],
        [36, 36, 36,  ..., 58, 18, 18],
        [ 9,  9,  9,  ..., 63, 63, 63],
        ...,
        [ 9, 36, 18,  ..., 18, 18, 54],
        [39, 63, 63,  ..., 50, 18, 54],
        [34, 32, 45,  ..., 36, 36, 36]])