In [1]:
import scipy as sp
import numpy as np

---

In [244]:
import numpy as np
from typing import List, Set, Tuple

def construct_constraint_matrices(
    constraints: List[Set[Tuple[int, int]]],
    values: List[float],
    m: int,  # Number of input rows
    n: int   # Number of input columns
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Construct Q, G, and W matrices for multiregion constraints
    
    Parameters:
    -----------
    constraints : List[Set[Tuple[int, int]]]
        List of index pair sets for each constraint
    values : List[float]
        Corresponding values for each constraint set
    m : int
        Number of rows in the input matrix X
    n : int
        Number of columns in the input matrix X
    
    Returns:
    --------
    Tuple[np.ndarray, np.ndarray, np.ndarray]
        Q (rows aggregation matrix, M x m), 
        G (columns aggregation matrix, n x N), 
        W (constraint values matrix, M x N)
    """
    # Find unique constraint indices
    first_components = [(set(idx[0] for idx in constraint_set)) for constraint_set in constraints]
    second_components = [(set(idx[1]  for idx in constraint_set)) for constraint_set in constraints]
    
    # Determine M and N by finding groups of first and second components
    def find_groups(components):
        groups = []
        used = set()
        for comp_set in components:
            # Check if this set overlaps with existing groups
            matching_group = None
            for group_idx, group in enumerate(groups):
                if not group.isdisjoint(comp_set):
                    matching_group = group_idx
                    break
            
            # If no matching group, create a new one
            if matching_group is None:
                groups.append(comp_set)
            else:
                # Merge with existing group
                groups[matching_group].update(comp_set)
        
        return groups
    
    row_groups = find_groups(first_components)
    col_groups = find_groups(second_components)

    # Dimensions
    M = len(row_groups)
    N = len(col_groups)
    # print(M, N)

    # Initialize Q, G, and W matrices
    G = np.zeros((M, m), dtype=float)
    Q = np.zeros((n, N), dtype=float)
    #W = np.full((M,N), dtype = float, fill_value=np.nan)#
    W = np.zeros((M, N), dtype=float)
    
    # Create mapping for row and column groups
    row_group_map = {}
    for group_idx, group in enumerate(row_groups):
        for row in group:
            row_group_map[row] = group_idx
    
    col_group_map = {}
    for group_idx, group in enumerate(col_groups):
        for col in group:
            col_group_map[col] = group_idx
    
    # Fill Q matrix (row aggregation)
    for row, group_idx in row_group_map.items():
        G[group_idx, row] = 1.0
    
    # Fill G matrix (column aggregation)
    for col, group_idx in col_group_map.items():
        Q[col, group_idx] = 1.0
    
    # Fill W matrix with constraint values
    for constraint_set, value in zip(constraints, values):
        for i, j in constraint_set:
            row_idx = row_group_map[i]
            col_idx = col_group_map[j]
            W[row_idx, col_idx] = value
    
    return G, Q, W

In [245]:
constraints = [
    {(0, 4), (1, 5), (2, 3)},  # First constraint set
    {(3, 4), (4, 5), (5, 3)},  # Second constraint set
]
    
# Corresponding values for constraints
values = [10.0, 20.0]

# Specify input matrix dimensions
m, n = 6, 6  # Dimensions of input matrix X

# Construct matrices
G, Q, W = construct_constraint_matrices(constraints, values, m, n)

print("\nG matrix (row aggregation):")
print(G)
print("Q matrix (column aggregation):")
print(Q)
print("\nW matrix (constraint values):")
print(W)
    


G matrix (row aggregation):
[[1. 1. 1. 0. 0. 0.]
 [0. 0. 0. 1. 1. 1.]]
Q matrix (column aggregation):
[[0.]
 [0.]
 [0.]
 [1.]
 [1.]
 [1.]]

W matrix (constraint values):
[[10.]
 [20.]]


In [246]:
X1= np.array([[0,5,0,0,0,0,0],
    [0,0,5,0,0,0,0],
    [7,0,0,0,0,0,0],
    [0,0,0,13,0,0,0],
    [0,0,0,0,0,0,0],
    [0,0,0,0,0,0,0]
    ])
m,n = 6,7

constraints = [
    {(0, 1), (1, 2)},  # First constraint set
    {(2, 0), (3, 3)},  # Second constraint set
]
    
# Corresponding values for constraints
values = [10.0, 20.0]

G, Q, W = construct_constraint_matrices(constraints, values, m, n)

print("Q matrix (row aggregation):")
print(Q)
print("\nG matrix (column aggregation):")
print(G)
print("\nW matrix (constraint values):")
print(W)

assert np.all(G @ X1 @ Q == W)

Q matrix (row aggregation):
[[0. 1.]
 [1. 0.]
 [1. 0.]
 [0. 1.]
 [0. 0.]
 [0. 0.]
 [0. 0.]]

G matrix (column aggregation):
[[1. 1. 0. 0. 0. 0.]
 [0. 0. 1. 1. 0. 0.]]

W matrix (constraint values):
[[10.  0.]
 [ 0. 20.]]


In [330]:
import numpy as np
import scipy.sparse as sp
import warnings

def invd_sparse(x):
    """
    Constructs a diagonal sparse matrix with 1/x, replacing entries where x == 0 with 1.
    
    Parameters:
        x (numpy array or scipy sparse matrix): Input 1D array.
        
    Returns:
        scipy sparse matrix: A diagonal sparse matrix.
    """
    with np.errstate(divide='ignore', invalid='ignore'):
        invd_values = np.where(x != 0, 1.0 / x, 1.0)
    return sp.diags(invd_values.flatten())

def MRGRAS_UpdateMatrix(P, N, T, r, s):
    """
    Updates the matrix using the GRAS balancing approach with sparse matrices.
    
    Parameters:
        P (scipy sparse matrix): Positive part of the input matrix.
        N (scipy sparse matrix): Negative part of the input matrix.
        r (numpy array): Row adjustment factors.
        s (numpy array): Column adjustment factors.
        
    Returns:
        scipy sparse matrix: The updated balanced matrix.
    """
    with np.errstate(divide='ignore', invalid='ignore'):
        invd_T = np.where(T != 0, 1.0 / T, 1.0)
    return sp.diags(r.flatten()) @ (P * T) @ sp.diags(s.flatten()) - invd_sparse(r) @ (N * invd_T) @ invd_sparse(s)

def MRGRAS_Balancing(X, u, v, W, G, Q, EPSILON=1e-10, epsilon_constr = 1e-10 ,max_iter=20):
    # Ensure inputs are consistent
    assert np.allclose(u.sum(), v.sum()), "Row and column sums must be equal."

    # Define matrix size
    m, n = X.shape
    M, N = W.shape
    # Split into positive and negative components (in sparse format)
    P0 = np.maximum(X, 0)
    N0 = np.maximum(-X, 0)
    assert np.allclose(X, (P0 - N0)), "Matrix decomposition into P and N is incorrect."

    # Initialize vectors
    r = np.ones((m, 1))
    t = np.ones((M, N))

    # expand t
    T_mask = (Q @ t @ G).T

    # print(T_mask)

    zero_rows = np.all(T_mask == 0, axis=1)
    zero_columns = np.all(T_mask == 0, axis=0)
    # Replace elements in zero rows with 1
    T_mask[zero_rows, :] = 1
    # Replace elements in zero columns with 1
    T_mask[:, zero_columns] = 1
    pr = (T_mask * P0).T @ r        

    with np.errstate(divide='ignore', invalid='ignore'):
        invd_T = np.where(T_mask != 0, 1.0 / T_mask, 1.0)
    nr = ( invd_T * N0).T @ invd_sparse(r) @ np.ones((m, 1))

    # Initial value of s1
    # Ensure pr and nr are compatible for element-wise multiplication
    pr_dense = pr.toarray() if sp.issparse(pr) else pr
    nr_dense = nr.toarray() if sp.issparse(nr) else nr
    s1 = invd_sparse(2 * pr_dense) @ (v + np.sqrt(v ** 2 + 4 * pr_dense * nr_dense))

    # Handle possible NaNs
    s1 = np.nan_to_num(s1, nan=1e-10, posinf=1e-10, neginf=1e-10)

    ss = -invd_sparse(v) @ nr
    s1[pr_dense.flatten() == 0] = ss[pr_dense.flatten() == 0]

    # Iteration loop
    iter = 1
    dif = float('inf')

    while (dif > EPSILON) and (iter <= max_iter):
        # Update r
        ps = (P0 * T_mask) @ s1
        ns = (N0 * invd_T) @ invd_sparse(s1) @ np.ones((n, 1))

        ps_dense = ps.toarray() if sp.issparse(ps) else ps
        ns_dense = ns.toarray() if sp.issparse(ns) else ns
        r = invd_sparse(2 * ps_dense) @ (u + np.sqrt(u ** 2 + 4 * ps_dense * ns_dense))
        
        # Handle possible NaNs
        r = np.nan_to_num(r, nan=1e-10, posinf=1e-10, neginf=1e-10)

        rr = -invd_sparse(u) @ ns
        r[ps_dense.flatten() == 0] = rr[ps_dense.flatten() == 0]

        # update t
        pt = G @ np.diag(r.flatten()) @ P0 @ np.diag(s1.flatten()) @ Q 
        nt = G @ invd_sparse(r) @ N0 @ invd_sparse(s1) @ Q 
        
        with np.errstate(divide='ignore', invalid='ignore'):
            pt_invd = np.where(pt != 0, 1.0 / pt, 1.0)
        t = (pt_invd / 2.0)*(W + np.sqrt(W**2 + 4 * pt * nt))

        # Handle possible NaNs
        t = np.nan_to_num(t, nan=1e-10, posinf=1e-10, neginf=1e-10)
        with np.errstate(divide='ignore', invalid='ignore'):
            W_invd = np.where(W != 0, 1.0 / W, 1.0)
        tr = - nt * W_invd

        t[pt == 0] = tr[ pt == 0]


        # expand t
        T_mask = (Q @ t @ G).T
        zero_rows = np.all(T_mask == 0, axis=1)
        zero_columns = np.all(T_mask == 0, axis=0)
        # Replace elements in zero rows with 1
        T_mask[zero_rows, :] = 1
        # Replace elements in zero columns with 1
        T_mask[:, zero_columns] = 1

        with np.errstate(divide='ignore', invalid='ignore'):
            invd_T = np.where(T_mask != 0, 1.0 / T_mask, 1.0)
        # Update s2
        pr = (T_mask * P0).T @ r 
        nr = ( invd_T * N0).T @ invd_sparse(r) @ np.ones((m, 1)) # verificare se rimuovere questo np.ones
        
        pr_dense = pr.toarray() if sp.issparse(pr) else pr
        nr_dense = nr.toarray() if sp.issparse(nr) else nr
        s2 = invd_sparse(2 * pr_dense) @ (v + np.sqrt(v ** 2 + 4 * pr_dense * nr_dense))

        # Handle possible NaNs
        s2 = np.nan_to_num(s2, nan=1e-10, posinf=1e-10, neginf=1e-10)
        
        ss = -invd_sparse(v) @ nr
        s2[pr_dense.flatten() == 0] = ss[pr_dense.flatten() == 0]

        # Convergence check
        dif = np.max(np.abs(s2 - s1))
        s1 = s2  # Update s1 for next iteration
        iter +=1 
    

    # Post iteration checks
    if (iter == max_iter) and (dif > EPSILON ):
        warnings.warn("GRAS procedure did not converge.")
        return None
    else:
        s = s2
        ps = (P0 * T_mask) @ s
        ns = (N0 * invd_T) @ invd_sparse(s) @ np.ones((n, 1))
        
        ps_dense = ps.toarray() if sp.issparse(ps) else ps
        ns_dense = ns.toarray() if sp.issparse(ns) else ns
        r = invd_sparse(2 * ps_dense) @ (u + np.sqrt(u ** 2 + 4 * ps_dense * ns_dense))

        # Handle possible NaNs
        r = np.nan_to_num(r, nan=1e-10, posinf=1e-10, neginf=1e-10)
        
        rr = -invd_sparse(u) @ ns
        r[ps_dense.flatten() == 0] = rr[ps_dense.flatten() == 0]

        X_MRGRAS = MRGRAS_UpdateMatrix(P0, N0, T_mask, r, s)
        row_sums = np.array(X_MRGRAS.sum(axis=1)).flatten()
        column_sums = np.array(X_MRGRAS.sum(axis=0)).flatten()
        if not (np.allclose(row_sums, u.flatten(), atol=epsilon_constr) and np.allclose(column_sums, v.flatten(), atol=epsilon_constr) and np.allclose(G @ X_MRGRAS @ Q, W)):
            warnings.warn("The final balanced matrix does not match the target row/column sums within the specified tolerance.")

        # Final updated matrix
        return X_MRGRAS, iter, dif


In [331]:

# Test Case 1: Small Basic Matrix / single cell constraint
print("Test Case 1: Small Basic Matrix")
X1 = np.array([[5, -3, 0,1], [2, 0, -2,1], [-1, 3, 4,1]])
u1 = np.array([5, 3, 1]).reshape(-1, 1)
v1 = np.array([3, 4, 1,1]).reshape(-1, 1)

constraints = [
    {(0, 0)}  # First constraint set
    ]
    
# Corresponding values for constraints
values = [6]

# Specify input matrix dimensions
m, n = X1.shape  # Dimensions of input matrix X

# Construct matrices
G, Q, W = construct_constraint_matrices(constraints, values, m, n)
result1, iterations1, diff1 = MRGRAS_Balancing(X1, u1,v1, W, G, Q, EPSILON=1e-14, epsilon_constr=1e-14, max_iter=200)

assert result1 is not None, "Test Case 1 failed: Procedure did not converge."
assert np.allclose(result1.sum(axis = 1).flatten(), u1.flatten(), atol=1e-10), "Test Case 1 failed: Row sum target not reached"
assert np.allclose(result1.sum(axis = 0).flatten(), v1.flatten(), atol=1e-10), "Test Case 1 failed: Columns sum target not reached"
assert np.allclose(G @ result1 @ Q, W, atol=1e-10), "Test Case 1 failed: Constraint sum target not reached"

# Check if the sign of each element is preserved
assert np.all((X1 > 0) == (result1 > 0)), "Test Case 1 failed: Signs of positive cells are not preserved."
assert np.all((X1 < 0) == (result1 < 0)), "Test Case 1 failed: Signs of negative cells are not preserved."
print(f"Test Case 1 passed: Converged in {iterations1} iterations with final difference {diff1}")

Test Case 1: Small Basic Matrix
Test Case 1 passed: Converged in 201 iterations with final difference 9.325873406851315e-15


In [332]:
result1

array([[ 6.        , -1.14177507,  0.        ,  0.14177507],
       [ 2.83706041,  0.        , -0.60280467,  0.76574426],
       [-5.83706041,  5.14177507,  1.60280467,  0.09248067]])

In [338]:
# Test Case 2: Diagonal Matrix
print("\nTest Case 2: Diagonal Matrix")
X2 = np.array([[5, 0, 0], [0, -4, 0], [0, 0, 2]])
u2 = np.array([5, -4, 3]).reshape(-1, 1)
v2 = np.array([5, -4, 3]).reshape(-1, 1)

constraints = [
    {(0, 0)},  # First constraint set
    {(1, 1)}  # Second constraint set
    ]
    
# Corresponding values for constraints
values = [5.0,-4.0]

# Specify input matrix dimensions
m, n = X2.shape  # Dimensions of input matrix X

G, Q, W = construct_constraint_matrices(constraints, values, m, n)
result2, iterations2, diff2 = MRGRAS_Balancing(X2, u2, v2, W, G, Q, max_iter= 200)
assert result2 is not None, "Test Case 2 failed: Procedure did not converge."
# Check if the sign of each element is preserved
assert np.all((X2 > 0) == (result2 > 0)), "Test Case 2 failed: Signs of positive cells are not preserved."
assert np.all((X2 < 0) == (result2 < 0)), "Test Case 2 failed: Signs of negative cells are not preserved."
print(f"Test Case 2 passed: Converged in {iterations2} iterations with final difference {diff2}")


Test Case 2: Diagonal Matrix
Test Case 2 passed: Converged in 2 iterations with final difference 0.0


In [339]:
result2

array([[ 5.,  0.,  0.],
       [ 0., -4.,  0.],
       [ 0.,  0.,  3.]])

In [None]:
# Test Case 3: All Positive Matrix
print("\nTest Case 3: All Positive Matrix")
X3 = np.array([[3, 2, 0], [4, 5, 0], [0, 0, 9]])
u3 = np.array([9, 16, 11]).reshape(-1, 1)
v3 = np.array([5, 20, 11]).reshape(-1, 1)

constraints = [
    {(0, 0),(0,1),(1,0),(1,1)},  # First constraint set
    {(2,2)}
    ]
    
# Corresponding values for constraints
values = [25,11]
G, Q, W = construct_constraint_matrices(constraints, values, m, n)



result3, iterations3, diff3 = MRGRAS_Balancing(X3, u3, v3,W,G,Q)
assert result3 is not None, "Test Case 3 failed: Procedure did not converge."
# Check if the sign of each element is preserved
assert np.all((X3 > 0) == (result3 > 0)), "Test Case 3 failed: Signs of positive cells are not preserved."
print(f"Test Case 3 passed: Converged in {iterations3} iterations with final difference {diff3}")



Test Case 3: All Positive Matrix
Test Case 3 passed: Converged in 7 iterations with final difference 1.6344703368531555e-11


In [342]:
result3

array([[ 2.40045386,  6.59954614,  0.        ],
       [ 2.59954614, 13.40045386,  0.        ],
       [ 0.        ,  0.        , 11.        ]])

In [367]:
# Test Case 3: All Positive Matrix
print("\nTest Case 3: All Positive Matrix")
X3 = np.array([[3, 2, 0], [4, 5,10], [1, 9,7]])
u3 = np.array([9, 16, 12,]).reshape(-1, 1)
v3 = np.array([5, 20, 12]).reshape(-1, 1)

constraints = [
    {(0, 0)},  # First constraint set
    {(2,2)}
    ]
    
# Corresponding values for constraints
values = [4,0]
m,n = X3.shape
G, Q, W = construct_constraint_matrices(constraints, values, m, n)



result3, iterations3, diff3 = MRGRAS_Balancing(X3, u3, v3,W,G,Q, max_iter=2000)
assert result3 is not None, "Test Case 3 failed: Procedure did not converge."
# Check if the sign of each element is preserved
assert np.all((X3 > 0) == (result3 > 0)), "Test Case 3 failed: Signs of positive cells are not preserved."
print(f"Test Case 3 passed: Converged in {iterations3} iterations with final difference {diff3}")



Test Case 3: All Positive Matrix
Test Case 3 passed: Converged in 99 iterations with final difference 9.391315503037845e-11




In [368]:
result3, iterations3
# TO DO: fix W definition to include not covered part by constraints

(array([[4.        , 5.        , 0.        ],
        [0.85983898, 6.90080038, 8.23936065],
        [0.14016103, 8.09919962, 3.76063935]]),
 99)

In [None]:
# Test Case 4: Large Sparse Matrix
print("\nTest Case 4: Large Sparse Matrix")
X4 = sp.random(100, 100, density=0.1, format='csr') - sp.random(100, 100, density=0.1, format='csr')
u4 = np.random.rand(100, 1) * 10
v4 = np.random.rand(100, 1) * 10
u4 = u4 / u4.sum() * v4.sum()  # Normalize u4 to ensure u4.sum() == v4.sum()
result4, iterations4, diff4 = MRGRAS_Balancing(X4.toarray(), u4, v4)
assert result4 is not None, "Test Case 4 failed: Procedure did not converge."
# Check if the sign of each element is preserved
assert np.all((X4.toarray() > 0) == (result4 > 0)), "Test Case 4 failed: Signs of positive cells are not preserved."
assert np.all((X4.toarray() < 0) == (result4 < 0)), "Test Case 4 failed: Signs of negative cells are not preserved."
print(f"Test Case 4 passed: Converged in {iterations4} iterations with final difference {diff4}")