This is a local, non-Kaggle notebook in which TFX 1.16.0 and python 3.10 and the compatible versions of other libraries are installed in a virtual environment that this notebook is running in.

paths are relative to the github repository directory, "recommender_systems"

In [None]:
import shutil

from tfx.orchestration import metadata

import tensorflow_transform as tft

from ml_metadata.proto import metadata_store_pb2
from ml_metadata.metadata_store import metadata_store

import sys
import os
sys.path.append(os.path.join(os.getcwd(), "src/test/python/movie_lens_tfx"))
sys.path.append(os.path.join(os.getcwd(), "src/main/python/movie_lens_tfx"))

from helper import *
from movie_lens_tfx.PipelineComponentsFactory import *
from movie_lens_tfx.tune_train_movie_lens import *

from absl import logging
tf.get_logger().propagate = False
logging.set_verbosity(logging.WARNING)
logging.set_stderrthreshold(logging.WARNING)

## EDA on the raw data

### w/ Polars and Plotly express
output is written to bin/local_notebook/images/

%run src/main/python/eda/eda_raw.py

the generated images aren't plotted here, but similar plots are shown via TFDV below.

### Run data pre-processing on full dataset to get the transformed data

In [None]:
infiles_dict_ser, output_config_ser, split_names = get_test_data(use_small=False)
user_id_max = 6040
movie_id_max = 3952
n_genres = N_GENRES
n_age_groups = N_AGE_GROUPS
n_occupations = 21
MIN_EVAL_SIZE = 50 #make this larger for production pipeline

BATCH_SIZE = 64
NUM_EPOCHS = 20

test_num = "1"
    
PIPELINE_NAME = 'TestPipelines'
output_data_dir = os.path.join(get_bin_dir(), "local_notebook", test_num)
PIPELINE_ROOT = os.path.join(output_data_dir, PIPELINE_NAME)

# remove results from previous test runs:
try:
  print(f"removing: {PIPELINE_ROOT}")
  shutil.rmtree(PIPELINE_ROOT)
except OSError as e:
  pass
METADATA_PATH = os.path.join(PIPELINE_ROOT, 'tfx_metadata',
                             'metadata.db')
os.makedirs(os.path.join(PIPELINE_ROOT, 'tfx_metadata'),
            exist_ok=True)

ENABLE_CACHE = True

# metadata_connection_config = metadata_store_pb2.ConnectionConfig()
# metadata_connection_config.sqlite.SetInParent()
# metadata_connection = metadata.Metadata(metadata_connection_config)
metadata_connection_config = metadata.sqlite_metadata_connection_config(
  METADATA_PATH)

store = metadata_store.MetadataStore(metadata_connection_config)

if get_kaggle():
  tr_dir = "/kaggle/working/"
else:
  tr_dir = os.path.join(get_project_dir(), "src/main/python/movie_lens_tfx")

serving_model_dir = os.path.join(PIPELINE_ROOT, 'serving_model')
output_parquet_path = os.path.join(PIPELINE_ROOT, "transformed_parquet")

# for the custom ingestion component, the apache beam pipeline needs to be able to
# find the sibling scripts it imports.
# 2 solutions: (1) create a tar archive and use --extra_package in pipeline args
# or (2) use setup.py and --setup_file in pipeline args.

beam_pipeline_args = [
  '--direct_running_mode=multi_processing',
  '--direct_num_workers=0',
  '--setup_file=setup.py',
  #f'--extra_package={ingest_tar_file}'
]

In [None]:
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

tf.get_logger().propagate = False
import logging
logging.getLogger().setLevel(logging.WARNING)
from absl import logging
logging.set_verbosity(logging.WARNING)
logging.set_stderrthreshold(logging.WARNING)

context = InteractiveContext(pipeline_name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT,
  metadata_connection_config=metadata_connection_config,
  beam_pipeline_args=beam_pipeline_args
)

factory = PipelineComponentsFactory(num_examples=1000209, infiles_dict_ser, output_config_ser, tr_dir,
    user_id_max, movie_id_max, n_genres, n_age_groups,
    MIN_EVAL_SIZE, batch_size=BATCH_SIZE, num_epochs=NUM_EPOCHS, device="CPU",
    serving_model_dir, output_parquet_path)

components = factory.build_components(PIPELINE_TYPE.PREPROCESSING)

for component in components:
    context.run(component)

print(f'done pre-processing data')

## EDA on the transformed data

### using Polars, Plotly.express 

this can take an hour on a single COTS

%run src/main/python/eda/eda_transformed.py

Some of the images are shown here from a former saved run.

You can see a clear correlation between rating and movie_id, then less
so between rating and age and rating and occupation.

<img src="src/test/resources/train_dist_corr_heatmap.png">

You can see many cooccurences between movie genres.

<img src="src/test/resources/train_genre_cooccurence_rating_5_heatmap.png">

You can see that Drama, then Comedy, then Action are the most popular movie genres.

<img src="src/test/resources/train_genres_rating_5.png">


### using Pandas, Pyspark MLLIB FPGrowth

This does a market basket analysis with movie_ids.

If you want the PrefixSpan plots also, set
PLOT_PREFIXSPAN = True
but beware that the script will take much longer to run.

In [None]:
PLOT_PREFIXSPAN=False

%run src/main/python/eda/eda_transformed_pyspark_mllib.py

Some of the images are shown here from a former saved run.

You can see that many association rules can be derived from the data.
<img src="src/test/resources/train_movies_assoc_rules_rating_5.png">

and that there are many frequent itemsets.
<img src="src/test/resources/train_movies_itemsets_rating_5.png">

<img src="src/test/resources/train_movies_itemsets_rating_5_2.png">

The results show that neural network models will have good material to exploit.

The PrefixScan model can be enabled in the script to provide images to explore sequences
for actions or as precursor to Sequential DNN models.

### Data and Concept Drift
After exploring the data inputs to the model, we want to define monitoring
for data and concept shifts.

let X = features

let Y = targets

Data shift is a change in the joint distribution, P(X, Y). 

Using the probability product rule, we can explore 4 causes for the
simplest changes in P(X,Y)
$$ P(X, Y) = P(Y, X) = P(X|Y)P(Y) = P(Y|X)P(X) $$

We can look for changes in one member in the following pairs at any time
(one is simpler than exploring more than 1 member changing at same time):
$$ P(X|Y) * P(Y) $$
$$ P(Y|X) * P(X) $$

* Covariate shift:
  $$ P(X) changed.  P(Y|X) unchanged $$
  Distr of model inputs changes.
* Label shift:
  $$ P(Y) changed,  P(X|Y) unchanged $$
  Distr of model outputs changes, but for any given output, the input distribution stays the same.
  
* Concept shift:
  $$ P(X) unchanged,  P(Y|X) changed $$
* Manifestation shift:
  $$ P(Y) unchanged,  P(X|Y) changed $$

[see more at NannyML](https://www.nannyml.com/blog/concept-drift)

if the train, eval, and test split are random, then one could use
the maximum differences in their distributions as a lower limit on 
the estimate for stochastic error.  A trigger for data drift should
then be GEQ about 3 times that stochastic error.

The previous data doesn't exist yet for this project, but one could
either download another movie-lens dataset of different time period,
or split this 1M dataset by a timestamp ordering into 2 partitions.

More data is available at [GroupLens](https://files.grouplens.org/datasets/movielens/)

### Data [Drift and Skew using TFDV](https://www.tensorflow.org/tfx/tutorials/data_validation/tfdv_basic)
* Drift
  Drift detection is supported for categorical features and between consecutive spans of data
  (i.e., between span N and span N+1), such as between different days of training data.
    * L-infinity distance
    * alerts when drift is higher than threshold distance
* Skew
    * data - schema skew
      occurs when the training and serving data do not conform to the same schema.
      Any expected deviations between the two (such as the label feature being only present in the training data but not in serving) should be specified through environments field in the schema.
    * feature skew
      occurs when the training and serving feature values are different
      This can happen when:
        * A data source of features is modified between training and serving time
        * A difference in logic for generating between training and serving.
    * distribution skew
      occurs when the training and serving distributions are significantly different.
      some key causes:
        * different code or different data sources to generate the training dataset
        * a faulty sampling mechanism that chooses a non-representative subsample of the serving data to train on.


### using TFDV to look at the raw data

In [None]:
from tfx.dsl.io import fileio
from tfx.orchestration import metadata
from tfx.components import StatisticsGen, SchemaGen, ExampleValidator
from tfx.utils import io_utils
from tensorflow_metadata.proto.v0 import anomalies_pb2, schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2
from tensorflow_transform.tf_metadata import schema_utils
import tensorflow_data_validation as tfdv

In [None]:
print("Schema from SchemaGen:")
_list = store.get_artifacts_by_type("Schema")
print(f'Schema count={len(_list)}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "SchemaGen" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_path = os.path.join(artifact_uri, "schema.pbtxt")
schema = tfdv.load_schema_text(file_path)
#tfdv.visualize_artifacts(schema)
tfdv.display_schema(schema=schema)

print("ExampleStatistcs from StatisticsGen:")
_list = store.get_artifacts_by_type("ExampleStatistics")
print(f'ExampleStatistics count={len(_list)}')
#print(f'ExampleStatistics ={_list}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "StatisticsGen" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_paths = [os.path.join(artifact_uri, name, "FeatureStats.pb") 
  for name in os.listdir(artifact_uri)]
stats_dict = {}
for file_path in file_paths:
    if "test" in file_path:
        #plotting train and eval
        continue
    try:
        with open(file_path, 'rb') as f:
            serialized_stats = f.read()
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")    
    stats = statistics_pb2.DatasetFeatureStatisticsList()
    stats.ParseFromString(serialized_stats)
    if "train" in file_path:
        stats_dict['train'] = stats
    else:
        stats_dict['eval'] = stats
tfdv.visualize_statistics(lhs_statistics=stats_dict['eval'],
    rhs_statistics=stats_dict['train'],
    lhs_name='EVAL_DATASET', rhs_name='TRAIN_DATASET')

print("ExampleAnomalies from ExampleValidator:")
_list = store.get_artifacts_by_type("ExampleAnomalies")
print(f'ExampleAnomalies count={len(_list)}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "ExampleValidator" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_paths = [os.path.join(artifact_uri, name, "SchemaDiff.pb") 
  for name in os.listdir(artifact_uri)]
for file_path in file_paths:
    try:
        with open(file_path, 'rb') as f:
            serialized_anomalies = f.read()
    except FileNotFoundError:
        print(f"Error: Anomaly file not found at {file_path}")
    anomalies = anomalies_pb2.Anomalies()
    anomalies.ParseFromString(serialized_anomalies)
    tfdv.display_anomalies(anomalies)

print("ExampleStatistics from pre-transform stats of Transform:")
_list = store.get_artifacts_by_type("ExampleStatistics")
#print(f'ExampleStatistics={_list}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "pre_transform_stats" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_paths = [os.path.join(artifact_uri, name) 
  for name in os.listdir(artifact_uri)]
for file_path in file_paths:
    try:
        with open(file_path, 'rb') as f:
            serialized_stats = f.read()
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")    
    stats = statistics_pb2.DatasetFeatureStatisticsList()
    stats.ParseFromString(serialized_stats)
    tfdv.visualize_statistics(stats)
    

### using TFDV to look at the transformed data

In [None]:

print("Schema from post-Transform:")
_list = store.get_artifacts_by_type("Schema")
print(f'Schema count={len(_list)}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "post_transform_schema" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_path = os.path.join(artifact_uri, "schema.pbtxt")
schema = tfdv.load_schema_text(file_path)
#tfdv.visualize_artifacts(schema)
tfdv.display_schema(schema=schema)

print("ExampleStatistics from post-transform stats of Transform:")
_list = store.get_artifacts_by_type("ExampleStatistics")
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "post_transform_stats" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_paths = [os.path.join(artifact_uri, name) 
  for name in os.listdir(artifact_uri)]
for file_path in file_paths:
    try:
        with open(file_path, 'rb') as f:
            serialized_stats = f.read()
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")    
    stats = statistics_pb2.DatasetFeatureStatisticsList()
    stats.ParseFromString(serialized_stats)
    tfdv.visualize_statistics(stats)
    
print("ExampleAnomalies from post-transform of Transform:")
_list = store.get_artifacts_by_type("ExampleAnomalies")
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "post_transform_anomalies" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
file_paths = [os.path.join(artifact_uri, name) 
  for name in os.listdir(artifact_uri)]
for file_path in file_paths:
    anomalies = anomalies_pb2.Anomalies()
    try:
        with open(file_path, 'rb') as f:
            serialized_anomalies = f.read()
    except FileNotFoundError:
        print(f"Error: Anomaly file not found at {file_path}")
    anomalies = anomalies_pb2.Anomalies()
    anomalies.ParseFromString(serialized_anomalies)
    tfdv.display_anomalies(anomalies)


## Save the data schema with version control

In [None]:
import shutil

_list = store.get_artifacts_by_type("Schema")
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "pre_transform_schema" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
pre_transform_file_path = [os.path.join(artifact_uri, name) for name in os.listdir(artifact_uri)][0]

for artifact in _list:
    if "post_transform_schema" in artifact.uri:
        artifact_uri = artifact.uri
        break
assert(artifact_uri is not None)
post_transform_file_path = [os.path.join(artifact_uri, name) for name in os.listdir(artifact_uri)][0]

pre_dir = os.path.join(get_project_dir(), "src/main/resources", "pre_transform")
post_dir = os.path.join(get_project_dir(), "src/main/resources", "post_transform")
os.makedirs(pre_dir, exist_ok=True)
os.makedirs(post_dir, exist_ok=True)

pre_schema_path = os.path.join(pre_dir, "schema.pbtxt")
post_schema_path = os.path.join(post_dir, "schema.pbtxt")

shutil.copyfile(pre_transform_file_path, pre_schema_path)
shutil.copyfile(post_transform_file_path, post_schema_path)


## Run baseline model pipeline with full dataset

In [None]:
pipeline_factory = PipelineComponentsFactory(
  num_examples=1000209,
  infiles_dict_ser=infiles_dict_ser, output_config_ser=output_config_ser,
  transform_dir=tr_dir, user_id_max=user_id_max, movie_id_max=movie_id_max,
  n_genres=n_genres, n_age_groups=n_age_groups, min_eval_size=MIN_EVAL_SIZE,
  batch_size=BATCH_SIZE, num_epochs=NUM_EPOCHS, device="CPU",
  serving_model_dir=serving_model_dir,
)

beam_pipeline_args = [
  '--direct_running_mode=multi_processing',
  '--direct_num_workers=0']

baseline_components = pipeline_factory.build_components(PIPELINE_TYPE.BASELINE,
  run_example_diff=False, pre_transform_schema_dir_path=pre_dir,
  post_transform_schema_dir_path=post_dir)

# create baseline model
my_pipeline = tfx.dsl.Pipeline(
  pipeline_name=PIPELINE_NAME,
  pipeline_root=PIPELINE_ROOT,
  components=baseline_components,
  enable_cache=ENABLE_CACHE,
  metadata_connection_config=metadata_connection_config,
  beam_pipeline_args=beam_pipeline_args,
)

tfx.orchestration.LocalDagRunner().run(my_pipeline)


In [None]:
print(f'plot ModelRun using tensorboard')
%load_ext tensorboard
_list = store.get_artifacts_by_type("ModelRun")
print(f'ModelRun count={len(_list)}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "Trainer" in artifact.uri:
        artifact_uri = artifact.uri
        break
%tensorboard --logdir {artifact_uri}

## Run full model pipeline with full dataset

In [None]:
artifact_types = store.get_artifact_types()
logging.debug(f"MLMD store artifact_types={artifact_types}")
artifacts = store.get_artifacts()
logging.debug(f"MLMD store artifacts={artifacts}")

components = pipeline_factory.build_components(PIPELINE_TYPE.PRODUCTION,
  run_example_diff=False, pre_transform_schema_dir_path=pre_dir,
  post_transform_schema_dir_path=post_dir)
  
# simulate experimentation of one model family
my_pipeline = tfx.dsl.Pipeline(
  pipeline_name=PIPELINE_NAME,
  pipeline_root=PIPELINE_ROOT,
  components=components,
  enable_cache=ENABLE_CACHE,
  metadata_connection_config=metadata_connection_config,
  beam_pipeline_args=beam_pipeline_args,
)

tfx.orchestration.LocalDagRunner().run(my_pipeline)


In [None]:
#add training and eval plots: Trainer/model_run
#  Tensorboard can visualize them
print(f'plot ModelRun using tensorboard')
%load_ext tensorboard
_list = store.get_artifacts_by_type("ModelRun")
print(f'ModelRun count={len(_list)}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
for artifact in _list:
    if "Trainer" in artifact.uri:
        artifact_uri = artifact.uri
        %tensorboard --logdir {artifact_uri}

print("ModelEvaluation from Evaluator:")
_list = store.get_artifacts_by_type("ModelEvaluation")
print(f'Schema count={len(_list)}')
_list = sorted(_list, key=lambda x: x.create_time_since_epoch, reverse=True)
artifact_uri = _list[0].uri
assert(artifact_uri is not None)
#file_path = os.path.join(artifact_uri, "metrics*")
eval_result = tfma.load_eval_result(artifact.uri)
tfma.view.render_slicing_metrics(eval_result)