In [1]:
import os
os.chdir("..")

import torch
import numpy as np
from tqdm import tqdm

from flipgraph import int2bin, bin2int

In [2]:
device = torch.device("cpu")

## Функции

In [3]:
def naive_decomposition(M, batch_size=1, dtype=torch.int16):
    """
    Generate a naive rank-n decomposition of boolean matrix M.
    
    Args:
        M (torch.Tensor): A boolean matrix of shape (n, m).
        batch_size (int): Number of identical decompositions to generate.
        
    Returns:
        torch.Tensor: A tensor of shape (batch_size, 2, n) where:
                     UV[b, 0, i] is the binary-encoded u_i vector
                     UV[b, 1, i] is the binary-encoded v_i vector
    """
    n, m = M.shape
    device = M.device
    
    # u vectors: one-hot encodings (as powers of 2)
    u = (1 << torch.arange(n, device=device, dtype=dtype))
    
    # v vectors: binary encoding of rows of M
    powers = (1 << torch.arange(m, device=device, dtype=dtype))
    v = (M * powers).sum(dim=1).to(dtype)
    
    # Stack and repeat for batch_size
    UV = torch.stack([u, v]).unsqueeze(0).repeat(batch_size, 1, 1)
    
    return UV

In [4]:
def flip(UV):
    """
    In-place flip operation on batch of matrix decompositions.
    For each decomposition, selects a random pair of dyads and performs a flip.
    
    Args:
        UV (torch.Tensor): Tensor of shape (batch_size, 2, r) containing decompositions
    """
    batch_size, _, r = UV.shape
    device = UV.device
    
    # Select random pairs and ensure i != j
    i, j = torch.randint(0, r, (2, batch_size), device=device)
    j = torch.where(i == j, (j + 1) % r, j)
    idx = torch.arange(batch_size, device=device)
    
    # Update values in-place
    UV[idx, 0, i] ^= UV[idx, 0, j]
    UV[idx, 1, j] ^= UV[idx, 1, i]

In [5]:
def int2bin(tensor, bit_width):
    """Convert integer values to their binary representation."""
    return (tensor[..., None] >> torch.arange(bit_width, device=tensor.device, dtype=tensor.dtype)) & 1

def reconstruct(UV, M_shape):
    """
    Reconstruct the original matrix from a decomposition using efficient tensor operations.
    
    Args:
        UV (torch.Tensor): Tensor of shape (batch_size, 2, r) containing decompositions
        M_shape (tuple): Shape of the original matrix (n, m)
        logic (bool): If True, use boolean logic (OR), else use XOR (mod-2)
    
    Returns:
        torch.Tensor: Batch of reconstructed matrices of shape (batch_size, n, m)
    """
    n, m = M_shape
    device = UV.device
    
    # Convert integers to binary
    u_bin = int2bin(UV[:, 0, :], n)  # Shape: (batch_size, r, n)
    v_bin = int2bin(UV[:, 1, :], m)  # Shape: (batch_size, r, m)
    
    return torch.einsum('bri,brj->bij', u_bin, v_bin) % 2

def check_uv(UV, M):
    """
    Check if the decomposition correctly reconstructs the original matrix.
    
    Args:
        UV (torch.Tensor): Tensor of shape (batch_size, 2, r) containing decompositions
        M (torch.Tensor): Original matrix of shape (n, m)
    
    Returns:
        torch.Tensor: Boolean tensor of shape (batch_size,) indicating correct reconstructions
    """
    M_recon = reconstruct(UV, M.shape)
    return (M_recon == M.unsqueeze(0)).all(dim=(1, 2))

In [15]:
def reducible(UV):
    """
    Check if the decomposition UV is reducible.
    Reducible means there exist two dyads (u_i, v_i) and (u_j, v_j) such that 
    either u_i = u_j or v_i = v_j.
    
    Args:
        UV (torch.Tensor): Tensor of shape (batch_size, 2, r) containing decompositions
    
    Returns:
        torch.Tensor: Boolean tensor of shape (batch_size,) indicating which decompositions are reducible
    """
    batch_size, _, r = UV.shape
    mask = torch.zeros(batch_size, dtype=torch.bool, device=UV.device)
    
    # Check for identical vectors in either dimension
    for dim in range(2):
        sorted_vec, _ = torch.sort(UV[:, dim, :], dim=1)
        mask |= (sorted_vec[:, 1:] == sorted_vec[:, :-1]).any(dim=1)
    
    return mask

In [19]:
def reduce(UV):
    """
    Reduce a batch of decompositions UV by finding and merging redundant dyads.
    If there are dyads (u_i, v_i) and (u_j, v_j) such that u_i = u_j, then
    remove dyad j and update v_i = v_i ^ v_j. Similarly if v_i = v_j.
    
    Args:
        UV (torch.Tensor): Tensor of shape (batch_size, 2, r) containing decompositions
    
    Returns:
        torch.Tensor: Reduced decompositions of shape (batch_size, 2, r-1)
    """
    batch_size, _, r = UV.shape
    device = UV.device
    ar_batch = torch.arange(batch_size, device=device)
    
    # Find duplicate pair candidates
    cand_i = torch.empty(batch_size, dtype=torch.long, device=device)
    cand_j = torch.empty(batch_size, dtype=torch.long, device=device)
    cand_dim = torch.empty(batch_size, dtype=torch.long, device=device)
    found = torch.zeros(batch_size, dtype=torch.bool, device=device)
    
    # Check for identical vectors in one dimension
    for dim in range(2):
        dup = torch.triu(UV[:, dim, :][:, :, None] == UV[:, dim, :][:, None, :], diagonal=1)
        dup_flat = dup.view(batch_size, -1)
        has_dup = dup_flat.any(dim=1)
        cand = (~found) & has_dup
        if cand.any():
            idx = cand.nonzero(as_tuple=True)[0]
            pos = dup_flat[idx].float().argmax(dim=1)
            cand_i[idx] = pos // r
            cand_j[idx] = pos % r
            cand_dim[idx] = dim
            found[idx] = True
    
    # Determine the other dimension
    other_dim = 1 - cand_dim
    
    # Remove duplicate dyad j from each state
    idx_all = torch.arange(r, device=device).unsqueeze(0).expand(batch_size, r)
    mask = idx_all != cand_j.unsqueeze(1)
    new_idx = idx_all[mask].view(batch_size, r-1)
    UV_red = torch.gather(UV, 2, new_idx.unsqueeze(1).expand(batch_size, 2, r-1))
    
    # Merging: update the other dimension at merged index
    merge_idx = torch.where(cand_i < cand_j, cand_i, cand_i - 1)
    UV_red[ar_batch, other_dim, merge_idx] = UV[ar_batch, other_dim, cand_i] ^ UV[ar_batch, other_dim, cand_j]
    
    return UV_red

## Process

In [92]:
n = 6
M = torch.randint(0, 1+1, (n,n), device=device)

N = 10_000
UV = naive_decomposition(M, N)

L = 1000

for _ in range(3):
    if UV.size(0) != N:
        idx = torch.randint(0, UV.size(0), (N,), device=device)
        UV = UV[idx].clone()

    q = 0
    UV_rf = torch.zeros((N, 2, UV.size(2)-1), device=UV.device, dtype=UV.dtype)
    pbar = tqdm(range(L))
    for j in pbar:
        flip(UV)
        
        if j > 500:
            reducible_mask = reducible(UV)
            if reducible_mask.sum() > 0:
                UV_reduced = reduce(UV[reducible_mask])
                s = min(UV_reduced.size(0), N-q)
                if s > 0:
                    UV_rf[q:q+s] = UV_reduced[:s]
                    q += s
                pbar.set_description(f"r = {UV.size(2)}, q = {q:_}")
                if q == N:
                    break

    if q > 0:
        UV = UV_rf[:q].clone()
#         torch.save(UV.cpu(), f"data/uv-n{n1}-r{UV.size(2)}-tmp.pt")
        print(f"# reducible : {UV.size(0):_}")
        if not check_uv(UV, M).min().item():
            raise ValueError("Wrong decomposition detected.")
    else:
        print(f"No reducible states (r={UV.size(2)}).")
        break

r = 6, q = 10_000:  50%|█████     | 502/1000 [00:00<00:00, 1843.11it/s]


# reducible : 10_000


r = 5, q = 10_000:  50%|█████     | 504/1000 [00:00<00:00, 1926.05it/s]


# reducible : 10_000


100%|██████████| 1000/1000 [00:01<00:00, 969.96it/s]

No reducible states (r=4).





In [106]:
M

tensor([[0, 0, 0, 1, 1, 1],
        [1, 0, 0, 1, 1, 0],
        [1, 0, 0, 0, 0, 1],
        [0, 0, 0, 0, 1, 1],
        [0, 0, 0, 1, 1, 1],
        [0, 1, 1, 1, 0, 1]])

In [107]:
print("U =\n", int2bin(UV[:1, 0], n)[0].cpu().tolist())
print("V =\n", int2bin(UV[:1, 1], n)[0].cpu().tolist())

U =
 [[1, 1, 0, 0, 1, 0], [1, 1, 0, 0, 1, 1], [1, 1, 0, 1, 1, 0], [0, 1, 1, 0, 0, 1]]
V =
 [[1, 1, 1, 0, 0, 0], [1, 1, 1, 1, 0, 0], [0, 0, 0, 0, 1, 1], [1, 0, 0, 0, 0, 1]]
