# End-to-End ML: Deep Learning Recommendation Model
このレコメンデーション・エンジンは、Tasty Bytesのフードトラックが販売する各メニューについて、ロイヤルティ顧客にレコメンデーション・スコアを提供する。この出力は、パーソナライズされたアウトリーチ、顧客が訪問するトラックブランドの増加、業績不振のトラックへのトラフィックの増加に使用される。

レコメンデーション・エンジンのモデル学習は、GPUデバイス間での分散学習を活用しており、エンドツーエンドのモデル開発とデプロイは、以下のSnowflake機能を使用して簡素化および合理化されています：
- Snowflake Notebooks with GPU Container Runtime (GA)
- Snowflake Feature Store (GA)
- Snowflake Modeling API (GA) - Preprocessing, Training (PyTorch API), Evaluation
- Snowflake Model Registry (GA)
- Model Deployment from Registry to SPCS (GA)

## Setup
End-to-Endのモデル開発とデプロイに必要となるSnowflakeライブラリをインポート

In [None]:
!pip install snowflake-ml-python==1.8.0
!pip install snowflake-snowpark-python==1.29.0
!pip install torchvision==0.18.1
!pip install torch==2.3.1

### GPU Device Info
ノートブックで利用可能なGPUデバイスの数を表示

**Snowflake Feature**: Snowflake GPU Notebooks (GA) - SnowflakeノートブックからGPUリソースに簡単にアクセスできます。

In [None]:
import torch
# Get device info
if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
    print("Number of GPU devices available:", num_gpus)
    
    for i in range(num_gpus):
        print("Device", i, ":", torch.cuda.get_device_name(i))
    
    #Set a default device
    torch.cuda.set_device(0)
else:
    print("CUDA is not available. Check your installation or GPU setup.")

## 特徴量ストアの作成
Snowflake特徴量ストアは、データサイエンスおよび機械学習ワークロードにおける特徴量の作成、保存、管理をより簡単かつ効率的に行えるように設計されています。プロデューサは、フィーチャーストア内のフィーチャービューとエンティティを管理し、部門間のコラボレーションとフィーチャーの再利用を促進します。このアプローチにより、トレーニング環境とサービング環境間のサイロが解消され、Pythonコマンドを使用した集計や移動ウィンドウの計算が簡素化されます。

Import the necessary packages for development.

In [None]:
import os
import time
import math
#import sys

# Third-party library imports
import pandas as pd
import numpy as np
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from torch.utils.data import DataLoader, TensorDataset
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_classif 

# Snowflake library imports
import streamlit as st
from snowflake.ml.modeling.preprocessing import LabelEncoder, MinMaxScaler
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
)
#from snowflake.ml.modeling.data import MLRuntimeDataset
#from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

# from snowflake.ml.modeling.pytorch.context import getContext
from snowflake.ml.modeling.distributors.pytorch import get_context

from snowflake.ml.feature_store import (
FeatureStore,
FeatureView,
Entity,
CreationMode
)
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.metrics import (
roc_auc_score,  
precision_score, 
recall_score, 
confusion_matrix
)
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Add a query tag to the session. This helps with debugging and performance monitoring.
session.query_tag = {"origin":"sf_sit", "name":"tasty_bytes_e2e_ml", "version":{"major":1, "minor":0}, "attributes":{"is_quickstart":0, "source":"notebook"}}

db = str(session.get_current_database().strip('"'))
solution_prefix = (db.upper()).split('_PROD')[0]
import warnings
warnings.filterwarnings('ignore')

### 特徴量ストアのスキーマと権限
フィーチャストア・スキーマを作成し、プロデューサがフィーチャストア・ビューを作成するために必要なすべての権限と、コンシューマがこれらのビューにアクセスするための権限を付与します。

In [None]:
 -- $FS_ROLE_PRODUCER create feature views and $FS_ROLE_CONSUMER uses the feature views for training. These roles are created as part of the setup
USE ROLE ACCOUNTADMIN;
SET FS_ROLE_PRODUCER = '{{solution_prefix}}_FS_PRODUCER';
SET FS_ROLE_CONSUMER = '{{solution_prefix}}_DATA_SCIENTIST';
SET FS_DATABASE = '{{solution_prefix}}_PROD';
SET FS_SCHEMA = 'FS_SCHEMA';
SET SCHEMA_FQN = CONCAT($FS_DATABASE, '.', $FS_SCHEMA);
SET FS_WAREHOUSE = '{{solution_prefix}}_DS_WH';
SET MR_DEMO_DB='{{solution_prefix}}_PROD';

-- Create schema
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($FS_SCHEMA);

-- Build role hierarchy
GRANT ROLE IDENTIFIER($FS_ROLE_CONSUMER) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);

-- Grant PRODUCER role privileges
GRANT USAGE ON DATABASE IDENTIFIER($FS_DATABASE) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);
GRANT USAGE ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);
GRANT CREATE DYNAMIC TABLE ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);
GRANT CREATE VIEW ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);
GRANT CREATE TAG ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);
GRANT CREATE DATASET ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);
GRANT APPLY TAG ON ACCOUNT TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);

-- Grant CONSUMER role privileges
GRANT USAGE ON DATABASE IDENTIFIER($FS_DATABASE) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT USAGE ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT SELECT,MONITOR ON FUTURE DYNAMIC TABLES IN SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT SELECT,MONITOR ON ALL DYNAMIC TABLES IN SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT SELECT,REFERENCES ON FUTURE VIEWS IN SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT SELECT,REFERENCES ON ALL VIEWS IN SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT CREATE DATASET ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);

-- [Optional] Grant USAGE ON WAREHOUSE to CONSUMER
GRANT USAGE ON WAREHOUSE IDENTIFIER($FS_WAREHOUSE) TO ROLE IDENTIFIER($FS_ROLE_CONSUMER);
GRANT USAGE ON SCHEMA IDENTIFIER($SCHEMA_FQN) TO ROLE IDENTIFIER($FS_ROLE_PRODUCER);


Feature Store Producer Roleを使用してFeature Storeを定義します。

In [None]:
session.sql(f'USE ROLE {solution_prefix}_FS_PRODUCER')
session.sql(f'USE WAREHOUSE {solution_prefix}_DS_WH').collect()
session.sql('USE SCHEMA FS_SCHEMA').collect()

FS=FeatureStore(
session=session,
database=f"{solution_prefix}_PROD",
    name="FS_SCHEMA",
    default_warehouse=f"{solution_prefix}_DS_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST)

In [None]:
menu_spdf = session.sql("SELECT menu_type, truck_brand_name, menu_item_name, item_category, item_subcategory, sale_price_usd FROM raw_pos.menu WHERE item_category != 'Beverage'");
menu_spdf

In [None]:
cust_spdf = session.sql("SELECT customer_id, city, country, gender, marital_status, birthday_date, DATEDIFF(year, birthday_date, CURRENT_DATE()) AS age FROM raw_customer.customer_loyalty");
st.dataframe(cust_spdf)

In [None]:
avg_monthly_purchase_amount = session.sql(f"SELECT  customer_id, ROUND(SUM(order_total) / (TIMESTAMPDIFF(MONTH, MIN(date), MAX(date)) + 1),2) AS avg_monthly_purchase_amount FROM {solution_prefix}_PROD.ANALYTICS.ORDERS_V GROUP BY customer_id")
avg_monthly_purchase_amount

In [None]:
avg_weekly_purchase_amount = session.sql(f"SELECT customer_id,ROUND(SUM(order_total) / (TIMESTAMPDIFF(WEEK, MIN(date), MAX(date)) + 1),2) AS avg_weekly_purchase_amount FROM {solution_prefix}_PROD.ANALYTICS.ORDERS_V GROUP BY customer_id");
avg_weekly_purchase_amount

In [None]:
avg_yearly_purchase_amount = session.sql(f"SELECT customer_id, ROUND(SUM(order_total)/(TIMESTAMPDIFF(YEAR, MIN(date), MAX(date)) + 1),2) AS avg_yearly_purchase_amount FROM {solution_prefix}_PROD.ANALYTICS.ORDERS_V GROUP BY customer_id")
avg_yearly_purchase_amount

In [None]:
cust_avgs_spdf= avg_monthly_purchase_amount.join(avg_weekly_purchase_amount,"CUSTOMER_ID").join(avg_yearly_purchase_amount,"CUSTOMER_ID")
st.dataframe(cust_avgs_spdf)

特徴量ストアのエンティティ作成

In [None]:
# Snowflake Feature Store requires an "entity" with "join_keys" be registered
custentity = Entity(name="CustomerIds", join_keys=["Customer_ID"])
FS.register_entity(custentity)

Menuentity = Entity(name="Menu_ItemNames", join_keys=["MENU_ITEM_NAME"])
FS.register_entity(Menuentity)


Purchaseavgs_entity = Entity(name="Purchase_Avgs", join_keys=["Customer_ID"])
FS.register_entity(Purchaseavgs_entity)

In [None]:
FS.list_entities().show(100)

顧客特徴量Viewの登録

In [None]:
fv = FeatureView(
    name="CUSTOMER_FEATURES",
    entities = [custentity],
    feature_df=cust_spdf,
    refresh_freq="1 day"
)
registered_fv = FS.register_feature_view(
    feature_view=fv,
    version="V1",
     block=True,
    overwrite=True
)

メニュー特徴量Viewの登録

In [None]:
fv = FeatureView(
    name="MENU_FEATURES",
    entities = [Menuentity],
    feature_df=menu_spdf,
    refresh_freq="1 day"
)

registered_fv = FS.register_feature_view(
    feature_view=fv,
    version="V1",
    block=True,
    overwrite=True
)

購入特徴量Viewの登録

In [None]:
fv = FeatureView(
    name="PURCHASE_FEATURES",
    entities = [Purchaseavgs_entity],
    feature_df=cust_avgs_spdf,
    refresh_freq="1 day"
)

registered_fv = FS.register_feature_view(
    feature_view=fv,
    version="V1",
     block=True,
    overwrite=True
)

In [None]:
FS.list_feature_views(entity_name="Purchase_Avgs")

## 特徴量ストアから特徴量へアクセス

特徴量ストアには、顧客、メニューアイテム、購入の特徴量ビューが含まれます。モデル特徴量は特徴量ストアからアクセスします。

**Snowflakeの特長：** 特徴量ストア（GA） - データと連動する特徴量を簡単に見つけることができます。

In [None]:
USE ROLE {{solution_prefix}}_DATA_SCIENTIST;

In [None]:
# Access feature views
FS=FeatureStore(
session=session,
database=f"{solution_prefix}_PROD",
    name="FS_SCHEMA",
    default_warehouse=f"{solution_prefix}_DS_WH")

customer_fv : FeatureView = FS.get_feature_view(
    name='CUSTOMER_FEATURES',
    version='V1'
)
print(customer_fv)

menu_fv : FeatureView = FS.get_feature_view(
    name='MENU_FEATURES',
    version='V1'
)
print(menu_fv)

purchase_fv : FeatureView = FS.get_feature_view(
   name='PURCHASE_FEATURES',
   version='V1'
)
print(purchase_fv)


### 特徴量ストアからデータセットの作成
インタラクションデータセットには、各メニュー・顧客ペアの購入フラグが含まれる。このインタラクションデータセットはトレーニング、検証、テストに分割される。特徴は特徴ストアからデータセットに持ち込まれる。

In [None]:
# Split the interaction dataset and get features from the feature store
def create_dataset(spine_df, name):
    data = FS.generate_dataset(
    name=name,
    spine_df=spine_df,
    features=[customer_fv, menu_fv, purchase_fv]
    )
    df = data.read.to_snowpark_dataframe().drop("BIRTHDAY_DATE")
    return df
    
interaction_df = session.table('analytics.loyalty_purchased_items')

# Split into train/validation/test
datasets = interaction_df.random_split([.1, .1, .8])

# Build training tables
train_df = create_dataset(datasets[0], "train")
val_df = create_dataset(datasets[1], "validation")
    
train_df.show()

In [None]:
train_df.count() + val_df.count()

## 特徴量エンジニアリング
このモデルは、カテゴリ（スパース）特徴の埋め込みを作成する。カテゴリ値は一意な整数としてエンコードされる。前処理として、疎な特徴にはラベル符号化を、数値（密な）特徴には最小-最大スケーリングを適用します。

**Snowflakeの特徴:** Snowpark ML Modeling API - 特徴エンジニアリングと前処理（GA） - 頻繁に使用されるscikit-learnの前処理関数を分散実行することで、パフォーマンスとスケーラビリティを向上。

In [6]:
# Preprocess sparse and dense features
sparse_features = ['MENU_ITEM_NAME', 
                   'MENU_TYPE', 
                   'TRUCK_BRAND_NAME', 
                   'ITEM_CATEGORY', 
                   'ITEM_SUBCATEGORY',
                   'CITY',
                   'COUNTRY',
                   'GENDER',
                   'MARITAL_STATUS',]

dense_features = ['SALE_PRICE_USD',
                  'AGE',
                  'AVG_MONTHLY_PURCHASE_AMOUNT',
                  'AVG_WEEKLY_PURCHASE_AMOUNT',
                  'AVG_YEARLY_PURCHASE_AMOUNT',
                 ]

label_col = "PURCHASED"

# Create pipeline
pipeline_steps = []

# Label encode sparse features
for i, feat in enumerate(sparse_features):
    le_step = (
        f"LE{i+1}",
        LabelEncoder(input_cols=[feat], output_cols=[feat]),
    )
    pipeline_steps.append(le_step)

# Scale dense features
pipeline_steps.append(
    (
        "MMS",
        MinMaxScaler(
            feature_range=(0, 1),
            input_cols=dense_features,
            output_cols=dense_features
        )
    )
)

# Preprocessing pipeline
preprocessing_pipeline = Pipeline(steps=pipeline_steps)
train_data = preprocessing_pipeline.fit(train_df).transform(train_df)
val_data = preprocessing_pipeline.transform(val_df)

In [None]:
train_data.write.mode("overwrite").save_as_table("ml.train_data_table", table_type="temporary")
train_data.write.mode("overwrite").save_as_table("ml.val_data_table", table_type="temporary")

### パイプラインの保存
保存されたパイプラインは、推論における特徴量変換にて利用

In [None]:
# Save pipeline to a stage where it can be centrally accessed
pipeline_local_path = f'/tmp/dlrm_preprocessor_v1.joblib'
joblib.dump(preprocessing_pipeline, open(pipeline_local_path, 'wb'))
session.file.put(pipeline_local_path, 
                 '@ML.ML_STAGE/dlrm_preprocessor_v1.joblib', 
                 auto_compress=False, 
                 overwrite=True)


In [None]:
USE SCHEMA ML;
CREATE or replace STAGE UDF_STAGE;

In [None]:
import json

data = train_df[dense_features + sparse_features + [label_col]]
data = data.with_columns(sparse_features,
                        [F.col(c).cast(T.StringType()) for c in sparse_features])

def serialize_label_encoders(label_encoders):
    serialized_label_encoders = {}
    for feat, lbe in label_encoders.items():
        serialized_label_encoders[feat] = {
            'input_cols': lbe.input_cols,
            'output_cols': lbe.output_cols,
            'classes_': lbe.classes_.tolist()
        }
    return serialized_label_encoders

def save_label_encoders_to_stage(label_encoders, stage_name, dir_name):
        serialized_label_encoders = json.dumps(label_encoders)
        # Write serialized encoders to a local file first
        with open('/tmp/label_encoders.json', 'w') as f:
            f.write(serialized_label_encoders)
        # Upload the local file to the Snowflake stage
        session.file.put('/tmp/label_encoders.json', f'@{stage_name}/{dir_name}',auto_compress=False)
        return f'Uploaded to @{stage_name}/{dir_name}'
    
label_encoders = {}

# Iterate over each sparse feature
for feat in sparse_features:
    # Initialize LabelEncoder for the current feature
    lbe = LabelEncoder(input_cols=[feat], output_cols=[feat+'_ENCODED'],drop_input_cols=True)
    
    # Fit LabelEncoder to the data
    lbe.fit(data)
    
    # Store the LabelEncoder object for reference
    label_encoders[feat] = lbe
    data = lbe.transform(data)
# Serialize label encoders
serialized_label_encoders = serialize_label_encoders(label_encoders)
stage_name="UDF_STAGE"
dir_name="dlrm_label_encoders"
# Save serialized label encoders to a file
with open('/tmp/label_encoders.json', 'w') as f:
    json.dump(serialized_label_encoders, f)
save_label_encoders_to_stage(serialized_label_encoders, stage_name, dir_name)


## モデル定義
このPyTorchモデルは深層学習推薦モデル（DLRM）である。これは、各ロイヤリティ顧客に全てのメニューの推薦スコアを提供するために使用されている。
- 埋め込み層はカテゴリ特徴を密なベクトルに変換する。
- 数値的特徴は多層パーセプトロン（MLP）層を通して処理される。
- 特徴相互作用層は入力特徴のペア間の複雑な関係を捉える。
- 最後のdence層は推薦スコアを生成する。

In [None]:
# PyTorch DLRM
class FeatureInteraction(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, inputs):
        feature_dim = inputs.shape[1]
        concat_features = inputs.view(-1, feature_dim, 1)
        dot_products = torch.matmul(concat_features, concat_features.transpose(1, 2))
        ones = torch.ones_like(dot_products) 
        mask = torch.triu(ones)
        out_dim = feature_dim * (feature_dim + 1) // 2
        flat_result = dot_products[mask.bool()]
        reshape_result = flat_result.view(-1, out_dim)
        return reshape_result

class DLRM(nn.Module):
    
    def __init__(self, sparse_feature_number, dense_feature_number, num_embeddings, embed_dim, bottom_mlp_dims, top_mlp_dims):
        super(DLRM, self).__init__()
        
        self.embeddings = nn.EmbeddingBag(num_embeddings=num_embeddings, embedding_dim=embed_dim, mode='sum')
        self.layer_feature_interaction = FeatureInteraction()
        
        self.bottom_mlp = torch.nn.Sequential(
            torch.nn.Linear(dense_feature_number, bottom_mlp_dims[0]),
            torch.nn.ReLU(),
            torch.nn.Linear(bottom_mlp_dims[0], bottom_mlp_dims[1]),
            torch.nn.ReLU()
        )
        
        top_mlp_input_dim = (
            (embed_dim + bottom_mlp_dims[1]) 
            * ((embed_dim + bottom_mlp_dims[1]) + 1) // 2 
            + bottom_mlp_dims[1]
         )

        self.top_mlp = nn.Sequential(
            nn.Linear(top_mlp_input_dim, top_mlp_dims[0]),
            nn.ReLU(),
            nn.Linear(top_mlp_dims[0], top_mlp_dims[1]),
            nn.ReLU(),
            nn.Linear(top_mlp_dims[1], 1)
        )

    def forward(self, x_sparse, x_dense):
        # Embedding layer for categorical inputs
        embed_x = self.embeddings(x_sparse)
        # MLPs for numeric inputs
        bottom_mlp_output = self.bottom_mlp(x_dense)
        # Combine categical embeddings and MLP outputs
        concat_first = torch.cat([bottom_mlp_output, embed_x], dim=-1)
        # Get feature interactions
        interaction = self.layer_feature_interaction(concat_first)
        # Concat interaction outputs with MLP outputs
        concat_second = torch.cat([interaction, bottom_mlp_output], dim=-1)
        # MLP layers to output 
        output = self.top_mlp(concat_second)
        return output

## モデルトレーニング

モデルトレーニング機能は、各デバイスにモデルとデータを配置する。勾配は各トレーニングバッチで結合され、すべてのデバイスに伝搬される。各エポックの後、トレーニングと検証の損失がすべてのデバイスで平均化され、モデルの重みが保存されます。

**Snowflakeの特長:** Snowpark ML Modeling API - PyTorch - Snowpark DataFrameからGPUデバイスに分散して実行。

In [None]:
# Adjust number of epochs and records in the training data
num_epochs = 2
training_sample = 100000

In [None]:
# Model training function
def setup(rank, world_size):
    # Initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    torch.manual_seed(42)

def train_func():
    
    context = get_context()
    rank = context.get_rank()
    world_size = context.get_world_size()
    setup(rank, world_size)

    batch_size = 256

    # GET DATA FROM CONTEXT AND SET UP TENSORS
    dataset_map = context.get_dataset_map()
    training_data = dataset_map["train"].get_shard().to_torch_dataset(
        batch_size=batch_size, shuffle=True
    )
    validation_data = dataset_map["val"].get_shard().to_torch_dataset(
        batch_size=batch_size, shuffle=True
    )
    dataloader = DataLoader(training_data, batch_size=None)
    val_dataloader = DataLoader(validation_data, batch_size=None)

    # DEFINE MODEL
    model = DLRM(
        sparse_feature_number=len(sparse_features),
        dense_feature_number=len(dense_features),
        num_embeddings=142,
        embed_dim=128,
        bottom_mlp_dims=[256, 128],
        top_mlp_dims=[128, 128],
    )
        
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    criterion = torch.nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)
    
    # TRAIN
    for epoch in range(num_epochs):
        start_time = time.time()
        records_processed = 0
        running_loss = 0.0
        i = 0
        
        for batch_idx, batch_data in enumerate(dataloader):
            y = batch_data.pop(label_col).type(torch.float32).to(rank).squeeze()
            
            x_sparse = torch.stack(
                [tensor.to(torch.int).squeeze() for key, tensor in batch_data.items() if key in sparse_features],
                dim=1
            )
            x_dense = torch.stack(
                [tensor.to(torch.float32).squeeze() for key, tensor in batch_data.items() if key in dense_features],
                dim=1
            )
                        
            optimizer.zero_grad()
            output = ddp_model(x_sparse, x_dense)
            loss = criterion(output, y.unsqueeze(1))
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            records_processed += len(y)

            if (batch_idx + 1) % 500 == 0:
                print(
                    f"Epoch {epoch+1}/{num_epochs}, Batch {batch_idx + 1}: Device {rank} processed {records_processed} records, Epoch Time: {time.time() - start_time:.2f} seconds, Average Training loss: {running_loss / (batch_idx + 1):.4f}"
                )

        print(
            f"Epoch {epoch+1}/{num_epochs}, Batch {batch_idx + 1}: Device {rank} processed {records_processed} records, Epoch Time: {time.time() - start_time:.2f} seconds, Average Training loss: {running_loss / (batch_idx + 1):.4f}"
        )

        # Average loss across devices
        running_loss_tensor = torch.tensor(running_loss / (batch_idx + 1), device=rank)
        dist.all_reduce(running_loss_tensor)
        dist.barrier()
        running_loss = running_loss_tensor.item()
        running_loss /= world_size

        # GET VALIDATION LOSS
        ddp_model.eval()
        val_loss = 0.0
        for val_batch_idx, val_batch_data in enumerate(val_dataloader):
            y_val = val_batch_data.pop(label_col).type(torch.float32).to(rank).squeeze()
            x_sparse_val = torch.stack(
                [tensor.to(torch.int).squeeze() for key, tensor in val_batch_data.items() if key in sparse_features],
                dim=1
            )
            x_dense_val = torch.stack(
                [tensor.to(torch.float32).squeeze() for key, tensor in val_batch_data.items() if key in dense_features],
                dim=1
            )
    
            with torch.no_grad():
                output_val = ddp_model(x_sparse_val, x_dense_val)
                loss_val = criterion(output_val, y_val.unsqueeze(1))
            
            val_loss += loss_val.item()
    
        # Average validation loss across devices
        val_loss_tensor = torch.tensor(val_loss / (val_batch_idx + 1), device=rank)
        dist.all_reduce(val_loss_tensor)
        dist.barrier()
        val_loss = val_loss_tensor.item()
        val_loss /= world_size
        ddp_model.train()
        
    
        # SAVE MODEL
        if rank == 0:
            print(f" Epoch {epoch+1}/{num_epochs}, Training Loss: {running_loss:.4f}, Validation Loss: {val_loss:.4f}, Epoch Time: {time.time() - start_time:.2f} seconds ")
            torch.save(model.state_dict(), '/tmp/latest_model.pth')
    
    dist.destroy_process_group()  # Train - Snowflake ML PyTorch API

In [None]:
# Train - Snowflake ML PyTorch API
pytroch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(
        num_nodes=1,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    ),
)

train_data = session.table("ml.train_data_table")
val_data = session.table("ml.val_data_table")
data_train = ShardedDataConnector.from_dataframe(train_data.limit(training_sample))
data_val = ShardedDataConnector.from_dataframe(val_data)

out = pytroch_trainer.run(
    dataset_map=dict(
             train=data_train,
             val=data_val
         )
)

## モデルのデプロイ
モデルは Snowflake Model Registry に記録されます。ログに記録されたモデルは、Snowpark Container Services (SPCS)上で推論のためにデプロイされます。

**Snowflakeの特長**： Snowflake Model Registry with SPCS deployment - 柔軟なコンピュート環境において、Snowflakeでモデルとそのメタデータを安全にデプロイ、管理します。

![model_serving](https://docs.snowflake.com/en/_images/model-registry-spcs-deployment.png)

In [None]:
# Load the model
def load_model(model_path):
    model = DLRM(sparse_feature_number=len(sparse_features),
                 dense_feature_number=len(dense_features),
                 num_embeddings=142,
                 embed_dim=128,
                 bottom_mlp_dims=[256, 128],
                 top_mlp_dims=[128, 128])
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Load saved model
model = load_model('/tmp/latest_model.pth')

In [None]:
# Register the model to the Snowflake model registry.
registry = Registry(session=session, database_name=f"{solution_prefix}_PROD", schema_name="REGISTRY")


In [None]:
train_data = session.table("ml.train_data_table")
sample_input = train_data.limit(1).to_pandas()
x_sparse = torch.tensor(sample_input[sparse_features].values, dtype=torch.int)
x_dense = torch.tensor(sample_input[dense_features].values, dtype=torch.float32)

# Log model to registry
model_ref = registry.log_model(
    model,
    model_name="RecModelDemo",
    version_name="V1",
    pip_requirements=["torchvision==0.18.1","torch==2.3.1"],
    conda_dependencies=["pyopenssl>=22.0.0","libxml2<=2.13.7"],
    sample_input_data=[x_sparse[0].unsqueeze(0), x_dense[0].unsqueeze(0)],
    options={'relax_version': False, 'multiple_inputs': True}
)

In [None]:
# Deploying Model to Container Runtime using GPU's
model_ref.create_service(service_name="TB_REC_SERVICE_DEMO_PREDICT",
                  service_compute_pool=f"{solution_prefix}_DEPLOY_POOL",
                  image_repo=f"{solution_prefix}_PROD.REGISTRY.IMAGE_REPO",
                  build_external_access_integration=f"{solution_prefix}_CONDA_ACCESS_INTEGRATION")

## モデルの推論と検証
テストデータの推論は、専用のコンピュートプール上で動作する SPCS にデプロイされたモデルを使用して完了する。テストデータのフィーチャーはフィーチャーストアからアクセスされ、前処理パイプラインが必要に応じてデータを変換します。

**モデル出力**： 
モデルは入力特徴量に基づくスコアを出力する。スコアが高ければ高いほど、その顧客にとってそのメニューがよりおすすめであることを示します。モデルのパフォーマンスを評価するために、スコアからバイナリ予測が作成されます（スコアが0.5以上の場合は1、そうでない場合は0）。

**スノーフレークの特徴：**
- SPCS 上での推論（PuPr） - 専用のコンピュートプールを持つコンテナ環境にデプロイされたモデルに対して推論を実行します。
- Snowpark ML Modeling API - Evaluation Metrics (GA) - 使用頻度の高い scikit-learn 前処理関数の分散実行により、パフォーマンスとスケーラビリティを向上。

In [None]:
# Get features from feature store
test_df = FS.retrieve_feature_values(
    spine_df=datasets[2],
features=[customer_fv, menu_fv, purchase_fv]
)
#test_df_subset = test_df.sample(100000)
test_df_subset = test_df.sample(n=10000)

# Preprocess
test_data = preprocessing_pipeline.transform(test_df_subset)

# To maintain the Order
test_data_pd = test_data.to_pandas()

sparse_input = torch.tensor(test_data_pd[sparse_features].values, dtype=torch.int)
dense_input = torch.tensor(test_data_pd[dense_features].values, dtype=torch.float32)
input_data = [sparse_input, dense_input]

predictions = model_ref.run(input_data, function_name="forward", service_name="TB_REC_SERVICE_DEMO_PREDICT")

predictions['output_feature_0'] = predictions['output_feature_0'].apply(
    lambda x: x[0] if isinstance(x, list) else float(x)
)
eval_df_pd = pd.concat([test_data_pd[["CUSTOMER_ID", "MENU_ITEM_NAME", "PURCHASED"]], 
                       predictions.rename(columns={'output_feature_0': 'PREDICTION'})], axis = 1).assign(
                                         BINARY_PREDICTION=lambda df: np.where(df['PREDICTION'] >= 0.5, 1, 0))
eval_df = session.create_dataframe(eval_df_pd)

In [None]:
eval_df

### 検証

強力な推薦モデルは、購入されたアイテムのほとんどを推薦するはずである（これは、あるアイテムが顧客にとって関心のあるものであることを示すトレーニング指標である）。一方、未購入のアイテムは、必ずしも興味がないことを示しているわけではない。推薦エンジンのゴールは、顧客が興味を持ちうる未購入アイテムを特定することであり、そのためには未購入アイテムを「誤分類」する必要がある。理想的には、想起を高め、未購入アイテムの一部を推薦することである。

In [None]:
# Get Evaluation Metrics
cols = st.columns(3)
cols[0].metric("AUC", round(roc_auc_score(df=eval_df, y_true_col_names="PURCHASED", y_score_col_names="PREDICTION"),3))
cols[1].metric("Recall", round(recall_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))
cols[2].metric("Precision", round(precision_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))

In [None]:
m = registry.get_model("RecModelDemo")
mv=m.version("v1")
mv.set_metric("AUC", round(roc_auc_score(df=eval_df, y_true_col_names="PURCHASED", y_score_col_names="PREDICTION"),3))
mv.set_metric("Recall", round(recall_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))
mv.set_metric("Precision", round(precision_score(df=eval_df, y_true_col_names="PURCHASED", y_pred_col_names="BINARY_PREDICTION"),3))
m.description = "Provides menu recommendations for Tasty bytes business"


### ユーザーインタラクションで推論実行

In [None]:
def get_filters():
    filter_clause = ' 1=1 '
    if filters['country'] is not None and len(filters['country']) > 0:
        country_where_clause = "("
        for x in filters['country']:
            country_where_clause += f"'{x}',"
        country_where_clause = country_where_clause[:-1] + ")"
        filter_clause += " and country in " + country_where_clause

    if filters['city'] is not None and len(filters['city']) > 0:
        city_where_clause = "("
        for x in filters['city']:
            city_where_clause += f"'{x}',"
        city_where_clause = city_where_clause[:-1] + ")"
        filter_clause += " and city in " + city_where_clause

    if filters['truck_brand_name'] is not None and len(filters['truck_brand_name']) > 0:
        truck_brand_name_where_clause = "("
        for x in filters['truck_brand_name']:
            truck_brand_name_where_clause += f"'{x}',"
        truck_brand_name_where_clause = truck_brand_name_where_clause[:-1] + ")"
        filter_clause += " and truck_brand_name in " + truck_brand_name_where_clause
    return filter_clause

def get_filtered_data():
    with st.spinner("Getting Filtered Data..."):
        time.sleep(1)
        cust_avgs = cust_avgs_spdf
        item_df = session.table('ml.menu_item_features').filter("item_category != 'Beverage'")
        customer_df = session.table('ml.customer_features')
        purchase_df = session.table('analytics.loyalty_purchased_items')
        df = purchase_df.join(item_df, 'menu_item_name', 'left')
        filters_clause = get_filters()
        st.write(filters_clause)
        result_df = df.join(customer_df, 'customer_id', 'left')
        final_df = result_df.join(cust_avgs, 'customer_id', 'left').filter(filters_clause).order_by("customer_id").limit(10000)
    return final_df

def infer_model(test_data):
    with st.spinner("Inferring Deep Learning Model on SPCS..."):
        time.sleep(1)

        # Get deployed model
        reg = Registry(session=session, database_name=session.get_current_database(), schema_name='REGISTRY')#, database_name='TEST_MAY_19_TASTYBYTESENDTOENDML_PROD', schema_name="REGISTRY")
        m = reg.get_model('RECMODELDEMO')
        mv = m.version("v1")
    
        test_data_pd = test_data.to_pandas()

        # Build input tensor
        sparse_input = torch.tensor(test_data_pd[sparse_features_encoded].values, dtype=torch.int)
        dense_input = torch.tensor(test_data_pd[dense_features].values, dtype=torch.float32)
        input_data = [sparse_input, dense_input]

        # Run inference on deployed model
        predictions = mv.run(
            input_data,
            function_name = "FORWARD",
            service_name = "TB_REC_SERVICE_DEMO_PREDICT"
        )

        # Concat with input dataframe
        predictions['output_feature_0'] = predictions['output_feature_0'].apply(
            lambda x: x[0] if isinstance(x, list) else float(x)
        )
        recommendations = pd.concat([test_data_pd[["CUSTOMER_ID", "CITY", "MENU_ITEM_NAME", "PURCHASED"]], 
                               predictions.rename(columns={'output_feature_0': 'PREDICTION'})], axis = 1)
        recommendations_df = session.create_dataframe(recommendations)

        # Define a window partitioned by customer_id and ordered by prediction score
        windowSpec = Window.partitionBy(col("customer_id")).orderBy(col("prediction").desc())
            
        # Add a rank column to rank the predictions for each customer
        rankedDf = recommendations_df.withColumn("rank", rank().over(windowSpec))
        
        # Filter for rows where rank is 1 (top prediction for each customer)
        # topPredictionsDf = rankedDf.filter(col("rank") == 1)

        # Filter for rows where rank is less than or equal to 3 (top 3 predictions for each customer)
        topPredictionsDf = rankedDf.filter(col("rank") <= 3)
        
        # Convert DataFrame to Pandas DataFrame
        top_predictions_pd = topPredictionsDf.toPandas()
        
        # Group by customer_id and collect the top 3 recommendations as a list
        grouped_top_predictions = top_predictions_pd.groupby("CUSTOMER_ID").agg({'CITY': 'first','MENU_ITEM_NAME': lambda x: list(x)}).reset_index()
        
        distinct_customer_ids = grouped_top_predictions['CUSTOMER_ID'].unique()
        sql_in_clause = ""
        
        # Loop through the distinct customer_ids to form the SQL IN clause
        for customer_id in distinct_customer_ids:
            # Append each customer_id to the SQL IN clause
            sql_in_clause += "'" + str(customer_id) + "', "
        
        # Remove the trailing comma and space
        sql_in_clause = sql_in_clause[:-2]
        history_df = session.sql(f"""select customer_id, 
                                    menu_item_name as Purchase_History
                                    from analytics.loyalty_purchased_items
                                    where purchased = 1
                                    and customer_id in ({sql_in_clause});""").to_pandas()
        grouped_history_df = history_df.groupby('CUSTOMER_ID')['PURCHASE_HISTORY'].agg(list).reset_index()

        # Merge finalDf_pd with grouped_history_df on customer_id
        finalDf_pd = pd.merge(grouped_top_predictions, grouped_history_df, on='CUSTOMER_ID', how='left')

        # Reorder columns
        finalDf_pd = finalDf_pd[['CUSTOMER_ID', 'CITY', 'PURCHASE_HISTORY', 'MENU_ITEM_NAME']]

        finalDf_pd['PURCHASE_HISTORY'] = finalDf_pd['PURCHASE_HISTORY'].apply(lambda x: x if isinstance(x, list) else [])
    return finalDf_pd

def get_serialized_label_encoders():
    stage_name = 'ml.UDF_STAGE'
    dir_name='dlrm_label_encoders'
    file_name = 'label_encoders.json'
    
    session.file.get(f'@{stage_name}/{dir_name}/{file_name}',"/tmp/dir/")
    modified_label_encoders = {}
    # Read the downloaded file content
    with open(f"/tmp/dir/label_encoders.json", "r") as f:
        serialized_content = f.read()
    
    # Deserialize the label encoders
    serialized_label_encoders = json.loads(serialized_content)
    for feat, params in serialized_label_encoders.items():
        modified_params = params.copy()  # Create a copy to avoid modifying the original dictionary
        # Update the classes_ list to create the desired output structure
        modified_params['classes_'] = {i: v for i, v in enumerate(params['classes_'])}
        modified_label_encoders[params['output_cols'][0]] = modified_params['classes_']
    return modified_label_encoders
    
def apply_label_encoding(df):
    if 'modified_label_encoders' not in st.session_state:
        st.session_state.modified_label_encoders = get_serialized_label_encoders()
    serialized_label_encoders = st.session_state.modified_label_encoders
    input_cols_to_drop = []
    for feat, mapping in serialized_label_encoders.items():
        # Get the input column from the serialized label encoder
        input_col = feat[:-len("_ENCODED")]  # Extract the column name from the encoded column name

        # Get the mapping dictionary for the current feature
        encoding_mapping = {int(k): v for k, v in mapping.items()}
        
        # Generate encoding for the input column
        case_stmt = "CASE"
        for encoded_val, class_val in encoding_mapping.items():
            case_stmt += f" WHEN {input_col} = '{class_val}' THEN {encoded_val}"
        case_stmt += " ELSE NULL END"

        # Apply label encoding using the CASE statement
        output_col = f"{input_col}_encoded"
        df = df.withColumn(output_col, F.expr(case_stmt))

        if input_col not in ("CITY", "TRUCK_BRAND_NAME", "MENU_ITEM_NAME"):
            input_cols_to_drop.append(input_col)

    # Drop the original input columns
    df = df.drop(*input_cols_to_drop)
    return df

# Function to simulate processing features
def process_features(df):
    with st.spinner("Processing Features..."):
        time.sleep(1)
        df_encoded = apply_label_encoding(df)
        mms = MinMaxScaler(feature_range=(0, 1), input_cols=dense_features, output_cols=dense_features)
        mms.fit(df_encoded)
        testdata = mms.transform(df_encoded)
    return testdata

In [None]:
import streamlit as st
from snowflake.snowpark.functions import rank, col
from snowflake.snowpark.window import Window

sparse_features_encoded = ['MENU_ITEM_NAME_ENCODED',
                           'MENU_TYPE_ENCODED',
                           'TRUCK_BRAND_NAME_ENCODED',
                           'ITEM_CATEGORY_ENCODED',
                           'ITEM_SUBCATEGORY_ENCODED',
                           'CITY_ENCODED',
                           'COUNTRY_ENCODED',
                           'GENDER_ENCODED',
                           'MARITAL_STATUS_ENCODED']

@st.cache_data(ttl=120)
def get_listdata():
    country_list = session.sql("""select distinct COUNTRY
                                                from ml.customer_features 
                                                order by COUNTRY;""").to_pandas()
    city_list = session.sql("""select distinct CITY 
                                                from ml.customer_features 
                                                order by CITY;""").to_pandas()
    truck_brand_name_list = session.sql("""select distinct TRUCK_BRAND_NAME
                                                        from ml.menu_item_features
                                                        order by TRUCK_BRAND_NAME;""").to_pandas()
    return country_list, city_list, truck_brand_name_list

def on_change_callback(key):
    if key == "country":
        filters['city'] = []
        country_where_clause = ""
        if len(filters['country']) > 0 :
            country_where_clause += "("
            for x in filters['country']:
                country_where_clause += f"'{x}',"
            country_where_clause = country_where_clause[:-1] + ")"
            city_list = session.sql(f"""select distinct CITY 
                                                        from ml.customer_features 
                                                        where COUNTRY in {country_where_clause} 
                                                        order by CITY;""").to_pandas()
        else:
            city_list = session.sql("""select distinct CITY 
                                                        from ml.customer_features 
                                                        order by CITY;""").to_pandas()
session = get_active_session()
country_list, city_list, truck_brand_name_list = get_listdata()
filters = {}

col1, col2, col3 = st.columns(3, gap="medium")
filters['country'] = col1.multiselect('対象国',
                                       country_list,
                                       on_change=on_change_callback,
                                       key="country",
                                       kwargs={"key": "country"})
filters['city'] = col2.multiselect('対象都市',
                                    city_list,
                                    on_change=on_change_callback,
                                    key="city",
                                    kwargs={"key": "city"})
filters['truck_brand_name'] = col3.multiselect('Truck Brand Name',
                                                  truck_brand_name_list,
                                                  on_change=on_change_callback,
                                                  key="truck_brand_name",
                                                    kwargs={"key": "truck_brand_name"})

if st.button("推論実行"):
    df = get_filtered_data()
    data = process_features(df)
    recommendations = infer_model(data)
    recommendations

### タスクとして定期実行

In [None]:
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session


def get_data_task(session: Session)-> None:
    avg_monthly_purchase_amount = session.sql(f"SELECT  customer_id, ROUND(SUM(order_total) / (TIMESTAMPDIFF(MONTH, MIN(date), MAX(date)) + 1),2) AS avg_monthly_purchase_amount FROM TASTYBYTESENDTOENDML_PROD.ANALYTICS.ORDERS_V GROUP BY customer_id")
    avg_weekly_purchase_amount = session.sql(f"SELECT customer_id,ROUND(SUM(order_total) / (TIMESTAMPDIFF(WEEK, MIN(date), MAX(date)) + 1),2) AS avg_weekly_purchase_amount FROM TASTYBYTESENDTOENDML_PROD.ANALYTICS.ORDERS_V GROUP BY customer_id")
    avg_yearly_purchase_amount = session.sql(f"SELECT customer_id, ROUND(SUM(order_total)/(TIMESTAMPDIFF(YEAR, MIN(date), MAX(date)) + 1),2) AS avg_yearly_purchase_amount FROM TASTYBYTESENDTOENDML_PROD.ANALYTICS.ORDERS_V GROUP BY customer_id")
    cust_avgs= avg_monthly_purchase_amount.join(avg_weekly_purchase_amount,"CUSTOMER_ID").join(avg_yearly_purchase_amount,"CUSTOMER_ID")
    
    cust_avgs = cust_avgs_spdf
    item_df = session.table('ml.menu_item_features').filter("item_category != 'Beverage'")
    customer_df = session.table('ml.customer_features')
    purchase_df = session.table('analytics.loyalty_purchased_items')
    df = purchase_df.join(item_df, 'menu_item_name', 'left')
    filters_clause = "1=1"
    result_df = df.join(customer_df, 'customer_id', 'left')
    final_df = result_df.join(cust_avgs, 'customer_id', 'left').filter(filters_clause).order_by("customer_id").limit(10000)
    session.write_pandas(final_df, table_name="customer_data",overwrite=True, auto_create_table=True, table_type=""temporary"")

def preprocess_task(session: Session) -> None:
  pass  # do something

def infer_task(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "Asia/Tokyo"),
    stage_location="@UDF_STAGE",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      get_data_task,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", preprocess_task, warehouse=test_warehouse)
    task3 = DAGTask("task3", infer_task, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()

## 総括
このノートブックはGPUコンピュートプールを活用し、スケールの大きなデータとディープラーニングモデルを扱う能力を解き放ちました。エンドツーエンドで、このワークフローはSnowpark DataFrameを使用し、以下の機能を活用して開発とデプロイを簡素化しました：
- GPUコンテナランタイム上でのSnowflakeノートブック
- Snowflake フィーチャーストア
- Snowflake Modeling API - 前処理、トレーニング（PyTorch API PuPr）、評価
- Snowflakeモデルレジストリ
- SPCSへのモデルデプロイメント

In [None]:
session.close()