In [1]:
# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ======================================================================

# Each user is responsible for checking the content of datasets and the
# applicable licenses and determining if suitable for the intended use.

<img src="https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_transformers4rec_getting-started-session-based-01-etl-with-nvtabular/nvidia_logo.png" style="width: 90px; float: right;">

# ETL with NVTabular

In this notebook we are going to generate synthetic data and then create sequential features with [NVTabular](https://github.com/NVIDIA-Merlin/NVTabular). Such data will be used in the next notebook to train a session-based recommendation model.

NVTabular is a feature engineering and preprocessing library for tabular data designed to quickly and easily manipulate terabyte scale datasets used to train deep learning based recommender systems. It provides a high level abstraction to simplify code and accelerates computation on the GPU using the RAPIDS cuDF library.

### Import required libraries

In [2]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
import glob

import cudf
import numpy as np
import pandas as pd

import nvtabular as nvt
from nvtabular.ops import *
from merlin.schema.tags import Tags

  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
  from .autonotebook import tqdm as notebook_tqdm


### Define Input/Output Path

In [3]:
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/workspace/data/")

## Load FSI Synthetic Demo Data

In [4]:
# Define paths to the FSI synthetic demo data files
DATA_FILES = [
    os.path.join(INPUT_DATA_DIR, "synthetic_fsi", "Synthetic_Demo_Data_Shared_Part1.xlsx"),
    os.path.join(INPUT_DATA_DIR, "synthetic_fsi", "Synthetic_Demo_Data_Shared_Part2.xlsx")
]

In [5]:
# Check if parquet file exists
parquet_path = os.path.join(INPUT_DATA_DIR, "synthetic_fsi", "synthetic_demo_data.parquet")

if os.path.exists(parquet_path):
    print(f"Loading data from parquet file: {parquet_path}")
    df = pd.read_parquet(parquet_path)
    print(f"Loaded {len(df)} rows")

else:
    print("Parquet file not found. Loading from Excel files...")
    dfs = []
    for file_path in DATA_FILES:
        print(f"Reading {file_path}...")
        sheet_name = "Data_Part1" if "Part1" in file_path else "Data_Part2"
        df_temp = pd.read_excel(file_path, sheet_name=sheet_name, skiprows=1)  # Read from appropriate sheet
        # Take first 100 rows for testing
        # df_temp = df_temp.head(100)
        # Clean column names to be more pythonic
        df_temp.columns = [col.strip().lower()
                          .replace(' ', '_')
                          .replace('(', '')
                          .replace(')', '')
                          .replace('$', '')
                          .replace('-', '_')
                          for col in df_temp.columns]
        dfs.append(df_temp)
        print(f"Loaded {len(df_temp)} rows from {os.path.basename(file_path)}")

    # Combine all dataframes
    df = pd.concat(dfs, ignore_index=True)
    
    # Save to parquet for faster loading next time
    print(f"Saving data to parquet file: {parquet_path}")
    os.makedirs(os.path.dirname(parquet_path), exist_ok=True)
    df.to_parquet(parquet_path, index=False)

print(f"Total rows: {len(df)}")
print(f"Available columns: {list(df.columns)}")

Loading data from parquet file: /workspace/data/synthetic_fsi/synthetic_demo_data.parquet
Loaded 440787 rows
Total rows: 440787
Available columns: ['session_date', 'loan_id', 'has_mobile_app', 'debtiq_enrolled', 'pa_eligible', 'topup_eligible', 'ita_eligible', 'email_sent_in_last_90_days', 'dm_sent_in_last_90_days', 'fico', 'income_', 'existing_loan_size_', 'current_loan_mob', 'offer___carousel', 'servicing___carousel', 'feature_sheet', 'bottom_sheet', 'converts_for_a_topup']


Visualize couple of rows of the loaded FSI synthetic demo dataset:

In [6]:
df.head()

Unnamed: 0,session_date,loan_id,has_mobile_app,debtiq_enrolled,pa_eligible,topup_eligible,ita_eligible,email_sent_in_last_90_days,dm_sent_in_last_90_days,fico,income_,existing_loan_size_,current_loan_mob,offer___carousel,servicing___carousel,feature_sheet,bottom_sheet,converts_for_a_topup
0,2025-05-22,4954838,1,1,1,1,1,17,2,807,57422,10857,9,Topup,ITA,ITA,ITA,0
1,2025-05-21,4765835,1,1,1,1,1,24,1,741,63181,9287,28,Topup,Topup,blank,ITA,0
2,2025-05-21,4185554,0,0,1,1,1,20,2,745,50730,9720,28,Topup,Topup,ITA,ITA,0
3,2025-05-21,7019817,0,0,1,1,1,17,1,741,148623,10786,12,Topup,ITA,blank,ITA,0
4,2025-05-24,8168610,0,0,1,1,1,19,1,752,110899,11537,27,Topup,ITA,ITA,ITA,0


## Feature Engineering with NVTabular

Deep Learning models require dense input features. Categorical features are sparse, and need to be represented by dense embeddings in the model. To allow for that, categorical features first need to be encoded as contiguous integers `(0, ..., |C|)`, where `|C|` is the feature cardinality (number of unique values), so that their embeddings can be efficiently stored in embedding layers.  We will use NVTabular to preprocess the categorical features, so that all categorical columns are encoded as contiguous integers. Note that the `Categorify` op encodes `nulls` to `1`, OOVs to `2` automatically. We preserve `0` for padding. The encoding of other categories starts from `3`. In our FSI demo dataset we handle any nulls appropriately. On the other hand `0` is used for padding the sequences in input block. 

Here our goal is to create sequential features. To do so, we are grouping the features together at the session level in the following cell. In this FSI demo dataset, we may not have a timestamp column, but if we had one (that's the case for most real-world datasets), we would be sorting the interactions by the timestamp column as in this [example notebook](../end-to-end-session-based/01-ETL-with-NVTabular.ipynb). Note that we also trim each feature sequence in a  session to a certain length. Here, we use the NVTabular library so that we can easily preprocess and create features on GPU with a few lines.

In [7]:
# First, let's prepare the FSI data for sequential modeling
print("Preparing FSI data for sequential recommendation modeling...")

# Convert session_date to datetime and extract day number for temporal features
df['session_date'] = pd.to_datetime(df['session_date'])
df['day'] = (df['session_date'] - df['session_date'].min()).dt.days + 1

# Create a product interaction sequence - combining the carousel and sheet features
# This represents the sequence of financial products/offers shown to each loan customer
df['product_interaction'] = df['offer___carousel'].astype(str) + '_' + df['servicing___carousel'].astype(str)

# For this FSI use case, we'll treat each unique product interaction as an "item"
# and create sequences of these interactions per loan_id
print(f"Unique product interactions: {df['product_interaction'].nunique()}")

SESSIONS_MAX_LENGTH = 10  # Reduced for financial data which typically has shorter sequences

# Define categorical features to encode
categorical_features = [
    'product_interaction',  # Our main "item" - combinations of offers/services
    'offer___carousel',     # Individual offer type
    'servicing___carousel', # Individual service type  
    'feature_sheet',        # Feature sheet shown
    'bottom_sheet'          # Bottom sheet type
]

# Define continuous/numerical features
continuous_features = [
    'fico',                        # Credit score
    'income_',                     # Customer income
    'existing_loan_size_',         # Current loan amount
    'current_loan_mob',            # Months on book
    'email_sent_in_last_90_days',  # Email frequency
    'dm_sent_in_last_90_days'      # Direct mail frequency
]

# Define binary features (treat as categorical for embedding)
binary_features = [
    'has_mobile_app',
    'debtiq_enrolled', 
    'pa_eligible',
    'topup_eligible',
    'ita_eligible'
]

# Categorify all categorical and binary features
categ_feats = (categorical_features + binary_features) >> nvt.ops.Categorify()

# Prepare continuous features 
cont_feats = continuous_features >> nvt.ops.FillMissing() >> nvt.ops.Normalize()

# Define the complete feature set for groupby
groupby_feats = categ_feats + cont_feats + ['loan_id', 'day', 'converts_for_a_topup']

# Group features by loan_id to create sequences
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["loan_id"], 
    aggs={
        # Main product interaction sequence (our "items")
        "product_interaction": ["list", "count"],
        # Other categorical sequences
        "offer___carousel": ["list"],
        "servicing___carousel": ["list"], 
        "feature_sheet": ["list"],
        "bottom_sheet": ["list"],
        # Binary feature sequences
        "has_mobile_app": ["list"],
        "debtiq_enrolled": ["list"],
        "pa_eligible": ["list"], 
        "topup_eligible": ["list"],
        "ita_eligible": ["list"],
        # Continuous feature sequences
        "fico": ["list"],
        "income_": ["list"],
        "existing_loan_size_": ["list"],
        "current_loan_mob": ["list"],
        "email_sent_in_last_90_days": ["list"],
        "dm_sent_in_last_90_days": ["list"],
        # Temporal and target features
        "day": ["first"],
        "converts_for_a_topup": ["max"]  # Single target value for binary classification
        },
    name_sep="-")

# Create main item sequence (product interactions) with proper tagging
sequence_features_item = (
    groupby_features['product_interaction-list']
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) 
    >> TagAsItemID()
)

# Create categorical feature sequences
categorical_sequences = (
    groupby_features['offer___carousel-list', 'servicing___carousel-list', 
                    'feature_sheet-list', 'bottom_sheet-list'] 
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
)

# Create binary feature sequences 
binary_sequences = (
    groupby_features['has_mobile_app-list', 'debtiq_enrolled-list', 'pa_eligible-list',
                    'topup_eligible-list', 'ita_eligible-list']
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
    >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
)

# Create continuous feature sequences
continuous_sequences = (
    groupby_features['fico-list', 'income_-list', 'existing_loan_size_-list',
                    'current_loan_mob-list', 'email_sent_in_last_90_days-list',
                    'dm_sent_in_last_90_days-list'] 
    >> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
    >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

# Create target feature (single value for binary classification)
target_feature = (
    groupby_features['converts_for_a_topup-max']
    >> nvt.ops.AddMetadata(tags=[Tags.TARGET])
)

# Filter out loans with very short interaction sequences
MINIMUM_SESSION_LENGTH = 1  # Keep all loans since FSI data may have single interactions
selected_features = (
    groupby_features['product_interaction-count', 'day-first', 'loan_id'] + 
    sequence_features_item +
    categorical_sequences + 
    binary_sequences +
    continuous_sequences +
    target_feature
)

# For FSI data, we'll keep all loans even with single interactions    
filtered_sessions = selected_features

# Create final feature list with value counts for embedding dimensions
seq_feats_list = filtered_sessions[
    'product_interaction-list', 'offer___carousel-list', 'servicing___carousel-list',
    'feature_sheet-list', 'bottom_sheet-list', 'has_mobile_app-list', 
    'debtiq_enrolled-list', 'pa_eligible-list', 'topup_eligible-list', 
    'ita_eligible-list', 'fico-list', 'income_-list', 'existing_loan_size_-list',
    'current_loan_mob-list', 'email_sent_in_last_90_days-list', 
    'dm_sent_in_last_90_days-list', 'converts_for_a_topup-max'
] >> nvt.ops.ValueCount()

# Create the complete workflow
workflow = nvt.Workflow(filtered_sessions['loan_id', 'day-first'] + seq_feats_list)

# Apply the workflow to create sequential features
print("Applying NVTabular workflow to create sequential features...")
dataset = nvt.Dataset(df)

# Generate statistics for the features and export parquet files
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt_classifier"))

print("✅ Sequential feature engineering completed!")
print(f"📊 Features created for {len(df)} loan interactions")
print(f"🎯 Target variable: converts_for_a_topup (max value for binary classification)")
print(f"📝 Main item sequence: product_interaction (combinations of offers/services)")
print(f"🔄 Note: This version is optimized for BinaryClassificationTask with single target values")

Preparing FSI data for sequential recommendation modeling...
Unique product interactions: 7
Applying NVTabular workflow to create sequential features...




KeyboardInterrupt: 

It is possible to save the preprocessing workflow. That is useful to apply the same preprocessing to other data (with the same schema) and also to deploy the session-based recommendation pipeline to Triton Inference Server.

In [None]:
workflow.output_schema

Save NVTabular workflow.

In [None]:
workflow.save(os.path.join(INPUT_DATA_DIR, "workflow_etl_classifier"))

## Export pre-processed data by day

In this example we are going to split the preprocessed parquet files by days, to allow for temporal training and evaluation. There will be a folder for each day and three parquet files within each day folder: `train.parquet`, `validation.parquet` and `test.parquet`.

In [None]:
OUTPUT_DIR = os.environ.get("OUTPUT_DIR",os.path.join(INPUT_DATA_DIR, "sessions_by_day_classifier"))

In [None]:
# Read in the processed parquet file
sessions_gdf = cudf.read_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt_classifier/part_0.parquet"))

In [None]:
print(sessions_gdf.head(3))

In [None]:
from transformers4rec.utils.data_utils import save_time_based_splits
save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                       output_dir= OUTPUT_DIR,
                       partition_col='day-first',
                       timestamp_col='loan_id', 
                      )

## Check out the preprocessed outputs

In [None]:
# TRAIN_PATHS = os.path.join(OUTPUT_DIR, "1", "train.parquet")
TRAIN_PATHS = os.path.join(OUTPUT_DIR, "21", "train.parquet")
print(f"Reading training data from: {TRAIN_PATHS}")
print(f"Available days: {sorted([d for d in os.listdir(OUTPUT_DIR) if os.path.isdir(os.path.join(OUTPUT_DIR, d))])}")
print(f"📁 Using classifier-specific directory: {OUTPUT_DIR}")
print(f"🎯 Target format: Single value for binary classification (converts_for_a_topup-last)")

In [None]:
df = pd.read_parquet(TRAIN_PATHS)
df.head()

In [None]:
# Check for examples of positive conversions (converts_for_a_topup-max = 1)
print("🔍 Checking for positive conversion examples...")
print(f"Total records: {len(df)}")

if 'converts_for_a_topup-max' in df.columns:
    target_col = 'converts_for_a_topup-max'
    print(f"\n📊 Target variable distribution:")
    print(df[target_col].value_counts().sort_index())
    
    conversion_rate = df[target_col].mean()
    print(f"\n📈 Overall conversion rate: {conversion_rate:.3f} ({conversion_rate*100:.1f}%)")
    
    # Show examples of positive conversions
    positive_examples = df[df[target_col] == 1]
    if len(positive_examples) > 0:
        print(f"\n✅ Found {len(positive_examples)} positive conversion examples")
        print("\n🎯 Sample positive conversions (converts_for_a_topup-max = 1):")
        # print(positive_examples[['loan_id', 'day-first', target_col]].head())
        print(positive_examples[['loan_id', target_col]].head())
        
        # Show a few complete examples
        print(f"\n📋 Complete feature example for a positive conversion:")
        sample_positive = positive_examples.iloc[0]
        for col in df.columns:
            if col.endswith('-list') or col.endswith('-max'):
                print(f"  {col}: {sample_positive[col]}")
    else:
        print("❌ No positive conversion examples found in this dataset")
        
    # Show examples of negative conversions  
    negative_examples = df[df[target_col] == 0]
    print(f"\n🔄 Found {len(negative_examples)} negative conversion examples")
    
else:
    print("❌ Target column 'converts_for_a_topup-max' not found!")
    print(f"Available columns: {list(df.columns)}")
    
    # Check if we have the old list format instead
    if 'converts_for_a_topup-list' in df.columns:
        print("⚠️  Found 'converts_for_a_topup-list' - you may need to re-run the ETL pipeline")


In [None]:
# Create Balanced Dataset for Training
print("🎯 Creating balanced dataset for improved binary classification training...")

# Configuration: Ratio of non-conversions to conversions (negative:positive)
# 1:10 ratio means 10 non-conversions for every 1 conversion
NEGATIVE_TO_POSITIVE_RATIO = 10  # ⚙️ Configurable for experimentation

print(f"📊 Target ratio: {NEGATIVE_TO_POSITIVE_RATIO}:1 (negative:positive)")

# Load the processed data
processed_data_path = os.path.join(INPUT_DATA_DIR, "processed_nvt_classifier/part_0.parquet")
df_processed = pd.read_parquet(processed_data_path)

print(f"📈 Original dataset statistics:")
print(f"   Total samples: {len(df_processed):,}")

# Check target distribution
target_col = 'converts_for_a_topup-max'
if target_col in df_processed.columns:
    # Handle list format if needed
    if hasattr(df_processed[target_col].iloc[0], '__len__') and not isinstance(df_processed[target_col].iloc[0], str):
        print("🔧 Converting list format target to single values...")
        df_processed[target_col] = df_processed[target_col].apply(lambda x: x[0] if len(x) > 0 else 0)
    
    # Get original distribution
    positive_samples = df_processed[df_processed[target_col] == 1]
    negative_samples = df_processed[df_processed[target_col] == 0]
    
    original_positive = len(positive_samples)
    original_negative = len(negative_samples)
    original_ratio = original_negative / original_positive if original_positive > 0 else float('inf')
    
    print(f"   Positive samples: {original_positive:,} ({original_positive/len(df_processed)*100:.2f}%)")
    print(f"   Negative samples: {original_negative:,} ({original_negative/len(df_processed)*100:.2f}%)")
    print(f"   Original ratio: {original_ratio:.1f}:1")
    
    # Create balanced dataset
    if original_positive > 0:
        # Calculate target negative samples
        target_negative_samples = original_positive * NEGATIVE_TO_POSITIVE_RATIO
        
        if target_negative_samples >= original_negative:
            print(f"⚠️  Warning: Target ratio ({NEGATIVE_TO_POSITIVE_RATIO}:1) requires {target_negative_samples:,} negative samples")
            print(f"   but only {original_negative:,} available. Using all negative samples.")
            balanced_negative_samples = negative_samples
        else:
            # Randomly sample negative examples
            print(f"🎲 Randomly sampling {target_negative_samples:,} negative samples from {original_negative:,} available")
            balanced_negative_samples = negative_samples.sample(n=int(target_negative_samples), random_state=42)
        
        # Combine positive and balanced negative samples
        balanced_df = pd.concat([positive_samples, balanced_negative_samples], ignore_index=True)
        
        # Shuffle the balanced dataset
        balanced_df = balanced_df.sample(frac=1, random_state=42).reset_index(drop=True)
        
        # Final statistics
        final_positive = len(balanced_df[balanced_df[target_col] == 1])
        final_negative = len(balanced_df[balanced_df[target_col] == 0])
        final_ratio = final_negative / final_positive if final_positive > 0 else float('inf')
        
        print(f"\n📊 Balanced dataset statistics:")
        print(f"   Total samples: {len(balanced_df):,}")
        print(f"   Positive samples: {final_positive:,} ({final_positive/len(balanced_df)*100:.2f}%)")
        print(f"   Negative samples: {final_negative:,} ({final_negative/len(balanced_df)*100:.2f}%)")
        print(f"   Final ratio: {final_ratio:.1f}:1")
        print(f"   Size reduction: {(1 - len(balanced_df)/len(df_processed))*100:.1f}%")
        
        # Save balanced dataset
        balanced_output_dir = os.path.join(INPUT_DATA_DIR, f"balanced_classifier_ratio_{NEGATIVE_TO_POSITIVE_RATIO}_to_1")
        os.makedirs(balanced_output_dir, exist_ok=True)
        
        balanced_file_path = os.path.join(balanced_output_dir, "balanced_data.parquet")
        balanced_df.to_parquet(balanced_file_path, index=False)
        
        print(f"\n✅ Balanced dataset saved to: {balanced_file_path}")
        
        # Save configuration metadata
        config_info = {
            'negative_to_positive_ratio': NEGATIVE_TO_POSITIVE_RATIO,
            'original_samples': len(df_processed),
            'original_positive': original_positive, 
            'original_negative': original_negative,
            'original_ratio': original_ratio,
            'balanced_samples': len(balanced_df),
            'balanced_positive': final_positive,
            'balanced_negative': final_negative,
            'balanced_ratio': final_ratio,
            'random_seed': 42
        }
        
        import json
        config_file_path = os.path.join(balanced_output_dir, "balance_config.json")
        with open(config_file_path, 'w') as f:
            json.dump(config_info, f, indent=2)
            
        print(f"📄 Configuration saved to: {config_file_path}")
        
        # Create stratified train/validation/test splits from balanced data
        from sklearn.model_selection import train_test_split
        
        print(f"\n🔄 Creating stratified train/validation/test splits...")
        
        # 70% train, 15% validation, 15% test
        train_data, temp_data = train_test_split(
            balanced_df, 
            test_size=0.3, 
            random_state=42, 
            stratify=balanced_df[target_col]
        )
        
        val_data, test_data = train_test_split(
            temp_data,
            test_size=0.5,  # 0.5 of 0.3 = 0.15 (15% of total)
            random_state=42,
            stratify=temp_data[target_col]
        )
        
        # Save splits
        train_data.to_parquet(os.path.join(balanced_output_dir, "train.parquet"), index=False)
        val_data.to_parquet(os.path.join(balanced_output_dir, "valid.parquet"), index=False)
        test_data.to_parquet(os.path.join(balanced_output_dir, "test.parquet"), index=False)
        
        # Print split statistics
        for split_name, split_data in [("TRAIN", train_data), ("VALIDATION", val_data), ("TEST", test_data)]:
            pos_count = (split_data[target_col] == 1).sum()
            total_count = len(split_data)
            conv_rate = pos_count / total_count if total_count > 0 else 0
            
            print(f"   {split_name}:")
            print(f"     Total: {total_count:,}")
            print(f"     Positive: {pos_count:,} ({conv_rate:.1%})")
            print(f"     Negative: {total_count - pos_count:,}")
        
        print(f"\n🎯 Ready for training! Use balanced dataset at: {balanced_output_dir}")
        print(f"💡 To experiment with different ratios, change NEGATIVE_TO_POSITIVE_RATIO and re-run this cell")
        
    else:
        print("❌ No positive samples found - cannot create balanced dataset")
        
else:
    print(f"❌ Target column '{target_col}' not found!")
    print(f"Available columns: {list(df_processed.columns)}")

# Clean up memory
del df_processed
gc.collect()


In [None]:
import gc
del df
gc.collect()

You have  just created session-level features to train a session-based recommendation model using NVTabular. Now you can move to the the next notebook,`02-session-based-XLNet-with-PyT.ipynb` to train a session-based recommendation model using [XLNet](https://arxiv.org/abs/1906.08237), one of the state-of-the-art NLP model. Please shut down this kernel to free the GPU memory before you start the next one.