# Materializers

The precise way that data passes between the steps is dictated by `materializers`. The data that flows through steps 
are stored as artifacts and artifacts are stored in artifact stores. The logic that governs the reading and writing of 
data to and from the artifact stores lives in the materializers. In order to control more precisely how data 
flows between steps, one can simply create a custom materializer by sub-classing the `BaseMaterializer` class.

Materializers are also necessary when you want Custom objects to be passed between steps. Within this tutorial you'll learn how to build your own materializers.

In [1]:
%%capture
!pip install zenml

In [2]:
import os
from typing import Type

from zenml.artifacts import DataArtifact
from zenml.pipelines import pipeline
from zenml.steps import step

We'll start off with a custom class called `MyObj`. You can imagine this as a complex class that describes your training data with all its relevant metadata.

In [3]:
class MyObj:
    """Your custom implementation of a data object"""
    def __init__(self, name: str):
        self.name = name

Next we will construct a basic pipeline in which the an instance of this `MyObj` class is passed between the pipeline steps

In [4]:
@step
def step1() -> MyObj:
    """Step that returns one of your data objects"""
    return MyObj("aria_the_cat")

@step
def step2(my_obj: MyObj):
    """Step that uses one of your data objects as input"""
    print(my_obj.name)
    
@pipeline
def pipe(step1, step2):
    """Connecting the steps together in a pipeline"""
    custom_data_object = step1()
    step2(custom_data_object)

Lets run this pipeline and see how it behaves when it does not know how to write and read the `MyObj`instance to and from the Artifact Store.

In [5]:
# This will fail - don't worry, we will fix this together
pipe(
    step1=step1(),
    step2=step2()
).run()

[1;35mCreating run for pipeline: `[0m[33;21mpipe`[1;35m[0m
[1;35mCache enabled for pipeline `[0m[33;21mpipe`[1;35m[0m
[1;35mUsing stack `[0m[33;21msecrets_stack2`[1;35m to run pipeline `[0m[33;21mpipe`[1;35m...[0m


As you can see, you are getting a **StepInterfaceError**. Step1 is trying to write its output to the Artifact Store but can't find a Materializer to do so. Let's write one!


For Custom Materializers allyou need to do is inherit from the ZenML BaseMaterializer, define the ASSOCIATED_TYPES and implement the two methods (handle_input, handle_return). Make sure to use the zenml fileio for the implementation of the read/write. This ensures that this will work, even when you switch out the Artifact Store at a later point in time.

In [6]:
from zenml.materializers.base_materializer import BaseMaterializer
from zenml.io import fileio



class MyMaterializer(BaseMaterializer):
    """Materializer for you data object"""
    # Within the associated types you specify which object this materializer
    # is built to handle
    ASSOCIATED_TYPES = (MyObj,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[MyObj]) -> MyObj:
        """Read from artifact store"""
        super().handle_input(data_type)
        with fileio.open(os.path.join(self.artifact.uri, "data.txt"), "r") as f:
            name = f.read()
        return MyObj(name=name)

    def handle_return(self, my_obj: MyObj) -> None:
        """Write to artifact store"""
        super().handle_return(my_obj)
        with fileio.open(os.path.join(self.artifact.uri, "data.txt"), "w") as f:
            f.write(my_obj.name)

Let's try running our pipeline again, make sure to specify the return materializer for all steps that return the object_type associated with your custom materializer. 

In [7]:
pipe(
    step1=step1().with_return_materializers(MyMaterializer),
    step2=step2()
).run()

[1;35mCreating run for pipeline: `[0m[33;21mpipe`[1;35m[0m
[1;35mCache enabled for pipeline `[0m[33;21mpipe`[1;35m[0m
[1;35mUsing stack `[0m[33;21msecrets_stack2`[1;35m to run pipeline `[0m[33;21mpipe`[1;35m...[0m
[1;35mStep `[0m[33;21mstep1`[1;35m has started.[0m
[1;35mStep `[0m[33;21mstep1`[1;35m has finished in 0.124s.[0m
[1;35mStep `[0m[33;21mstep2`[1;35m has started.[0m
aria_the_cat
[1;35mStep `[0m[33;21mstep2`[1;35m has finished in 0.088s.[0m
[1;35mPipeline run `[0m[33;21mpipe-01_Apr_22-15_04_05_078985`[1;35m has finished in 0.223s.[0m


And voilà, your pipeline is up and running 