In [3]:
import os
os.environ['ENV'] = 'prod'
os.environ['REGION'] = 'apse1'
os.environ['TENANT'] ="in"
os.environ['RECO_S3_BUCKET'] = "p13n-reco-offline-prod"
os.environ['COUNTRY_KEY']= "in"
os.environ['AWS_REGION']= "ap-southeast-1"
os.environ['USE_REAL_CMS3']= "True"
os.environ['RECO_CREDENTIAL']= "-----BEGINRSAPRIVATEKEY-----\nMGICAQACEQCdHOlGnxIMWCMzjK2JAg37AgMBAAECEGOIwGTEO9vd3X9+jyiF4NECCQnoqDakDgSm2QIID9sadWN0XvMCCQLiqPkgVKSuIQIIDCAsWM+pJB8CCQG0jbIGCNX9MA==\n-----ENDRSAPRIVATEKEY-----"


import argparse, gc
import json
import os, time
import numpy as np
import s3fs
import pyarrow
import tensorflow as tf
from tqdm import tqdm

tfv1 = tf.compat.v1
tfv1.disable_v2_behavior()

# Enable memory growth for GPUs to avoid memory fragmentation
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    for device in physical_devices:
        tf.config.experimental.set_memory_growth(device, True)

import tensorflow_addons as tfa
import tensorflow_recommenders_addons as tfra

from common.config.utils import data_path, model_path
from common.config import TENANT
from tpfy.tf_model.tpfy_model_v3_mtl import TpfyModelV3, TpfyMtlModelConfig
from tpfy.etl.schema import TpfyMtlDatasetSchema
from model.parquet_dataset import TFParquetDataset
from tpfy.common import TpfyDataPath
from omegaconf import OmegaConf
from dataclasses import dataclass
from tpfy.train_v3_mtl import make_example_mtl, TpfyTrainConfig, TpfyConfig
from tpfy.helper import load_model_weights_from_s3
import pandas as pd
from model.losses import masked_binary_entropy_loss
from model.metrics import MaskedAUC
from sklearn.metrics import mean_squared_error
import pickle

content_popularity_tag = pd.read_csv('Content_popularity_tag_26Jan_to_1Feb.csv')
content_popularity_tag.drop_duplicates(subset=['sub_title_id'], inplace = True)
content_popularity_tag.reset_index(drop = True, inplace = True)
content_popularity_dict = content_popularity_tag.set_index('sub_title_id')['popularity_tag'].to_dict()    

class Args:
    """Simple class to hold training arguments (replaces argparse)"""
    def __init__(self):
        # Positional arguments
        self.model_name = "tpfy-v3-mtl-r2"
        self.date = "2026-02-09"  # Training date
        self.val_date = "2026-02-09"  # Validation date
        
        # Optional arguments
        self.conf = None
        self.max_epoch = None
        self.val_days = 1
        self.click_ns = 0.08
        self.variant = "cms3"
        self.num_workers = 4
        self.repeat = 1
        self.eval_freq = None
        self.lr = 1e-4
        self.batch_size = 512
        self.click_weight = 1.0
        self.watch_weight = 1.0
        self.upload = False  # Set to False if you don't want to upload to S3
        self.reload_local_model = None
        self.reload_s3_model = "tpfy-v3-mtl-r2"  # Set to None if starting fresh
        self.extract_activations = True
        self.output = None
        self.clear_nn = False
        self.ckpt = None
        self.verbose = False
        self.countries = None

args = Args()

# Display configuration
print("Training Configuration:")
print(f"  Model Name: {args.model_name}")
print(f"  Training Date: {args.date}")
print(f"  Validation Date: {args.val_date}")
print(f"  Variant: {args.variant}")
print(f"  Click NS: {args.click_ns}")
print(f"  Num Workers: {args.num_workers}")
print(f"  Reload Model: {args.reload_s3_model}")
print(f"  Upload: {args.upload}")

# Load configuration
config_name = f"tpfy/tpfy_config/mtl-{TENANT}.yaml"
if not os.path.exists(config_name):
    raise FileNotFoundError(f"Config file {config_name} not found")

hparams: TpfyConfig = OmegaConf.merge(
    OmegaConf.structured(TpfyConfig),
    OmegaConf.load(config_name),
)
print(f"\nLoaded config: {config_name}")

# Override batch size if specified
if args.batch_size:
    hparams.train.batch_size = args.batch_size

batch_size = hparams.train.batch_size
print(f"Batch size: {batch_size}")

# Load dataset
variant = args.variant
if variant and not variant.startswith("-"):
    variant = "-" + variant

session = tfv1.keras.backend.get_session()

def create_dataset(date, path = None):
    if path:
        data_path_str = path
    else:
        data_path_str = data_path(
            TpfyDataPath.S3_TPFY_IMPR_V3_AGG_MTL_EXTRACTED_EXAMPLES_VAR, TENANT
        ) % (variant, date)

    dataset = TFParquetDataset([data_path_str], TpfyMtlDatasetSchema, shuffle_files=True)
    print(f'Batch size : {batch_size}')
    tf_dataset = dataset.create_tf_dataset(batch_size).map(make_example_mtl)
    # tf_dataset = dataset.create_parallel_tf_dataset(
    #     batch_size,
    #     args.num_workers,
    #     num_epochs=1,
    #     queue_size=16,
    #     v2=True,
    #     row_transformer_factory=None,
    # ).map(make_example_mtl)
    # train_it = tfv1.data.make_initializable_iterator(tf_dataset)
    return tf_dataset

def load_and_compile_model():
    # Build model
    print(f"\n{'='*80}")
    print("BUILDING MODEL")
    print(f"{'='*80}")

    model = TpfyModelV3(
        hparams.model,
        click_ns=args.click_ns,
        enable_random_watch=hparams.train.enable_random_watch,
    )

    # Create optimizer (needed for compilation, even though we won't train)
    optimizer = tfa.optimizers.AdamW(
        weight_decay=0.0,  # Not needed for inference
        learning_rate=0.001,  # Not needed for inference
        epsilon=1e-4,
    )

    loss_dict = {
            "click": masked_binary_entropy_loss(from_logits=True),
            "watch": masked_binary_entropy_loss(from_logits=True),
            "random_watch": masked_binary_entropy_loss(from_logits=False),
            "paywall_view": masked_binary_entropy_loss(from_logits=True),
            "add_watchlist": masked_binary_entropy_loss(from_logits=True),
    }
    metric_dict = {
            "click": MaskedAUC(from_logits=True),
            "watch": MaskedAUC(from_logits=True),
            "random_watch": MaskedAUC(from_logits=False),
            "paywall_view": MaskedAUC(from_logits=True),
            "add_watchlist": MaskedAUC(from_logits=True),
    }

    optimizer = tfra.dynamic_embedding.DynamicEmbeddingOptimizer(optimizer)

    model.compile(optimizer=optimizer, loss=loss_dict, metrics=metric_dict)
    print("Model compiled")
    
    return model

def get_activations_and_labels(iterator, model, last_layer_tensor):
    features, labels, metadata = session.run(iterator)

    # Run model
    predictions = model(features, training=False)

    # Execute
    pred_values, activation_values = session.run(
        [predictions, last_layer_tensor]
    )

    return activation_values, labels, pred_values, metadata

def validation(iterator, model, last_layer_tensor, d=128, runs=10):
    # Use numpy arrays with preallocated size to avoid dynamic list growth
    # all_predictions = []
    all_labels = []
    all_variances = []
    all_content_ids = []
    all_dw_p_ids = []
    all_timestamps = []
    all_deepFMpredictions = []
    # mse_sum = 0.0
    valid_runs = 0

    for i in range(runs):
        try:
            H, y_batch_all_labels, pred_values, metadata = get_activations_and_labels(iterator, model, last_layer_tensor)
            y_batch = y_batch_all_labels['click']
            pred_values = pred_values['click']

            mask = (y_batch != -1)

            if not np.any(mask):
                continue

            H = H[mask.squeeze()]
            y_batch = y_batch[mask].reshape(sum(mask)[0], 1)
            pred_values = pred_values[mask].reshape(sum(mask)[0], 1)

            H = H / (np.linalg.norm(H, axis=1, keepdims=True) + 1e-8)
            variance = np.sqrt(np.diag(H @ A_inv @ H.T))
            # mean = H@theta

            # Append to lists (will concatenate once at the end)
            # all_predictions.append(mean)
            all_deepFMpredictions.append(pred_values)
            all_labels.append(y_batch)
            all_variances.append(variance)
            all_content_ids.extend([int(content_id) for content_id, mask_bool in zip(metadata['content_id'], mask) if mask_bool])
            all_dw_p_ids.extend([dw_p_id for dw_p_id, mask_bool in zip(metadata['dw_p_id'], mask) if mask_bool])
            all_timestamps.extend([timestamp for timestamp, mask_bool in zip(metadata['timestamp'], mask) if mask_bool])
            # mse_sum += mean_squared_error(y_batch, mean)
            valid_runs += 1

            # Clean up intermediate variables
            del H, y_batch, pred_values, mask, variance, y_batch_all_labels, metadata

        except tf.errors.OutOfRangeError:
            print(f"Iterator exhausted at run {i}")
            break

    if valid_runs == 0:
        return None, None, None, None, None, None

    # Concatenate once at the end (more memory efficient)
    # predictions = np.concatenate(all_predictions, axis=0) if all_predictions else np.array([])
    deepFMpredictions = np.concatenate(all_deepFMpredictions, axis=0)
    labels = np.concatenate(all_labels, axis=0) if all_labels else np.array([])
    variances = np.concatenate(all_variances, axis=0) if all_variances else np.array([])

    # Clean up lists
    del all_labels, all_variances, all_deepFMpredictions

    return deepFMpredictions, labels, variances, all_content_ids, all_dw_p_ids, all_timestamps

Training Configuration:
  Model Name: tpfy-v3-mtl-r2
  Training Date: 2026-02-09
  Validation Date: 2026-02-09
  Variant: cms3
  Click NS: 0.08
  Num Workers: 4
  Reload Model: tpfy-v3-mtl-r2
  Upload: False

Loaded config: tpfy/tpfy_config/mtl-in.yaml
Batch size: 512


In [4]:
validation_data_path = 's3://p13n-reco-offline-prod/upload_objects/test_vedansh/daily-mtl-extracted-cms3-minimum-5-contents/cd=2026-02-06/'
tf_dataset = create_dataset(args.date, validation_data_path)
iterator = tf_dataset.make_one_shot_iterator()
next_batch = iterator.get_next()
sample_features, sample_labels, sample_metadata = session.run(next_batch)
tpfy_model = load_and_compile_model()

prediction = tpfy_model(sample_features, training=False)
session.run([
    tfv1.global_variables_initializer(),
    tfv1.local_variables_initializer(),
    tfv1.tables_initializer()
])

plain_weights = load_model_weights_from_s3(
    args.model_name,
    use_s3=True,
    checkpoint_name='1770723470'
)
plain_weights_modified = {k.replace('train/', ''): v for k, v in plain_weights.items()}
restore_ops = tpfy_model.restore_plain_weights_ops(
    plain_weights_modified,
    clear_nn=args.clear_nn
)
session.run(restore_ops)

# Create NEW iterator (reset to start of dataset)
iterator = tf_dataset.make_one_shot_iterator()
next_batch = iterator.get_next()

# Get compress_output tensor (linear_input)
graph = tf.compat.v1.get_default_graph()
compress_output_tensor = graph.get_tensor_by_name('tpfy_model_v3/deepfm/add:0')

files s3://p13n-reco-offline-prod/upload_objects/test_vedansh/daily-mtl-extracted-cms3-minimum-5-contents/cd=2026-02-06/part-00019-tid-3703456604030915455-99f93d8f-91e2-40eb-8513-dac6e652d0be-3923-1-c000.snappy.parquet,s3://p13n-reco-offline-prod/upload_objects/test_vedansh/daily-mtl-extracted-cms3-minimum-5-contents/cd=2026-02-06/part-00065-tid-3703456604030915455-99f93d8f-91e2-40eb-8513-dac6e652d0be-3893-1-c000.snappy.parquet,s3://p13n-reco-offline-prod/upload_objects/test_vedansh/daily-mtl-extracted-cms3-minimum-5-contents/cd=2026-02-06/part-00047-tid-3703456604030915455-99f93d8f-91e2-40eb-8513-dac6e652d0be-3916-1-c000.snappy.parquet,s3://p13n-reco-offline-prod/upload_objects/test_vedansh/daily-mtl-extracted-cms3-minimum-5-contents/cd=2026-02-06/part-00094-tid-3703456604030915455-99f93d8f-91e2-40eb-8513-dac6e652d0be-3939-1-c000.snappy.parquet,s3://p13n-reco-offline-prod/upload_objects/test_vedansh/daily-mtl-extracted-cms3-minimum-5-contents/cd=2026-02-06/part-00075-tid-3703456604030

ValueError: Variable embedding_layer already exists, disallowed. Did you mean to set reuse=True or reuse=tf.AUTO_REUSE in VarScope?

In [15]:
lambda_=1.0
d=128
A = np.load('tpfy/neural_linUCB_training_data/A_285400.npy')
# b = np.zeros((d, 1), dtype=np.float64)
A_inv = np.linalg.inv(A)
run = 0

while True:
    start = time.time()
    # Find the validation evals for 10 runs
    val_results = validation(next_batch, tpfy_model, compress_output_tensor, runs=10)

    if val_results[0] is None:
        print(f"Validation failed at run {run}, skipping metrics")
        run += 1
        continue

    deepFMpredictions, labels, variances, content_ids, dw_p_ids, timestamps = val_results

    # Save results
    dumping_dict = {
        'dw_p_ids': dw_p_ids,
        'timestamps': timestamps,
        'content_ids': content_ids,
        'deepFMpredictions': deepFMpredictions.flatten(),
        'labels': labels.flatten(),
        'variances': variances
    }

    with open(f'tpfy/neural_linUCB_training_data/validation_dumping_dict/validation_stats_run_{run}.pkl', 'wb') as handle:
        pickle.dump(dumping_dict, handle)

    # Clean up large objects explicitly
    del labels, variances, content_ids, deepFMpredictions, dw_p_ids, timestamps, val_results

    # Force garbage collection and clear TF cache
    gc.collect()
    
    run += 10
    print(f'Run {run} completed in {time.time() - start} s!')

--------------
q Tensor("tpfy_model_v3_47/feature_prep/strided_slice:0", shape=(512, 32), dtype=float32)
k Tensor("tpfy_model_v3_47/feature_prep/watched_content_embedding_unpooled:0", shape=(512, ?, 32), dtype=float32)
Kw Tensor("tpfy_model_v3_47/feature_prep/GetSlotFids:1", shape=(512, ?), dtype=float32)
target embedding shape (512, 9, 32)
user embedding shape (512, 27, 32)
target: Tensor("tpfy_model_v3_47/feature_prep/target_feature/target_embeddings:0", shape=(512, 9, 32), dtype=float32)
user: Tensor("tpfy_model_v3_47/feature_prep/user_feature/user_embeddings:0", shape=(512, 27, 32), dtype=float32)
watched: Tensor("tpfy_model_v3_47/feature_prep/dot_prod_attention_pooling/cond/Merge:0", shape=(512, 32), dtype=float32)
fm_user Tensor("tpfy_model_v3_47/deepfm/fwfm/concat:0", shape=(512, 28, 32), dtype=float32)
fm_item Tensor("tpfy_model_v3_47/feature_prep/target_feature/target_embeddings:0", shape=(512, 9, 32), dtype=float32)
fwfm out Tensor("tpfy_model_v3_47/deepfm/fwfm/Reshape:0", sh

KeyboardInterrupt: 

In [5]:
len(deepFMpredictions), len(labels), len(variances), len(content_ids), len(dw_p_ids), len(timestamps)

(5120, 5120, 5120, 5120, 5120, 5120)

In [24]:
x = pd.DataFrame.from_dict(pd.read_pickle('tpfy/neural_linUCB_training_data/validation_dumping_dict/validation_stats_run_30.pkl'), orient='index').T

In [29]:
x['labels'] = x['labels'].astype(int)
x['deepFMpredictions'] = x['deepFMpredictions'].astype(float)
x['variances'] = x['variances'].astype(float)

In [30]:
x.groupby('labels')['deepFMpredictions'].mean()

labels
0   -0.870472
1   -0.347185
Name: deepFMpredictions, dtype: float64

In [31]:
x.groupby('labels')['variances'].mean()

labels
0    0.004308
1    0.004304
Name: variances, dtype: float64

In [32]:
x['popularity_category'] = x['content_ids'].apply(lambda x: content_popularity_dict.get(int(x)))

In [34]:
x.groupby('popularity_category')['deepFMpredictions'].mean()

popularity_category
High                  -0.184641
Low                   -0.903284
Medium                -0.640735
Negligible            -1.109829
Weekly New Releases   -0.049026
Name: deepFMpredictions, dtype: float64

In [35]:
x.groupby('popularity_category')['variances'].mean()

popularity_category
High                   0.004258
Low                    0.004321
Medium                 0.004316
Negligible             0.004273
Weekly New Releases    0.004206
Name: variances, dtype: float64