In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import numpy as np
from numpy.linalg import inv
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession\
  .builder\
  .appName('scratch-ALS')\
  .getOrCreate()
sc = spark.sparkContext

In [None]:
data = sc.textFile("u.data")
M = data.map(
  lambda l: l.split('\t')
).map(
  lambda l: (int(l[0]), (int(l[1]), float(l[2])))
)


In [None]:
numWorkers = sc.defaultParallelism
numWorkers

2

In [None]:
def getRelativeIndex(value, index_list):
  return index_list[value]

In [None]:
def sortByRelativeIndex(user_or_item, input):
  if user_or_item == 'user':
    return input\
      .map(lambda x: x[1])\
      .distinct()\
      .sortBy(lambda x: x, ascending=True)\
      .zipWithIndex().collect()
  else: return input\
      .map(lambda x: x[2][1])\
      .distinct()\
      .sortBy(lambda x: x, ascending=True)\
      .zipWithIndex().collect()


In [None]:
def getBlock(user_or_item, ratings, sorted_users, sorted_items):
  if user_or_item == 'user':
    return ratings\
      .map(lambda x: (getRelativeIndex(x[0], sorted_users), getRelativeIndex(x[1][0], sorted_items)))\
      .groupByKey()
  else:
    return ratings\
      .map(lambda x: (getRelativeIndex(x[1][0], sorted_items), getRelativeIndex(x[0], sorted_users)))\
      .groupByKey()


In [None]:
sorted_users = dict(M.map(lambda x: x[0]).distinct().sortBy(lambda idx: idx, ascending = True)\
  .zipWithIndex().collect())

sorted_items = dict(M.map(lambda x: x[1][0]).distinct().sortBy(lambda idx: idx, ascending = True)\
  .zipWithIndex().collect())

item_count = len(sorted_items)
user_count = len(sorted_users)

In [None]:
numFactors = 10
W = np.matrix(np.random.rand(numFactors, len(sorted_users)))
H = np.matrix(np.random.rand(numFactors, len(sorted_items)) )

In [None]:
R_u = M.map(lambda x: (x[0], (x[1][0], x[1][1]))).cache()
R_i = M.map(lambda x: (x[1][0], (x[0], x[1][1]))).cache()

In [None]:
w_broadcast = sc.broadcast(W)
h_broadcast = sc.broadcast(H)

In [None]:
def computeOptimizeMatrix(iterables, constant_matrix_broadcast, lamb):
  fixed_matrix = constant_matrix_broadcast.value
  num_factors = fixed_matrix.shape[0]
  r_i = np.zeros((1, num_factors))
  iter_dict = dict(iterables)
  XtX = np.zeros((num_factors, num_factors))
  for i, val in enumerate(r_i[0]):
    if i in iter_dict:
      r_i[0, i] = iter_dict[i]
  updated_vector = (inv((fixed_matrix).dot(fixed_matrix.T) + lamb.value * np.eye(num_factors))).dot((fixed_matrix.dot(r_i.T)))
  return updated_vector

In [None]:
def computeOptimizeMatrix(iterables, constant_matrix_broadcast, lamb):
  fixed_matrix = constant_matrix_broadcast.value
  num_factors = fixed_matrix.shape[0]
  r_i = np.zeros((1, num_factors))
  iter_dict = dict(iterables)
  XtX = np.zeros((num_factors, num_factors))
  XXt = fixed_matrix.dot(fixed_matrix.T)
  XtX = np.zeros((numFactors, numFactors))
  RX = np.zeros((numFactors, 1))
  for i in iter_dict.items():
    index = i[0] - 1
    rating = i[1]
    C = H[:, [index]]
    RX += (rating) * C
    XtX += (C.dot(C.T))
  return np.linalg.solve(XtX + lamb.value * np.eye(numFactors),RX)

In [None]:
random_user = R_u.groupByKey().collect()[6]
iterables = list(random_user[1])
constant_matrix_broadcast = h_broadcast

fixed_matrix = constant_matrix_broadcast.value
num_factors = fixed_matrix.shape[0]

iter_dict = dict(iterables)
XtX = np.zeros((num_factors, num_factors))
XXt = fixed_matrix.dot(fixed_matrix.T)
XtX = np.zeros((numFactors, numFactors))
RX = np.zeros((numFactors, 1))
for i in iter_dict.items():
  index = sorted_items[i[0]]
  rating = i[1]
  C = H[:, [index]]
  RX += rating * C
  XtX += (C.dot(C.T))
np.linalg.solve(XtX, RX)

array([[0.71789419],
       [1.15859247],
       [0.75519049],
       [1.08549238],
       [0.3121845 ],
       [0.30759592],
       [0.30436116],
       [0.66523078],
       [1.04636745],
       [0.62074334]])

In [None]:
LAMBDA = 0.01   # regularization
np.random.seed(42)


def get_rmse(R, ms: np.ndarray, us: np.ndarray) -> np.float64:
    diff = R - ms * us.T
    return np.sqrt(np.sum(np.power(diff, 2)) / (M_count * U_count))


def update(i: int, mat: np.ndarray, ratings: np.ndarray) -> np.ndarray:
    uu = mat.shape[0]
    ff = mat.shape[1]

    XtX = mat.T * mat
    Xty = mat.T * ratings[i, :].T

    for j in range(ff):
        XtX[j, j] += LAMBDA * uu

    return np.linalg.solve(XtX, Xty)


In [None]:
lamb = sc.broadcast(0.01)

In [None]:
newW = R_u.groupByKey()\
  .mapValues(lambda row:computeOptimizeMatrix(row,h_broadcast,lamb))\
  .sortByKey()\
  .map(lambda data: data[1])\
  .collect()


In [None]:
print(W.shape)
print(H.shape)

(10, 943)
(10, 1682)


In [None]:
random_rating = M.filter(lambda x: x[0]==1 and x[1][0]==7).collect()[0]

In [None]:
W[:, [0]].T.dot(H[:, [6]])[0][0]

matrix([[1.87418619]])

In [None]:
def get_error_square(rating, i, j):
  pred = W[:, [i]].T.dot(H[:, [j]])
  return (rating - pred)**2

In [None]:
get_error_square(random_rating[1][1], sorted_users[random_rating[0]], sorted_items[random_rating[1][0]])

matrix([[4.51908435]])

In [None]:
ITERATIONS = 50
for i in range(ITERATIONS):
  newW = R_u.groupByKey()\
    .mapValues(lambda row:computeOptimizeMatrix(row,h_broadcast,lamb))\
    .sortByKey()\
    .map(lambda data: data[1])\
    .collect()
  W = np.array(list(map(lambda x: x.flatten(), newW))).T
  w_broadcast.destroy()
  w_broadcast = sc.broadcast(W)
  newH = R_i.groupByKey()\
    .mapValues(lambda row:computeOptimizeMatrix(row,w_broadcast,lamb))\
    .sortByKey()\
    .map(lambda data: data[1])\
    .collect()
  H = np.array(list(map(lambda x: x.flatten(), newH))).T
  h_broadcast.destroy()
  h_broadcast = sc.broadcast(H)
  sse = M.map(lambda x: get_error_square(x[1][1], sorted_users[x[0]], sorted_items[x[1][0]])).reduce(lambda x,y: x+y)[0,0]
  count = M.count()
  mse = pow((sse/count), 0.5)
  print("Iteration %d:" % i)
  print("\nRMSE: %5.4f\n" % mse)


Iteration 0:

RMSE: 1.7618

Iteration 1:

RMSE: 1.3727

Iteration 2:

RMSE: 2.5061

Iteration 3:

RMSE: 1.6994

Iteration 4:

RMSE: 2.7745

Iteration 5:

RMSE: 1.9254

Iteration 6:

RMSE: 3.0209

Iteration 7:

RMSE: 1.9365

Iteration 8:

RMSE: 2.9850

Iteration 9:

RMSE: 1.9295

Iteration 10:

RMSE: 2.9492

Iteration 11:

RMSE: 2.0464

Iteration 12:

RMSE: 3.0120

Iteration 13:

RMSE: 2.0495

Iteration 14:

RMSE: 3.0679

Iteration 15:

RMSE: 2.0480

Iteration 16:

RMSE: 2.8640

Iteration 17:

RMSE: 1.9627

Iteration 18:

RMSE: 3.0204

Iteration 19:

RMSE: 2.2307

Iteration 20:

RMSE: 2.8294

Iteration 21:

RMSE: 2.2853

Iteration 22:

RMSE: 2.6405

Iteration 23:

RMSE: 2.2580

Iteration 24:

RMSE: 2.6589

Iteration 25:

RMSE: 2.2301

Iteration 26:

RMSE: 2.8180

Iteration 27:

RMSE: 2.0106

Iteration 28:

RMSE: 2.6996

Iteration 29:

RMSE: 2.3213

Iteration 30:

RMSE: 2.6186

Iteration 31:

RMSE: 2.0928

Iteration 32:

RMSE: 2.6017

Iteration 33:

RMSE: 2.2111

Iteration 34:

RMSE: 2.6

In [None]:
np.array(list(map(lambda x: x.flatten(), newW))).T.shape

In [None]:
M_count = len(sorted_items)
U_count = len(sorted_users)
F = numFactors
partitions = numWorkers
ITERATIONS = 2

In [None]:
R = np.zeros((M_count, U_count))
rating_rdd = M.map(lambda x: (x[0], x[1][0], x[1][1])).collect()


In [None]:
for rating in rating_rdd:
  R[sorted_items[rating[1]], sorted_users[rating[0]]] = rating[2]

In [None]:
R = np.matrix(R)

In [None]:
ms = np.matrix(np.random.rand(M_count, F))
us = np.matrix(np.random.rand(U_count, F))

Rb = sc.broadcast(R)
msb = sc.broadcast(ms)
usb = sc.broadcast(us)



for i in range(ITERATIONS):
    ms_ = sc.parallelize(range(M), partitions) \
        .map(lambda x: update(x, usb.value, Rb.value)) \
        .collect()
    # collect() returns a list, so array ends up being
    # a 3-d array, we take the first 2 dims for the matrix
    ms = np.matrix(np.array(ms_)[:, :, 0])
    msb = sc.broadcast(ms)

    us_ = sc.parallelize(range(U), partitions) \
        .map(lambda x: update(x, msb.value, Rb.value.T)) \
        .collect()
    us = np.matrix(np.array(us_)[:, :, 0])
    usb = sc.broadcast(us)

    error = rmse(R, ms, us)
    print("Iteration %d:" % i)
    print("\nRMSE: %5.4f\n" % error)


In [None]:
def get_error_square(rating, i, j):
  pred = us[[i], :].dot(ms[[j], :].T)[0][0]
  return (rating - pred)**2

ITERATIONS = 100
for i in range(ITERATIONS):
    ms_ = sc.parallelize(range(M_count), partitions) \
        .map(lambda x: update(x, usb.value, Rb.value)) \
        .collect()
    # collect() returns a list, so array ends up being
    # a 3-d array, we take the first 2 dims for the matrix
    ms = np.matrix(np.array(ms_)[:, :, 0])
    msb = sc.broadcast(ms)

    us_ = sc.parallelize(range(U_count), partitions) \
        .map(lambda x: update(x, msb.value, Rb.value.T)) \
        .collect()
    us = np.matrix(np.array(us_)[:, :, 0])
    usb = sc.broadcast(us)

    
    sse = M.map(lambda x: get_error_square(x[1][1], sorted_users[x[0]], sorted_items[x[1][0]])).reduce(lambda x,y: x+y)[0, 0]
    count = M.count()
    rmse = pow(sse/count, 0.5)
    error = get_rmse(R, ms, us)
    print("Iteration %d:" % i)
    print("\nRMSE: %5.4f" % error)
    print("\nGlobal RMSE: %5.4f\n" % rmse)


Iteration 0:

RMSE: 0.6796

Global RMSE: 2.3169

Iteration 1:

RMSE: 0.6796

Global RMSE: 2.3168

Iteration 2:

RMSE: 0.6796

Global RMSE: 2.3168

Iteration 3:

RMSE: 0.6796

Global RMSE: 2.3167

Iteration 4:

RMSE: 0.6796

Global RMSE: 2.3166

Iteration 5:

RMSE: 0.6796

Global RMSE: 2.3166

Iteration 6:

RMSE: 0.6796

Global RMSE: 2.3165

Iteration 7:

RMSE: 0.6796

Global RMSE: 2.3164

Iteration 8:

RMSE: 0.6796

Global RMSE: 2.3164

Iteration 9:

RMSE: 0.6796

Global RMSE: 2.3163

Iteration 10:

RMSE: 0.6796

Global RMSE: 2.3163

Iteration 11:

RMSE: 0.6796

Global RMSE: 2.3163

Iteration 12:

RMSE: 0.6796

Global RMSE: 2.3162

Iteration 13:

RMSE: 0.6796

Global RMSE: 2.3162

Iteration 14:

RMSE: 0.6796

Global RMSE: 2.3162

Iteration 15:

RMSE: 0.6796

Global RMSE: 2.3161

Iteration 16:

RMSE: 0.6796

Global RMSE: 2.3161

Iteration 17:

RMSE: 0.6796

Global RMSE: 2.3161

Iteration 18:

RMSE: 0.6796

Global RMSE: 2.3161

Iteration 19:

RMSE: 0.6796

Global RMSE: 2.3160

Iteration 

In [None]:
usb.value.shape[1]

In [None]:
def get_error_square(rating, i, j):
  pred = us[:, [i]].T.dot(ms[:, [j]])[0][0]
  return (rating - pred)**2

In [None]:
ms_ = sc.parallelize(range(M_count), partitions) \
    .map(lambda x: update(x, usb.value, Rb.value)) \
    .collect()
# collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
ms = np.matrix(np.array(ms_)[:, :, 0])
msb = sc.broadcast(ms)

us_ = sc.parallelize(range(U_count), partitions) \
    .map(lambda x: update(x, msb.value, Rb.value.T)) \
    .collect()
us = np.matrix(np.array(us_)[:, :, 0])
usb = sc.broadcast(us)
mse = get_rmse(M, ms, us)
error = rmse(R, ms, us)
print("Iteration %d:" % i)
print("\nErr: %5.4f\n" % error)
print("\nRMSE: %5.4f\n" % rmse)


TypeError: ignored

In [None]:
R

In [None]:
XtX = usb.value.T * usb.value
inv(XtX + 0.01 * np.eye(F)) * Xty

In [None]:
Xty = usb.value.T * Rb.value[6, :].T
Xty