# Coldstart Predictions
The script takes in the new $V'$ matrix generated from the cold start regression model and generates new predictions for each of the users in the training data using distributed computation with Dask.
<br><br>
Ultimately, we got the predictions to work locally so did not need to use this

### Read in Data

In [1]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

from glob import glob
from distributed import Client
from dask_jobqueue import SLURMCluster

from IPython.display import display
import matplotlib.pyplot as plt

import time

In [2]:
# Set LOCAL to True for single-machine execution while developing
# Set LOCAL to False for cluster execution
LOCAL = False

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(memory='16GB', cores=2, python='/scratch/work/public/dask/bin/python', 
                               local_directory='/tmp/{}/'.format(os.environ['SLURM_JOB_USER']),
                               job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])])

    cluster.submit_command = 'slurm'
    cluster.scale(100)

    display(cluster)
    client = Client(cluster)

display(client)

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

0,1
Client  Scheduler: tcp://10.32.35.15:40087  Dashboard: http://10.32.35.15:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [3]:
#set up directories
data_dir_v = Path('../../data/itemFactors_r200_updated/') #change this based on which V matrix to take in
data_dir_u = Path('../../data/userFactors_r200/') #change this based on which U matrix to take in
data_dir_h = Path('../../data/items_hash/')
data_dir_val = Path('../../data/valUsers/')
data_dir = '../../data/'

In [4]:
#read in U
latentUsers = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in data_dir_u.glob('*.parquet')
)

In [5]:
#read in val user list 
valUsers = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in data_dir_val.glob('*.parquet')
)

valUsers.head()

Unnamed: 0,user_hashId
0,605046451
1,-2060013449
2,-1682237462
3,-935373563
4,-1472571388


In [6]:
#read in U and check shape
latentItems = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in data_dir_v.glob('*.parquet')
)

In [7]:
valUsers = valUsers.rename(columns={"user_hashId": "id"})
latentUsers = latentUsers.merge(valUsers, on='id')

In [8]:
latentItems.id

256584    2147430290
256585    2147435213
256586    2147443727
256587    2147458348
256588    2147466802
             ...    
256579    2147399756
256580    2147399814
256581    2147411783
256582    2147426948
256583    2147427543
Name: id, Length: 320739, dtype: int64

### Convert to Dask

In [9]:
latentItems_dd = dd.from_pandas(latentItems, chunksize=100)
latentUsers_dd = dd.from_pandas(latentUsers, chunksize=100)

In [10]:
item_hashedids_da = da.array(latentItems.id.values).reshape(-1,1)
item_hashedids_da = item_hashedids_da.rechunk((1, 320739))
item_hashedids_da

Unnamed: 0,Array,Chunk
Bytes,2.57 MB,8 B
Shape,"(320739, 1)","(1, 1)"
Count,641480 Tasks,320739 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 2.57 MB 8 B Shape (320739, 1) (1, 1) Count 641480 Tasks 320739 Chunks Type int64 numpy.ndarray",1  320739,

Unnamed: 0,Array,Chunk
Bytes,2.57 MB,8 B
Shape,"(320739, 1)","(1, 1)"
Count,641480 Tasks,320739 Chunks
Type,int64,numpy.ndarray


In [71]:
latentItems_da = da.stack(latentItems_dd["features"])
latentUsers_da = da.stack(latentUsers_dd["features"])

KeyboardInterrupt: 

In [75]:
latentItems_da = latentItems_da.rechunk((1000,200))
latentItems_da

Unnamed: 0,Array,Chunk
Bytes,513.18 MB,1.60 MB
Shape,"(320739, 200)","(1000, 200)"
Count,875464 Tasks,321 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 513.18 MB 1.60 MB Shape (320739, 200) (1000, 200) Count 875464 Tasks 321 Chunks Type float64 numpy.ndarray",200  320739,

Unnamed: 0,Array,Chunk
Bytes,513.18 MB,1.60 MB
Shape,"(320739, 200)","(1000, 200)"
Count,875464 Tasks,321 Chunks
Type,float64,numpy.ndarray


In [76]:
latentUsers_da

Unnamed: 0,Array,Chunk
Bytes,8.00 MB,800 B
Shape,"(10000, 200)","(1, 200)"
Count,20000 Tasks,10000 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 8.00 MB 800 B Shape (10000, 200) (1, 200) Count 20000 Tasks 10000 Chunks Type float32 numpy.ndarray",200  10000,

Unnamed: 0,Array,Chunk
Bytes,8.00 MB,800 B
Shape,"(10000, 200)","(1, 200)"
Count,20000 Tasks,10000 Chunks
Type,float32,numpy.ndarray


In [77]:
#try to sample one user
singleUser = latentUsers_da[0]

#get scores
userScores = latentItems_da@singleUser
userScores = userScores.reshape(-1,1)
#turn back into array to allow for some sorting
#userScores_dd = dd.from_dask_array(userScores)

data = [userScores, item_hashedids_da]

In [78]:
#get scores and IDs together in an array
score_id_arr = da.concatenate(data, axis=1)

In [79]:
#now thing as 320k ratings -- chunking is arbitraty for now, need to change
#score_id_arr = score_id_arr.rechunk((8,320740/4))
score_id_arr

Unnamed: 0,Array,Chunk
Bytes,5.13 MB,8 B
Shape,"(320739, 2)","(1, 1)"
Count,3141925 Tasks,641478 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 5.13 MB 8 B Shape (320739, 2) (1, 1) Count 3141925 Tasks 641478 Chunks Type float64 numpy.ndarray",2  320739,

Unnamed: 0,Array,Chunk
Bytes,5.13 MB,8 B
Shape,"(320739, 2)","(1, 1)"
Count,3141925 Tasks,641478 Chunks
Type,float64,numpy.ndarray


In [80]:
top_500_ind = score_id_arr.argtopk(500, axis=0)[:,0]
top_500_ind

Unnamed: 0,Array,Chunk
Bytes,4.00 kB,4.00 kB
Shape,"(500,)","(500,)"
Count,5280196 Tasks,1 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 4.00 kB 4.00 kB Shape (500,) (500,) Count 5280196 Tasks 1 Chunks Type int64 numpy.ndarray",500  1,

Unnamed: 0,Array,Chunk
Bytes,4.00 kB,4.00 kB
Shape,"(500,)","(500,)"
Count,5280196 Tasks,1 Chunks
Type,int64,numpy.ndarray


In [69]:
user_preds_500 = score_id_arr[top_500_ind][:,1]
user_preds_500

Unnamed: 0,Array,Chunk
Bytes,4.00 kB,4.00 kB
Shape,"(500,)","(500,)"
Count,3915451 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 4.00 kB 4.00 kB Shape (500,) (500,) Count 3915451 Tasks 1 Chunks Type float64 numpy.ndarray",500  1,

Unnamed: 0,Array,Chunk
Bytes,4.00 kB,4.00 kB
Shape,"(500,)","(500,)"
Count,3915451 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [None]:
user_preds_500_arr = user_preds_500.compute()

user_preds_500_arr

In [20]:
#send back to datafram
scores_df = dd.from_dask_array(score_id_arr, columns=['scores','item_id'])
scores_df

Unnamed: 0_level_0,scores,item_id
npartitions=80185,Unnamed: 1_level_1,Unnamed: 2_level_1
0,float64,float64
4,...,...
...,...,...
320736,...,...
320738,...,...


### Options to get top 500 recs for the user
1. try converting the $\texttt{score_id_arr}$ to a regular numpy array and use argpartition
2. using dask arrays topk or argtopk <---
3. Convert to a dataframe and reset the index as the score


- Should try more rechunking to minimize task number

In [41]:
a = np.array([[1,2,3,4.1,5,4,0.1,0,2.1,5.1,3.4],[1,2,3,4,5,4,5,1,2,5,2]]).T
a[a.argpartition(5, axis=0)]

array([[[0. , 1. ],
        [2. , 2. ]],

       [[0.1, 5. ],
        [1. , 1. ]],

       [[1. , 1. ],
        [2.1, 2. ]],

       [[2. , 2. ],
        [0. , 1. ]],

       [[2.1, 2. ],
        [3.4, 2. ]],

       [[3. , 3. ],
        [3. , 3. ]],

       [[3.4, 2. ],
        [4.1, 4. ]],

       [[5. , 5. ],
        [4. , 4. ]],

       [[4.1, 4. ],
        [0.1, 5. ]],

       [[5.1, 5. ],
        [5.1, 5. ]],

       [[4. , 4. ],
        [5. , 5. ]]])

scores_df = scores_df.set_index('scores')
scores_df

In [42]:
a

array([[1. , 1. ],
       [2. , 2. ],
       [3. , 3. ],
       [4.1, 4. ],
       [5. , 5. ],
       [4. , 4. ],
       [0.1, 5. ],
       [0. , 1. ],
       [2.1, 2. ],
       [5.1, 5. ],
       [3.4, 2. ]])

In [57]:
a_da = da.from_array(a)

inds = a_da.argtopk(3, axis=0)[:,0]

a_da[inds][:,1].compute()

array([5., 5., 4.])

In [48]:
a_da.argtopk(3, axis=0).compute()

array([[9, 4],
       [4, 6],
       [3, 9]])

### NEXT STEPS
- how can we sort this df without crashing everything?
    - how can we do this 10,000 times?

In [None]:
#increase block size
#latentUsers_dat = latentUsers_da.transpose()
#preds = da.matmul(latentItems_da, latentUsers_dat) 