# A Fast Parallel Stochastic Gradient Method for Matrix Factorization in Shared Memory Systems

Matrix factorization is known to be an effective method for recommender systems that are given only the ratings from users to items. Currently, stochastic gradient (SG) method is one of the most popular algorithms for matrix factorization. However, as a sequential approach, SG is difficult to be parallelized for handling web-scale problems. In this paper, we develop a fast parallel SG method, FPSG, for shared memory systems. By dramatically reducing the cache-miss rate and carefully addressing the load balance of threads, FPSG is more efficient than state-of-the-art parallel algorithms for matrix factorization.

In [1]:
import numpy as np
from scipy import sparse
import matplotlib.pyplot as plt
import time
import os
from tqdm import tqdm
from multiprocessing import Process, Queue, Pool, Array
from ctypes import c_double
from functools import partial
import multiprocessing
from multiprocessing import log_to_stderr, get_logger
import logging
from concurrent.futures import ThreadPoolExecutor
import itertools as itls
import random
from functools import reduce

#### Data input

In [2]:
path = os.getcwd().replace('projet', 'data/')
r = np.genfromtxt(path + 'ratings.dat', delimiter="::", dtype=None)

# Matrice r
row = np.array([y[0] for y in r[::, :1]])
col = np.array([y[0] for y in r[::, 1:2]])
values = np.array([y[0] for y in r[::, 2:3]])
M = sparse.coo_matrix((values, (row, col)))
Mc = M.tocsr()

# Jeu de données réduit
x = 100000
y = 100000
A = Mc[:x, :y]
id_x = A.nonzero()[0]
id_y = A.nonzero()[1]
nb_ratings = len(A.data)

In [3]:
print(A)

  (1, 1)	5
  (1, 48)	5
  (1, 150)	5
  (1, 260)	4
  (1, 527)	5
  (1, 531)	4
  (1, 588)	4
  (1, 594)	4
  (1, 595)	5
  (1, 608)	4
  (1, 661)	3
  (1, 720)	3
  (1, 745)	3
  (1, 783)	4
  (1, 914)	3
  (1, 919)	4
  (1, 938)	4
  (1, 1022)	5
  (1, 1028)	5
  (1, 1029)	5
  (1, 1035)	5
  (1, 1097)	4
  (1, 1193)	5
  (1, 1197)	3
  (1, 1207)	4
  :	:
  (6040, 3334)	5
  (6040, 3342)	3
  (6040, 3359)	4
  (6040, 3361)	2
  (6040, 3362)	4
  (6040, 3388)	1
  (6040, 3418)	3
  (6040, 3421)	3
  (6040, 3422)	3
  (6040, 3424)	2
  (6040, 3449)	3
  (6040, 3471)	4
  (6040, 3504)	4
  (6040, 3505)	4
  (6040, 3521)	5
  (6040, 3524)	1
  (6040, 3543)	4
  (6040, 3547)	4
  (6040, 3552)	2
  (6040, 3671)	4
  (6040, 3683)	4
  (6040, 3703)	4
  (6040, 3735)	4
  (6040, 3751)	4
  (6040, 3819)	5


#### Fonction RMSE

In [4]:
latent_dim = 20
p = np.random.rand(latent_dim, A.shape[0])/10
q = np.random.rand(latent_dim, A.shape[1])/10
r_hat = np.dot(np.transpose(p), q)

In [5]:
def RMSE(A, r_hat, id_x, id_y, nb_ratings):
    rmse = 0
    for i, j in zip(id_x, id_y):
        rmse += (A[i, j] - r_hat[i, j])**2
    return np.sqrt(rmse/nb_ratings)

In [6]:
start = time.time()
rmse = RMSE(A, r_hat, id_x, id_y, nb_ratings)
print('Executed in:', time.time()-start)

Executed in: 36.63531994819641


In [7]:
def map_func(i, j):
    return (A[i, j] - r_hat[i, j])**2

def reduce_func(a,b):
    return a+b

def RMSE_map(A, r_hat, id_x, id_y, nb_ratings):
    p = multiprocessing.Pool(processes=4)
    return np.sqrt(reduce(reduce_func, p.starmap(map_func, zip(id_x, id_y)))/nb_ratings)

In [8]:
start = time.time()
rmse_map = RMSE_map(A, r_hat, id_x, id_y, nb_ratings)
print('Executed in:', time.time()-start)

Executed in: 12.40776777267456


In [9]:
print('Exact?', rmse_map-rmse)

Exact? 0.0


#### Input data - display management

In [10]:
print_iter = 1
time_break = 900.

#### Input data - Parameters initialisation

In [11]:
# Input definition
latent_dim = 20
learning_rate = 0.01
l_p = 0.05
l_q = 0.05
stop_cond = 0.1
nb_process = 10

rmse = 6

## I - Reference algorithm - Sequential method

The naive method is our reference. It is a naive sequential model which loop through all the couples (u,v) referecing a value in the rating matrix, and updating the matrix P and the matrix Q, sequentially.

It is therefore not optimal at all especially for the larger dimensions.

We use this method as a benchmark.

In [12]:
class modelNaive:

    def __init__(self, ratingMatrix, latent_dim, learning_rate,
                 l_p, l_q):
        """
        Class attributes:
        -----------
            ratingMatrix: Matrix of the ratings
            latent_dim: latent dimension of p and q
            learning_rate: learning rate coefficient
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
            nbre_threads: number of threads available
        """
        self.ratingMatrix = ratingMatrix
        self.latent_dim = latent_dim
        self.learning_rate = learning_rate
        self.l_p = l_p
        self.l_q = l_q
        self.__parameters = self.__initialize_parameters()

        # id coef not null - Must have sparse.to_csr for the matrix
        self.id_x = ratingMatrix.nonzero()[0]
        self.id_y = ratingMatrix.nonzero()[1]
        
        # Track number of updates
        self.nb_updates = 0
        
    def __initialize_parameters(self):
        """
        Parameters:
        -----------
            latent_dim: latent dimension of p and q
            ratingMatrix: rating matrix
        Output:
        -----------
            p: P matrix with random parameters (user matrix)
            q: Q matrix with random parameters (item matrix)
        """
        p = np.random.rand(self.latent_dim, self.ratingMatrix.shape[0])/10
        q = np.random.rand(self.latent_dim, self.ratingMatrix.shape[1])/10
        
        r_hat_init = np.dot(np.transpose(p), q)

        return p, q, r_hat_init

    def updateDescente(self, u, v):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        # Non actualize parameters
        p_u = self.__parameters[0][:, u]
        q_v = self.__parameters[1][:, v]

        # Calcul de la descente de gradient
        e = self.ratingMatrix[u, v] - np.dot(np.transpose(p_u), q_v)
        p_u = p_u + self.learning_rate*(e*q_v - self.l_p*p_u)
        q_v = q_v + self.learning_rate*(e*p_u - self.l_q*q_v)

        # update de p et q
        self.__parameters[0][:, u], self.__parameters[1][:, v] = p_u, q_v
        
        # Nb updates
        self.nb_updates += 1

    def updatePQ(self):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        for u, v in zip(self.id_x, self.id_y):
            modelNaive.updateDescente(self, u, v)
            '''if timer > time_break:
                break'''

    def get_updated_matrix(self):
        """
        Output:
        -----------
            p: Matrix P (users)
            q: Matrix Q (items)
        """
        return self.__parameters

In [13]:
modeleNaive_ = modelNaive(A, latent_dim, learning_rate, l_p, l_q)

j = 0

start_time = time.time()

while rmse > stop_cond:

    start_temp = time.time()

    # Gradient descent
    #modeleNaive_.updatePQ(start_time, time_break)
    modeleNaive_.updatePQ()

    ''' # Print iter
    if j % print_iter == 0:
        print('----')
        print('Iteration', j, ' - ', time.time() - start_temp, 's per iteration')
    j += 1
    '''
    # stop condition for iterative method
    if time.time() - start_time > time_break:
        break



In [14]:
print("Execution time - itératif: %s secondes ----" % np.round((time.time() - start_time), 2))

Execution time - itératif: 945.47 secondes ----


#### Calcul du RMSE

In [15]:
# Get the updated p and q
p = modeleNaive_.get_updated_matrix()[0]
q = modeleNaive_.get_updated_matrix()[1]

In [16]:
R_hat_init = modeleNaive_.get_updated_matrix()[2]
R_hat = np.dot(np.transpose(p), q)

In [17]:
rmse_init = RMSE(A, R_hat_init, id_x, id_y, nb_ratings)
print('-----')
print('Initial RMSE:', rmse_init)
print()
rmse = RMSE(A, R_hat, id_x, id_y, nb_ratings)
print('-----')
print('RMSE:', rmse)
print()
print('-----')
print('Gain:', rmse_init - rmse)

-----
Initial RMSE 3.704220606915771

-----
RMSE nan

-----
Gain: nan


## II - HogWild Algorithm

HogWild algorithm assumes that the rating matrix is highly sparse and deduces that for two randomly sampled ratings, the gradient descent are likely to be independent.

In [18]:
class modelHogWild:

    def __init__(self, ratingMatrix, latent_dim, learning_rate,
                 l_p, l_q, nb_threads):
        """
        Class attributes:
        -----------
            ratingMatrix: Matrix of the ratings
            latent_dim: latent dimension of p and q
            learning_rate: learning rate coefficient
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        """
        self.ratingMatrix = ratingMatrix
        self.latent_dim = latent_dim
        self.learning_rate = learning_rate
        self.l_p = l_p
        self.l_q = l_q
        self.nb_process = nb_process

        # Computed attributes
        self.__parameters = self.__initialize_parameters()

        # id coef not null - Must have sparse.to_csr for the matrix
        self.id_x = ratingMatrix.nonzero()[0]
        self.id_y = ratingMatrix.nonzero()[1]
        
        # Track number of updates
        self.nb_updates = 0

    def __initialize_parameters(self):
        """
        Parameters:
        -----------
            latent_dim: latent dimension of p and q
            ratingMatrix: rating matrix
        Output:
        -----------
            p: P matrix with random parameters (user matrix)
            q: Q matrix with random parameters (item matrix)
        """
        p = np.random.rand(self.latent_dim, self.ratingMatrix.shape[0])/10
        q = np.random.rand(self.latent_dim, self.ratingMatrix.shape[1])/10
        
        r_hat_init = np.dot(np.transpose(p), q)

        return p, q, r_hat_init
    
    # Implementation of the first method - Multi-threading

    def updateDescente_multithreading(self, u, v):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        # Non actualize parameters
        p_u = self.__parameters[0][:, u]
        q_v = self.__parameters[1][:, v]

        # Calcul de la descente de gradient
        e = self.ratingMatrix[u, v] - np.dot(np.transpose(p_u), q_v)
        p_u = p_u + self.learning_rate*(e*q_v - self.l_p*p_u)
        q_v = q_v + self.learning_rate*(e*p_u - self.l_q*q_v)

        # update de p et q
        self.__parameters[0][:, u], self.__parameters[1][:, v] = p_u, q_v
        
        # update Nb updates
        self.nb_updates += 1

    def multithreading(self):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        def chooseUV(id_x, id_y, nb_process):
            index_chosen = np.random.randint(0, len(id_x), nb_process)
            return id_x[index_chosen], id_y[index_chosen]

        res_ = chooseUV(self.id_x, self.id_y, self.nb_process)
        u_ = res_[0]
        v_ = res_[1]

        with ThreadPoolExecutor(max_workers=self.nb_process) as executor:
            executor.map(self.updateDescente_multithreading, u_, v_)
    
    # Implementation of the second method - Multi-processing
    
    def updateDescente_multiprocessing(self, tuple_uv_in, results):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        try:
            tuple_uv = tuple_uv_in.get()
        except Exception:
            return
        u_in = tuple_uv[0]
        v_in = tuple_uv[1]
        
        # Non actualize parameters
        p_u = self.__parameters[0][:, u_in]
        q_v = self.__parameters[1][:, v_in]

        # Calcul de la descente de gradient
        e = self.ratingMatrix[u_in, v_in] - np.dot(np.transpose(p_u), q_v)
        p_u = p_u + self.learning_rate*(e*q_v - self.l_p*p_u)
        q_v = q_v + self.learning_rate*(e*p_u - self.l_q*q_v)

        # Push p_u and q_v in the queue
        results.put([u_in, p_u, v_in, q_v])

        proc_name = multiprocessing.current_process().name
        # print( '%s: Index u : %s, Index v : %s' % (proc_name, u_in, v_in))
        
        # update Nb updates
        self.nb_updates += 1
        return

    def multiprocess(self):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        def chooseUV(id_x, id_y, nb_process):
            index_chosen = np.random.randint(0, len(id_x), nb_process)
            return [(id_x[y], id_y[y]) for y in index_chosen]

        tuple_uv_ = chooseUV(self.id_x, self.id_y, self.nb_process)

        tuple_uv_in = Queue()
        results = Queue()

        for i in range(self.nb_process):
            tuple_uv_in.put(tuple_uv_[i])

        jobs = [Process(target=self.updateDescente_multiprocessing,
                        args=(tuple_uv_in, results))
                for i in range(self.nb_process)]

        for j in jobs:
            j.start()
        
        for j in jobs:
            j.join()

        n = self.nb_process
        while (n):
            result = results.get()
            self.__parameters[0][:, result[0]] = result[1]
            self.__parameters[1][:, result[2]] = result[3]
            n -= 1

    def get_updated_matrix(self):
        """
        Output:
        -----------
            p: Matrix P (users)
            q: Matrix Q (items)
        """
        return self.__parameters

### II - A - MultiThreading approach

In [19]:
modelHogWild_ = modelHogWild(A, latent_dim, learning_rate, l_p, l_q, nb_process)
j = 0
start_time = time.time()

while rmse > stop_cond:
    
    start_temp = time.time()

    # Gradient descent
    modelHogWild_.multithreading()

    '''# Print
    j += 1
    if j % print_iter == 0:
        print('-----')
        print('Iteration', j, ' - ', np.round(time.time() - start_temp, 2), 's per iteration')'''

    # stop condition for iterative method
    if time.time() - start_time > time_break:
        break

In [20]:
print("Execution time - MultiThreading: %s secondes ---" % np.round((time.time() - start_time), 2))

Execution time - MultiThreading: 0.01 secondes ---


#### Calcul du RMSE

In [21]:
# Get the updated p and q
p = modelHogWild_.get_updated_matrix()[0]
q = modelHogWild_.get_updated_matrix()[1]

In [22]:
R_hat_init = modelHogWild_.get_updated_matrix()[2]
R_hat = np.dot(np.transpose(p), q)

In [23]:
rmse_init = RMSE(A, R_hat_init, id_x, id_y, nb_ratings)
print('-----')
print('Initial RMSE', rmse_init)
print()
rmse = RMSE(A, R_hat, id_x, id_y, nb_ratings)
print('-----')
print('RMSE', rmse)
print()
print('-----')
print('Gain:', rmse_init - rmse)

-----
Initial RMSE 3.7040307529955525

-----
RMSE 3.7040307529955525

-----
Gain: 0.0


The multithread approach on python does not fit our project. In fact, it corresponds to the iterative, i.e. the naive approach. Indeed, the GIL on python forces the algorithm to run on an only thread. Therefore, the gradient descent, here, is not parallelized but iterative.
We must follow other methods, developed in the following parts.

### II - B - MultiProcessing approach - Queue

In [24]:
modelHogWild_ = modelHogWild(A, latent_dim, learning_rate, l_p, l_q, nb_process)
j = 0
start_time = time.time()

while rmse > stop_cond:
    
    start_temp = time.time()

    # Gradient descent
    modelHogWild_.multiprocess()

    '''# Print
    j += 1
    if j % print_iter == 0:
        print('-----')
        print('Iteration', j, ' - ', np.round(time.time() - start_temp, 2), 's per iteration')'''
    # stop condition for iterative method
    if time.time() - start_time > time_break:
        break

In [25]:
print("Execution time - MultiProcessing, Queue approach: %s secondes ---" % np.round((time.time() - start_time), 2))

Execution time - MultiProcessing, Queue approach: 900.04 secondes ---


##### Calcul du RMSE

In [26]:
# Get the updated p and q
p = modelHogWild_.get_updated_matrix()[0]
q = modelHogWild_.get_updated_matrix()[1]

In [27]:
R_hat_init = modelHogWild_.get_updated_matrix()[2]
R_hat = np.dot(np.transpose(p), q)

In [28]:
rmse_init = RMSE(A, R_hat_init, id_x, id_y, nb_ratings)
print('-----')
print('Initial RMSE', rmse_init)
print()
rmse = RMSE(A, R_hat, id_x, id_y, nb_ratings)
print('-----')
print('RMSE', rmse)
print()
print('-----')
print('Gain:', rmse_init - rmse)

-----
Initial RMSE 3.7040778347081225

-----
RMSE 1.6700039728814609

-----
Gain: 2.034073861826662


### II - D - Observations

- The multi-threading is a non-optimal approach. In fact, the GIL (Global Interpreter Lock) locks the access to multiple threads in parallel. Python wasn't designed considering that personal computers might have more than one core (shows you how old the language is), so the GIL is necessary because Python is not thread-safe and there is a globally enforced lock when accessing a Python object. Though not perfect, it's a pretty effective mechanism for memory management.

- The Queue approach seems to be the most adapted approach. In fact, the implementation of the FSPG algorithm (later) includes a scheduler, therefore, an ordonnate management allowed with the queues. We will, thus, use the queue approach for the last algorithm, FSPG.

## III - DSGD Algorithm

In [29]:
class modelDSGD:

    def __init__(self, ratingMatrix, latent_dim, learning_rate,
                 l_p, l_q, nb_process):
        """
        Class attributes:
        -----------
            ratingMatrix: Matrix of the ratings
            latent_dim: latent dimension of p and q
            learning_rate: learning rate coefficient
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
            nbre_threads: number of threads available
        """
        self.ratingMatrix = ratingMatrix
        self.latent_dim = latent_dim
        self.learning_rate = learning_rate
        self.l_p = l_p
        self.l_q = l_q
        self.nb_process = nb_process
        self.__parameters = self.__initialize_parameters()

        # id coef not null - Must have sparse.to_csr for the matrix
        self.id_x = ratingMatrix.nonzero()[0]
        self.id_y = ratingMatrix.nonzero()[1]
        
        # Track number of updates
        self.nb_updates = 0

    def Sequential_List_Treatment(self):
        """
        Parameters:
        -----------
            Nbre_threads : number of threads available
        Output:
        -----------
            Steps : list indicating at each step of one parallelisation,
            the submatrix that each thread should treat
        """
        Steps = []
        for k in range(self.nb_process):
            latent = {}
            for i in range(self.nb_process):
                j = (i+k) % self.nb_process
                latent["Thread" + str(i)] = [i, j]
            Steps.append(latent)
        return Steps

    def create_Index_Sub_Matrix(self):
        """
        Parameters:
        -----------
            R:Ratings Matrix
            nbre_threads: Number of available threads
        Output
        --------
            dic_row: Dictionnary indicating the available u for each sub matrix
            dic_cal: Dictionnary indicating the available v for each sub matrix
        """
        nrow, ncol = self.ratingMatrix.shape
        inter_row = nrow // self.nb_process
        inter_col = ncol // self.nb_process
        dic_row = {}
        dic_col = {}
        for i in range(self.nb_process-1):
            dic_row[i] = list(range(i*inter_row, (i+1)*inter_row))
            dic_col[i] = list(range(i*inter_col, (i+1)*inter_col))
        dic_row[i+1] = list(range((i+1)*inter_row, nrow))
        dic_col[i+1] = list(range((i+1)*inter_col, ncol))
        return dic_row, dic_col

    def grid_uv_generation(self, id_row, id_col):
        """
        Parameters:
        -----------
            id_row: row index of the submatrix
            id_col: col index of the submatrix
        Output:
        -----------
            u, v for the specific thread
        """
        # Id of the sub-block s
        (ids_block_row,
         ids_block_col) = modelDSGD.create_Index_Sub_Matrix(self)
        subblock = (ids_block_row[id_row], ids_block_col[id_col])

        couple_subblock = set([(i, j) for i, j in itls.product(subblock[0],
                                                               subblock[1])])
        # Intersection of the not null coefficient
        couple_uv = set([(i, j) for i, j in zip(self.id_x, self.id_y)])

        # Interesct
        return list(couple_subblock.intersection(couple_uv))

    def __initialize_parameters(self):
        """
        Parameters:
        -----------
            latent_dim: latent dimension of p and q
            ratingMatrix: rating matrix
        Output:
        -----------
            p: P matrix with random parameters (user matrix)
            q: Q matrix with random parameters (item matrix)
        """
        p = np.random.rand(self.latent_dim, self.ratingMatrix.shape[0])/10
        q = np.random.rand(self.latent_dim, self.ratingMatrix.shape[1])/10
        r_hat_init = np.dot(np.transpose(p), q)

        return p, q, r_hat_init

    # Implementation of the queue approach

    def updateDescente(self, tuple_uv, results):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        # Non actualize parameters
        p_u = self.__parameters[0][:, tuple_uv[0]]
        q_v = self.__parameters[1][:, tuple_uv[1]]

        # Calcul de la descente de gradient
        e = self.ratingMatrix[tuple_uv[0], tuple_uv[1]] - np.dot(np.transpose(p_u), q_v)
        p_u = p_u + self.learning_rate*(e*q_v - self.l_p*p_u)
        q_v = q_v + self.learning_rate*(e*p_u - self.l_q*q_v)

        # Push p_u and q_v in the queue
        results.put([tuple_uv[0], p_u, tuple_uv[1], q_v])
        
        proc_name = multiprocessing.current_process().name
        # print( '%s: Index u : %s, Index v : %s' % (proc_name, tuple_uv[0], tuple_uv[1]))
        
        # Update Nb updates
        self.nb_updates += 1
        return

    def multiprocess(self):
        """
        Parameters:
        -----------
            p: matrix with random parameters (user matrix)
            q: matrix with random parameters (item matrix)
            l_p: reguralization parameter for p_u
            l_q: reguralization parameter for q_v
        Output:
        -----------
            p_u: column u of p updated with the gradient descent
            q_v: column v of q updated with the gradient descent
        """
        def update_one_sub_matrix(blocks, results):
            try:
                block = blocks.get()
            except:
                return
            for tuple_uv in block:
                # Launch gradient descent
                modelDSGD.updateDescente(self, tuple_uv, results)

        # Pattern of matrix R
        patterns = modelDSGD.Sequential_List_Treatment(self)

        for pattern in patterns:
            # Define the empty queues
            blocks = Queue()
            results = Queue()
            
            count = 0

            sub_block = list(pattern.values())
            sub_block = [np.random.permutation(modelDSGD_.grid_uv_generation(y[0], y[1])) for y in sub_block]
            
            for sub in sub_block:
                blocks.put(sub)
            
            count=np.sum([len(a) for a in sub_block])

            jobs = [Process(target=update_one_sub_matrix,
                            args=(blocks, results))
                    for i in range(self.nb_process)]

            for j in jobs:
                j.start()
            
            for j in jobs:
                j.join(timeout=0)
                if j.is_alive():
                    print('Job is not finished')
                else:
                    print('Job is done')
                print('Successful join:', j.name)

            n = count
            while (n):
                result = results.get()
                self.__parameters[0][:, result[0]] = result[1]
                self.__parameters[1][:, result[2]] = result[3]
                n -= 1

    def get_updated_matrix(self):
        """
        Output:
        -----------
            p: Matrix P (users)
            q: Matrix Q (items)
        """
        return self.__parameters


In [30]:
modelDSGD_ = modelDSGD(A, latent_dim, learning_rate, l_p, l_q, nb_process)
j = 0
start_time = time.time()

while rmse > stop_cond:
    
    start_temp = time.time()

    # Gradient descent
    modelDSGD_.multiprocess()
    
    '''# Print
    j += 1
    if j % print_iter == 0:
        print('-----')
        print('Iteration', j, ' - ', np.round(time.time() - start_temp, 2), 's per iteration')'''

    # stop condition for iterative method
    if time.time() - start_time > time_break:
        break

Job is not finished
Successful join: Process-208675
Job is not finished
Successful join: Process-208676
Job is not finished
Successful join: Process-208677
Job is not finished
Successful join: Process-208678
Job is not finished
Successful join: Process-208679
Job is not finished
Successful join: Process-208680
Job is not finished
Successful join: Process-208681
Job is not finished
Successful join: Process-208682
Job is not finished
Successful join: Process-208683
Job is not finished
Successful join: Process-208684
Job is not finished
Successful join: Process-208685
Job is not finished
Successful join: Process-208686
Job is not finished
Successful join: Process-208687
Job is not finished
Successful join: Process-208688
Job is not finished
Successful join: Process-208689
Job is not finished
Successful join: Process-208690
Job is not finished
Successful join: Process-208691
Job is not finished
Successful join: Process-208692
Job is not finished
Successful join: Process-208693
Job is not f

Job is not finished
Successful join: Process-208835
Job is not finished
Successful join: Process-208836
Job is not finished
Successful join: Process-208837
Job is not finished
Successful join: Process-208838
Job is not finished
Successful join: Process-208839
Job is not finished
Successful join: Process-208840
Job is not finished
Successful join: Process-208841
Job is not finished
Successful join: Process-208842
Job is not finished
Successful join: Process-208843
Job is not finished
Successful join: Process-208844
Job is not finished
Successful join: Process-208845
Job is not finished
Successful join: Process-208846
Job is not finished
Successful join: Process-208847
Job is not finished
Successful join: Process-208848
Job is not finished
Successful join: Process-208849
Job is not finished
Successful join: Process-208850
Job is not finished
Successful join: Process-208851
Job is not finished
Successful join: Process-208852
Job is not finished
Successful join: Process-208853
Job is not f

Job is not finished
Successful join: Process-208995
Job is not finished
Successful join: Process-208996
Job is not finished
Successful join: Process-208997
Job is not finished
Successful join: Process-208998
Job is not finished
Successful join: Process-208999
Job is not finished
Successful join: Process-209000
Job is not finished
Successful join: Process-209001
Job is not finished
Successful join: Process-209002
Job is not finished
Successful join: Process-209003
Job is not finished
Successful join: Process-209004
Job is not finished
Successful join: Process-209005
Job is not finished
Successful join: Process-209006
Job is not finished
Successful join: Process-209007
Job is not finished
Successful join: Process-209008
Job is not finished
Successful join: Process-209009
Job is not finished
Successful join: Process-209010
Job is not finished
Successful join: Process-209011
Job is not finished
Successful join: Process-209012
Job is not finished
Successful join: Process-209013
Job is not f

Job is not finished
Successful join: Process-209155
Job is not finished
Successful join: Process-209156
Job is not finished
Successful join: Process-209157
Job is not finished
Successful join: Process-209158
Job is not finished
Successful join: Process-209159
Job is not finished
Successful join: Process-209160
Job is not finished
Successful join: Process-209161
Job is not finished
Successful join: Process-209162
Job is not finished
Successful join: Process-209163
Job is not finished
Successful join: Process-209164
Job is not finished
Successful join: Process-209165
Job is not finished
Successful join: Process-209166
Job is not finished
Successful join: Process-209167
Job is not finished
Successful join: Process-209168
Job is not finished
Successful join: Process-209169
Job is not finished
Successful join: Process-209170
Job is not finished
Successful join: Process-209171
Job is not finished
Successful join: Process-209172
Job is not finished
Successful join: Process-209173
Job is not f

Job is not finished
Successful join: Process-209315
Job is not finished
Successful join: Process-209316
Job is not finished
Successful join: Process-209317
Job is not finished
Successful join: Process-209318
Job is not finished
Successful join: Process-209319
Job is not finished
Successful join: Process-209320
Job is not finished
Successful join: Process-209321
Job is not finished
Successful join: Process-209322
Job is not finished
Successful join: Process-209323
Job is not finished
Successful join: Process-209324
Job is not finished
Successful join: Process-209325
Job is not finished
Successful join: Process-209326
Job is not finished
Successful join: Process-209327
Job is not finished
Successful join: Process-209328
Job is not finished
Successful join: Process-209329
Job is not finished
Successful join: Process-209330
Job is not finished
Successful join: Process-209331
Job is not finished
Successful join: Process-209332
Job is not finished
Successful join: Process-209333
Job is not f

In [31]:
print("Execution time - parallel: %s secondes ---" % np.round((time.time() - start_time), 2))

Execution time - parallel: 997.14 secondes ---


##### Calcul du RMSE

In [32]:
# Get the updated p and q
p = modelDSGD_.get_updated_matrix()[0]
q = modelDSGD_.get_updated_matrix()[1]

In [33]:
R_hat_init = modelDSGD_.get_updated_matrix()[2]
R_hat = np.dot(np.transpose(p), q)

In [34]:
rmse_init = RMSE(A, R_hat_init, id_x, id_y, nb_ratings)
print('-----')
print('Initial RMSE', rmse_init)
print()
rmse = RMSE(A, R_hat, id_x, id_y, nb_ratings)
print('-----')
print('RMSE', rmse)
print()
print('-----')
print('Gain:', rmse_init - rmse)

-----
Initial RMSE 3.703971102086721

-----
RMSE 1.1679263817010908

-----
Gain: 2.5360447203856302


## IV - FSPG Algorithm

In [35]:
from random import shuffle

def Random_Shuffle_Index(self):
    #shuffle raws
    indices = np.arange(self.shape[0])  
    shuffle(indices)
    #shuffled_matrix = matrix[list(indices)] 
    
    #shuffle columns
    columns = np.arange(self.shape[1])
    shuffle(columns)
    #shuffled_matrix = shuffled_matrix[list(indices)]

    return(np.array(indices),np.array(columns))

In [36]:
def Grid_Matrix_Sub_Matrix_index(self,s):
    nrow, ncol = self.shape # taille de la matrice de notation
    s=s#Number of spread
    inter_row = nrow//(s+1)
    inter_col = ncol//(s+1)
    dic_row = {}
    dic_col = {}
    for i in range(s):
        dic_row[i] = list(range(i*inter_row, (i+1)*inter_row))
        dic_col[i] = list(range(i*inter_col, (i+1)*inter_col))
    dic_row[i+1] = list(range((i+1)*inter_row, nrow))
    dic_col[i+1] = list(range((i+1)*inter_col, ncol))
    return(dic_row, dic_col)

In [37]:
def create_real_indexes_Sub_Matrix(self,s):
    dic_row, dic_col = Grid_Matrix_Sub_Matrix_index(self,s)
    real_raws, real_columns = Random_Shuffle_Index(self)
    
    for i in dic_row:
        dic_row[i] = real_raws[dic_row[i]]
        dic_col[i] = real_columns[dic_col[i]]
        dic_row[i]=np.sort(dic_row[i])
    return(dic_row,dic_col)

In [38]:
Visited_number=np.zeros((len(Sub_Matrix[0]),len(Sub_Matrix[1])))
structure_is_busy=np.zeros((len(Sub_Matrix[0]),len(Sub_Matrix[1])))

def Scheduler(Sub_Matrix, Visited_number, structure_is_busy):
    
    x_mins=np.where(Visited_number==Visited_number.min())[0]
    y_mins=np.where(Visited_number==Visited_number.min())[1]
    Couples_min=[x for x in zip(x_mins,y_mins)]

    x_non_used=np.where(structure_is_busy==0)[0]
    y_non_used=np.where(structure_is_busy==0)[1]
    Couples_non_used=[x for x in zip(x_non_used,y_non_used)]

    Block_to_utpdate=[x for x in Couples_min if x in Couples_non_used][0]
    Index_to_update=Sub_Matrix[0][Block_to_utpdate[0]]
    Colum_to_update=Sub_Matrix[1][Block_to_utpdate[1]]
    
    return(Index_to_update,Colum_to_update)

NameError: name 'Sub_Matrix' is not defined