In [33]:
import pandas as pd
import numpy as np
from bidict import bidict
from scipy.sparse import coo_matrix,csr_matrix
import implicit
from implicit import evaluation
import ray
import logging
from ray import tune
import pickle
import os
def shell(command):
    stream = os.popen(command)
    return stream.read()    
# from dsrecommender.trainer import train_test_split
logger = logging.getLogger(__name__)
def map_ids(row, mapper) -> int:
    return mapper[row]


def train_test_split(ratings, K=1, train_only_size=0.0, random_state=None):
    if K < 1:
        raise ValueError("The 'K' must be >= 1.")
    if not 0.0 <= train_only_size < 1.0:
        raise ValueError("The 'train_only_size' must be in the range (0.0 <= x < 1.0).")

    ratings = ratings.tocoo()  # this will sort row/cols unless ratings is COO.
#     random_state = check_random_state(random_state)

    users = ratings.row
    items = ratings.col
    data = ratings.data
    
    unique_users, counts = np.unique(users, return_counts=True)

    # get only users with n + 1 interactions
    candidate_mask = counts > K + 1

    # keep a given subset of users _only_ in the training set.
    if train_only_size > 0.0:
        train_only_mask = ~np.isin(
            unique_users, _choose(random_state, len(unique_users), train_only_size)
        )
        candidate_mask = train_only_mask & candidate_mask

    # get unique users who appear in the test set
    unique_candidate_users = unique_users[candidate_mask]
    full_candidate_mask = np.isin(users, unique_candidate_users)

    # get all users, items and ratings that match specified requirements to be
    # included in test set.
    candidate_users = users[full_candidate_mask]
    candidate_items = items[full_candidate_mask]
    candidate_data = data[full_candidate_mask]

    test_idx, train_idx = _take_tails(
        candidate_users, K, shuffled=True, return_complement=True
    )

    # get all remaining remaining candidate user-item pairs, and prepare to append to
    # training set.
    train_idx = np.setdiff1d(np.arange(len(candidate_users), dtype=int), test_idx)

    # build test matrix
    test_users = candidate_users[test_idx]
    test_items = candidate_items[test_idx]
    test_data = candidate_data[test_idx]
    test_mat = csr_matrix(
        (test_data, (test_users, test_items)), shape=ratings.shape, dtype=ratings.dtype
    )

    # build training matrix
    train_users = np.r_[users[~full_candidate_mask], candidate_users[train_idx]]
    train_items = np.r_[items[~full_candidate_mask], candidate_items[train_idx]]
    train_data = np.r_[data[~full_candidate_mask], candidate_data[train_idx]]
    train_mat = csr_matrix(
        (train_data, (train_users, train_items)),
        shape=ratings.shape,
        dtype=ratings.dtype,
    )

    return train_mat, test_mat

# Credit: https://github.com/benfred/implicit/blob/master/implicit/evaluation.pyx#L145
def _choose(rng,  n, frac):
    size = max(1, int(n * frac))
    arr = rng.choice(n, size=size, replace=False)
    return arr

def _take_tails(arr,n, return_complement=False, shuffled=False):
    idx = arr.argsort()
    sorted_arr = arr[idx]

    end = np.bincount(sorted_arr).cumsum() - 1
    start = end - n
    ranges = np.linspace(start, end, num=n + 1, dtype=int)[1:]

    if shuffled:
        shuffled_idx = (sorted_arr + np.random.random(arr.shape)).argsort()
        tails = shuffled_idx[np.ravel(ranges, order="f")]
    else:
        tails = np.ravel(ranges, order="f")

    heads = np.setdiff1d(idx, tails)

    if return_complement:
        return idx[tails], idx[heads]
    else:
        return idx[tails]
    
def to_pickle(data,path): 
    with open(path,"wb") as f:
        pickle.dump(data,f)
        
def from_pickle(path): 
    return pickle.load(open(path,'rb'))
    
def load_data(path,frac=1):
    print(f"Loading data from {path} ")
    df = pd.read_csv(path,
                 header=0,
                 names=["user","item"])
    df["item"] = df.item.str.split('|')
    df = df.explode("item")
    df=df.reset_index().drop("index",axis=1)
    if frac < 1:
        print(f"Sampled data frac: {frac}" )
        return df.sample(frac=frac)
    else:
        print(f"Loaded data from {path}. Not sampled." )
        return df

def process_data(df):

    usersdict = bidict({user:i for i,user in enumerate(df.user.unique())})
    itemsdict = bidict({item:i for i,item in enumerate(df.item.unique())})
    print('Sparsity: {:4.3f}%'.format(float(df.shape[0]) / float(len(usersdict)*len(itemsdict)) * 100))

    users=df.user.apply(map_ids,args=[usersdict]).to_numpy()
    items=df.item.apply(map_ids,args=[itemsdict]).to_numpy()
    ratings = np.ones(len(items))
    matrix = coo_matrix((ratings, (users, items))).tocsr()

    K=4
    train_only_size=0.75
    random_state = np.random.RandomState(42)
    train, test = train_test_split(matrix,K,train_only_size,random_state)

    return {"train":train,"test":test}

def _train(config,data=None):

    
#     train = data["train"].copy()
#     test = data["test"].copy()

    import implicit
    from implicit import evaluation
    
    logger.info("Training with BayesianPersonalizedRanking")
    model = implicit.bpr.BayesianPersonalizedRanking(**config,use_gpu=True)

    model.fit(train.T)
    # auc = evaluation.AUC_at_k(model,train,test)
    patk = evaluation.precision_at_k(model,train,test)
    tune.report(patk=patk)

In [34]:
data = load_data("s3://unext-datascience-prod/jobs/ippanreco/user_watched_sakuhins.csv.gz")
# data = load_data("s3://unext-datascience-prod/jobs/ippanreco/user_watched_sakuhins_10percent.csv")
# data = load_data("s3://unext-datascience-prod/jobs/ippanreco/user_watched_sakuhins_small.csv")

Loading data from s3://unext-datascience-prod/jobs/ippanreco/user_watched_sakuhins.csv.gz 
Loaded data from s3://unext-datascience-prod/jobs/ippanreco/user_watched_sakuhins.csv.gz. Not sampled.


In [35]:
data = process_data(data)

Sparsity: 0.088%


In [36]:
to_pickle(data,"/home/ray/data/traindata.pkl")

In [40]:
data = from_pickle("/home/ray/data/traindata.pkl")

In [38]:
!ls -lah /home/ray/data/

total 633M
drwxr-xr-x 2 root root    40 Jun  5 09:01 .
drwxr-xr-x 1 ray  users  152 Jun  5 09:00 ..
-rw-r--r-- 1 root root     0 Jun  5 09:00 t.txt
-rw-r--r-- 1 root root  633M Jun  5 09:15 traindata.pkl


In [42]:
!aws s3 cp /home/ray/data/traindata.pkl s3://unext-datascience-prod/jobs/ippanreco/

upload: ../data/traindata.pkl to s3://unext-datascience-prod/jobs/ippanreco/traindata.pkl


In [None]:
# ray.init(num_gpus=1)
ray.util.connect("ray20dev-ray-head:10001")

In [None]:
ray.cluster_resources()

In [None]:
ray.nodes()

In [None]:
analysis = tune.run(
    tune.with_parameters(_train, data=data),
    metric="patk",
    mode="max",
    num_samples=1,
    resources_per_trial={"cpu": 3,"gpu": 1},
    config={
        "factors": tune.grid_search([191,223,255,287]),
#         "learning_rate":tune.uniform(0.01, 0.2),
#         "regularization":tune.uniform(0.01, 0.2),
#         "iterations":tune.grid_search([80,100,120]),
#         "verify_negative_samples":tune.choice([True, False]),
#         "random_state":np.random.RandomState(42)        
    }
)
print("Best params: ", analysis.best_config)
print("Best result: ", analysis.best_result)