# Submission Details

* __Student__: Shlomi Ben-Shushan

* __Course__: Deep Learning (22961)

* __Assignment__: Maman 11

* __Video Link__: https://drive.google.com/file/d/1-SBNfKaN5zHNAGB8G58KkjH1evfBkmHb/view?usp=sharing

### Imports

In [1]:
from typing import Tuple
from torch import Tensor, tensor, zeros, cat

### Question A

In [2]:
EXPANSION_ERROR = "Cannot expand tensor of shape {} to shape {}"

class ExpansionError(Exception):
    pass

In [3]:
def expand_as(A: Tensor, B: Tensor) -> Tensor:
    # If A has more dimensions than B, raise an error because it's not possible to expand
    if len(A.shape) > len(B.shape):
        raise ExpansionError(EXPANSION_ERROR.format(A.shape, B.shape))

    # Add leading 1s to A's shape to match the number of dimensions in B
    new_shape_list = [1] * (len(B.shape) - len(A.shape)) + list(A.shape)

    # Clone A to avoid modifying the original tensor and reshape A to match the new shape with leading 1s.
    expanded_A = A.clone().reshape(*new_shape_list)

    # For each dimension of B, expand A if necessary by creating multiple copies along the dimension using cat
    for i in range(len(B.shape)):
        if expanded_A.shape[i] == 1 and B.shape[i] != 1:
            expanded_A = cat([expanded_A] * B.shape[i], dim=i)

    # Raise an error if the resulting tensor shape doesn't matches B's shape
    if expanded_A.shape != B.shape:
        raise ExpansionError(EXPANSION_ERROR.format(A.shape, B.shape))

    # Return the expanded tensor.
    return expanded_A

In [4]:
A = tensor([[1], [2], [3]])  # Shape (3, 1)
B = zeros((3, 4))            # Shape (3, 4)
assert A.shape != B.shape, "Test case 1 pre-condition failed"
expanded_A = expand_as(A, B)
assert expanded_A.shape == B.shape, "Test case 1 failed"

A = tensor([[1], [2], [3]])  # Shape (3, 1)
B = zeros((3, 4))            # Shape (3, 4)
A_BACKUP = A.clone()
B_BACKUP = B.clone()
_ = expand_as(A, B)
assert A.equal(A_BACKUP) and B.equal(B_BACKUP), "Test case 2 failed"

A = tensor([1])              # Shape (1, )
B = zeros((2, 3, 4))         # Shape (2, 3, 4)
expanded_A = expand_as(A, B)
assert expanded_A.shape == B.shape, "Test case 3 failed"

A = tensor([1, 2, 3])        # Shape (3, )
B = zeros((3, 4))            # Shape (3, 4)
try:
    _ = expand_as(A, B)
    assert False, "Test case 4 failed: Expected ExpansionError"
except ExpansionError:
    pass

A = tensor([1, 2])       # Shape (2, )
B = zeros((3, 4))        # Shape (3, 4)
try:
    _ = expand_as(A, B)
    assert False, "Test case 5 failed: Expected ExpansionError"
except ExpansionError:
    pass

print("All tests passed!")

All tests passed!


### Question B

In [5]:
def is_tensor_empty(t: Tensor) -> bool:
    return t is None or t.numel() == 0

def are_broadcastable(A: Tensor, B: Tensor) -> Tuple[bool, Tuple[int]]:
    # Ensure no empty tensor was provided
    if is_tensor_empty(A) or is_tensor_empty(B):
        return False, None

    # Check tensor shapes and ensure A_shape is shorter
    A_shape, B_shape = sorted([A.shape, B.shape], key=len)

    # Start with the larger tensor's shape.
    broadcast_shape = list(B_shape)

    # Iterate over dimensions from the last (rightmost) to the first (leftmost).
    for i in range(1, len(A_shape) + 1):

        # If dimensions match or A_shape has 1, use B_shape's dimension.
        if A_shape[-i] == B_shape[-i] or A_shape[-i] == 1:
            pass  # Same as broadcast_shape[-i] = B_shape[-i]

        # If B_shape has 1, use A_shape's dimension.
        elif B_shape[-i] == 1:
            broadcast_shape[-i] = A_shape[-i]

        # If dimensions are incompatible, broadcasting is not possible.
        else:
            return False, None

    # Return success and the resulting broadcast shape.
    return True, tuple(broadcast_shape)

In [6]:
assert are_broadcastable(tensor([1]), zeros((2, 3, 4))) == (True, (2, 3, 4)), "Test case 1 failed"
assert are_broadcastable(tensor([[1], [2], [3]]), zeros((3, 4))) == (True, (3, 4)), "Test case 2 failed"
assert are_broadcastable(tensor([1, 2]), zeros((3, 4))) == (False, None), "Test case 3 failed"
assert are_broadcastable(tensor([[1, 2, 3]]), zeros((3,))) == (True, (1, 3)), "Test case 4 failed"
assert are_broadcastable(tensor([[1], [2]]), tensor([[[1, 2]], [[3, 4]], [[5, 6]]])) == (True, (3, 2, 2)), "Test case 4 failed"
print("All tests passed!")

All tests passed!


### Question C

In [7]:
class BroadcastError(Exception):
    pass

In [9]:
def broadcast_tensors(A: Tensor, B: Tensor) -> Tuple[Tensor]:
    broadcastable, result_shape = are_broadcastable(A, B)
    if not broadcastable:
        raise BroadcastError("Tensors cannot be broadcasted together")

    # Assume the given tensors are broadcastable and use expand_as expand each given tensor
    expanded_A = expand_as(A, zeros(result_shape))
    expanded_B = expand_as(B, zeros(result_shape))
    return expanded_A, expanded_B

In [10]:
A = tensor([1])              # Shape (1,)
B = zeros((2, 3, 4))         # Shape (2,3,4)
expanded_A, expanded_B = broadcast_tensors(A, B)
assert expanded_A.shape == expanded_B.shape == (2, 3, 4), "Test case 1 failed"

A = tensor([[1], [2], [3]])  # Shape (3,1)
B = zeros((3, 4))            # Shape (3,4)
expanded_A, expanded_B = broadcast_tensors(A, B)
assert expanded_A.shape == expanded_B.shape == (3, 4), "Test case 2 failed"

try:
    A = tensor([1, 2])       # Shape (2,)
    B = zeros((3, 4))        # Shape (3,4)
    broadcast_tensors(A, B)  # Should raise error
    assert False, "Test case 3 failed: Expected ValueError"
except BroadcastError:
    pass

print("All tests passed!")

All tests passed!


### Question D

In [None]:
from torch import broadcast_tensors as reference_broadcast_tensors

In [13]:
test_cases = [
    (tensor([1]), zeros((2, 3, 4)), "Broadcasting scalar to (2,3,4)"),
    (tensor([[1], [2], [3]]), zeros((3, 4)), "Column vector to (3,4)"),
    (tensor([1, 2, 3]), zeros((3, 1)), "Row vector to (3,1)"),
    (tensor([[1, 2, 3]]), zeros((1, 3)), "1-row matrix to (1,3)"),
    (tensor([1, 1, 1]), zeros((3, 3)), "1D vector broadcast to (3,3)"),
    (tensor([[1], [2]]), zeros((2, 3)), "Column vector to (2,3)"),
    (tensor([[1, 2, 3]]), zeros((2, 3)), "Row vector to (2,3)"),
    (tensor([[[1]], [[2]]]), zeros((2, 3, 4)), "(2,1,1) to (2,3,4)"),
    (tensor([1, 2, 3, 4]), zeros((1, 4)), "Vector to (1,4)"),
    (tensor([[1, 2], [3, 4]]), zeros((2, 2)), "No broadcasting needed"),
    (tensor([[1], [2], [3]]), zeros((1, 3, 4)), "Expanding from (3,1) to (1,3,4)"),
    (tensor([1, 2, 3]).view(3, 1, 1), zeros((3, 1, 2)), "Broadcasting (3,) to (3,1,2)"),
    (tensor([[1, 2, 3], [4, 5, 6]]).view(2, 3, 1), zeros((2, 3, 4)), "Expanding to third dim"),
    (tensor([1, 2, 3, 4]), zeros((3, 1, 4)), "Matching last dimension"),
    (tensor([[[1, 2, 3]]]), zeros((2, 3, 3)), "Expanding singleton dimension"),
]

In [15]:
try:
    for A, B, description in test_cases:
        expanded_A, expanded_B = broadcast_tensors(A, B)
        expected_A, expected_B = reference_broadcast_tensors(A, B)
        assert expanded_A.equal(expected_A), "Mismatch in broadcasted A"
        assert expanded_B.equal(expected_B), "Mismatch in broadcasted B"
except:
    print(f"Test '{description}' failed!")
else:
    print("All tests passed!")

All tests passed!
