Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: StepInterfaceError: Unable to find materializer for output 'output' of type <class 'keras.engine.training.Model'> in step 'trainer'. #874

Closed
1 task done
Coder-Vishali opened this issue Sep 5, 2022 · 15 comments
Assignees
Labels
bug Something isn't working

Comments

@Coder-Vishali
Copy link

Contact Details [Optional]

vishalisrinivasan97@gmail.com

System Information

ZenML version: 0.13.1
Install path: /home//.pyenv/versions/3.7.6/lib/python3.7/site-packages/zenml
Python version: 3.7.6
Platform information: {'os': 'linux', 'linux_distro': 'ubuntu', 'linux_distro_like': 'debian', 'linux_distro_version': '18.04'}
Environment: native
Integrations: ['kubeflow', 'kubernetes', 'tensorboard', 'tensorflow']

What happened?

When I try to implement this code "https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration"; at the step "Run pipeline":
_

Initialize the pipeline

first_pipeline = mnist_pipeline(
importer=importer(),
normalizer=normalizer(),
trainer=trainer(),
evaluator=evaluator(),
)

first_pipeline.run()

_

When I try to run the above snippet, I face StepInterfaceError

StepInterfaceError: Unable to find materializer for output 'output' of type <class 'keras.engine.training.Model'>
in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using
step.with_return_materializers(...) or registering a default materializer for specific types by subclassing
BaseMaterializer and setting its ASSOCIATED_TYPES class variable. For more information, visit
https://docs.zenml.io/developer-guide/advanced-usage/materializer.

Reproduction steps

You can get the entire source code from here: https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration

Initialize the pipeline

first_pipeline = mnist_pipeline(
importer=importer(),
normalizer=normalizer(),
trainer=trainer(),
evaluator=evaluator(),
)

first_pipeline.run()

Relevant log output

Creating run for pipeline: mnist_pipeline
Cache enabled for pipeline mnist_pipeline
Using stack default to run pipeline mnist_pipeline...

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ <ipython-input-19-3e8dc075b972>:9 in <module>                                                    │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/pipelines/base_pipeline.py:449 in run               │
│                                                                                                  │
│   446 │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True                                 │
│   447 │   │   try:                                                                               │
│   448 │   │   │   return_value = stack.deploy_pipeline(                                          │
│ ❱ 449 │   │   │   │   self, runtime_configuration=runtime_configuration                          │
│   450 │   │   │   )                                                                              │
│   451 │   │   finally:                                                                           │
│   452 │   │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False                            │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/stack/stack.py:832 in deploy_pipeline               │
│                                                                                                  │
│   829 │   │   )                                                                                  │
│   830 │   │                                                                                      │
│   831 │   │   return_value = self.orchestrator.run(                                              │
│ ❱ 832 │   │   │   pipeline, stack=self, runtime_configuration=runtime_configuration              │
│   833 │   │   )                                                                                  │
│   834 │   │                                                                                      │
│   835 │   │   # Put pipeline level cache policy back to make sure the next runs                  │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/base_orchestrator.py:250 in run       │
│                                                                                                  │
│   247 │   │   # Create the protobuf pipeline which will be needed for various reasons            │
│   248 │   │   # in the following steps                                                           │
│   249 │   │   pb2_pipeline: Pb2Pipeline = Compiler().compile(                                    │
│ ❱ 250 │   │   │   create_tfx_pipeline(pipeline, stack=stack)                                     │
│   251 │   │   )                                                                                  │
│   252 │   │                                                                                      │
│   253 │   │   self._configure_node_context(                                                      │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/utils.py:50 in create_tfx_pipeline    │
│                                                                                                  │
│    47 │   │   │   the pipeline.                                                                  │
│    48 │   """
│    49 │   # Connect the inputs/outputs of all steps in the pipeline                              │
│ ❱  50 │   zenml_pipeline.connect(**zenml_pipeline.steps)                                         │
│    51 │                                                                                          │
│    52 │   tfx_components = {                                                                     │
│    53 │   │   step.name: step.component for step in zenml_pipeline.steps.values()                │
│ <ipython-input-18-3f1c2fa0b83f>:11 in mnist_pipeline                                             │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:707 in __call__                  │
│                                                                                                  │
│   704 │   │   }                                                                                  │
│   705 │   │                                                                                      │
│   706 │   │   # make sure we have registered materializers for each output                       │
│ ❱ 707 │   │   materializers = self.get_materializers(ensure_complete=True)                       │
│   708 │   │                                                                                      │
│   709 │   │   # Prepare the output artifacts and spec                                            │
│   710 │   │   for key, value in self.OUTPUT_SIGNATURE.items():                                   │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:366 in get_materializers         │
│                                                                                                  │
│   363 │   │   │   │   │   │   f"registering a default materializer for specific "
│   364 │   │   │   │   │   │   f"types by subclassing `BaseMaterializer` and setting "
│   365 │   │   │   │   │   │   f"its `ASSOCIATED_TYPES` class variable.",                         │
│ ❱ 366 │   │   │   │   │   │   url="https://docs.zenml.io/developer-guide/advanced-usage/materi   │
│   367 │   │   │   │   │   )                                                                      │
│   368 │   │                                                                                      │
│   369 │   │   return materializers                                                               │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'output' of type `<class 'keras.engine.training.Model'>`
in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using 
`step.with_return_materializers(...)` or registering a default materializer for specific types by subclassing 
`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more information, visit 
https://docs.zenml.io/developer-guide/advanced-usage/materializer.

Code of Conduct

  • I agree to follow this project's Code of Conduct
@Coder-Vishali Coder-Vishali added the bug Something isn't working label Sep 5, 2022
@dnth dnth self-assigned this Sep 6, 2022
@ahmadmustafaanis
Copy link

ahmadmustafaanis commented Sep 8, 2022

I am getting the same error for my simple code.
Here is my code

from zenml.steps import step, Output

@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
    """
    return MNIST x_train, y_train, x_test, y_test
    """
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    
    return x_train, y_train, x_test, y_test

@step
def initialize_model() -> Output(untrained_model=tf.keras.Sequential):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

@step
def preprocess_data(x:np.ndarray, y:np.ndarray) ->Output(x_processed= np.ndarray, y_processed=np.ndarray):
    x_processed = x.astype("float32") / 255
    x_processed = np.expand_dims(x_processed, -1)
    
    y_processed = keras.utils.to_categorical(y, num_classes)
    
    return x_processed, y_processed

@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Sequential) -> Output(trained_model=tf.keras.Sequential, history=dict):
    history = untrained_model.fit(x_train, y_train, epochs=epochs)
    return untrained_model, history 

@step
def eval_model(trained_model: tf.keras.Sequential, x_test:np.ndarray, y_test:np.ndarray) -> None:
    trainedmodel.evaluate(x_test, y_test)

from zenml.pipelines import pipeline

@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
    x_train, y_train, x_test, y_test = data_loader()
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)    
    model = initialize_model()
    model, history = train_model(x_train_processed, y_train_processed, model)
    
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())

mnist_pipeline.run()

The error is

Creating run for pipeline: runner
Cache enabled for pipeline runner
Using stack default to run pipeline runner...
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /tmp/ipykernel_26594/730396930.py:1 in <cell line: 1>                                            │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/730396930.py'                         │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/pipelines/base_pipeline.py:44 │
│ 8 in run                                                                                         │
│                                                                                                  │
│   445 │   │   # behavior                                                                         │
│   446 │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True                                 │
│   447 │   │   try:                                                                               │
│ ❱ 448 │   │   │   return_value = stack.deploy_pipeline(                                          │
│   449 │   │   │   │   self, runtime_configuration=runtime_configuration                          │
│   450 │   │   │   )                                                                              │
│   451 │   │   finally:                                                                           │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/stack/stack.py:831 in         │
│ deploy_pipeline                                                                                  │
│                                                                                                  │
│   828 │   │   │   pipeline=pipeline, runtime_configuration=runtime_configuration                 │
│   829 │   │   )                                                                                  │
│   830 │   │                                                                                      │
│ ❱ 831 │   │   return_value = self.orchestrator.run(                                              │
│   832 │   │   │   pipeline, stack=self, runtime_configuration=runtime_configuration              │
│   833 │   │   )                                                                                  │
│   834                                                                                            │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/base_orchestrat │
│ or.py:250 in run                                                                                 │
│                                                                                                  │
│   247 │   │   # Create the protobuf pipeline which will be needed for various reasons            │
│   248 │   │   # in the following steps                                                           │
│   249 │   │   pb2_pipeline: Pb2Pipeline = Compiler().compile(                                    │
│ ❱ 250 │   │   │   create_tfx_pipeline(pipeline, stack=stack)                                     │
│   251 │   │   )                                                                                  │
│   252 │   │                                                                                      │
│   253 │   │   self._configure_node_context(                                                      │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/utils.py:50 in  │
│ create_tfx_pipeline                                                                              │
│                                                                                                  │
│    47 │   │   │   the pipeline.                                                                  │
│    48 │   """                                                                                    │
│    49 │   # Connect the inputs/outputs of all steps in the pipeline                              │
│ ❱  50 │   zenml_pipeline.connect(**zenml_pipeline.steps)                                         │
│    51 │                                                                                          │
│    52 │   tfx_components = {                                                                     │
│    53 │   │   step.name: step.component for step in zenml_pipeline.steps.values()                │
│                                                                                                  │
│ /tmp/ipykernel_26594/100900222.py:7 in runner                                                    │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/100900222.py'                         │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:707 in     │
│ __call__                                                                                         │
│                                                                                                  │
│   704 │   │   }                                                                                  │
│   705 │   │                                                                                      │
│   706 │   │   # make sure we have registered materializers for each output                       │
│ ❱ 707 │   │   materializers = self.get_materializers(ensure_complete=True)                       │
│   708 │   │                                                                                      │
│   709 │   │   # Prepare the output artifacts and spec                                            │
│   710 │   │   for key, value in self.OUTPUT_SIGNATURE.items():                                   │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:357 in     │
│ get_materializers                                                                                │
│                                                                                                  │
│   354 │   │   │   │   materializers[output_name] = materializer                                  │
│   355 │   │   │   else:                                                                          │
│   356 │   │   │   │   if ensure_complete:                                                        │
│ ❱ 357 │   │   │   │   │   raise StepInterfaceError(                                              │
│   358 │   │   │   │   │   │   f"Unable to find materializer for output "                         │
│   359 │   │   │   │   │   │   f"'{output_name}' of type `{output_type}` in step "                │
│   360 │   │   │   │   │   │   f"'{self.name}'. Please make sure to either "                      │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class 
'keras.engine.sequential.Sequential'>` in step 'initialize_model'. Please make sure to either explicitly set a 
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more 
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.

@dudeperf3ct
Copy link
Contributor

@ahmadmustafaanis the error you are getting is because there's no materializer written for type <class 'keras.engine.sequential.Sequential'>.

The code should run if you replace tf.keras.Sequential with tf.keras.Model since Sequential inherits from Model class.

@ahmadmustafaanis
Copy link

@dudeperf3ct I am still getting this error.

StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class 
'keras.engine.training.Model'>` in step 'initialize_model'. Please make sure to either explicitly set a 
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more 
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.

the function

@step
def initialize_model() -> Output(untrained_model= tf.keras.Model):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

@dudeperf3ct
Copy link
Contributor

dudeperf3ct commented Sep 15, 2022

@ahmadmustafaanis what version of zenml are you using? I am able to run the pipeline using the following code using zenml 0.13.2.

import numpy as np
import tensorflow as tf

from zenml.steps import step, Output

num_classes = 10
epochs = 10

@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
    """
    return MNIST x_train, y_train, x_test, y_test
    """
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    
    return x_train, y_train, x_test, y_test

@step
def initialize_model() -> Output(untrained_model=tf.keras.Model):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

@step
def preprocess_data(x:np.ndarray, y:np.ndarray) -> Output(x_processed= np.ndarray, y_processed=np.ndarray):
    x_processed = x.astype("float32") / 255
    x_processed = np.expand_dims(x_processed, -1)
    
    y_processed = tf.keras.utils.to_categorical(y, num_classes)
    
    return x_processed, y_processed

@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Model) -> Output(trained_model=tf.keras.Model, history=dict):
    history = untrained_model.fit(x_train, y_train, epochs=epochs)
    return untrained_model, history.history

@step
def eval_model(trained_model: tf.keras.Model, x_test:np.ndarray, y_test:np.ndarray) -> None:
    trained_model.evaluate(x_test, y_test)

from zenml.pipelines import pipeline

@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
    x_train, y_train, x_test, y_test = data_loader()
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)    
    model = initialize_model()
    model, history = train_model(x_train_processed, y_train_processed, model)
    
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())

mnist_pipeline.run()

@ahmadmustafaanis
Copy link

My version is

└[~]> zenml --version
zenml, version 0.13.1

@dnth
Copy link
Contributor

dnth commented Sep 16, 2022

@ahmadmustafaanis I'm able to run @dudeperf3ct code with zenml==0.13.1 and zenml==0.13.0

Did you install tensorflow using pip? Or using zenml integration eg zenml integration install tensorflow?

@ahmadmustafaanis
Copy link

Yes I installed it via pip, not integrated via zenml.

I guess that's the issue, I'll try with zenml integration this weekend.

@dnth
Copy link
Contributor

dnth commented Sep 16, 2022

@ahmadmustafaanis quite possibly. Using the zenml integration install makes sure the package is installed correctly. Keep us updated here! :))

@ahmadmustafaanis
Copy link

ahmadmustafaanis commented Sep 17, 2022

@dnth Thanks, working like charm now. This means that almost every object, that is a output from a zenml step(or input to), needs to be installed via zenml integration install right?

@dnth
Copy link
Contributor

dnth commented Sep 19, 2022

@ahmadmustafaanis yes, for every object that passes through the pipeline, you need to tell zenml how to serialize/deserialize it. In this way this object can be cached and stored. In zenml this is done using something called a Materializer. For most Python objects you don't need to write your own Materializer, these are already builtin within zenml. But for custom objects you'd need to write a custom materializer for it.

Here's a guide how to write a custom materializer
https://docs.zenml.io/v/0.6.0/guides/index/custom-materializer

A Keras model is not a standard Python object so you'd need to write one. But this has been done by someone in zenml and to use it you'll have to install the integration with zenml integration install tensorflow.

@dnth
Copy link
Contributor

dnth commented Sep 19, 2022

@dudeperf3ct any chance this also solves your issue? :)

@dudeperf3ct
Copy link
Contributor

@dnth i didn't face any issue ... i was helping in resolving the issue

@dnth
Copy link
Contributor

dnth commented Sep 19, 2022

@dudeperf3ct I see.. sorry I misunderstood! Thank you so much for your help!

@dnth
Copy link
Contributor

dnth commented Sep 19, 2022

@Coder-Vishali please let us know if this solves the issue for you :)

@dnth
Copy link
Contributor

dnth commented Sep 28, 2022

@Coder-Vishali I'm closing this issue due to inactivity. Please let us know if the solution provided worked for you. Feel free to reopen if the problem still persists :)

@dnth dnth closed this as completed Sep 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants