In [1]:
from pyspark import SparkContext
from scipy import sparse as sm
from sklearn.preprocessing import normalize
import numpy as np
import csv
from tqdm import tqdm_notebook as tqdm
from sklearn.linear_model import ElasticNet
import heapq

sc = SparkContext.getOrCreate()

In [2]:
train_rdd = sc.textFile("data/train.csv")
icm_rdd = sc.textFile("data/icm_fede.csv")
test_rdd= sc.textFile("data/target_users.csv")

train_header = train_rdd.first()
icm_header = icm_rdd.first()
test_header= test_rdd.first()

train_clean_data = train_rdd.filter(lambda x: x != train_header).map(lambda line: line.split(',')).map(lambda x: (int(x[0]), int(x[1]), float(x[2])))
icm_clean_data = icm_rdd.filter(lambda x: x != icm_header).map(lambda line: line.split(',')).map(lambda x: (int(x[0]), int(x[1])))
test_clean_data= test_rdd.filter(lambda x: x != test_header).map(lambda line: line.split(','))

test_users=test_clean_data.map( lambda x: int(x[0])).collect()


grouped_rates = train_clean_data.filter(lambda x: x[0] in test_users).map(lambda x: (x[0],x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
grouped_rates_dic = dict(grouped_rates)


item_ratings = train_clean_data.map(lambda x: (x[0], x[2])).aggregateByKey((0,0), lambda x,y: (x[0] + y, x[1] + 1),lambda x,y: (x[0] + y[0], x[1] + y[1]))
user_ratings_mean = item_ratings.mapValues(lambda x: (x[0] / (x[1]))).collect()
user_ratings_mean_dic=dict(user_ratings_mean)


item_ratings_forTop = train_clean_data.map(lambda x: (x[1], x[2])).aggregateByKey((0,0), lambda x,y: (x[0] + y, x[1] + 1),lambda x,y: (x[0] + y[0], x[1] + y[1]))#.sortBy(lambda x: x[1][1], ascending=False)
#item_ratings.take(10)
shrinkage_factor = 5
item_ratings_mean = item_ratings_forTop.mapValues(lambda x: (x[0] / (x[1] + shrinkage_factor))).sortBy(lambda x: x[1], ascending = False).map(lambda x: x[0]).collect()
    


In [3]:
users = train_clean_data.map(lambda x: x[0]).collect()
items = train_clean_data.map(lambda x: x[1]).collect()
ratings = train_clean_data.map(lambda x: x[2]).collect()
ratings_unbiased = train_clean_data.map(lambda x: x[2]-user_ratings_mean_dic[x[0]]).collect()

shape = (train_clean_data.map(lambda x: int(x[0])).max()+1,
         train_clean_data.map(lambda x: int(x[1])).max()+1)

UxI = sm.csc_matrix((ratings, (users, items)), shape=shape)

items_for_features= icm_clean_data.map(lambda x:x[0]).collect()
features = icm_clean_data.map(lambda x:x[1]).collect()
items_for_features.append(37142)
features.append(0)

unos=[1]*len(items_for_features)

matrixinteractionsSparse = sm.csr_matrix((unos, (items_for_features, features)))
#matrixinteractionsSparse = sm.csc_matrix((ratings, (users, items)), shape=shape)

In [4]:
matrixinteractionsSparseNorm = normalize(matrixinteractionsSparse, norm='l2', axis=1)
matrixSimilarity = matrixinteractionsSparseNorm.dot(matrixinteractionsSparseNorm.T)
matrixinteractionsSparse = matrixinteractionsSparse.T.tocsc()
n_items = matrixinteractionsSparse.shape[1]





In [5]:
listTopSimilar = []
matrixSimilarity = matrixSimilarity.tocsc()
for i in tqdm(range(n_items)):
    minimum = min(400,matrixSimilarity[:,i].nnz)            #prendo minimo tra 100 e il numero di item simili
    #top_k_idx = np.argpartition(matrixSimilarity[i,:], -maximum)[:maximum]
    top_k_idx = matrixSimilarity[:, i].data.argpartition(-minimum)[-minimum:]
    listTopSimilar.append(matrixSimilarity[:, i].indices[top_k_idx])


Widget Javascript not detected.  It may not be installed or enabled properly.





In [6]:
matrixSimilarity

<37143x37143 sparse matrix of type '<class 'numpy.float64'>'
	with 610860304 stored elements in Compressed Sparse Column format>

In [7]:
l1_penalty=0.1
l2_penalty=0.1
positive_only=True
l1_ratio = l1_penalty / (l1_penalty + l2_penalty)

model = ElasticNet(alpha=1.0,
                       l1_ratio=l1_ratio,
                       positive=positive_only,
                       fit_intercept=False,
                       copy_X=False)

In [8]:
def fitSLIM(URM):
    
    itemNumber = URM.shape[1]
    
    itemList = sc.parallelize(list(range(itemNumber)))
    
    # fit item's factors in parallel
    slimResult = itemList.flatMap(lambda x: fitOneColumn(x, URM, model))

    rows = slimResult.map(lambda x: x[0]).collect()
    cols = slimResult.map(lambda x: x[1]).collect()
    values = slimResult.map(lambda x: x[2]).collect()   
    
    # generate the sparse weight matrix
    return sm.csc_matrix((values, (rows, cols)), 
                              shape=(itemList.max()+1, itemList.max()+1),
                              dtype=np.float32)


In [9]:
def fitOneColumn(colID, URM, model):
    if(URM[:, colID].nnz > 0):
    # get the target column
        if len(listTopSimilar[colID]) <= 0:
            y = URM[:, colID].toarray()

    # set the colID column of URM to zero
            URM.data[URM.indptr[colID]:URM.indptr[colID + 1]] = 0.0
            print(colID, 'ops')
    # fit one ElasticNet model per column
            model.fit(URM, y)
            nnz_idx = model.coef_ > 0.0
            if nnz_idx.sum() > 0:
                values = model.coef_[nnz_idx]
                rows = np.arange(URM.shape[1])[nnz_idx]
                cols = np.ones(nnz_idx.sum()) * colID
                return list(zip(rows, cols, values))

    # self.model.coef_ contains the coefficient of the ElasticNet model
    # let's keep only the non-zero values
        else:
            y = URM[:, colID].toarray().ravel()

    # set the colID column of URM to zero
            URM.data[URM.indptr[colID]:URM.indptr[colID + 1]] = 0.0
            print(colID, 'ops')
    # fit one ElasticNet model per column
            model.fit(URM[:, listTopSimilar[colID]], y)

    # self.model.coef_ contains the coefficient of the ElasticNet model
    # let's keep only the non-zero values
            nnz_idx = model.coef_ > 0.0
            if nnz_idx.sum() > 0:
                values = model.coef_[nnz_idx]
                rows = np.arange(URM.shape[1])[nnz_idx]
                cols = np.ones(nnz_idx.sum()) * colID
                return list(zip(rows, cols, values))
    return []

In [10]:
UxI

<15375x37143 sparse matrix of type '<class 'numpy.float64'>'
	with 170149 stored elements in Compressed Sparse Column format>

In [11]:
similaritySLIM = fitSLIM(UxI)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 23.0 failed 1 times, most recent failure: Lost task 2.0 in stage 23.0 (TID 41, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-8-127652cf4fb8>", line 8, in <lambda>
  File "<ipython-input-9-4e80a5d83294>", line 27, in fitOneColumn
UnboundLocalError: local variable 'nnz_idx' referenced before assignment

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-8-127652cf4fb8>", line 8, in <lambda>
  File "<ipython-input-9-4e80a5d83294>", line 27, in fitOneColumn
UnboundLocalError: local variable 'nnz_idx' referenced before assignment

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
similaritySLIM

def fitSLIM(URM):
    
    itemNumber = URM.shape[1]
    
    result = list()
    
    for item in tqdm(range(itemNumber)):        
        result += fitOneColumn(item, URM, model)
    
    return result
    # fit item's factors in parallel

values, rows, cols = [], [], []

        # fit each item's factors sequentially (not in parallel)
for j in tqdm(range(n_items)):
            # get the target column
            if(matrixinteractionsSparse[:, j].nnz > 0):
                y = matrixinteractionsSparse[:, j].toarray().ravel()
                #y = column_or_1d(y, warn=True)
                # set the j-th column of X to zero
                startptr = matrixinteractionsSparse.indptr[j]
                endptr = matrixinteractionsSparse.indptr[j + 1]
                bak = matrixinteractionsSparse.data[startptr: endptr].copy()
                matrixinteractionsSparse.data[startptr: endptr] = 0.0
                # fit one ElasticNet model per column
                model.fit(matrixinteractionsSparse[:,listTopSimilar[j]],y)
                #model.fit(matrixinteractionsSparse, y)
                # self.model.coef_ contains the coefficient of the ElasticNet model
                # let's keep only the non-zero values
                nnz_idx = model.coef_ > 0.0
                #values.extend(model.coef_[nnz_idx])
                #rows.extend(np.arange(n_items)[nnz_idx])
                #cols.extend(np.ones(nnz_idx.sum()) * j)
                if (nnz_idx.sum() > 0):
                    values.extend(model.coef_[nnz_idx])
                    rows.extend(listTopSimilar[j][nnz_idx].flatten())
                    # rows.extend(np.arange(nitems)[nnz_idx])
                    cols.extend(np.ones(nnz_idx.sum()) * j)
                # finally, replace the original values of the j-th column
                matrixinteractionsSparse.data[startptr:endptr] = bak
# generate the sparse weight matrix
matrixSimilarity = sm.csc_matrix((values, (rows, cols)), shape=(n_items, n_items), dtype=np.float32)
matrixinteractionsSparse = matrixinteractionsSparse.T.tocsc()
listUser = []
listValue = []

for index in tqdm(test_users):
    listSimilarity = matrixSimilarity[:,index]
    scores = listSimilarity.T.dot(matrixinteractionsSparse)
    print(scores.nnz)
    #scores =  matrixinteractionsSparse[index,:].dot(matrixSimilarity)
    scores = scores.toarray()[0]
    scores *= np.negative((matrixinteractionsSparse[index,:]).astype(bool).toarray()[0])
    sumTemp = np.sum((matrixinteractionsSparse[index, :]).astype(bool).toarray()[0])
    if (sumTemp <= 2):
        low_values_indices = scores < 0.001  # Where values are low
        scores[low_values_indices] = 0  # All low values set to 0
    topItems = heapq.nlargest(5, range(len(scores)), scores.take)
    if(scores[topItems[0]]>0):
        listUser.append(index)
        listValue.append(str(topItems[0]) + " " + str(topItems[1]) + " " + str(topItems[2]) + " " + str(topItems[3]) + " " + str(topItems[4]))


unos=[1]*len(items_for_features)

UxI= sm.csr_matrix((ratings, (users, items)))
UxI_coo = UxI.tocoo()
IxF= sm.csr_matrix((unos, (items_for_features, features)))
IxF_coo = IxF.tocoo()

def fitOneColumn(colID, URM, l1_penalty=0.1, l2_penalty=0.1, positive_only=True):
    
    l1_ratio = l1_penalty / (l1_penalty + l2_penalty)

    model = ElasticNet(alpha=1.0,
                       l1_ratio=l1_ratio,
                       positive=positive_only,
                       fit_intercept=False,
                       copy_X=False)

    # get the target column
    y = URM[:, colID].toarray()

    # set the colID column of URM to zero
    URM.data[URM.indptr[colID]:URM.indptr[colID + 1]] = 0.0

    # fit one ElasticNet model per column
    model.fit(URM, y)

    # self.model.coef_ contains the coefficient of the ElasticNet model
    # let's keep only the non-zero values
    nnz_idx = model.coef_ > 0.0

    values = model.coef_[nnz_idx]
    rows = np.arange(URM.shape[1])[nnz_idx]
    cols = np.ones(nnz_idx.sum()) * colID

    
    return list(zip(rows, cols, values))