In [1]:
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s',force=True)
#logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',force=True)# stream=sys.stdout)
logging.info("Hello")

Hello


In [2]:
import joblib,subprocess,requests,io
import pandas as pd
import numpy  as np
from   s3fs   import S3FileSystem
# -------------------------------------------------
#    системные функции
# -------------------------------------------------

def host_ip():
    ''' Возвращает ip-адрес текущего хоста '''
    return subprocess.run(['curl', 'ifconfig.co/'], capture_output=True, text=True).stdout.strip()

def post_request(url, params=None):
    ''' Формирует POST-запрос к заданному сервису '''
    headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}

    resp = requests.post(url, headers=headers, params=params)

    if resp.status_code == 200:
        recs = resp.json()
    else:
        recs = {"result": "ERROR", "code": f"{resp.status_code}"}
        logging.error(f"Error status code: {resp.status_code} received from {url} ({params})")
    return recs

def pd_info(df: pd.DataFrame):
    ''' Отписывает результат pd.DataFrame.info в строку (для logging) '''
    with io.StringIO() as output:
        df.info(show_counts=True, buf=output)
        return output.getvalue()


# -------------------------------------------------
#    функции работы с файлами в хранилище S3
# -------------------------------------------------

def load_csv_files(s3: S3FileSystem, path_list: list):
    try:
        df = pd.DataFrame()
        for path in path_list:
            with s3.open(path, mode='r') as fd:
                df = pd.concat([df, pd.read_csv(fd)], axis=0, ignore_index=True)
        return df
    except:
        logging.warning(f"Error loading file: {path}")
        return None

def load_parquet_file(s3: S3FileSystem, path: str):
    try:
        df = pd.DataFrame()
        with s3.open(path, mode='rb') as fd:
            df = pd.read_parquet(fd)
        return df
    except:
        logging.error(f"Error loading file: {path}")
        return None

def load_pkl_file(s3: S3FileSystem, path: str):
    try:
        with s3.open(path, mode='rb') as fd:
            obj = joblib.load(fd)
        return obj
    except Exception as e:
        logging.error(f"Error {e} loading file: {path}")
        return None

def save_to_parquet(df, s3: S3FileSystem, path: str, verbose=True):
    with s3.open(path, mode='wb') as fd:
        df.to_parquet(fd)
    if verbose:  logging.info(f"save_to_parquet:\n{path}\n{pd_info(df)}")
    return

def save_to_pkl(obj, s3: S3FileSystem, path: str, verbose=True):
    with s3.open(path, mode='wb') as fd:
        joblib.dump (obj, fd)
    if verbose:  logging.info(f"save_to_pkl: {path}")
    return

def delete_s3_files(s3: S3FileSystem, files: dict):
    for k,path in files.items():
        s3.rm(path)
    return



# -------------------------------------------------
#    функции для извлечения item properties
# -------------------------------------------------

def get_registered_items(timestamp, items_ctgr):
    ''' Возвращает набор itemid, для которых определена корректная категория на заданный момент времени '''
    return  set(items_ctgr.query("timestamp <= @timestamp")['itemid'])

def get_unavailable_items(timestamp, items_avail):
    ''' Возвращает набор itemid, для которых установлен признак available==0 на заданный момент времени '''
    tmp = items_avail.query("timestamp <= @timestamp").drop_duplicates(subset=['itemid'], keep='first')
    return  set(tmp.query("value == '0'")['itemid'])

def get_available_items(timestamp, items_ctgr, items_avail):
    ''' Возвращает набор itemid, доступных на заданный момент времени '''
    return  get_registered_items(timestamp, items_ctgr) - get_unavailable_items(timestamp, items_avail)

def get_item_availability(timestamp, items_ctgr, items_avail):
    ''' Возвращает признак доступности товаров, актуальный на заданный момент времени '''
    lst = get_registered_items(timestamp, items_ctgr)
    tmp = items_avail.query("timestamp <= @timestamp and itemid in @lst") \
                     .drop_duplicates(subset=['itemid'], keep='first').reset_index(drop=True)
    return  tmp[['itemid','value']]

def get_item_category(timestamp, items_ctgr):
    ''' Возвращает категории товаров, актуальные на заданный момент времени '''
    tmp = items_ctgr.query("timestamp <= @timestamp") \
                    .drop_duplicates(subset=['itemid'], keep='first').reset_index(drop=True)
    return  tmp[['itemid','categoryid']]

def get_item_properties(timestamp, items):
    ''' Возвращает свойства товаров/товара, актуальные на заданный момент времени '''
    tmp = items.query("timestamp <= @timestamp") \
               .drop_duplicates(subset=['itemid','property'], keep='first').reset_index(drop=True)
    return  tmp[['itemid','property','value_code']]



In [3]:
import os
#from   airflow.models.connection          import Connection
#from   airflow.utils.session              import create_session
#from   airflow.providers.amazon.aws.fs.s3 import get_fs
from   dotenv     import find_dotenv, load_dotenv
load_dotenv(find_dotenv("env.services"))

S3_DIR          = f"{os.environ['S3_BUCKET_NAME']}/Diplom"
AWS_ACCESS_KEY  = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_KEY  = os.environ['AWS_SECRET_ACCESS_KEY']
S3_ENDPOINT_URL = os.environ['AWS_ENDPOINT_URL']
MY_HOST         = host_ip()  if not "HOST_IP" in os.environ  else os.environ['HOST_IP']

# параметры конфигурации DAG-а
CONFIG      = {
    "EVENT_HISTORY_WEEKS": 26,  # сохранять events только за последние подгода
    "EVENT_POPULAR_WEEKS": 12,  # глубина истории (в неделях) для определения популярных товаров
    "EVENT_TARGET_WEEKS" : 2,   # количество недель для target-периода (в режиме переобучения моделей)
    "EVENT_TEST_WEEKS"   : 1,   # количество недель для test-периода (в режиме переобучения моделей)
    "EVENT_CUT_OFF_WEEKS": 4,   # количество недель для inference-периода (в режиме расчета рекомендаций)
    "ALS_RECS_PER_USER"  : 15,  # количество коллаборативных рекомендаций на пользователя
    "ALS_SIMS_PER_ITEM"  : 15,  # количество подобных товаров на основе коллаборативных рекомендаций
    "EXPERIMENT_NAME"    : f"{os.environ['MLFLOW_EXPERIMENT_NAME']}",
    "MODEL_NAME"         : f"{os.environ['MLFLOW_MODEL_NAME']}",
    "MLFLOW_SERVER_URL"  : f"http://{MY_HOST}:{os.environ['MLFLOW_SERVER_PORT']}",
}

# используемые ресурсы
SRC_FILES = {
    "cats_src" : f"{S3_DIR}/source_data/category_tree.csv",
    "props_src":[f"{S3_DIR}/source_data/item_properties_part1.csv",
                    f"{S3_DIR}/source_data/item_properties_part2.csv"],
    "event_src": f"{S3_DIR}/source_data/events.csv"
}
INFER_FILES = {
    "cats_dst" : f"{S3_DIR}/infer_data/category_tree.parquet",
    "item_cat" : f"{S3_DIR}/infer_data/item_categories.parquet",
    "item_prop": f"{S3_DIR}/infer_data/item_properties.parquet",
    "available": f"{S3_DIR}/infer_data/item_availability.parquet",
    "event_dst": f"{S3_DIR}/infer_data/events.parquet",
    "eventlast": f"{S3_DIR}/infer_data/last_events.parquet",
}
PROD_FILES = {
    "top_pop"  : f"{S3_DIR}/recommendations/top_popular.parquet",
    "similar"  : f"{S3_DIR}/recommendations/similar_items.parquet",
    "ranked"   : f"{S3_DIR}/recommendations/ranked_candidades.parquet",
    "final"    : f"{S3_DIR}/recommendations/final_recommendations.parquet"
}
MODEL_FILES = {
    "als_parms": f"{S3_DIR}/model/als_params.pkl",
    "cb_parms" : f"{S3_DIR}/model/cb_params.pkl",
    "cb_model" : f"{S3_DIR}/model/cb_model.pkl"
}
REC_SERVICES= {
    "rec_serv" : f"http://{MY_HOST}:{os.environ['RECOMMENDATIONS_PORT']}",
    "features" : f"http://{MY_HOST}:{os.environ['FEATURES_STORE_PORT']}",
    "events"   : f"http://{MY_HOST}:{os.environ['EVENTS_STORE_PORT']}",
}

MY_HOST

'89.169.168.158'

In [4]:
s3 = S3FileSystem(
    endpoint_url=os.environ['AWS_ENDPOINT_URL'],
    key=os.environ['AWS_ACCESS_KEY_ID'],
    secret=os.environ['AWS_SECRET_ACCESS_KEY'], cache_regions=True
)


In [5]:
import scipy
from   sklearn.preprocessing   import MinMaxScaler
from   implicit.als            import AlternatingLeastSquares
from   implicit.evaluation     import mean_average_precision_at_k, ndcg_at_k, AUC_at_k
from   sklearn.model_selection import ParameterGrid
from   catboost                import CatBoostClassifier, Pool
from   catboost.utils          import eval_metric
from   threadpoolctl           import threadpool_limits
threadpool_limits(1, "blas")
RANDOM_STATE = 42

def calc_item_rating(df):
    ''' Формирует вектор рейтинга взаимодействий: наличие просмотров + добавления в корзину + покупки*2 '''
    return ((df[0] > 0) + df[1] + df[2]*2).astype(np.int16)

def user_item_matrix(events_set, users, items):
    ''' Формирует и возвращает матрицу взаимодействий в dense и sparse формате '''
    user_item = events_set.query("visitorid in @users  and  itemid in @items")     \
                            .groupby(['visitorid','itemid'])['event'].value_counts() \
                            .unstack(fill_value=0).reset_index()
    # формируем рейтинг взаимодействий
    user_item['rating'] = 0  if user_item.shape[0]==0  else calc_item_rating(user_item)
    return user_item, scipy.sparse.csr_matrix(
        (user_item['rating'], (user_item['visitorid'], user_item['itemid'])), 
            shape=(users.max()+1, items.max()+1)
    )



In [6]:
def als_validate(csr_train, csr_val, k=10, **kwargs):
    ''' Обучает и валидирует ALS с заданными гиперпараметрами '''
    model = AlternatingLeastSquares(random_state=RANDOM_STATE, **kwargs)
    model.fit(csr_train, show_progress=False)
    map_k = 0 if csr_val is None  \
            else mean_average_precision_at_k(model, csr_train, csr_val, k, show_progress=False)
    return  model, map_k

def als_train(csr_train, csr_val, **hyper_params):
    ''' Обучает ALS с возможным подбором гиперпараметров '''

    if hyper_params:
        model,_ = als_validate(csr_train, None, **hyper_params)
        return  model, hyper_params
    
    # подбор гиперпараметров
    grid = {
        'alpha':[30.0,50.0,70.0,100.0], 'regularization':[0.005,0.01,0.05], 
        'iterations':[10,15,20,25],     'factors':[50,100,150,200]
    }
    test_grid   = {'alpha':[100.0], 'factors':[100], 'iterations':[15], 'regularization':[0.005]}
    best_metric = 0
    best_params = {}
    best_model  = None

    for params in list(ParameterGrid(test_grid)):     # в prom-e ЗАМЕНИТЬ test_grid НА grid !!!
        model, metric = als_validate(csr_train, csr_val, **params)
        if metric > best_metric:
            best_metric = metric
            best_params = params
            best_model  = model
    return  best_model, best_params



def als_train_bayes(csr_train, csr_val, **hyper_params):
    ''' Обучает ALS с возможным подбором гиперпараметров '''

    if hyper_params:
        model,_ = als_validate(csr_train, None, **hyper_params)
        return  model, hyper_params
    
    from skopt           import BayesSearchCV
    from sklearn.base    import BaseEstimator
    best_model= None
    best_score= 0
    als_cache = dict()

    class ScikitAls (BaseEstimator):
        def __init__(self, factors=100, regularization=0.01, alpha=1.0, iterations=3, **kwargs):
            self.factors=factors
            self.regularization=regularization
            self.alpha=alpha
            self.iterations=iterations
            self.params=kwargs
        def fit(self, X, y=None):
            nonlocal best_model, best_score, als_cache
            if  f"{self.get_params()}" in als_cache.keys(): return
            self.als = AlternatingLeastSquares(random_state=RANDOM_STATE, **self.get_params())
            self.als.fit(csr_train, **self.params)
        def predict(self, X):
            return X
        def calc_score(self):
            nonlocal best_model, best_score, als_cache
            key = f"{self.get_params()}"
            if  key not in als_cache.keys():
                #als_cache[key]= round(mean_average_precision_at_k(self.als, csr_train, csr_val, 10, show_progress=False), 5)
                als_cache[key]= round(ndcg_at_k(self.als, csr_train, csr_val, 10, show_progress=False), 5)
                if  best_score < als_cache[key]:
                    best_score = als_cache[key]
                    best_model = self.als
                print (f"score [{key}] => {als_cache[key]}")
            return als_cache[key]


    def als_scorer(estimator, X, y=None):
        return  estimator.calc_score()

    
    searchcv = BayesSearchCV(
        ScikitAls(random_state=RANDOM_STATE),
        search_spaces={
            'regularization': (0.005, 1.0, 'log-uniform'),
            'alpha':          (1.0, 100.0, 'uniform'),
            'iterations':     (10,    25),
            'factors':        (30,   150)
        },
        fit_params={'show_progress': True},
        scoring=als_scorer,
        n_iter=30,
        cv=2,
        random_state=RANDOM_STATE,
    )
    searchcv.fit(csr_train)
    return  best_model, dict(searchcv.best_params_)


from implicit.cpu.bpr import BayesianPersonalizedRanking
def als_train_(csr_train, csr_val, **hyper_params):
    ''' Обучает ALS с возможным подбором гиперпараметров '''

    if hyper_params:
        model,_ = als_validate(csr_train, None, **hyper_params)
        return  model, hyper_params
    
    from skopt           import BayesSearchCV
    from sklearn.base    import BaseEstimator
    best_model= None
    best_score= 0
    als_cache = dict()

    class ScikitAls (BaseEstimator):
        def __init__(self, factors=100, regularization=0.01, learning_rate=0.01, iterations=100, **kwargs):
            self.factors=factors
            self.regularization=regularization
            self.learning_rate=learning_rate
            self.iterations=iterations
            self.params=kwargs
        def fit(self, X, y=None):
            nonlocal best_model, best_score, als_cache
            if  f"{self.get_params()}" in als_cache.keys(): return
            self.als = BayesianPersonalizedRanking(random_state=RANDOM_STATE, **self.get_params())
            self.als.fit(csr_train, **self.params)
        def predict(self, X):
            return X
        def calc_score(self):
            nonlocal best_model, best_score, als_cache
            key = f"{self.get_params()}"
            if  key not in als_cache.keys():
                #als_cache[key]= round(mean_average_precision_at_k(self.als, csr_train, csr_val, 10, show_progress=False), 5)
                als_cache[key]= round(AUC_at_k(self.als, csr_train, csr_val, 10, show_progress=False), 5)
                if  best_score < als_cache[key]:
                    best_score = als_cache[key]
                    best_model = self.als
                print (f"score [{key}] => {als_cache[key]}")
            return als_cache[key]


    def als_scorer(estimator, X, y=None):
        return  estimator.calc_score()

    
    searchcv = BayesSearchCV(
        ScikitAls(random_state=RANDOM_STATE),
        search_spaces={
            'regularization': (0.005, 1.0, 'log-uniform'),
            'learning_rate':  (0.005, 1.0, 'log-uniform'),
            'iterations':     (10,   100),
            'factors':        (30,   150)
        },
        fit_params={'show_progress': True},
        scoring=als_scorer,
        n_iter=50,
        cv=2,
        random_state=RANDOM_STATE,
    )
    searchcv.fit(csr_train)
    print (f"=== BEST SCORE: {best_score} ===")
    return  best_model, dict(searchcv.best_params_)



In [7]:
def reg_model(model, X, y_pred, metrics, params, config, artifacts=None, runid=None, desc="Test mlflow environment"):
    import mlflow 
    MODEL_NAME = config["MODEL_NAME"]

    mlflow.set_tracking_uri(config['MLFLOW_SERVER_URL'])
    logging.info(f"MLFlow tracking URL set to: {mlflow.get_tracking_uri()}")
    experiment_id = mlflow.set_experiment(config['EXPERIMENT_NAME']).experiment_id
    logging.info(f"MLFlow experiment name/id: {config['EXPERIMENT_NAME']}/{experiment_id}")

    with mlflow.start_run(run_id=runid, run_name=f"{MODEL_NAME}_run", experiment_id=experiment_id, description=desc) as run:
        run_id = run.info.run_id
        if  model is not None:
            input_example = X[:10]
            signature     = mlflow.models.infer_signature(X, y_pred)
            model_info    = mlflow.catboost.log_model(
                cb_model             = model,
                registered_model_name= MODEL_NAME,
                input_example        = input_example,
                signature            = signature,
                await_registration_for=0,
                artifact_path        = f'model'
            )
            mlflow.log_params(params)
            mlflow.log_metrics(metrics)
            mlflow.set_tag('estimator', 'CatBoostClassifier')
            logging.info(f"Model <{MODEL_NAME}> registered successfully (run_id:{run_id})")

        if artifacts is not None:
            for key,val in artifacts.items():
                mlflow.log_dict(val, f"{key}.json")
                logging.info(f"MLFlow artifact {key}.json logged (run_id:{run_id})")

    logging.info(f"MLFlow run {run_id} closed")
    return run_id


In [8]:
def cb_train(train_pool):
    ''' Обучает Catboost с подбором гиперпараметров '''

    model = CatBoostClassifier(random_state=RANDOM_STATE, verbose=False, 
        auto_class_weights='Balanced', loss_function='Logloss', eval_metric='Recall'
    )

    grid = {
        'learning_rate':[0.18, 0.16, 0.14, 0.12, 0.09, 0.03],
        'l2_leaf_reg':  [1, 3, 5],
        'depth'        :[4, 6, 8, 10],
        'iterations'   :[10,100,500,1000],
    }
    test_grid = {
        'l2_leaf_reg':  [   1,    3],
        'learning_rate':[0.14, 0.12],
        'iterations'   :[  10,  100],
        'depth'        :[   6,    4],
    }

    result = model.grid_search(test_grid, X=train_pool, stratified=True, refit=True, 
        partition_random_seed=RANDOM_STATE, plot=False, verbose=False 
    )
    logging.info(f"cb_result: {result}")
    return  model, result['params']



In [9]:
def cb_train_new(X, y):
    ''' Обучает Catboost с подбором гиперпараметров '''

    model = CatBoostClassifier(random_state=RANDOM_STATE, verbose=False, 
        auto_class_weights='Balanced', loss_function='Logloss', #eval_metric='Recall'
    )

    from skopt import BayesSearchCV
    searchcv = BayesSearchCV(
        model,
        search_spaces={
            'learning_rate' : (0.03, 0.3, 'uniform'),
            'l2_leaf_reg'   : (1, 7),
            'depth'         : (4, 10),
            'iterations'    : (10,1000)
        },
        #fit_params={'show_progress': True},
        #scoring=als_scorer,
        n_iter=30,
        cv=3,
        random_state=RANDOM_STATE,
    )
    searchcv.fit(X, y)
    return  searchcv.best_estimator_, dict(searchcv.best_params_)


In [10]:
# ---------------------------------------
# Загружаем исходные файлы
# ---------------------------------------
if ((items      := load_parquet_file(s3, INFER_FILES['item_prop'])) is None or
    (items_ctgr := load_parquet_file(s3, INFER_FILES['item_cat' ])) is None or
    (items_avail:= load_parquet_file(s3, INFER_FILES['available'])) is None or
    (events     := load_parquet_file(s3, INFER_FILES['event_dst'])) is None or
    (als_params := load_pkl_file    (s3, MODEL_FILES['als_parms'])) is None or
    (cb_model   := load_pkl_file    (s3, MODEL_FILES['cb_model' ])) is None
):  raise Exception()


In [11]:
retrain = True

# ---------------------------------------
# Определяем опорные даты для расчетов
# ---------------------------------------
MS_PER_DAY     = 24*60*60*1000
max_event_time = events['timestamp'].max()
infer_time     = ((max_event_time-1) // MS_PER_DAY +1) * MS_PER_DAY
infer_date     = pd.to_datetime(infer_time, unit='ms')
logging.info(f"Calculated inference date: {infer_date}")

# для ограничения размера матрицы взаимодействий, в режиме расчета рекомендаций возьмем только 
# пользователей, проявивших активность за последние недели (~60 тыс уникальных пользователей в неделю)
cut_off_time   = 0 if retrain  else infer_time - 7*CONFIG['EVENT_CUT_OFF_WEEKS']*MS_PER_DAY
if not retrain:  logging.info(f"Calculated cut_off time: {pd.to_datetime(cut_off_time, unit='ms')}")

# в режиме переобучения моделей необходимо определить точки для разделения выборки на train/target/test
test_time      = infer_time if not retrain  else (
                    infer_time - 7*CONFIG['EVENT_TEST_WEEKS']*MS_PER_DAY
)
if  retrain:  logging.info(f"Calculated test_time: {pd.to_datetime(test_time, unit='ms')}")
target_time    = infer_time if not retrain  else (
                    test_time - 7*CONFIG['EVENT_TARGET_WEEKS']*MS_PER_DAY
)
if  retrain:  logging.info(f"Calculated target_time: {pd.to_datetime(target_time, unit='ms')}")
# расчитываем время отсечения событий для определения АКТУАЛЬНЫХ топ-100 товаров
top_pop_time   = infer_time - 7*CONFIG['EVENT_POPULAR_WEEKS']*MS_PER_DAY
logging.info(f"Calculated top_pop_time: {pd.to_datetime(top_pop_time, unit='ms')}")



Calculated inference date: 2015-09-13 00:00:00
Calculated test_time: 2015-09-06 00:00:00
Calculated target_time: 2015-08-23 00:00:00
Calculated top_pop_time: 2015-06-21 00:00:00


In [12]:
top_popular = events.query("timestamp >= @top_pop_time")                               \
                    .groupby(['itemid'])['event'].value_counts().unstack(fill_value=0) \
                    .sort_values(by=2,ascending=False).head(100)

# считаем и масштабируем рейтинг популярности
top_popular['rating']    = calc_item_rating(top_popular)
top_popular['pop_score'] = MinMaxScaler().fit_transform(top_popular['rating'].to_frame())

# сортируем по скорингу
top_popular = top_popular[['rating','pop_score']].sort_values(by='rating',ascending=False).reset_index()
logging.info(f"Calculated top_popular: {pd_info(top_popular)}")


Calculated top_popular: <class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   itemid     100 non-null    int64  
 1   rating     100 non-null    int16  
 2   pop_score  100 non-null    float64
dtypes: float64(1), int16(1), int64(1)
memory usage: 1.9 KB



In [13]:
#runid = reg_model(None, None, None, None, None, CONFIG, artifacts={'top_popular': top_popular.to_dict()})
#runid

In [14]:
# пользователи, на которых будем обучать als-модель
als_users = events.query("timestamp < @target_time")['visitorid'].unique()
logging.info(f"als_users: {als_users.shape[0]}")

# для рекомендаций (но не для обучения!) будем использовать только доступные товары
if  not retrain:
    av_items = np.array(list(get_available_items (infer_time, items_ctgr, items_avail)))
else:
    av_items = np.array(list(get_registered_items(target_time, items_ctgr)))
logging.info(f"available items: {av_items.shape[0]}")

# строим матрицу взаимодействий для обучения:  als_users x av_items
user_item, user_item_sparse = user_item_matrix(
    events.query("timestamp <  @target_time"), als_users, av_items
)
# валидационная матрица взаимодействий (в режиме расчета рекомендаций - ПУСТАЯ)
user_item_val, user_item_val_sparse = user_item_matrix(
    events.query("timestamp >= @target_time"), als_users, av_items
)


als_users: 867213
available items: 412906


In [15]:
retrain

True

In [16]:
# обучаем ALS-модель
if  not retrain:
    als_model,_ = als_train(user_item_sparse, None, **als_params)
else:
    als_model,als_params = als_train_(user_item_sparse, user_item_val_sparse)
logging.info(f"als_params: {als_params}")


  0%|          | 0/75 [00:00<?, ?it/s]

score [{'factors': 79, 'iterations': 75, 'learning_rate': 0.700692149436217, 'regularization': 0.02664614222120147}] => 0.50716


  0%|          | 0/89 [00:00<?, ?it/s]

score [{'factors': 130, 'iterations': 89, 'learning_rate': 0.024953172459998858, 'regularization': 0.7722621432745812}] => 0.5


  0%|          | 0/93 [00:00<?, ?it/s]

score [{'factors': 83, 'iterations': 93, 'learning_rate': 0.00871473275220427, 'regularization': 0.04966858407544434}] => 0.50011


  0%|          | 0/25 [00:00<?, ?it/s]

score [{'factors': 127, 'iterations': 25, 'learning_rate': 0.11887602332371709, 'regularization': 0.35168661954941205}] => 0.5


  0%|          | 0/49 [00:00<?, ?it/s]

score [{'factors': 126, 'iterations': 49, 'learning_rate': 0.08142147512462601, 'regularization': 0.22174186133335963}] => 0.49999


  0%|          | 0/95 [00:00<?, ?it/s]

score [{'factors': 118, 'iterations': 95, 'learning_rate': 0.011896933386650058, 'regularization': 0.013568825353547214}] => 0.50294


  0%|          | 0/80 [00:00<?, ?it/s]

score [{'factors': 104, 'iterations': 80, 'learning_rate': 0.033503682146853224, 'regularization': 0.5138291456389467}] => 0.50002


  0%|          | 0/93 [00:00<?, ?it/s]

score [{'factors': 95, 'iterations': 93, 'learning_rate': 0.06939925065759288, 'regularization': 0.42309957790998937}] => 0.49999


  0%|          | 0/73 [00:00<?, ?it/s]

score [{'factors': 145, 'iterations': 73, 'learning_rate': 0.5066023937049742, 'regularization': 0.045267530167202336}] => 0.50764


  0%|          | 0/84 [00:00<?, ?it/s]

score [{'factors': 30, 'iterations': 84, 'learning_rate': 0.2539123555141126, 'regularization': 0.019262248760035825}] => 0.50488


  0%|          | 0/42 [00:00<?, ?it/s]

ModelFitError: NaN encountered in factors

In [17]:
# пользователи, которым будем давать персональные рекомендации
hot_users = events.query("visitorid in @user_item['visitorid'].unique()")['visitorid'].unique()
logging.info(f"user_item users: {hot_users.shape[0]}")
if  not retrain:
    hot_users = events.query(
        "visitorid in @hot_users  and  timestamp >= @cut_off_time"
    )['visitorid'].unique()
else:
    hot_users = events.query(
        "visitorid in @hot_users  and  visitorid in @user_item_val['visitorid'].unique()"
    )['visitorid'].unique()
logging.info(f"hot_users: {hot_users.shape[0]}")


user_item users: 867213
hot_users: 15545


In [18]:
# вычисляем коллаборативные рекомендации
RECS_PER_USER = CONFIG['ALS_RECS_PER_USER']
als_recommendations = als_model.recommend(
    hot_users, 
    user_item_sparse[hot_users], 
    filter_already_liked_items=True, N=RECS_PER_USER
)
personal_als  = pd.DataFrame({
    'itemid'   : als_recommendations[0].ravel(),
    'als_score': als_recommendations[1].ravel()
}, index=pd.MultiIndex.from_product(
    [hot_users, range(RECS_PER_USER)], names=['visitorid', 'als_rank'])
).reset_index()
del als_recommendations #, user_item_sparse, user_item_val_sparse, user_item, user_item_val
logging.info(f"personal_als calculated:\n{pd_info(personal_als)}")


personal_als calculated:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 233175 entries, 0 to 233174
Data columns (total 4 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   visitorid  233175 non-null  int64  
 1   als_rank   233175 non-null  int64  
 2   itemid     233175 non-null  int32  
 3   als_score  233175 non-null  float32
dtypes: float32(1), int32(1), int64(2)
memory usage: 5.3 MB



In [19]:
personal_als.nunique()

visitorid     15545
als_rank         15
itemid         8620
als_score    224166
dtype: int64

In [20]:
# получаем подобные для всех товаров, известных модели
SIMS_PER_ITEM = CONFIG['ALS_SIMS_PER_ITEM']
sim_items = als_model.similar_items(user_item['itemid'].unique(), N=SIMS_PER_ITEM)

similar_items = pd.DataFrame({
    'sim_itemid': sim_items[0].ravel(),
    'sim_score' : sim_items[1].ravel()
}, index=pd.MultiIndex.from_product(
    [user_item['itemid'].unique(), range(SIMS_PER_ITEM)], names=['itemid', 'sim_rank']
)).reset_index()
del sim_items, user_item, user_item_val, als_model
logging.info(f"similar_items calculated:\n{pd_info(similar_items)}")


similar_items calculated:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2339130 entries, 0 to 2339129
Data columns (total 4 columns):
 #   Column      Non-Null Count    Dtype  
---  ------      --------------    -----  
 0   itemid      2339130 non-null  int64  
 1   sim_rank    2339130 non-null  int64  
 2   sim_itemid  2339130 non-null  int32  
 3   sim_score   2339130 non-null  float32
dtypes: float32(1), int32(1), int64(2)
memory usage: 53.5 MB



In [21]:
similar_items.nunique()

itemid         155942
sim_rank           15
sim_itemid     155820
sim_score     1513166
dtype: int64

In [22]:
# объединяем информацию из personal_als, similar_items и top_popular в привязке к visitorid
candidades = personal_als[['visitorid','itemid']].merge(
    similar_items, how='left', on='itemid'
).groupby(['visitorid','sim_itemid']).agg(
    sim_score=('sim_score','max')
).reset_index().rename(
    columns={'sim_itemid': 'itemid'} 
).merge(
    personal_als[['visitorid','itemid','als_score']], how='outer', on=['visitorid','itemid']
).merge(
    top_popular[['itemid','pop_score']], how='left', on='itemid'
)
del personal_als


In [23]:
# формируем таргет для модели ранжирования (добавления в корзину и покупки)
if  retrain:
    events_target = events.query("timestamp >= @target_time and timestamp < @test_time").copy()
    events_target['target'] = (events_target['event'] > 0).astype(np.int8)

    # расширяем состав candidades за счет положительных сэмплов из events_target
    candidades = candidades.merge(
        events_target.groupby(['visitorid','itemid']).agg(target=('target','max')).reset_index().query(
            "visitorid in @hot_users and target > 0"
        ), 
        how='outer', on=['visitorid','itemid']
    )
    candidades["target"] = candidades["target"].fillna(0).astype(np.int8)
logging.info(f"candidades:\n{pd_info(candidades)}")


candidades:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2290304 entries, 0 to 2290303
Data columns (total 6 columns):
 #   Column     Non-Null Count    Dtype  
---  ------     --------------    -----  
 0   visitorid  2290304 non-null  int64  
 1   itemid     2290304 non-null  int64  
 2   sim_score  2289218 non-null  float32
 3   als_score  233175 non-null   float32
 4   pop_score  19219 non-null    float64
 5   target     2290304 non-null  int8   
dtypes: float32(2), float64(1), int64(2), int8(1)
memory usage: 72.1 MB



In [24]:
candidades['target'].value_counts()

target
0    2289095
1       1209
Name: count, dtype: int64

In [25]:
# получаем свойства товаров, актуальные на target_time
item_props = get_item_properties(target_time-1, items)

# отбираем категориальные свойства (с количеством значений до 10-и)
prop_vals = item_props.groupby('property').agg(nvalues=('value_code','max')).reset_index()
categorical_props = prop_vals.query("nvalues <= 5")['property'].unique()
categorical_props.shape

(477,)

In [26]:
# выясняем предпочтения пользователей по категориальным свойствам товаров
user_item_prop = candidades[['visitorid','itemid']].merge(
    item_props.query("itemid in @candidades['itemid'].unique() and property in @categorical_props"),
    how='left', on='itemid'
)
user_prop_score = user_item_prop.groupby(['visitorid','property']).agg(
    prop_score=('itemid','nunique')
).reset_index()
user_item_prop_score = user_item_prop.merge(
    user_prop_score, how='left', on=['visitorid','property']
).groupby(['visitorid','itemid']).agg(
    prop_score=('prop_score','mean')
).fillna(0).reset_index()
del user_item_prop, user_prop_score
logging.info(f"user_item_prop_score:\n{pd_info(user_item_prop_score)}")


user_item_prop_score:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2290304 entries, 0 to 2290303
Data columns (total 3 columns):
 #   Column      Non-Null Count    Dtype  
---  ------      --------------    -----  
 0   visitorid   2290304 non-null  int64  
 1   itemid      2290304 non-null  int64  
 2   prop_score  2290304 non-null  float64
dtypes: float64(1), int64(2)
memory usage: 52.4 MB



In [27]:
# код для условного выполнения ячеек, начинающихся с %%exec_if <condition>
from IPython.core.magic import register_cell_magic
from IPython import get_ipython
@register_cell_magic
def exec_if(line, cell):
    try:
        if eval(line): get_ipython().run_cell(cell)
    except:
        pass
    return
   

In [28]:
%%exec_if False  # НЕ СЧИТАЕМ в связи с малозначимостью данных признаков, исходя из cb_model.get_feature_importance

# ... а также, предпочтения пользователей по категориям товаров

user_item_cat = events_train.query(
    "event>0 and visitorid in @hot_users"             # берем только целевые события
)[['visitorid','itemid','categoryid','root']]

user_cat_score = user_item_cat.groupby(['visitorid','categoryid']).agg(
    cat_score=('itemid','nunique')
).reset_index()

user_root_score = user_item_cat.groupby(['visitorid','root']).agg(
    root_score=('itemid','nunique')
).reset_index()

user_item_cat_score = user_item_cat.merge(
    user_cat_score, how='left', on=['visitorid','categoryid']
).groupby(['visitorid','itemid']).agg(
    cat_score=('cat_score','mean')
).reset_index().merge(
    candidades[['visitorid','itemid']], how='right', on=['visitorid','itemid']
).fillna(0)

user_item_root_score = user_item_cat.merge(
    user_root_score, how='left', on=['visitorid','root']
).groupby(['visitorid','itemid']).agg(
    root_score=('root_score','mean')
).reset_index().merge(
    candidades[['visitorid','itemid']], how='right', on=['visitorid','itemid']
).fillna(0)

del user_item_cat, user_cat_score, user_root_score
user_item_cat_score.info(show_counts=True)
user_item_root_score.info(show_counts=True)

In [29]:
# нормализуем user_item_prop_score по каждому пользователю
def normalize_col_by_col(df: pd.DataFrame, col: str, by_col: str):
    from sklearn.preprocessing import normalize
    tmp      = df.sort_values(by=by_col)
    tmp[col] = tmp.groupby(by_col)[col].apply(
        lambda x: normalize(x.values.reshape(-1,1), norm='l1', axis=0, copy=True, return_norm=True)[0].ravel()
    ).explode(col).values.astype('float32')
    df[col]  = tmp[col]
normalize_col_by_col(user_item_prop_score, 'prop_score', 'visitorid')


In [30]:
# считаем hit_score
user_item_hit_score = events.query("visitorid in @hot_users and timestamp < @target_time").groupby(
    ['visitorid','itemid']
).agg(
    hit_score=('event','nunique')
).reset_index().merge(
    candidades[['visitorid','itemid']], how='right', on=['visitorid','itemid']
).fillna(0)
logging.info(f"user_item_hit_score:\n{pd_info(user_item_hit_score)}")


user_item_hit_score:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2290304 entries, 0 to 2290303
Data columns (total 3 columns):
 #   Column     Non-Null Count    Dtype  
---  ------     --------------    -----  
 0   visitorid  2290304 non-null  int64  
 1   itemid     2290304 non-null  int64  
 2   hit_score  2290304 non-null  float64
dtypes: float64(1), int64(2)
memory usage: 52.4 MB



In [31]:
# добавляем признаки пользователя - "стаж" и активность
user_features = events.query("visitorid in @hot_users and timestamp < @target_time").groupby("visitorid").agg(
    stage  =('timestamp', lambda x: (infer_date - pd.to_datetime(x.min(),unit='ms')).days +1),
    nclicks=('timestamp', 'count'),
    nbuys  =('transactionid', 'count')
).reset_index()
user_features["click_per_day"] = user_features["nclicks"] / user_features["stage"]
user_features["buy_per_click"] = user_features["nbuys"]   / user_features["nclicks"]
logging.info(f"user_features:\n{pd_info(user_features)}")


user_features:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15545 entries, 0 to 15544
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   visitorid      15545 non-null  int64  
 1   stage          15545 non-null  int64  
 2   nclicks        15545 non-null  int64  
 3   nbuys          15545 non-null  int64  
 4   click_per_day  15545 non-null  float64
 5   buy_per_click  15545 non-null  float64
dtypes: float64(2), int64(4)
memory usage: 728.8 KB



In [32]:
# вносим сформированные дополнительные признаки в candidades
candidades = candidades.merge(
    user_item_prop_score, how='left', on=['visitorid','itemid']
).merge(
    user_item_hit_score,  how='left', on=['visitorid','itemid']
).merge(
    user_features[['visitorid','stage','click_per_day','buy_per_click']], how='left', on=['visitorid']
)
# объединяем als-скоринги
candidades['sim_score'] = candidades['als_score'].fillna(1) * candidades['sim_score'].fillna(0)
logging.info(f"final candidades:\n{pd_info(candidades)}")


final candidades:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2290304 entries, 0 to 2290303
Data columns (total 11 columns):
 #   Column         Non-Null Count    Dtype  
---  ------         --------------    -----  
 0   visitorid      2290304 non-null  int64  
 1   itemid         2290304 non-null  int64  
 2   sim_score      2290304 non-null  float32
 3   als_score      233175 non-null   float32
 4   pop_score      19219 non-null    float64
 5   target         2290304 non-null  int8   
 6   prop_score     2290304 non-null  float32
 7   hit_score      2290304 non-null  float64
 8   stage          2290304 non-null  int64  
 9   click_per_day  2290304 non-null  float64
 10  buy_per_click  2290304 non-null  float64
dtypes: float32(3), float64(4), int64(3), int8(1)
memory usage: 150.7 MB



In [33]:
# фиксируем список признаков
feature_cols   = ['sim_score','pop_score','prop_score','hit_score',
                    'stage','click_per_day','buy_per_click']


In [53]:
if  retrain:    # --------------------------------------- #
                # -   Переобучаем модель ранжирования   - #
                # --------------------------------------- #
    # в кандидатах оставляем только тех пользователей, у которых есть хотя бы один положительный таргет
    candidades_for_train = candidades.groupby("visitorid").filter(lambda x: x["target"].sum() > 0)

    # убираем неинформативные дубликаты (гарантированно оставляя положительный таргет)
    candidades_for_train = candidades_for_train.sort_values(by='target',ascending=False).drop_duplicates(
        subset=['visitorid','itemid']+feature_cols, keep='first'
    )
    logging.info(f"candidades for catboost train:\n{pd_info(candidades_for_train)}")

    # Обучаем ранжирующую модель с подбором гиперпараметров
    train_data = Pool(data=candidades_for_train[feature_cols], label=candidades_for_train['target'])
    #cb_model, cb_params = cb_train (train_data)
    cb_model, cb_params = cb_train_new (candidades_for_train[feature_cols], candidades_for_train['target'])
    logging.info(f"catboost parameters:\n{cb_params}")

    # получаем оценку важности признаков
    feature_importance = pd.DataFrame(cb_model.get_feature_importance(), index=feature_cols, columns=["fi"])
    feature_importance = feature_importance.sort_values(by="fi", ascending=False)
    logging.info(f"feature_importance:\n{feature_importance}")

    # сохраняем данные
    #save_to_pkl(als_params, s3, MODEL_FILES['als_parms'])
    #save_to_pkl(cb_params,  s3, MODEL_FILES['cb_parms'])
    #save_to_pkl(cb_model,   s3, MODEL_FILES['cb_model'])


candidades for catboost train:
<class 'pandas.core.frame.DataFrame'>
Index: 65767 entries, 5395 to 2286361
Data columns (total 13 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   visitorid      65767 non-null  int64  
 1   itemid         65767 non-null  int64  
 2   sim_score      65767 non-null  float32
 3   als_score      6585 non-null   float32
 4   pop_score      679 non-null    float64
 5   target         65767 non-null  int8   
 6   prop_score     65767 non-null  float32
 7   hit_score      65767 non-null  float64
 8   stage          65767 non-null  int64  
 9   click_per_day  65767 non-null  float64
 10  buy_per_click  65767 non-null  float64
 11  cb_score       65767 non-null  float64
 12  rank           65767 non-null  int64  
dtypes: float32(3), float64(5), int64(4), int8(1)
memory usage: 5.8 MB

catboost parameters:
OrderedDict([('depth', 9), ('iterations', 640), ('l2_leaf_reg', 1), ('learning_rate', 0.036416051470430436)

In [54]:
cb_model.get_best_score()

{'learn': {'Recall': 1.0, 'Logloss': 0.0071001575991267545}}

In [55]:
cb_model.get_all_params()

{'nan_mode': 'Min',
 'eval_metric': 'Recall',
 'iterations': 640,
 'sampling_frequency': 'PerTree',
 'leaf_estimation_method': 'Newton',
 'random_score_type': 'NormalWithModelSizeDecrease',
 'grow_policy': 'SymmetricTree',
 'penalties_coefficient': 1,
 'boosting_type': 'Plain',
 'model_shrink_mode': 'Constant',
 'feature_border_type': 'GreedyLogSum',
 'bayesian_matrix_reg': 0.10000000149011612,
 'eval_fraction': 0,
 'force_unit_auto_pair_weights': False,
 'l2_leaf_reg': 1,
 'random_strength': 1,
 'rsm': 1,
 'boost_from_average': False,
 'model_size_reg': 0.5,
 'pool_metainfo_options': {'tags': {}},
 'subsample': 0.800000011920929,
 'use_best_model': False,
 'class_names': [0, 1],
 'random_seed': 42,
 'depth': 9,
 'posterior_sampling': False,
 'border_count': 254,
 'class_weights': [1, 53.397850036621094],
 'classes_count': 0,
 'auto_class_weights': 'Balanced',
 'sparse_features_conflict_fraction': 0,
 'leaf_estimation_backtracking': 'AnyImprovement',
 'best_model_min_trees': 1,
 'model

In [41]:
predictions    = cb_model.predict_proba(train_data)
candidades_for_train["cb_score"] = predictions[:,1]
candidades_for_train.sort_values(['visitorid', 'cb_score', 'sim_score', 'hit_score', 'prop_score', 'pop_score'], 
                    ascending=[True, False, False, False, False, False], inplace=True
)
candidades_for_train["rank"] = candidades_for_train.groupby("visitorid")["cb_score"].cumcount() +1
#candidades_for_train.head()


In [42]:
candidades_for_eval = events.query("timestamp >= @test_time and visitorid in @hot_users and event > 0")  \
                            .groupby(['visitorid','itemid']).agg(label=('event','max'))    \
                            .merge(candidades_for_train, #.query("cb_score > 0.5"), 
                                   how='left', left_index=True, right_on=['visitorid','itemid'])
candidades_for_eval.info()

<class 'pandas.core.frame.DataFrame'>
Index: 527 entries, 2286361 to 2286361
Data columns (total 14 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   label          527 non-null    int8   
 1   visitorid      527 non-null    int64  
 2   itemid         527 non-null    int64  
 3   sim_score      15 non-null     float32
 4   als_score      5 non-null      float32
 5   pop_score      2 non-null      float64
 6   target         15 non-null     float64
 7   prop_score     15 non-null     float32
 8   hit_score      15 non-null     float64
 9   stage          15 non-null     float64
 10  click_per_day  15 non-null     float64
 11  buy_per_click  15 non-null     float64
 12  cb_score       15 non-null     float64
 13  rank           15 non-null     float64
dtypes: float32(3), float64(8), int64(2), int8(1)
memory usage: 52.0 KB


In [43]:
candidades_for_eval['tgt'] = candidades_for_eval['cb_score'].notna().astype(int)
candidades_for_eval['lbl'] = (candidades_for_eval['label'] > 0).astype(int)
eval_metric(candidades_for_eval['lbl'].tolist(), candidades_for_eval['tgt'].tolist(), metric='Recall')

[0.028462998102466792]

In [44]:
candidades_for_eval.query("lbl > 0 and tgt>0").shape

(15, 16)

In [45]:
# получаем скоринг рекомендаций
inference_data = Pool(data=candidades[feature_cols])
predictions    = cb_model.predict_proba(inference_data)

# сортируем в соответствии с feature importance и проставим rank, начиная с 1
candidades["cb_score"] = predictions[:,1]
candidades.sort_values(['visitorid', 'cb_score', 'sim_score', 'hit_score', 'prop_score', 'pop_score'], 
                    ascending=[True, False, False, False, False, False], inplace=True
)
candidades["rank"] = candidades.groupby("visitorid")["cb_score"].cumcount() +1

# формируем финальные рекомендации с минимально необходимым для prod набором полей
max_recommendations_per_user = 100
final_recommendations = candidades.query(
    "rank <= @max_recommendations_per_user"
)[['visitorid','itemid','rank']] #,'cb_score','als_score']+feature_cols]


In [46]:
if retrain:
    candidades_for_eval = events.query("timestamp >= @test_time and visitorid in @hot_users and event > 0")  \
                                .groupby(['visitorid','itemid']).agg(label=('event','max'))    \
                                .merge(candidades, #.query("cb_score > 0.5"), 
                                 how='left', left_index=True, right_on=['visitorid','itemid'])
    candidades_for_eval['tgt'] = candidades_for_eval['cb_score'].notna().astype(int)
    candidades_for_eval['lbl'] = (candidades_for_eval['label'] > 0).astype(int)
    metrics = eval_metric(candidades_for_eval['lbl'].tolist(), candidades_for_eval['tgt'].tolist(), metric='Recall')
    #reg_model(cb_model, candidades_for_train[feature_cols], candidades_for_train['target'], {'Recall':metrics[0]}, 
    #          cb_params, CONFIG, artifacts={'feature_importance.pkl': feature_importance})

In [47]:
metrics

[0.07210626185958255]

In [52]:
dict(cb_params)


{'depth': 10,
 'iterations': 998,
 'l2_leaf_reg': 2,
 'learning_rate': 0.12927086924414405}

In [35]:
if not retrain:           # --------------------------------------- #
                # -    Завершаем расчет рекомендаций    - #
                # --------------------------------------- #
    # получаем скоринг рекомендаций
    inference_data = Pool(data=candidades[feature_cols])
    predictions    = cb_model.predict_proba(inference_data)

    # сортируем в соответствии с feature importance и проставим rank, начиная с 1
    candidades["cb_score"] = predictions[:,1]
    candidades.sort_values(['visitorid', 'cb_score', 'sim_score', 'hit_score', 'prop_score', 'pop_score'], 
                        ascending=[True, False, False, False, False, False], inplace=True
    )
    candidades["rank"] = candidades.groupby("visitorid")["cb_score"].cumcount() +1

    # формируем финальные рекомендации с минимально необходимым для prod набором полей
    max_recommendations_per_user = 100
    final_recommendations = candidades.query(
        "rank <= @max_recommendations_per_user"
    )[['visitorid','itemid','rank']] #,'cb_score','als_score']+feature_cols]
    logging.info(f"final_recommendations calculated:\n{pd_info(final_recommendations)}")

    # сохраняем рекомендации
    save_to_parquet(top_popular,           s3, PROD_FILES['top_pop'])
    save_to_parquet(similar_items,         s3, PROD_FILES['similar'])
    save_to_parquet(final_recommendations, s3, PROD_FILES['final'  ])

    # на всякий случай сохраняем candidades с полным набором полей
    save_to_parquet(candidades,            s3, PROD_FILES['ranked'])


final_recommendations calculated:
<class 'pandas.core.frame.DataFrame'>
Index: 9414371 entries, 42 to 11842159
Data columns (total 3 columns):
 #   Column     Non-Null Count    Dtype
---  ------     --------------    -----
 0   visitorid  9414371 non-null  int64
 1   itemid     9414371 non-null  int32
 2   rank       9414371 non-null  int64
dtypes: int32(1), int64(2)
memory usage: 251.4 MB

save_to_parquet:
s3-student-mle-20250228-1d75c84a52/Diplom/recommendations/top_popular.parquet
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   itemid     100 non-null    int64  
 1   rating     100 non-null    int16  
 2   pop_score  100 non-null    float64
dtypes: float64(1), int16(1), int64(1)
memory usage: 1.9 KB

save_to_parquet:
s3-student-mle-20250228-1d75c84a52/Diplom/recommendations/similar_items.parquet
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 