In [1]:
import os
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'

In [2]:
import pyspark
import numpy as np
import scipy.sparse as sp
from lightfm import LightFM

In [3]:
# Initialize pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("Candidate Selection") \
                    .getOrCreate()

In [4]:
# Load dataframes

import joblib

path_to_df = 'gs://thesis_apc_bucket/df_data'

df_tracks = spark.read.orc(path_to_df + '/df_tracks.orc')
df_playlists = spark.read.orc(path_to_df + '/df_playlists.orc')
df_playlists_metadata = spark.read.orc(path_to_df + '/df_playlists_metadata.orc')
df_test_playlists = spark.read.orc(path_to_df + '/df_test_playlists.orc')
df_test_metadata = spark.read.orc(path_to_df + '/df_test_metadata.orc')

df_train = spark.read.orc(path_to_df + '/df_train.orc')
df_val1 = spark.read.orc(path_to_df + '/df_val1.orc')
df_val2 = spark.read.orc(path_to_df + '/df_val2.orc')

In [5]:
# Load pid list

val1_pids = joblib.load(open('df_data/val1_pids.pkl', 'rb'))
val2_pids = joblib.load(open('df_data/val2_pids.pkl', 'rb'))
test_pids = joblib.load(open('df_data/test_pids.pkl', 'rb'))

In [7]:
# Load models

path_to_models = 'models2'

model = joblib.load(open(path_to_models + '/lightfm_model.pkl', 'rb'))
model_text = joblib.load(open(path_to_models + '/lightfm_model_text.pkl', 'rb'))
user_features = joblib.load(open(path_to_models + '/user_features.pkl', 'rb'))

In [8]:
# Set config

tid_max = df_tracks.agg({'tid': 'max'}).collect()[0]['max(tid)']

config = {
    'num_tracks': tid_max + 1,
}

In [10]:
# Set up list known positives (dataframe version)

df_user_seen = df_train.select(['pid', 'tid']).dropDuplicates()
#                        .withColumnRenamed('pid', 'seen_pid').withColumnRenamed('tid', 'seen_tid')

In [13]:
config['num_tracks']

997017

In [14]:
# Set up functions for making batch predictions
# Functions in this cell are taken from github.com/dmitryhd/lightfm

import time
from typing import Tuple, Union, Dict
import multiprocessing as mp
import logging
import itertools

# Inference functions --------------------------------------------------------------------------------------------------------------

_pool = None
_item_chunks = {}
CYTHON_DTYPE = np.float32
ID_DTYPE = np.float32

LightFM.logger = logging.getLogger('lightfm')

def debug(self, *args, **kwargs):
    self.logger.debug(*args, **kwargs)

def info(self, *args, **kwargs):
    self.logger.info(*args, **kwargs)

def _construct_user_features(self, n_users, user_features):
    if user_features is None:
        user_features = sp.identity(n_users, dtype=CYTHON_DTYPE, format='csr')
    else:
        user_features = user_features.tocsr()

    if n_users > user_features.shape[0]:
        raise Exception('Number of user feature rows does not equal the number of users')

    # If we already have embeddings, verify that
    # we have them for all the supplied features
    if self.user_embeddings is not None:
        if not self.user_embeddings.shape[0] >= user_features.shape[1]:
            raise ValueError(
                'The user feature matrix specifies more features than there are estimated '
                'feature embeddings: {} vs {}.'.format(self.user_embeddings.shape[0], user_features.shape[1])
            )

    user_features = self._to_cython_dtype(user_features)
    return user_features

def _construct_item_features(self, n_items: int, item_features: Union[sp.csr_matrix, None]) -> sp.csr_matrix:
    # TODO: mb. merge with user features
    if item_features is None:
        item_features = sp.identity(n_items, dtype=CYTHON_DTYPE, format='csr')
    else:
        item_features = item_features.tocsr()

    if n_items > item_features.shape[0]:
        raise Exception('Number of item feature rows does not equal the number of items')

    if self.item_embeddings is not None:
        if not self.item_embeddings.shape[0] >= item_features.shape[1]:
            raise ValueError(
                'The item feature matrix specifies more features than there are estimated '
                'feature embeddings: {} vs {}.'.format(self.item_embeddings.shape[0], item_features.shape[1])
            )

    item_features = self._to_cython_dtype(item_features)
    return item_features

LightFM.debug = debug
LightFM.info = info
LightFM._construct_user_features = _construct_user_features
LightFM._construct_item_features = _construct_item_features

def _check_setup():
    if not (len(_user_repr)
        and len(_user_repr_biases)
        and len(_item_repr)
        and len(_item_repr_biases)):

        raise EnvironmentError('You must setup mode.batch_setup(item_ids) before using predict')

def _batch_setup(model: LightFM,
                 item_chunks: Dict[int, np.ndarray],
                 item_features: Union[None, sp.csr_matrix]=None,
                 user_features: Union[None, sp.csr_matrix]=None,
                 n_process: int=1):

    global _item_repr, _user_repr
    global _item_repr_biases, _user_repr_biases
    global _pool
    global _item_chunks

    if item_features is None:
        n_items = len(model.item_biases)
        item_features = sp.identity(n_items, dtype=CYTHON_DTYPE, format='csr')

    if user_features is None:
        n_users = len(model.user_biases)
        user_features = sp.identity(n_users, dtype=CYTHON_DTYPE, format='csr')

    n_users = user_features.shape[0]
    user_features = model._construct_user_features(n_users, user_features)
    _user_repr, _user_repr_biases = _precompute_representation(
        features=user_features,
        feature_embeddings=model.user_embeddings,
        feature_biases=model.user_biases,
    )

    n_items = item_features.shape[0]
    item_features = model._construct_item_features(n_items, item_features)
    _item_repr, _item_repr_biases = _precompute_representation(
        features=item_features,
        feature_embeddings=model.item_embeddings,
        feature_biases=model.item_biases,
    )
    _item_repr = _item_repr.T
    _item_chunks = item_chunks
    _clean_pool()
    # Pool creation should go last
    if n_process > 1:
        _pool = mp.Pool(processes=n_process)
        
def _clean_pool():
    global _pool
    if _pool is not None:
        _pool.close()
        _pool = None

def _batch_cleanup():
    global _item_ids, _item_repr, _user_repr, _pool, _item_chunks
    _item_chunks = {}
    _user_repr = np.array([])
    _item_repr = np.ndarray([])
    _clean_pool()
    
def _get_top_k_scores(scores: np.ndarray, k: int, item_ids: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    :return: indices of items, top_k scores. All in score decreasing order.
    """

    if k:
        top_indices = np.argpartition(scores, -k)[-k:]
        scores = scores[top_indices]
        sorted_top_indices = np.argsort(-scores)
        scores = scores[sorted_top_indices]
        top_indices = top_indices[sorted_top_indices]
    else:
        top_indices = np.arange(len(scores))

    if len(item_ids):
        top_indices = np.array(item_ids)[top_indices]

    return top_indices, scores
    
def _batch_predict_for_user(user_id: int, top_k: int=50, chunk_id: int=None, item_ids=None) -> Tuple[np.ndarray, np.ndarray]:
    """
    :return: indices of items, top_k scores. All in score decreasing order.
    """
    # exclude biases from repr (last column of user_repr and last row of transposed item repr)
    user_repr = _user_repr[int(user_id), :]

    if chunk_id is not None:
        item_ids = _item_chunks[chunk_id]
    elif item_ids is None:
        raise UserWarning('Supply item chunks at setup or item_ids in predict')

    if item_ids is None or len(item_ids) == 0:
        item_repr = _item_repr
        item_repr_biases = _item_repr_biases
    else:
        item_repr = _item_repr[:, item_ids]
        item_repr_biases = _item_repr_biases[item_ids]

    scores = user_repr.dot(item_repr)
    scores += _user_repr_biases[int(user_id)]
    scores += item_repr_biases
    return _get_top_k_scores(scores, k=top_k, item_ids=item_ids)

def _precompute_representation(
        features: sp.csr_matrix,
        feature_embeddings: np.ndarray,
        feature_biases: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    :param: features           csr_matrix         [n_objects, n_features]
    :param: feature_embeddings np.ndarray(float)  [n_features, no_component]
    :param: feature_biases     np.ndarray(float)  [n_features]
    :return:
    TODO:
    tuple of
    - representation    np.ndarray(float)  [n_objects, no_component+1]
    - bias repr
    """

    representation = features.dot(feature_embeddings)
    representation_bias = features.dot(feature_biases)
    return representation, representation_bias

# ----------------------------------------------------------------------------------------------------------------------------------

# Main functions -------------------------------------------------------------------------------------------------------------------

def batch_setup(self,
                item_chunks: Dict[int, np.ndarray],
                item_features: Union[None, sp.csr_matrix]=None,
                user_features: Union[None, sp.csr_matrix]=None,
                n_process: int=1):
#     from lightfm.inference import _batch_setup
    self.n_process = n_process
    _batch_setup(model=self, item_chunks=item_chunks, item_features=item_features, user_features=user_features, n_process=n_process)

def batch_cleanup(self):
#     from lightfm.inference import _batch_cleanup
    _batch_cleanup()
    
def batch_predict(self,
                  chunk_id: int,
                  user_ids: Union[np.ndarray, list],
                  top_k: int=50) -> Dict[int, Tuple[np.ndarray, np.ndarray]]:
    """
    :return: dict by user id: item_indices, scores sorted by score
    """
#     from lightfm.inference import _batch_predict_for_user, _check_setup, _pool, _item_chunks

    self._check_initialized()
    self.info('Batch predict: user_ids: {:,}, item_ids: {:,}'.format(len(user_ids), len(_item_chunks[chunk_id])))

    recommendations = {}
    if not isinstance(user_ids, np.ndarray):
        user_ids = np.array(user_ids, dtype=ID_DTYPE)

    _check_setup()
    btime = time.time()

    if self.n_process == 1:
        self.debug('Start recommending: using single process')
        for user_id in user_ids:
            rec_ids, scores = _batch_predict_for_user(user_id=user_id, top_k=top_k, chunk_id=chunk_id)
            recommendations[user_id] = rec_ids, scores
    else:
        self.debug('Start recommending: using multiprocessing')
        recs_list = _pool.starmap(
            _batch_predict_for_user,
            zip(user_ids, itertools.repeat(top_k), itertools.repeat(chunk_id)),
        )
        recommendations = dict(zip(user_ids, recs_list))

    elapsed_sec = time.time() - btime
    elapsed_sec_by_user = elapsed_sec / len(user_ids)
    self.info('Recommendations for chunk {:,} done in {:.3f}s. {:.4f} s by user'.format(
        chunk_id, elapsed_sec, elapsed_sec_by_user,
    ))
    return recommendations

# Add functions to LightFM class
LightFM.batch_setup = batch_setup
LightFM.batch_cleanup = batch_cleanup
LightFM.batch_predict = batch_predict

# ----------------------------------------------------------------------------------------------------------------------------------

In [16]:
train_pids = df_train.select('pid').rdd.map(lambda x: x[0]).collect()

In [17]:
# Set up function to save candidates

from pyspark.sql.functions import collect_list, isnan, when, count, col
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, BooleanType
import pandas as pd

def save_candidates(target_pids, df_size, file_name, df=None):
    
    # Generate list of target pid's
    target_pids_text = list(set(target_pids).difference(train_pids))
    target_pids_no_text = list(set(target_pids).difference(target_pids_text))
    
    # Make predictions based on model and get top 2000 candidates for each pid
    print('Predicting for no text...')
    model.batch_setup(
        item_chunks={0: np.arange(config['num_tracks'])},
        n_process=16, 
    )    
    prediction_results = model.batch_predict(chunk_id=0, user_ids=target_pids_no_text, top_k=2000)
    model.batch_cleanup()
    
    # Make predictions based on model_text and get top 10,000 candidates for each pid
    print('Predicting for text...')
    model_text.batch_setup(
        item_chunks={0: np.arange(config['num_tracks'])},
        n_process=16, 
        user_features=user_features,
    )    
    prediction_results_text = model_text.batch_predict(chunk_id=0, user_ids=target_pids_text, top_k=2000)
    model_text.batch_cleanup()
    
    prediction_results.update(prediction_results_text)
    
    print('Constructing candidates dataframe...')
    
    if df is not None:
        val_tracks = df.groupBy('pid').agg(collect_list('tid').alias('tid')).toPandas().set_index('pid').tid
    
    pids = []
    tids = []
    targets = []
    
    schema = StructType([])
    if df is not None:
        schema = StructType([
                    StructField('pid', FloatType(), True),
                    StructField('tid', IntegerType(), True),
                    StructField('target', BooleanType(), True),
                  ])
    else:
        schema = StructType([
                    StructField('pid', FloatType(), True),
                    StructField('tid', IntegerType(), True),
                  ])
    
    for pid in target_pids:
        size = df_size[pid]
        l = max(size * 8, 700 + size)
        pids += [pid] * l
        tids += list(prediction_results[pid][0][:l])]
        
        if df is not None:
            tracks_t = val_tracks[pid]
            targets += [i in tracks_t for i in prediction_results[pid][0][:l]]
    
    # CHECKING RESULTS
    print("l = ", l)
    print("Length of pids: ", len(pids))
    print("Length of tids: ", len(tids))

    # Make candidates dataframe
    # Pandas version
    candidates = pd.DataFrame()
    candidates['pid'] = pids
    candidates['tid'] = tids
    
    # Append targets to candidates if df is supplied
    if df is not None:
        # Pandas version
        candidates['target'] = targets

    # Convert candidates to spark dataframe
    df_candidates = spark.createDataFrame(candidates, schema)

    # Remove candidates that are already in train (a.k.a. user_seen)
    df_final_candidates = df_candidates.join(df_user_seen, ['pid', 'tid'], 'leftanti')

    # Final check
    if df is not None:
        df_final_candidates.groupBy('target').count().show()

    # Save final candidates to ORC file
    df_final_candidates.write.mode('overwrite').orc(path_to_df + '/' + file_name)

In [None]:
%%time

val1_counts = df_val1.toPandas().pid.value_counts()

save_candidates(
    val1_pids.tolist(),
    val1_counts,
    'ii_candidate.orc',
    df_val1
)

Predicting for no text...
Predicting for text...
Constructing candidates dataframe...
l =  888
Length of pids:  8225013
Length of tids:  8225013
+------+-------+
|target|  count|
+------+-------+
|  true| 332139|
| false|7655017|
+------+-------+

CPU times: user 30.8 s, sys: 3.29 s, total: 34.1 s
Wall time: 18min 38s


In [None]:
%%time

val2_counts = df_val2.toPandas().pid.value_counts()

save_candidates(
    val2_pids.tolist(),
    val2_counts,
    'iii_candidate.orc',
    df_val2
)

Predicting for no text...
Predicting for text...
Constructing candidates dataframe...
l =  888
Length of pids:  8225013
Length of tids:  8225013
+------+-------+
|target|  count|
+------+-------+
|  true| 328047|
| false|7658526|
+------+-------+

CPU times: user 30.9 s, sys: 3.86 s, total: 34.8 s
Wall time: 18min 42s


In [None]:
# Add num_holdouts column to df_test_metadata

df_test_metadata = df_test_metadata.withColumn('num_holdouts', df_test_metadata['num_tracks'] - df_test_metadata['num_samples'])

In [None]:
%%time

from pyspark.sql.functions import col

test_counts = df_test_metadata.select(col('pid'), col('num_holdouts')).toPandas().set_index('pid').num_holdouts

save_candidates(
    test_pids.tolist(),
    test_counts,
    'test_candidate.orc'
)
