In [1]:
import os
import tensorflow as tf
from typing import Any, Dict, Text
from modules.merge_dataset import merge_dataset
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from modules import pipeline, components
from absl import logging
from tfx import components
from tfx.dsl.components.common.resolver import Resolver
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import \
    LatestBlessedModelStrategy
from tfx.proto import example_gen_pb2, pusher_pb2, trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

In [2]:
PIPELINE_NAME = "movie-recommender-pipeline"

# pipeline inputs
MOVIES_DATA_ROOT = "data/movies"
RATING_DATA_ROOT = "data/merge"
MOVIES_TRANSFORM_MODULE_FILE = "modules/movies_transform.py"
RATING_TRANSFORM_MODULE_FILE = "modules/rating_transform.py"
TUNER_MODULE_FILE = "modules/tuner.py"
TRAINER_MODULE_FILE = "modules/trainer.py"

# pipeline outputs
OUTPUT_BASE = "outputs"

serving_model_dir = os.path.join(OUTPUT_BASE, "serving_model")
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

In [3]:
context = InteractiveContext(pipeline_root=pipeline_root)



In [4]:
output = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(
            name="train", hash_buckets=8),
        example_gen_pb2.SplitConfig.Split(name="eval", hash_buckets=2),
    ])
)       

In [5]:
movies_example_gen = components.CsvExampleGen(
    input_base=MOVIES_DATA_ROOT,
    output_config=output
)
context.run(movies_example_gen)



0,1
.execution_id,69
.component,"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } CsvExampleGen at 0x169ac0bd910.inputs{}.outputs['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x169c244fb50.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0.exec_properties['input_base']data/movies['input_config']{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }['output_config']{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 8,  ""name"": ""train""  },  {  ""hash_buckets"": 2,  ""name"": ""eval""  }  ]  } }['output_data_format']6['output_file_format']5['custom_config']None['range_config']None['span']0['version']None['input_fingerprint']split:single_split,num_files:1,total_bytes:1397382,xor_checksum:1669469445,sum_checksum:1669469445"
.component.inputs,{}
.component.outputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x169c244fb50.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
.inputs,{}
.outputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x169c244fb50.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"
.exec_properties,"['input_base']data/movies['input_config']{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }['output_config']{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 8,  ""name"": ""train""  },  {  ""hash_buckets"": 2,  ""name"": ""eval""  }  ]  } }['output_data_format']6['output_file_format']5['custom_config']None['range_config']None['span']0['version']None['input_fingerprint']split:single_split,num_files:1,total_bytes:1397382,xor_checksum:1669469445,sum_checksum:1669469445"

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x169c244fb50.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,outputs\movie-recommender-pipeline\CsvExampleGen\examples\69
.span,0
.split_names,"[""train"", ""eval""]"
.version,0

0,1
['input_base'],data/movies
['input_config'],"{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }"
['output_config'],"{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 8,  ""name"": ""train""  },  {  ""hash_buckets"": 2,  ""name"": ""eval""  }  ]  } }"
['output_data_format'],6
['output_file_format'],5
['custom_config'],
['range_config'],
['span'],0
['version'],
['input_fingerprint'],"split:single_split,num_files:1,total_bytes:1397382,xor_checksum:1669469445,sum_checksum:1669469445"

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x169c244fb50.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: outputs\movie-recommender-pipeline\CsvExampleGen\examples\69) at 0x169ac17eee0.type<class 'tfx.types.standard_artifacts.Examples'>.urioutputs\movie-recommender-pipeline\CsvExampleGen\examples\69.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,outputs\movie-recommender-pipeline\CsvExampleGen\examples\69
.span,0
.split_names,"[""train"", ""eval""]"
.version,0


In [6]:
rating_example_gen = components.CsvExampleGen(
    input_base=RATING_DATA_ROOT,
    output_config=output
)
context.run(rating_example_gen)

In [None]:
movies_statistics_gen = components.StatisticsGen(
    examples=movies_example_gen.outputs["examples"]
)
context.run(movies_statistics_gen)

In [None]:
rating_statistics_gen = components.StatisticsGen(
    examples=rating_example_gen.outputs["examples"]
)
context.run(rating_statistics_gen)

In [None]:
movies_schema_gen = components.SchemaGen(
    statistics=movies_statistics_gen.outputs["statistics"]
)
context.run(movies_schema_gen)

In [None]:
rating_schema_gen = components.SchemaGen(
    statistics=rating_statistics_gen.outputs["statistics"]
)
context.run(rating_schema_gen)

In [None]:
movies_example_validator = components.ExampleValidator(
    statistics=movies_statistics_gen.outputs["statistics"],
    schema=movies_schema_gen.outputs["schema"],
)
context.run(movies_example_validator)

In [None]:
rating_example_validator = components.ExampleValidator(
    statistics=rating_statistics_gen.outputs["statistics"],
    schema=rating_schema_gen.outputs["schema"],
)
context.run(rating_example_validator)


In [None]:
movies_transform = components.Transform(
    examples=movies_example_gen.outputs["examples"],
    schema=movies_schema_gen.outputs["schema"],
    module_file=os.path.abspath(MOVIES_TRANSFORM_MODULE_FILE)
)
context.run(movies_transform)

In [None]:
rating_transform = components.Transform(
    examples=rating_example_gen.outputs["examples"],
    schema=rating_schema_gen.outputs["schema"],
    module_file=os.path.abspath(RATING_TRANSFORM_MODULE_FILE)
)
context.run(rating_transform)

In [None]:
trainer = components.Trainer(
    module_file=os.path.abspath(TRAINER_MODULE_FILE),
    examples=rating_transform.outputs["transformed_examples"],
    transform_graph=rating_transform.outputs["transform_graph"],
    schema=rating_transform.outputs["post_transform_schema"],
    # hyperparameters=tuner.outputs["best_hyperparameters"],
    train_args=trainer_pb2.TrainArgs(
        splits=["train"],
        num_steps=500,
    ),
    eval_args=trainer_pb2.EvalArgs(
        splits=["eval"],
        num_steps=100,
    ),
    custom_config={
        "epochs": 5,
        "movies": movies_transform.outputs["transformed_examples"],
        "movies_schema": movies_transform.outputs["post_transform_schema"],
    }
)
context.run(trainer)


In [None]:
model_resolver = Resolver(
    strategy_class=LatestBlessedModelStrategy,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing),
).with_id("Latest_blessed_model_resolve")
context.run(model_resolver)

In [None]:
pusher = components.Pusher(
    model=trainer.outputs["model"],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=os.path.join(
                serving_model_dir, "movie-recommender"
            ),
        )
    )
)
context.run(pusher)

In [None]:
try:
    loaded = tf.saved_model.load(pusher.outputs['pushed_model'].get()[0].uri)
    scores, titles = loaded(["42"])
    print(f"Recommendations: {titles[0][:3]}")
except BaseException as err:
    logging.error(err)