# Factorization of Affine Maps

The goal of this notebook is to show the process of factorizing
a set of affine maps which all have the same domain.
This will produce a single map from the starting domain
to a space described by linearly independent basis vectors
within which all the other maps reside.
The original maps can then be reconstructed as the composition
of the factored map and a new map specific to each of the original maps.

In [1]:
from functools import reduce
from islpy import Aff, dim_type as DimType, Id, LocalSpace, Mat, MultiAff, Space
from typing import Dict, Generator, Iterable, List, Tuple

## Motivating Example

To demonstrate the process, I will use a motivating example.
The end goal is to incorporate this process into the AlphaZ compiler
to manipulate the body of a reduction.
Where I want to use this, the expressions I start with are `MultiAff`,
which is a set of affine expressions.
The list of these below is what I want to factorize.

In [2]:
expressions = [
    MultiAff('[N] -> { [i,j,k,l] -> [j,k] }'),
    MultiAff('[N] -> { [i,j,k,l] -> [k-j] }'),
    MultiAff('[N] -> { [i,j,k,l] -> [j-k, i-j-k+l] }')
]
expressions

[MultiAff("[N] -> { [i, j, k, l] -> [(j), (k)] }"),
 MultiAff("[N] -> { [i, j, k, l] -> [(-j + k)] }"),
 MultiAff("[N] -> { [i, j, k, l] -> [(j - k), (i - j - k + l)] }")]

## Merging the Expressions Into One

The first step of this process is to merge
these expressions into a single expression.
The original expressions can be reconstructed
as a projection containing only the desired dimensions.

ISL does not have a function (that I'm aware of)
which can generate the projection functions.
Therefore, some record keeping is needed.
To begin, I will give each output index
from each of these expressions a unique name.
This will allow the outputs to be tracked across
the construction of new objects or transformation of existing ones.

In [3]:
def _name_generator(prefix: str) -> Generator[str, None, None]:
    """Generates names containing the given prefix and an incrementing value, separated by an underscore."""
    counter = 0
    while True:
        counter += 1
        yield f'{prefix}_{counter}'
        
def _name_expression_outputs(expression: MultiAff, names: Iterable[str]) -> MultiAff:
    """Gives each output of the provided expression a unique name, overwriting any existing names."""
    out_count = expression.dim(DimType.out)
    named_expr = expression
    for i in range(out_count):
        name = next(names)
        named_expr = named_expr.set_dim_name(DimType.out, i, name)
    return named_expr

def _name_expression_outputs_helper(expressions: List[MultiAff]) -> Generator[MultiAff, None, None]:
    """Gives each output of all the provided expressions a unique name, overwriting any existing names."""
    names = _name_generator('orig_out')
    for expr in expressions:
        yield _name_expression_outputs(expr, names)

def name_expression_outputs(expressions: List[MultiAff]) -> List[MultiAff]:
    """Gives each output of all the provided expressions a unique name, overwriting any existing names."""
    return list(_name_expression_outputs_helper(expressions))

# Print the space for each of the expressions to show how this works.
named_expressions = name_expression_outputs(expressions)
for expr in named_expressions:
    print(expr.space)

[N] -> { [i, j, k, l] -> [orig_out_1, orig_out_2] }
[N] -> { [i, j, k, l] -> [orig_out_3] }
[N] -> { [i, j, k, l] -> [orig_out_4, orig_out_5] }


Now that each output has a unique name that can be tracked,
it's time to merge all the expressions into a single one.
We won't worry about the projection function for now.
With the named outputs, this can be reconstructed later.

In [4]:
def _merge_expressions_helper(left: MultiAff, right: MultiAff) -> MultiAff:
    """Merges two expressions by taking their product and flattening the result."""
    return left.flat_range_product(right)

def merge_expressions(expressions: List[MultiAff]) -> MultiAff:
    """Merges a set of expressions by taking their product and flattening the result."""
    return reduce(_merge_expressions_helper, expressions)

# Print the merged expression to show how this works.
named_expressions = name_expression_outputs(expressions)
merged_expressions = merge_expressions(named_expressions)
print(merged_expressions)
print(merged_expressions.space)

[N] -> { [i, j, k, l] -> [(j), (k), (-j + k), (j - k), (i - j - k + l)] }
[N] -> { [i, j, k, l] -> [orig_out_1, orig_out_2, orig_out_3, orig_out_4, orig_out_5] }


## Decomposing the Merged Expression

Now, the merged expression needs to be decomposed in two.
I will do this by converting the expression into a matrix,
computing the Hermite normal form of this matrix,
and reconstructing two affine expressions from the result.

Note: ISL does contain functions for converting between matrices and multi-affine expressions.
I chose not to use them here because there isn't a guarantee (that I saw)
about the order in which the rows are returned,
and because there would be extra columns that need to be dropped first
then restored after calculating the Hermite normal form,
which I think is less readable than this.

In [5]:
def _single_expression_to_row(expression: Aff, matrix: Mat, row: int) -> Mat:
    """Copies the coefficients from one expression in a multi-expression into a matrix."""
    updated_matrix = matrix

    # Copy the parameters first.
    param_count = expression.dim(DimType.param)
    for i in range(param_count):
        coefficient = expression.get_coefficient_val(DimType.param, i)
        updated_matrix = updated_matrix.set_element_val(row, i, coefficient)
    
    # Then copy the inputs.
    in_offset = param_count
    in_count = expression.dim(DimType.in_)
    for i in range(in_count):
        coefficient = expression.get_coefficient_val(DimType.in_, i)
        updated_matrix = updated_matrix.set_element_val(row, i + in_offset, coefficient)
        
    # Finally, copy the constant.
    const_offset = in_offset + in_count
    const = expression.get_constant_val()
    updated_matrix = updated_matrix.set_element_val(row, const_offset, const)
    
    return updated_matrix

def expression_to_matrix(expression: MultiAff) -> Mat:
    """
    Converts an expression into a matrix of its parameters, input indexes, and constants.
    Throws an error if the expression uses division, as that may not work correctly.
    """
    if expression.dim(DimType.div) > 0:
        raise Exception('Cannot convert an expression with division into a matrix.')
        
    # There will be one row for each of the outputs.
    # There will be one column for each of the parameters, input indexes, and constants.
    context = expression.get_ctx()
    rows = expression.dim(DimType.out)
    cols = expression.dim(DimType.param) + expression.dim(DimType.in_) + 1
    matrix = Mat.alloc(context, rows, cols)

    # Copy the rows of the matrix one at a time.
    for i in range(rows):
        expression_part = expression.get_at(i)
        matrix = _single_expression_to_row(expression_part, matrix, i)

    return matrix

def print_matrix(matrix: Mat) -> None:
    for row in range(matrix.rows()):
        values = [matrix.get_element_val(row, col).to_str() for col in range(matrix.cols())]
        line = '\t'.join(values)
        print(line)

# Print the matrix to show how this works.
named_expressions = name_expression_outputs(expressions)
merged_expressions = merge_expressions(named_expressions)
matrix = expression_to_matrix(merged_expressions)
print('\t'.join(['N','i','j','k','l','cst']))
print_matrix(matrix)

N	i	j	k	l	cst
0	0	1	0	0	0
0	0	0	1	0	0
0	0	-1	1	0	0
0	0	1	-1	0	0
0	1	-1	-1	1	0


Now that we have a matrix, we can get the Hermite normal form of it.
For my purpose, I only need the Hermite matrix it produces,
which has linearly independent rows (similar to Gauss-Jordan elimination)
and the inverse of the matrix that represents the elementary operations used
to create the Hermite matrix.
These are called `H` and `Q` respectively by ISL.

I intend to use row-oriented Hermite normal form,
but ISL implements it as column-oriented.
These are simply transposes of each other,
so all inputs and outputs need to be transposed.

The Hermite matrix may have several rows of 0's at the end.
These can be dropped, along with the same number of columns from the right of Q.
Doing this will eliminate some unnecessary dimensions
from the expressions that will be formed from these matrices.

In [6]:
def _empty_row_count(matrix: Mat) -> int:
    """Returns the number of empty rows at the bottom of the given matrix."""
    count = 0
    for row in reversed(range(matrix.rows())):
        for col in range(matrix.cols()):
            coefficient = matrix.get_element_val(row, col).to_python()
            if coefficient != 0:
                return count
        count += 1
    return count

def _reduce_hermite_dimensionality(h: Mat, q: Mat) -> Tuple[Mat, Mat]:
    """Drops any rows of 0's from the bottom of H, and the same number of columns from the right of Q."""
    # Per the Hermite decomposition, the nubmer of rows of H and the number of columns of Q must be equal.
    # If this isn't the case, something went wrong.
    if h.rows() != q.cols():
        raise Exception(f'Error found with Hermite decomposition. H has {h.rows()} rows, and Q has {q.cols()} columns. These should be equal.')
    
    empty_rows = _empty_row_count(h)
    drop_start = h.rows() - empty_rows

    h_dropped = h.drop_rows(drop_start, empty_rows)
    q_dropped = q.drop_cols(drop_start, empty_rows)
    return h_dropped, q_dropped

def hermite_decomposition(matrix: Mat) -> Tuple[Mat, Mat]:
    """
    Performs the row-oriented Hermite decomposition of the given matrix,
    returning H and Q as the two outputs (in that order).
    Any rows of 0's have been dropped from the bottom of H,
    along with the same number of columns from the right of Q.
    """
    h, _, q = matrix.copy().transpose().left_hermite(0)
    h_transposed = h.transpose()
    q_transposed = q.transpose()
    return _reduce_hermite_dimensionality(h_transposed, q_transposed)

# Print the decomposed matrices to show how this works.
named_expressions = name_expression_outputs(expressions)
merged_expressions = merge_expressions(named_expressions)
matrix = expression_to_matrix(merged_expressions)
h, q = hermite_decomposition(matrix)

print('H Matrix:')
print_matrix(h)
print()
print('Q Matrix:')
print_matrix(q)

H Matrix:
0	1	0	0	1	0
0	0	1	0	0	0
0	0	0	1	0	0

Q Matrix:
0	1	0
0	0	1
0	-1	1
0	1	-1
1	-1	-1


## Constructing New Expressions from the Decomposition

Now that the expression can be decomposed into two matrices,
new expressions need to be constructed from these matrices.
The process I'll be using is to create a `Space` in ISL for the desired maps,
constructing the maps as `Aff` objects (single affine expressions),
converting them to `MultiAff` objects (which have more helpful functionality),
then merging these affine expressions together.

In [7]:
def _create_decomposition_spaces(original_space: Space, inner_dimensions: int) -> Tuple[Space, Space]:
    """
    Creates new spaces for the decomposition of an original space.
    The first space returned has the same domain as the original space,
    and the second space returned has the same range as the original space.
    """
    
    # Calling space.domain() returns indexes as 'out' indexes,
    # so reverse() is needed to make them 'in' indexes.
    first_space = original_space.domain().reverse()
    first_space: Space = first_space.add_dims(DimType.out, inner_dimensions)
    
    # The first space will handle all of the parameter inputs per the decomposition,
    # so drop them from the second space.
    second_space = original_space.range()
    second_space = second_space.add_dims(DimType.in_, inner_dimensions)
    second_space = second_space.drop_all_params()
    
    # Beyond constructing the spaces, we want the dimensions to have names.
    names = _name_generator('mid')
    for i in range(inner_dimensions):
        name = next(names)
        first_space = first_space.set_dim_name(DimType.out, i, name)
        second_space = second_space.set_dim_name(DimType.in_, i, name)
        
    return first_space, second_space

def _row_to_expression(matrix: Mat, row: int, space: Space, has_constants: bool) -> MultiAff:
    """Converts a row of a matrix into an expression."""
    
    # Create a new affine expression from the domain of the given space.
    # It can only represent a single output, so make it align with the output
    # of the given space at the same index as the matrix row this is created from.
    output_name = space.get_dim_name(DimType.out, row)
    expression = Aff.zero_on_domain_space(space.domain())
    
    # Populate all the coefficients for the parameters.
    param_count = space.dim(DimType.param)
    for i in range(param_count):
        coefficient = matrix.get_element_val(row, i)
        expression = expression.set_coefficient_val(DimType.param, i, coefficient)
        
    # Populate all the coefficients for the input indexes.
    in_read_offset = param_count
    in_count = space.dim(DimType.in_)
    for i in range(in_count):
        coefficient = matrix.get_element_val(row, i + in_read_offset)
        expression = expression.set_coefficient_val(DimType.in_, i, coefficient)
        
    # Populate the coefficient for the constant if applicable.
    if has_constants:
        const_offset = in_read_offset + in_count
        const = matrix.get_element_val(row, const_offset)
        expression = expression.set_constant_val(const)
    
    # Upcast the expression to the desired class.
    expression = MultiAff.from_aff(expression)
    expression = expression.set_dim_name(DimType.out, 0, output_name)
    return expression

def _matrix_to_expression(matrix: Mat, space: Space, has_constants: bool) -> MultiAff:
    """Converts a matrix into an expression. Each row is for one of the output dimensions."""
    expressions = [_row_to_expression(matrix, row, space, has_constants) for row in range(matrix.rows())]
    return merge_expressions(expressions)

def decompose_expression(expression: MultiAff) -> Tuple[MultiAff, MultiAff]:
    """Decomposes an expression using the Hermite decomposition of the matrix that represents it. Returns in the order H, Q."""
    
    # Convert the expression to a matrix, then decompose that matrix.
    matrix = expression_to_matrix(expression)
    h_matrix, q_matrix = hermite_decomposition(matrix)
    
    # Construct spaces for the decomposed expressions to be in.
    inner_dimensions = h_matrix.rows()
    h_space, q_space = _create_decomposition_spaces(expression.space, inner_dimensions)
    
    # Construct the decomposed expressions.
    # The H expression has constants, but the Q expression doesn't,
    # as the H expression will incorporate that into the output.
    h_expr = _matrix_to_expression(h_matrix, h_space, has_constants=True)
    q_expr = _matrix_to_expression(q_matrix, q_space, has_constants=False)
    return h_expr, q_expr

# Print the decomposed expressions to show how this works.
named_expressions = name_expression_outputs(expressions)
merged_expressions = merge_expressions(named_expressions)
h, q = decompose_expression(merged_expressions)

print(f'H: {h}')
print(f'Q: {q}')
print()
print(f'H Space: {h.space}')
print(f'Q Space: {q.space}')
print()
print(f'Original: {merged_expressions}')
print(f'Composed: {q.pullback_multi_aff(h)}')


H: [N] -> { [i, j, k, l] -> [(i + l), (j), (k)] }
Q: { [mid_1, mid_2, mid_3] -> [(mid_2), (mid_3), (-mid_2 + mid_3), (mid_2 - mid_3), (mid_1 - mid_2 - mid_3)] }

H Space: [N] -> { [i, j, k, l] -> [mid_1, mid_2, mid_3] }
Q Space: { [mid_1, mid_2, mid_3] -> [orig_out_1, orig_out_2, orig_out_3, orig_out_4, orig_out_5] }

Original: [N] -> { [i, j, k, l] -> [(j), (k), (-j + k), (j - k), (i - j - k + l)] }
Composed: [N] -> { [i, j, k, l] -> [(j), (k), (-j + k), (j - k), (i - j - k + l)] }


## Reconstructing the Original Expressions

Finally, the original expressions need to be reconstructed.
In short, the outputs of Q which match the outputs of the original expressions need to be projected out.

In [8]:
def get_decomposition_projection(original: MultiAff, q: MultiAff) -> MultiAff:
    """
    Returns the projection of the decomposed expression (Q)
    which can be used to recreate the given original expression.
    """
    # The range of Q will be the domain of the projection. This must be a 'LocalSpace' object.
    proj_domain = q.space.range()
    proj_domain = LocalSpace.from_space(proj_domain)
    
    # Create a dictionary from dimension names to their positions.
    name_to_position = { proj_domain.get_dim_name(DimType.out, i) : i for i in range(proj_domain.dim(DimType.out))}

    # Get the names of the output dimensions for the original expression,
    # then use the dictionary to map those to its index in Q.
    wanted_names = [original.get_dim_name(DimType.out, i) for i in range(original.dim(DimType.out))]
    wanted_indexes = [name_to_position[name] for name in wanted_names]
    
    # Construct an affine expression for each of the projections.
    # This is done in two steps: create it with the 'Aff' class,
    # then convert it to the 'MultiAff' class so it's usable.
    single_projection_exprs = [Aff.var_on_domain(proj_domain, DimType.out, i) for i in wanted_indexes]
    projection_multi_exprs = [MultiAff.from_aff(expr) for expr in single_projection_exprs]
    
    # Finally, merge all the individual expressions into a single one
    # and compose it with Q to produce the final expression.
    merged_projection = merge_expressions(projection_multi_exprs)
    return merged_projection.pullback_multi_aff(q)

# Print the decomposed expressions to show how this works.
named_expressions = name_expression_outputs(expressions)
merged_expressions = merge_expressions(named_expressions)
h, q = decompose_expression(merged_expressions)

for expr in named_expressions:
    projection = get_decomposition_projection(expr, q)
    projection_and_h = projection.pullback_multi_aff(h)
    print(f'Original:   {expr}')
    print(f'Projection: {projection}')
    print(f'Proj and H: {projection_and_h}')
    print(f'Matches?    {expr.is_equal(projection_and_h)}')
    print()

Original:   [N] -> { [i, j, k, l] -> [(j), (k)] }
Projection: { [mid_1, mid_2, mid_3] -> [(mid_2), (mid_3)] }
Proj and H: [N] -> { [i, j, k, l] -> [(j), (k)] }
Matches?    True

Original:   [N] -> { [i, j, k, l] -> [(-j + k)] }
Projection: { [mid_1, mid_2, mid_3] -> [(-mid_2 + mid_3)] }
Proj and H: [N] -> { [i, j, k, l] -> [(-j + k)] }
Matches?    True

Original:   [N] -> { [i, j, k, l] -> [(j - k), (i - j - k + l)] }
Projection: { [mid_1, mid_2, mid_3] -> [(mid_2 - mid_3), (mid_1 - mid_2 - mid_3)] }
Proj and H: [N] -> { [i, j, k, l] -> [(j - k), (i - j - k + l)] }
Matches?    True



## Putting Everything Together

We now have a way to take a set of affine expressions
which all have the same domain
and decompose them into two expressions
with the inner expression being common to all of them.
This allows it to be "factored out",
and this common factor's nullspace is the intersection of the nullspaces
of the original expressions.

As a last step, let's make one last function to neatly wrap all this up.

In [9]:
def factor_expressions(expressions: List[MultiAff]) -> Tuple[MultiAff, Dict[MultiAff, MultiAff]]:
    """
    Factors a set of affine expressions which have the same domain.

    Args:
        expressions: A list of affine expressions with the same domain.

    Returns:
        A tuple of two things.
        The first is the "common factor" expression that was pulled out.
        The second is a dictionary mapping the original expressions
        to the new expressions which, when composed with the "common factor",
        reproduce the original expression.
    """
    
    # Name all the dimensions of the expressions so they can be tracked,
    # merge them into a single expression containing all the subexpressions,
    # then decompose that merged expression.
    named_expressions = name_expression_outputs(expressions)
    merged_expressions = merge_expressions(named_expressions)
    h, q = decompose_expression(merged_expressions)
    
    # Construct a map from the original expressions to the projection of Q
    # which recreates the original expression when composed with H.
    projection_map = { expr: get_decomposition_projection(expr, q) for expr in named_expressions }
    return h, projection_map

def verify_projections(common_factor: MultiAff, original_to_projection_map: Dict[MultiAff, MultiAff]) -> bool:
    """Returns True if the factorization can correctly reproduce the original expressions, and False otherwise."""
    for original, projection in original_to_projection_map.items():
        factorization_composition = projection.copy().pullback_multi_aff(common_factor.copy())
        if not original.is_equal(factorization_composition):
            return False
    return True

In [10]:
expressions

[MultiAff("[N] -> { [i, j, k, l] -> [(j), (k)] }"),
 MultiAff("[N] -> { [i, j, k, l] -> [(-j + k)] }"),
 MultiAff("[N] -> { [i, j, k, l] -> [(j - k), (i - j - k + l)] }")]

In [11]:
# One last sanity check that everything is working.
expressions = [
    MultiAff('[N] -> { [i,j,k,l] -> [j,k] }'),
    MultiAff('[N] -> { [i,j,k,l] -> [k-j] }'),
    MultiAff('[N] -> { [i,j,k,l] -> [j-k, i-j-k+l] }')
]
common_factor, original_to_projection_map = factor_expressions(expressions)
working = verify_projections(common_factor, original_to_projection_map)

if working:
    print('It works!')
else:
    print('Something went wrong.')

It works!


In [12]:
print(common_factor)

[N] -> { [i, j, k, l] -> [(i + l), (j), (k)] }


In [13]:
for original, projection in original_to_projection_map.items():
    print(f'Original:  {original}')
    print(f'Projected: {projection}')
    print()

Original:  [N] -> { [i, j, k, l] -> [(j), (k)] }
Projected: { [mid_1, mid_2, mid_3] -> [(mid_2), (mid_3)] }

Original:  [N] -> { [i, j, k, l] -> [(-j + k)] }
Projected: { [mid_1, mid_2, mid_3] -> [(-mid_2 + mid_3)] }

Original:  [N] -> { [i, j, k, l] -> [(j - k), (i - j - k + l)] }
Projected: { [mid_1, mid_2, mid_3] -> [(mid_2 - mid_3), (mid_1 - mid_2 - mid_3)] }

