In [78]:
#Importing Libraries

from sklearn import metrics
import pandas as pd
import numpy as np
from pprint import pprint
from sklearn.datasets import load_boston
from numpy.linalg import inv, pinv, LinAlgError

In [81]:
# Generating Random Data
n=25 # Number of Users
m=25 # Number of Items

R = np.zeros((n,m))
for i in range(n):
    for j in range(m):
        R[i][j] = np.random.choice(np.arange(0,11), p=[0.5,0.0,0.05,0.05,0.05,0.05,0.05,0.05,0.1,0.05,0.05])
        
# pprint(R)

In [82]:
# Initializing U and V
k = 5 # Weights of items
U = np.zeros((k,n))
V = np.zeros((k,m))

for i in range(k):
    for j in range(n):
        U[i][j] = np.random.choice(np.arange(0,11))

for i in range(k):
    for j in range(m):
        V[i][j] = np.random.choice(np.arange(0,11))

# pprint(U)
# pprint(V)

In [83]:
# Global Variables for Updation
U_temp = np.zeros((k, n))
V_temp = np.zeros((k, m))

### Implementation function of matrix store

<img src="Images/matrix_store.png">

In [93]:
## MATRIX STORE ##

def fetch_data(p, q, d):
    
    if p==0:
        start1 = 0
        end2 = d
    else:
        start1 = d*(p-1)
        end1 = d*p
        
    return U[:, start1:end1], V[:, d*(q-1):d*q]

def save_data(new_U, new_V, p, q, d):
    
    if p==0:
        start1 = 0
        end2 = d
    else:
        start1 = d*(p-1)
        end1 = d*p
        
    start2 = d*(q-1)
    end2 = d*q

    U[:, start1:end1] = new_U
    V[:, start2:end2] = new_V

### Implementation function of Worker in thread

<img src="Images/worker.png">

In [94]:
# Storing Random Values for Stream in a Dataframe
stream = []
for i in range(0,225):
    temp = []
    x = np.random.choice(np.arange(0,25))
    y = np.random.choice(np.arange(0,25))
    value = np.random.choice(np.arange(0,11))
    temp.append(x)
    temp.append(y)
    temp.append(value)
    stream.append(temp)
    
df = pd.DataFrame(stream)
df.to_csv("stream_data.csv", index=False, header=False)

In [4]:
# Stream Generator Function

def generateStream(chunk_size=1):
    
    for chunk in pd.read_csv('stream_data.csv', header=None, chunksize=chunk_size):
        chunk_array = chunk.values
        yield chunk_array
        

In [96]:
## WORKER ##

def worker(lock, R, U, V, p, q, d):
            
    U_req, V_req = fetch_data(p, q, d)
    
    reg = 0.1           # Penalty Parameter for regularization
    step_size = 0.001   # Step Size
        
    # For Random Selection
    seti = []
    setj = []
    for i in range(0,d):
        seti.append(i)
        setj.append(i)
        
    # generating random i and j
    np.random.shuffle(seti)
    np.random.shuffle(setj)
    
    #print("R.shape:",R.shape)
    #print("U.shape:",U.shape)
    #print("V.shape:",V.shape)
    #print("I.shape:",len(seti))
    #print("J.shape:",len(setj))
    
    for i in range(0,d):
        for j in range(0,d):

            ui = U_req[:,seti[i]]
            vj = V_req[:,setj[j]]
            val = np.dot(ui.T, vj)            # uiT . vj
            val = val - R[seti[i]][setj[j]]   # (uiT . vj)-Rij 

            vj1 = val*vj                      # ((uiT . vj)-Rij)*vj
            ui1 = (reg/m)*ui                  # lambda/m * ui

            temp1 = vj1 + ui1                 # ((uiT . vj)-Rij)*vj + lambda/m * ui
            ui_temp = (2*step_size)*temp1     # 2 * StepSize * (((uiT . vj)-Rij)*vj + lambda/m * ui)

            ui2 = val*ui                      # ((uiT . vj)-Rij)*ui
            vj2 = (reg/n)*vj                  # lambda/m * vj

            temp2 = ui2 + vj2                 # ((uiT . vj)-Rij)*ui + lambda/m * vj
            vj_temp = (2*step_size)*temp2     # 2 * StepSize * (((uiT . vj)-Rij)*vj + lambda/m * ui)

            U[:,seti[i]] = ui - ui_temp       # Update ui
            V[:,setj[j]] = vj - vj_temp       # Update vj

    
    #return U, V
    
    lock.acquire()
    save_data(U_req, V_req, p, q, d)
    lock.release()
    #print("Thread Work Complete! Values Updated!")

### Implementation function of Master

<img src="Images/master.png">

In [97]:
## MASTER ##

import threading

#Getting stream Data
sdata = generateStream(1)   # <-- Yeilds 1 pair of i, j and Rij
d = 5 # Blocks divided

# Denotes set of non empty blocks, initially all blocks are free/unlocked
S = []
for i in range(1,6):
    for j in range(1,6):
        temp = []
        temp.append(i)
        temp.append(j)
        S.append(temp)
        
lock = threading.Lock() # Defining Lock For Threading

# Printing the Error Before starting the algo
utv_before = np.dot(U.T, V)
print("MAE Before: ", metrics.mean_absolute_error(R, utv_before))
print("MSE Before: ", metrics.mean_squared_error(R, utv_before))

for data in sdata:
    
    # Updating R Matrix
    R[data[0][0],data[0][1]] = data[0][2]
    
    if len(S)!=0:
        
        # Choosing a random block i.e. p and q
        num  = np.random.choice(np.arange(0,len(S)))
        val = S[num]
        p = val[0]
        q = val[1]
        #val1 = np.random.choice(np.arange(0,d))
        #val2 = np.random.choice(np.arange(0,d))
        
        # Removing the blocks in same row i.e. in row p and q (Locking it)
        for blocks in S:
            if blocks[0]==p | blocks[1]==q:
                S.remove(blocks)
        
        start1 = d*(p-1)
        end1 = d*p
        start2 = d*(q-1)
        end2 = d*q        
        t = threading.Thread(target=worker, args=(lock,R[start1:end1, start2:end2], U[:, start1:end1], V[:, start2:end2], p, q, d)) 
        
        # Start the thread for worker
        t.start()
        
        # wait until it finishes the job
        t.join()
        
        # Unlock the locked blocks
        for i in range(1,6):
            temp = []
            temp.append(p)
            temp.append(i)
            S.append(temp)
            
        for i in range(1,6):
            temp = []
            temp.append(i)
            temp.append(q)
            S.append(temp)
            

# Printing the Error After Completions of all iterations
utv_after = np.dot(U.T, V)
print()
print("MAE After: ", metrics.mean_absolute_error(R, utv_after))
print("MSE After: ", metrics.mean_squared_error(R, utv_after))  
print("Re-Run the Initialization Block")

MAE Before:  120.2624
MSE Before:  17620.5344

MAE After:  3.399500515370336
MSE After:  18.251261867542336
Re-Run the Initialization Block
