# End-to-End ESMM Recommender Pipeline Orchestration

This notebook demonstrates the orchestration of the ESMM recommender pipeline using the refactored scripts. It covers:
1. **Configuration**: Setting up mock paths and parameters.
2. **Data Preparation**: Simulating daily data splitting.
3. **EasyRec Configuration Generation**: Creating the pipeline config file for EasyRec training.
4. **Model Training (Simulated)**: Showing how EasyRec training would be invoked.
5. **Model Evaluation**: Calculating Group AUC on mock prediction data.

In [None]:
import os
import sys
import json
import pathlib
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import subprocess

# Add src to Python path to allow direct imports
module_path = os.path.abspath(os.path.join('..', 'src'))
if module_path not in sys.path:
    sys.path.append(module_path)
    print(f"Added {module_path} to sys.path")

try:
    # Import refactored functions/modules
    from data_processing.split_dataset import perform_split, list_parquet_files_in_gcs_directory, read_parquet_from_gcs as read_mock_parquet_gcs_split, write_df_to_parquet_gcs as write_mock_parquet_gcs_split
    from data_processing.auto_feature_config_generator import load_sample_from_gcs_parquet, classify_features, generate_input_fields_proto, generate_feature_config_proto, get_sample_dataframe
    from training.generate_and_run_train import group_features, populate_and_save_config # Using populate_and_save_config directly
    from evaluation.evaluate_model import create_mock_prediction_file, calculate_group_auc
    print("Successfully imported pipeline modules and functions.")
except ImportError as e:
    print(f"Error importing modules: {e}")
    print("Ensure all scripts have been refactored correctly and 'src' is accessible.")

## Section 3: Configuration

Define mock GCS paths, processing date, split fractions, config file paths, column names, and other parameters for the pipeline.

In [None]:
# --- General Configuration ---
DATE_TO_PROCESS = "20231101"
RANDOM_SEED = 42

# --- Mock GCS Paths ---
MOCK_GCS_BASE_BUCKET = "gs://mock-esmm-recommender-bucket"
MOCK_GCS_RAW_DATA_PATH = os.path.join(MOCK_GCS_BASE_BUCKET, "raw_data")
MOCK_GCS_PROCESSED_DATA_PATH_PREFIX = os.path.join(MOCK_GCS_BASE_BUCKET, "processed_data")
MOCK_GCS_MODEL_OUTPUT_PATH_PREFIX = os.path.join(MOCK_GCS_BASE_BUCKET, "model_output")
MOCK_GCS_PREDICTION_OUTPUT_PATH = os.path.join(MOCK_GCS_BASE_BUCKET, "prediction_output")

# Daily paths based on DATE_TO_PROCESS
MOCK_INPUT_GCS_DAY_PATH = os.path.join(MOCK_GCS_RAW_DATA_PATH, DATE_TO_PROCESS) # Input for split_dataset
MOCK_PROCESSED_DAY_PATH = os.path.join(MOCK_GCS_PROCESSED_DATA_PATH_PREFIX, DATE_TO_PROCESS)
MOCK_MODEL_DIR_DAY_PATH = os.path.join(MOCK_GCS_MODEL_OUTPUT_PATH_PREFIX, DATE_TO_PROCESS)

# Specific split paths (outputs of split_dataset, inputs for training)
MOCK_GCS_TRAIN_DATA_PATH = os.path.join(MOCK_PROCESSED_DAY_PATH, "train")
MOCK_GCS_EVAL_DATA_PATH = os.path.join(MOCK_PROCESSED_DAY_PATH, "validation")
MOCK_GCS_TEST_DATA_PATH = os.path.join(MOCK_PROCESSED_DAY_PATH, "test") # Though not used in ESMM template

# --- Config File Paths ---
CONFIG_TEMPLATE_PATH = "../configs/esmm_pipeline_template.config"
GENERATED_PIPELINE_CONFIG_PATH = f"../configs/generated_pipeline_{DATE_TO_PROCESS}.config"

# --- Data Splitting Parameters ---
TRAIN_FRACTION = 0.8
VALIDATION_FRACTION = 0.1
# Test fraction is 1.0 - TRAIN_FRACTION - VALIDATION_FRACTION

# --- Schema Inference & Feature Engineering Params ---
# load_sample_from_gcs_parquet will use its defaults: num_files_to_sample=1, num_rows_per_file_sample=1000

# --- EasyRec Training Parameters (Overrides for template) ---
TRAINING_PARAMS = {
    "batch_size": 4096,
    "num_epochs": 1, # Keep low for quick demo; increase for real training
    "learning_rate": 0.0005,
    "save_checkpoints_steps": 2000
}

# --- Evaluation Parameters ---
MOCK_PREDICTION_FILE = os.path.join(MOCK_GCS_PREDICTION_OUTPUT_PATH, DATE_TO_PROCESS, "predictions.parquet")
GROUP_BY_COLUMN_EVAL = "user_id"
CLICK_LABEL_COLUMN = "click"
CLICK_SCORE_COLUMN = "click_prediction_score" # Must match output of EasyRec predictor
CONVERSION_LABEL_COLUMN = "conversion"
CONVERSION_SCORE_COLUMN = "conversion_prediction_score" # Must match output of EasyRec predictor

print(f"Date to process: {DATE_TO_PROCESS}")
print(f"Mock GCS Train data path for schema inference: {MOCK_GCS_TRAIN_DATA_PATH}")
print(f"Generated pipeline config will be saved to: {os.path.abspath(GENERATED_PIPELINE_CONFIG_PATH)}")

## Section 4: Stage 1: Data Preparation - Splitting Daily Snapshot

This stage simulates the `split_dataset.py` script. In a real scenario, this script would interact with GCS to:
1. List all Parquet files for a given day (e.g., `gs://mock-bucket/raw_data/20231101/`).
2. Read these Parquet files into a single DataFrame.
3. Perform a row-level random split into train, validation, and test sets.
4. Save these splits back to GCS (e.g., into `gs://mock-bucket/processed_data/20231101/train/`, `.../validation/`, `.../test/`).

Here, we'll use the `perform_split` function directly with mock GCS functions that return sample DataFrames.

In [None]:
print("--- Stage 1: Data Splitting (Simulated) ---")
# The perform_split function uses mock GCS read/list functions internally for this demo.
# It will print messages about its (mock) GCS operations.
train_df_sample, validation_df_sample, test_df_sample = perform_split(
    input_gcs_day_path=MOCK_INPUT_GCS_DAY_PATH, 
    output_gcs_path_prefix=MOCK_GCS_PROCESSED_DATA_PATH_PREFIX, 
    train_fraction=TRAIN_FRACTION, 
    validation_fraction=VALIDATION_FRACTION, 
    random_seed=RANDOM_SEED
)

if train_df_sample is not None and not train_df_sample.empty:
    print("\nSuccessfully (mock) split data.")
    print(f"Train sample shape: {train_df_sample.shape}")
    # train_df_sample.head()
else:
    print("\nMock data splitting failed or returned empty train set. Using a fallback sample for next steps.")
    # Fallback to ensure notebook can proceed if mock split had issues or returned None
    train_df_sample = get_sample_dataframe(num_rows=int(1000 * TRAIN_FRACTION))
    validation_df_sample = get_sample_dataframe(num_rows=int(1000 * VALIDATION_FRACTION))
    test_df_sample = get_sample_dataframe(num_rows=int(1000 * (1.0 - TRAIN_FRACTION - VALIDATION_FRACTION)))
    print(f"Fallback Train sample shape: {train_df_sample.shape}")

# For the next stage (schema inference), we need a DataFrame sample.
# In a real scenario, load_sample_from_gcs_parquet would (mock) read from MOCK_GCS_TRAIN_DATA_PATH.
# Here, we directly use the train_df_sample from the previous (mock) split if it's valid.
df_for_schema_inference = train_df_sample 
if df_for_schema_inference.empty:
    print("Warning: df_for_schema_inference is empty, using a default sample.")
    df_for_schema_inference = get_sample_dataframe(num_rows=200) # Ensure it's not empty for classify_features

## Section 5: Stage 2: EasyRec Configuration Generation

This stage uses the `train_df_sample` (conceptually from the GCS training split path) to:
1. Infer feature types (`classify_features`).
2. Generate `input_fields` and `feature_config` protobuf snippets.
3. Automatically group features into 'user' and 'item' groups (`group_features`).
4. Populate the `esmm_pipeline_template.config` using the refactored `populate_and_save_config` function.

In [None]:
print("\n--- Stage 2: EasyRec Configuration Generation ---")

# 1. Classify features from the training data sample
print("Classifying features...")
classified_features = classify_features(df_for_schema_inference)
print(f"Classified features: {json.dumps(classified_features, indent=2)}")

# 2. Generate input_fields and feature_config protobuf snippets
print("\nGenerating protobuf snippets...")
input_fields_str = generate_input_fields_proto(classified_features)
feature_config_str = generate_feature_config_proto(classified_features)
# print("Input Fields Proto:\n", input_fields_str)
# print("\nFeature Config Proto:\n", feature_config_str)

# 3. Automatically group features
print("\nGrouping features...")
grouped_features = group_features(classified_features)
print(f"Grouped features: {json.dumps(grouped_features, indent=2)}")

# 4. Prepare GCS paths and training parameters for config population
gcs_paths_for_config = {
    "train": MOCK_GCS_TRAIN_DATA_PATH + "/", # Ensure trailing slash for directory
    "eval": MOCK_GCS_EVAL_DATA_PATH + "/",
    "model_dir": MOCK_MODEL_DIR_DAY_PATH + "/"
}

# Create a Namespace object similar to what argparse would produce for training_params
class ArgsNamespace:
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

training_params_args = ArgsNamespace(**TRAINING_PARAMS)

# 5. Populate and save the configuration file
print("\nPopulating and saving final pipeline configuration...")
generated_config_file = populate_and_save_config(
    template_config_path=CONFIG_TEMPLATE_PATH,
    output_config_path=GENERATED_PIPELINE_CONFIG_PATH,
    input_fields_str=input_fields_str, # Already indented by auto_feature_config_generator's wrapper
    feature_config_str=feature_config_str, # Already indented
    grouped_features=grouped_features,
    gcs_paths=gcs_paths_for_config,
    cli_args=training_params_args # Pass the Namespace object
)

if generated_config_file:
    print(f"\nGenerated EasyRec pipeline configuration saved to: {generated_config_file}")
    # Optional: Print a snippet of the generated config
    # with open(generated_config_file, 'r') as f:
    #     print("\n--- Snippet of Generated Config ---")
    #     for _ in range(30): # Print first 30 lines
    #         line = f.readline()
    #         if not line: break
    #         print(line, end='')
    #     print("...")
else:
    print("\nFailed to generate pipeline configuration.")

## Section 6: Stage 3: Model Training (Simulated)

This stage would involve executing the EasyRec training process using the configuration file generated in the previous step.

**Important:** Running this requires a fully configured EasyRec environment, including TensorFlow and all necessary dependencies. The GCS paths in the generated config must also be accessible and contain appropriately formatted Parquet data.

In [None]:
print("\n--- Stage 3: Model Training (Simulated) ---")

if generated_config_file and os.path.exists(generated_config_file):
    abs_generated_config_path = os.path.abspath(generated_config_file)
    training_command_parts = [
        "python", "-m", "easy_rec.python.train_eval",
        f"--pipeline_config_path={abs_generated_config_path}"
    ]
    training_command_str = " ".join(training_command_parts)
    
    print("The following command would be run for EasyRec training:")
    print(f"  {training_command_str}")
    
    # To actually run it (uncomment and ensure EasyRec is installed):
    # print("\nSimulating training execution (will likely fail if EasyRec is not set up)...\n")
    # try:
    #     process_result = subprocess.run(training_command_parts, check=True, text=True, capture_output=True)
    #     print("--- EasyRec Training STDOUT ---")
    #     print(process_result.stdout)
    #     print("--- End of EasyRec Training STDOUT ---")
    #     print("\nEasyRec training process completed successfully (simulated).")
    # except FileNotFoundError:
    #     print("Error: 'python' or EasyRec module not found. Cannot execute training.")
    #     print("Please ensure your Python environment is set up correctly for EasyRec.")
    # except subprocess.CalledProcessError as e:
    #     print(f"Error during EasyRec training execution (return code {e.returncode}):")
    #     print("--- STDOUT ---"); print(e.stdout)
    #     print("--- STDERR ---"); print(e.stderr)
    #     print("EasyRec training failed (simulated).")
    # except Exception as e:
    #     print(f"An unexpected error occurred during simulated training: {e}")
    print("\nNote: Actual training execution is commented out. Uncomment to try.")
else:
    print("Generated config file not found. Skipping training simulation.")

## Section 7: Stage 4: Model Evaluation

After training, predictions would be generated by EasyRec (either as part of evaluation or a separate prediction job). This stage uses the `evaluate_model.py` script to calculate Group AUC (GAUC) on these predictions.

We'll first create a mock prediction file for demonstration.

In [None]:
print("\n--- Stage 4: Model Evaluation (using mock predictions) ---")

# 1. Create a mock prediction file
mock_prediction_dir = os.path.dirname(MOCK_PREDICTION_FILE)
if not os.path.exists(mock_prediction_dir):
    os.makedirs(mock_prediction_dir)
    print(f"Created directory for mock predictions: {mock_prediction_dir}")

create_mock_prediction_file(
    file_path=MOCK_PREDICTION_FILE,
    num_rows=2000, 
    num_groups=50,
    group_col_name=GROUP_BY_COLUMN_EVAL,
    click_label_col=CLICK_LABEL_COLUMN,
    click_score_col=CLICK_SCORE_COLUMN,
    conv_label_col=CONVERSION_LABEL_COLUMN,
    conv_score_col=CONVERSION_SCORE_COLUMN,
    file_type="parquet"
)

# 2. Load the mock prediction data
try:
    df_predictions = pd.read_parquet(MOCK_PREDICTION_FILE)
    print(f"\nSuccessfully loaded mock prediction data. Shape: {df_predictions.shape}")
    # print(df_predictions.head())

    # 3. Calculate Group AUC for CTR
    print("\nCalculating GAUC for CTR...")
    gauc_ctr = calculate_group_auc(
        df_predictions.copy(), 
        score_col=CLICK_SCORE_COLUMN, 
        label_col=CLICK_LABEL_COLUMN, 
        group_col=GROUP_BY_COLUMN_EVAL
    )
    if pd.notna(gauc_ctr):
        print(f"  GAUC (CTR) for '{GROUP_BY_COLUMN_EVAL}': {gauc_ctr:.6f}")
    else:
        print("  GAUC (CTR) could not be calculated.")

    # 4. Calculate Group AUC for CVR (on clicked samples)
    print("\nCalculating GAUC for CVR (on clicked samples only)...")
    df_clicked_samples = df_predictions[df_predictions[CLICK_LABEL_COLUMN] == 1].copy()
    if df_clicked_samples.empty:
        print("  No samples with click=1 found. Cannot calculate GAUC for CVR.")
    else:
        print(f"  Evaluating CVR on {len(df_clicked_samples)} clicked samples.")
        gauc_cvr = calculate_group_auc(
            df_clicked_samples, 
            score_col=CONVERSION_SCORE_COLUMN, 
            label_col=CONVERSION_LABEL_COLUMN, 
            group_col=GROUP_BY_COLUMN_EVAL
        )
        if pd.notna(gauc_cvr):
            print(f"  GAUC (CVR) for '{GROUP_BY_COLUMN_EVAL}': {gauc_cvr:.6f}")
        else:
            print("  GAUC (CVR) could not be calculated.")

except FileNotFoundError:
    print(f"Error: Mock prediction file not found at {MOCK_PREDICTION_FILE}. Cannot perform evaluation.")
except Exception as e:
    print(f"An error occurred during evaluation: {e}")

## Section 8: Conclusion

This notebook provided an orchestrated walkthrough of the ESMM recommender pipeline using the developed Python scripts. Key takeaways:
- Data splitting is simulated, preparing distinct datasets for training, validation (and potentially test).
- Feature configuration (`input_fields`, `feature_config`) and feature grouping for EasyRec are automated based on a sample of the training data.
- A complete EasyRec pipeline configuration file is generated by populating a template with these auto-generated sections, GCS paths, and training parameters.
- The process for invoking EasyRec training with the generated config is outlined.
- Offline evaluation using Group AUC on (mock) model predictions is demonstrated for both CTR and CVR tasks.

**Next Steps for Real Execution:**
1.  Replace all mock GCS paths with actual GCS locations for your raw data, processed data, model outputs, and predictions.
2.  Implement the actual GCS read/write logic within the mock functions in `split_dataset.py` and `auto_feature_config_generator.py` (or replace calls to mocks with direct GCS operations if preferred).
3.  Ensure your environment has EasyRec, TensorFlow, and all other necessary dependencies installed correctly.
4.  Execute the EasyRec training using the generated configuration file.
5.  Generate actual prediction outputs from your trained EasyRec model.
6.  Use `evaluate_model.py` with the path to your actual prediction file to get GAUC scores and other relevant metrics provided by EasyRec's evaluation logs.