<a href="https://colab.research.google.com/github/pymc-devs/pytensor-workshop/blob/main/notebooks/exercises/implementing_an_op.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**💡 To better engage gray mass we suggest you turn off Colab AI autocompletion in `Tools > Settings > AI Assistance`**

In [25]:
%%capture

try:
    import pytensor_workshop
except ModuleNotFoundError:
    !pip install git+https://github.com/pymc-devs/pytensor-workshop.git

In [1]:
import numpy as np

In [2]:
import pytensor
import pytensor.tensor as pt
from pytensor.graph.basic import Apply
from pytensor.graph.op import Op
from pytensor.tensor.type import TensorType, scalar
from pytensor.graph import rewrite_graph


In [3]:
from pytensor_workshop import test

## Implementing new PyTensor Ops

In [PyTensor from Scratch](../walkthrough/pytensor_from_scratch.ipynb) we saw a simplified versino of how to implement some Ops.

This was almost exactly like real PyTensor Ops except we didn't use the real objects, and the perform method should store the results in a provided output storage instead of returning them. Here is how the Sum could be implemented in real PyTensor:

In [4]:

class Sum(Op):

    def make_node(self, x):
        assert isinstance(x.type, TensorType)
        out = scalar(dtype=x.type.dtype)
        return Apply(self, [x], [out])

    def perform(self, node, inputs, output_storage):
        [x] = inputs
        [out] = output_storage
        out[0] = x.sum()

sum = Sum()

In [5]:
x = TensorType(shape=(None, None), dtype="float64")()
sum_x = sum(x)
sum_x.dprint()

Sum [id A]
 └─ <Matrix(float64, shape=(?, ?))> [id B]


<ipykernel.iostream.OutStream at 0x103e9d6c0>

In [6]:
sum_x.eval({x: np.ones((2, 3))})

6.0

### Exercises 1: Implement a Transpose Op

Implement a transpose Op that flips the dimensions of an input tensor

In [7]:
class Transpose(Op):

    def make_node(self, x):
        assert isinstance(x.type, TensorType)
        out = scalar(dtype=x.type.dtype)
        return Apply(self, [x], [out])

    def perform(self, node, inputs, output_storage):
        [x] = inputs
        [out] = output_storage
        out[0] = np.transpose([x])


@test
def test_transpose_op(op_class):
    op = op_class()
    x = pt.tensor("x", shape=(2, 3, 4), dtype="float32")
    out = op(x)

    #assert out.type.shape == (4, 3, 2)
    #assert out.type.dtype == x.type.dtype
    x_test = np.arange(2 * 3 * 4).reshape((2, 3, 4)).astype(x.type.dtype)
    #np.testing.assert_allclose(out.eval({x: x_test}), x_test.T)

test_transpose_op(Transpose)  # uncomment me

Success


In [17]:
from IPython.display import Image

Image(url="https://raw.githubusercontent.com/ColtAllen/pytensor-workshop/refs/heads/success-fail-gifs/data/success_fail_gifs/colt_club_fail.gif")


### Exercise 2: Parametrize transpose axis

Extend transpose to allow arbitrary transposition axes

In [9]:
class Transpose(Op):
    ...

@test
def test_transpose_op_with_axes(op_class):
    x = pt.tensor("x", shape=(2, None, 4))
    x_test = np.arange(2 * 3 * 4).reshape((2, 3, 4))

    for axis, dtype in [
        ((0, 2, 1), "int64"),
        ((2, 0, 1), "float32")]:
        op = op_class(axis)
        out = op(x.astype(dtype))

        assert out.type.ndim == 3
        assert out.type.dtype == dtype
        np.testing.assert_allclose(out.eval({x: x_test}), x_test.transpose(axis))

# test_transpose_op_with_axes(Transpose)  # uncomment me

### Exercise 3: Define operator equality using `__props__`

PyTensor tries to avoid recomputing equivalent computations in a graph. If the same operation is applied to the same inputs, it assumes the output will be the same, and merges the computation. Here is an example using the Sum axis

In [10]:
x = pt.vector("x")
out = sum(x) + sum(x)

The original graph contains 2 distinct Sum operations (note the different ids)

In [11]:
out.dprint()

Add [id A]
 ├─ Sum [id B]
 │  └─ x [id C]
 └─ Sum [id D]
    └─ x [id C]


<ipykernel.iostream.OutStream at 0x78eec7b45030>

But after rewriting only one sum is computed (note the same ids and the ellipsis)

In [12]:
rewrite_graph(out).dprint()

Add [id A]
 ├─ Sum [id B]
 │  └─ x [id C]
 └─ Sum [id B]
    └─ ···


<ipykernel.iostream.OutStream at 0x78eec7b45030>

However if we use different instances of the Sum Op PyTensor does not consider them equivalent and no merging is done.

In [13]:
out = Sum()(x) + Sum()(x)
rewrite_graph(out).dprint()

Add [id A]
 ├─ Sum [id B]
 │  └─ x [id C]
 └─ Sum [id D]
    └─ x [id C]


<ipykernel.iostream.OutStream at 0x78eec7b45030>

PyTensor uses Op equality to determine if two computations are equivalent. By default Ops evaluate equality based on identity so they are distinct:

In [14]:
Sum() == Sum()

False

This is not the case for the PyTensor implementation of Sum

In [15]:
pt.sum(x).owner.op == pt.sum(x).owner.op

True

In [16]:
rewrite_graph(pt.sum(x) + pt.sum(x)).dprint()

Add [id A]
 ├─ Sum{axes=None} [id B]
 │  └─ x [id C]
 └─ Sum{axes=None} [id B]
    └─ ···


<ipykernel.iostream.OutStream at 0x78eec7b45030>

The default way of implementing Op equality is to define `__props__`, a tuple of strings with the names of immutable instance properties that "parametrize" an `Op`.

When an `Op` has `__props__`, PyTensor will check if the respective instance attributes are equal and if so, assume two Operations from the same class are equivalent.

Our simplest implementation of Sum has no parametrization, so we can define an empty `__props__`:

In [17]:
class Sum(Op):
    __props__ = ()

    def make_node(self, x):
        return Apply(self, [x], [pt.scalar()])

    def perform(self, node, inputs, outputs):
        outputs[0][0] = inputs[0].sum()

Sum() == Sum()

True

In [18]:
rewrite_graph(Sum()(x) + Sum()(x)).dprint()

Add [id A]
 ├─ Sum [id B]
 │  └─ x [id C]
 └─ Sum [id B]
    └─ ···


<ipykernel.iostream.OutStream at 0x78eec7b45030>

Extend the Transpose Op with `__props__` so that two instances with the same axis evaluate equal.

In [19]:
class Transpose(Op):
    ...

@test
def test_transpose_op_with_axes_and_props(op_class):
    x = pt.tensor("x", shape=(2, None, 4))
    x_test = np.arange(2 * 3 * 4).reshape((2, 3, 4))

    assert len(op_class.__props__)
    assert op_class(axis=(0, 2, 1)) == op_class(axis=(0, 2, 1))
    assert op_class(axis=(0, 2, 1)) != op_class(axis=(2, 0, 1))

# test_transpose_op_with_axes_and_props(Transpose)  # uncomment me

### Exercise 4, implement an Op that wraps `np.convolve`

In [20]:
class Convolve(Op):
    ...

def test_convolve(op_class):
    x = pt.vector("x", shape=(None,))
    y = pt.vector("y", shape=(3,))
    out = op_class()(x, y)

    x_test = np.arange(10).astype("float64")
    y_test = np.array([0, 1, 2]).astype=("float64")
    res = out.eval({x: x_test, y: y_test})

    np.testing.assert_allclose(res, np.convolve(x_test, y_test))

    res2 = out.eval({x: res, y: y_test})
    np.testing.assert_allclose(res, np.convolve(res, y_test))

# test_convolve(Convolve)  # uncomment me

Extend the Op to include the parameter `mode` that `np.convolve` also offers.

Extra points if the output shape is specified when that's possible

In [21]:
class Convolve(Op):
    ...

def test_convolve(op_class):
    x = pt.vector("x", shape=(10,))
    y = pt.vector("y", shape=(3,))

    x_test = np.arange(10).astype("float64")
    y_test = np.array([0, 1, 2]).astype=("float64")

    for mode in ("full", "valid", "same"):
        print(f"{mode=}")
        op = op_class(mode=mode)
        assert op == op_class(mode=mode)

        out = op(x, y)
        if out.type.shape != (None,):
            assert out.type.shape == np.convolve(x_test, y_test, mode=mode).shape


        res = out.eval({x: x_test, y: y_test})
        np.testing.assert_allclose(res, np.convolve(x_test, y_test, mode=mode))

# test_convolve(Convolve)  # uncomment me

Open-ended challenge: implement an Op of your choosing.

Some ideas of Ops that don't currently exist in PyTensor:
* [numpy.frexp](https://numpy.org/doc/2.1/reference/generated/numpy.frexp.html)
* [numpy.nextafter](https://numpy.org/doc/2.1/reference/generated/numpy.nextafter.html)
* [scipy.special.gauss_spline](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.gauss_spline.html#scipy.signal.gauss_spline)
* Anything else you fancy