In [11]:
import sqlite3
import pandas as pd
import joblib
import lightfm

from snowflake.snowpark import Session
from snowflake.connector.pandas_tools import write_pandas
import snowflake.snowpark.functions as f

from datetime import datetime
from sklearn.preprocessing import LabelEncoder
import lightfm
from lightfm import LightFM
from lightfm.data import Dataset
from lightfm.cross_validation import random_train_test_split
from lightfm.evaluation import precision_at_k, recall_at_k
import numpy as np
from lightfm.evaluation import precision_at_k, recall_at_k, auc_score
import ast
import numpy as np
import joblib
import os 

from TopicAnalysis import TopicAnalysis

In [5]:
CONNECTION_PARAMETERS = {}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()

In [13]:
def train_recommender(session: Session, file_prefix: str) -> None:
    batch_size = 10**7
    num_threads = os.cpu_count()
    
    def get_data_count():
        base_data = session.table('MODELING.RECOMMENDER_TRAIN')
        return base_data.count()
    
    def get_batch_data(num, offset):
        base_data = session.table('MODELING.RECOMMENDER_TRAIN')
        base_data = base_data.limit(num, offset=offset)
        return base_data.toPandas()
    
    def get_emails():
        base_data = session.table('MODELING.RECOMMENDER_TRAIN')
        base_data = base_data.select('EMAIL')
        base_data = base_data.distinct()
        return base_data.toPandas()
    
    def get_campaign_ids():
        result = session.sql("""SELECT DISTINCT CAMPAIGN_ID, SCORES FROM MODELING.RECOMMENDER_TRAIN""")
        df = result.toPandas()
        df['SCORES'] = df['SCORES'].apply(lambda x: [float(x) for x in x.split(',')])
        return df
        
    class LightModel:
        def __init__(self, users, items, feature_list):
            self.dataset = Dataset(item_identity_features=False)
            self.dataset.fit(users=users, items=items, item_features=[i for i in range(len(feature_list[0]))])
            self.users = users
            self.items = items
            self.feature_list = feature_list
            feature_dicts = []
            for features in feature_list:
                feature_dicts.append({i: float(feature) for i, feature in enumerate(features)})
            self.feature_dicts = feature_dicts
            self.item_features = self.dataset.build_item_features(zip(items, feature_dicts))
            self.model = LightFM(loss='warp')
            
        def add_data(self, users, items):        
            interactions, _ = self.dataset.build_interactions(zip(users, items))
            self.model.fit_partial(interactions, item_features=self.item_features, num_threads=num_threads)
    
        def predict(self, feature_list):
            new_id = max(self.items) * 2
            new_features = {i: float(feature) for i, feature in enumerate(feature_list)}
            self.dataset.fit_partial(users=None, items=[new_id])
            new_feature_dicts = self.feature_dicts + [new_features]
            new_items = list(self.items) + [new_id]
            item_features = model.dataset.build_item_features(zip(new_items, new_feature_dicts))
    
            user_mapping = self.dataset.mapping()[0]
            item_mapping = self.dataset.mapping()[2]
    
            all_users = list(user_mapping.values())
            item_expanded = [item_mapping[new_id]] * len(all_users)
            results = self.model.predict(list(user_mapping.values()), item_ids=item_expanded, item_features=item_features, num_threads=num_threads)
    
            all_emails = list(user_mapping.keys())
            
            return list(zip(all_emails, results))

    id_score = get_campaign_ids()
    emails = get_emails()
    num_rows = get_data_count()

    ids = id_score['CAMPAIGN_ID']
    scores = id_score['SCORES']
    unique_emails = emails['EMAIL']
    
    model = LightModel(unique_emails, ids, scores)

    num_batches = num_rows // batch_size + 1
    rows = []
    for i in range(num_batches):
        df = get_batch_data(batch_size, batch_size * i)
        print(f"Completed {i}/{num_batches}")
        try:
            model.add_data(df['EMAIL'].values, df['CAMPAIGN_ID'].values)
        except:
            print('Exception occurred')
            try:
                df_filtered = df[df['EMAIL'].isin(unique_emails['EMAIL'])]
                model.add_data(df_filtered['EMAIL'].values, df_filtered['CAMPAIGN_ID'].values)
            except:
                continue

    joblib.dump(model.model, f'/tmp/recommender.joblib')
    joblib.dump(model.dataset, f'/tmp/dataset.joblib')
    
    session.file.put(
        f'/tmp/recommender.joblib',
        f"@MODELS/recommender/{file_prefix}/",
        auto_compress=False,
        overwrite=True
    )
    session.file.put(
        f'/tmp/dataset.joblib',
        f"@MODELS/recommender/{file_prefix}/",
        auto_compress=False,
        overwrite=True
    )

In [14]:
registered_function = session.sproc.register(
        func=train_recommender,
        name="train_recommender",
        is_permanent=True,
        replace=True,
        stage_location="@MODELS",
        packages=["snowflake-snowpark-python", "pandas",  "numpy", "lightfm", "joblib", "scikit-learn"],
        execute_as='CALLER'
    )

In [15]:
session.sql("""CALL MODELING.TRAIN_RECOMMENDER('test_0')""").collect()

[Row(TRAIN_RECOMMENDER=None)]

In [16]:
session.file.get('@MODELS/recommender/test_0/', 'files/')

[GetResult(file='test_0/dataset.joblib', size=89670100, status='DOWNLOADED', message=''),
 GetResult(file='test_0/recommender.joblib', size=309063339, status='DOWNLOADED', message='')]