# SAR Deep Dive with Spark and SQL

In this example, we will walkthrough each step of the SAR algorithm with an implementation using Spark and SQL.

Smart Adaptive Recommendations (SAR) is a fast, scalable, adaptive algorithm for personalized recommendations based on user transaction history and item descriptions. It is powered by understanding the **similarity** between items, and recommending similar items to ones a user has an existing **affinity** for. 

# 0 Global Variables and Imports

In [1]:
# specify parameters
TOP_K=2
RECOMMEND_SEEN=True
# options are 'jaccard', 'lift' or '' to skip and use item cooccurrence directly
SIMILARITY='jaccard'

import pandas as pd
import numpy as np
import heapq
import pyspark.sql.functions as F
import sys
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, Row, ArrayType, IntegerType, FloatType

print("System version: {}".format(sys.version))
print("Pandas version: {}".format(pd.__version__))
print("PySpark version: {}".format(pyspark.__version__))

System version: 3.6.5 |Anaconda, Inc.| (default, Mar 29 2018, 13:32:41) [MSC v.1900 64 bit (AMD64)]
Pandas version: 0.23.0


NameError: name 'pyspark' is not defined

# 1 Load Data

We'll work with a small dataset here containing customer IDs, item IDs, and the customer's rating for the item. SAR requires inputs to be of the following schema: `<User ID>, <Item ID>, <Time>, [<Event Type>], [<Event Weight>]` (we will not use time or event type in the example below, and `rating` will be used as the `Event Weight`). 

In [2]:
# There are two versions of the dataframes - the numeric version and the alphanumeric one:
# they both have similar test data for top-2 recommendations and illustrate the indexing approaches to matrix multiplication on SQL
d_train = {
'customerID': [1,1,1,2,2,3,3],
'itemID':     [1,2,3,4,5,6,1],
'rating':     [5,5,5,1,1,3,5]
}
pdf_train = pd.DataFrame(d_train)
d_test = {
'customerID': [1,1,2,2,3,3],
'itemID':     [4,5,1,5,6,1],
'rating':     [1,1,5,5,5,5]
}
pdf_test = pd.DataFrame(d_test)

  return f(*args, **kwds)


In [3]:
a_train = np.array([[5,5,5,0,0,0],\
                    [0,0,0,1,1,0],
                    [5,0,0,0,0,3]])
print(a_train)
print(a_train.shape)

[[5 5 5 0 0 0]
 [0 0 0 1 1 0]
 [5 0 0 0 0 3]]
(3, 6)


In [4]:
d_alnum_train = {
'customerID': ['ua','ua','ua','ub','ub','uc','uc'],
'itemID':     ['ia','ib','ic','id','ie','if','ia'],
'rating':     [5,5,5,1,1,3,5]
}
#pdf_train = pd.DataFrame(d_alnum_train)
pdf_train = pd.DataFrame(d_train)
d_alnum_test = {
'customerID': ['ua','ua','ub','ub','uc','uc'],
'itemID':     ['id','ie','ia','ie','if','ia'],
'rating':     [1,1,5,5,5,5]
}
#pdf_test = pd.DataFrame(d_alnum_test)
pdf_test = pd.DataFrame(d_test)
pdf_test.head(10)

Unnamed: 0,customerID,itemID,rating
0,1,4,1
1,1,5,1
2,2,1,5
3,2,5,5
4,3,6,5
5,3,1,5


### Set up Spark context

The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. 

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("SAR pySpark") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g")\
    .config("spark.executor.cores", "32")\
    .config("spark.executor.memory", "8g")\
    .config("spark.yarn.executor.memoryOverhead", "3g")\
    .config("spark.memory.fraction", "0.9")\
    .config("spark.memory.stageFraction", "0.3")\
    .config("spark.executor.instances", 1)\
    .config("spark.executor.heartbeatInterval", "36000s")\
    .config("spark.network.timeout", "10000000s")\
    .config("spark.driver.maxResultSize", "50g")\
    .getOrCreate()

In [6]:
df = spark.createDataFrame(pdf_train).withColumn("type", F.lit(1))
df_test = spark.createDataFrame(pdf_test).withColumn("type", F.lit(0))
df.show()

+----------+------+------+----+
|customerID|itemID|rating|type|
+----------+------+------+----+
|         1|     1|     5|   1|
|         1|     2|     5|   1|
|         1|     3|     5|   1|
|         2|     4|     1|   1|
|         2|     5|     1|   1|
|         3|     6|     3|   1|
|         3|     1|     5|   1|
+----------+------+------+----+



# 2 Index the user and item IDs

Map user and item alphanumeric IDs to matrix indices.

In [7]:
n_train = df.count()
df_all = df.union(df_test)
df_all.createOrReplaceTempView("df_all")
query = """
SELECT customerid,
       Dense_rank()
         OVER(
           partition BY 1
           ORDER BY customerid) AS row_id,
       itemid,
       Dense_rank()
         OVER(
           partition BY 1
           ORDER BY itemid)     AS col_id,
       rating,
       type
FROM   df_all 
"""
df_all = spark.sql(query)
df_all.createOrReplaceTempView("df_all")
customer_index2ID = dict(df_all.select(["row_id", "customerID"]).rdd.reduceByKey(lambda k, v: v).collect())
item_index2ID = dict(df_all.select(["col_id", "itemID"]).rdd.reduceByKey(lambda k, v: v).collect())
df_all.show()

+----------+------+------+------+------+----+
|customerid|row_id|itemid|col_id|rating|type|
+----------+------+------+------+------+----+
|         1|     1|     1|     1|     5|   1|
|         1|     1|     2|     2|     5|   1|
|         1|     1|     3|     3|     5|   1|
|         1|     1|     4|     4|     1|   0|
|         1|     1|     5|     5|     1|   0|
|         2|     2|     1|     1|     5|   0|
|         2|     2|     4|     4|     1|   1|
|         2|     2|     5|     5|     1|   1|
|         2|     2|     5|     5|     5|   0|
|         3|     3|     1|     1|     5|   1|
|         3|     3|     1|     1|     5|   0|
|         3|     3|     6|     6|     3|   1|
|         3|     3|     6|     6|     5|   0|
+----------+------+------+------+------+----+



# 3 Compute Item Co-occurrence

Central to how SAR defines similarity is an item-to-item ***co-occurrence matrix***. Co-occurrence is defined as the number of times two items appear together for a given user.  We can represent the co-occurrence of all items as a $mxm$ matrix $C$, where $c_{i,j}$   is the number of times item $i$ occurred with item $j$.

The co-occurence matric $C$ has the following properties:
- It is symmetric, so $c_{i,j} = c_{j,i}$
- It is nonnegative: $c_{i,j} >= 0$
- The occurrences are at least as large as the co-occurrences. I.e, the largest element for each row (and column) is on the main diagonal: $∀(i,j) C_{i,i},C_{j,j}>=C_{i,j}$.

In [8]:
query = """
SELECT row_id,
       col_id,
       rating
FROM   df_all
WHERE  type = 1 
"""
df = spark.sql(query)
df.createOrReplaceTempView("df_train")
df.show()
df_transpose = spark.sql("select col_id as row_id, row_id as col_id, rating from df_train")
df_transpose.createOrReplaceTempView("df_train_transpose")
df_transpose.show()

+------+------+------+
|row_id|col_id|rating|
+------+------+------+
|     1|     1|     5|
|     1|     2|     5|
|     1|     3|     5|
|     2|     4|     1|
|     2|     5|     1|
|     3|     1|     5|
|     3|     6|     3|
+------+------+------+

+------+------+------+
|row_id|col_id|rating|
+------+------+------+
|     1|     1|     5|
|     2|     1|     5|
|     3|     1|     5|
|     4|     2|     1|
|     5|     2|     1|
|     1|     3|     5|
|     6|     3|     3|
+------+------+------+



In [9]:
query = """
SELECT A.row_id AS row_item_id,
       B.col_id AS col_item_id,
       Sum(1)   AS value
FROM   df_train_transpose A
       INNER JOIN df_train B
               ON A.col_id = B.row_id
GROUP  BY A.row_id,
          B.col_id
"""
item_cooccurrence = spark.sql(query)
item_cooccurrence.createOrReplaceTempView("item_cooccurrence")
item_cooccurrence.show()
print(item_cooccurrence.count())

+-----------+-----------+-----+
|row_item_id|col_item_id|value|
+-----------+-----------+-----+
|          6|          1|    1|
|          3|          1|    1|
|          2|          2|    1|
|          2|          3|    1|
|          1|          2|    1|
|          1|          1|    2|
|          1|          3|    1|
|          5|          4|    1|
|          3|          3|    1|
|          2|          1|    1|
|          3|          2|    1|
|          4|          4|    1|
|          6|          6|    1|
|          1|          6|    1|
|          4|          5|    1|
|          5|          5|    1|
+-----------+-----------+-----+

16


In [10]:
indicator = a_train.copy()
indicator[indicator>0]=1
item_cooccurrence = indicator.T.dot(indicator)
print (item_cooccurrence)
print ((item_cooccurrence>0).sum())

[[2 1 1 0 0 1]
 [1 1 1 0 0 0]
 [1 1 1 0 0 0]
 [0 0 0 1 1 0]
 [0 0 0 1 1 0]
 [1 0 0 0 0 1]]
16


# 4 Compute Item Similarity


Once we have a co-occurrence matrix, an ***item similarity matrix*** $S$ can be obtained by rescaling the co-occurrences according to a given metric. Options for the metric include Jaccard, lift, and counts (meaning no rescaling).

The rescaling formula for Jaccard is $s_{ij}=c_{ij} / (c_{ii}+c_{jj}-c_{ij})$

and that for lift is $s_{ij}=c_{ij}/(c_{ii}*c_{jj})$

where $c_{ii}$ and $c_{jj}$ are the $i$th and $j$th diagonal elements of $C$. In general, using counts as a similarity metric favours predictability, meaning that the most popular items will be recommended most of the time. Lift by contrast favours discoverability/serendipity: an item that is less popular overall but highly favoured by a small subset of users is more likely to be recommended. Jaccard is a compromise between the two.


In [7]:
# show to who to compute Jaccard
diag = item_cooccurrence.diagonal()
diag_rows = np.expand_dims(diag, axis=0)
diag_cols = np.expand_dims(diag, axis=1)
# this essentially does vstack(diag_rows).T + vstack(diag_rows) - cooccurrence
denom = diag_rows + diag_cols - item_cooccurrence
jaccard = item_cooccurrence / denom
print ("Jaccard")
print (jaccard)

NameError: name 'item_cooccurrence' is not defined

In [12]:
if SIMILARITY is 'jaccard' or SIMILARITY is 'lift':
    query = """
    SELECT A.row_item_id AS i,
           A.value       AS d
    FROM   item_cooccurrence A
    WHERE  A.row_item_id = A.col_item_id 
    """
    diagonal = spark.sql(query)
    diagonal.createOrReplaceTempView("diagonal")

In [13]:
similarity = None
if SIMILARITY is "jaccard":
    query = """
    SELECT A.row_item_id,
           A.col_item_id,
           ( A.value / ( B.d + C.d - A.value ) ) AS value
    FROM   item_cooccurrence AS A,
           diagonal AS B,
           diagonal AS C
    WHERE  A.row_item_id = B.i
           AND A.col_item_id = C.i 
    """
    similarity = spark.sql(query)
elif SIMILARITY is 'lift':
    query = """
    SELECT A.row_item_id,
           A.col_item_id,
           ( A.value / ( B.d * C.d ) ) AS value
    FROM   item_cooccurrence AS A,
           diagonal AS B,
           diagonal AS C
    WHERE  A.row_item_id = B.i
           AND A.col_item_id = C.i 
    """
    similarity = spark.sql(query)
else:
    similarity = item_cooccurrence
similarity.createOrReplaceTempView("item_similarity")
similarity.show()

+-----------+-----------+-----+
|row_item_id|col_item_id|value|
+-----------+-----------+-----+
|          1|          1|  1.0|
|          6|          1|  0.5|
|          3|          1|  0.5|
|          2|          1|  0.5|
|          1|          6|  0.5|
|          6|          6|  1.0|
|          1|          3|  0.5|
|          3|          3|  1.0|
|          2|          3|  1.0|
|          5|          5|  1.0|
|          4|          5|  1.0|
|          5|          4|  1.0|
|          4|          4|  1.0|
|          1|          2|  0.5|
|          3|          2|  1.0|
|          2|          2|  1.0|
+-----------+-----------+-----+



# 5 Compute User Affinity Scores

The affinity matrix in SAR captures the strength of the relationship between each individual user and each item. The event types and weights are used in computing this matrix: different event types (such as “rate” vs “view”) should be allowed to have an impact on a user’s affinity for an item. Similarly, the time of a transaction should have an impact; an event that takes place in the distant past can be thought of as being less important in determining the affinity.

Combining these effects gives us an expression for user-item affinity:
$a_{ij}=Σ_k (w_k exp[-log_2((t_0-t_k)/T)] $

where the affinity for user $i$ and item $j$ is the sum of all events involving user $i$ and item $j$, and $w_k$ is the weight of event $k$. The presence of the  $log_{2}$ factor means that the parameter $T$ in the exponential decay term can be treated as a half-life: events this far before the reference date $t_0$ will be given half the weight as those taking place at $t_0$. 

Repeating this computation for all $n$ users and $m$ items results in an $nxm$ matrix $A$.
Simplifications of the above expression can be obtained by setting all the weights equal to 1 (effectively ignoring event types), or by setting the half-life parameter $T$ to infinity (ignoring transaction times).


In [14]:
query = """
SELECT A.row_id                AS row_user_id,
       B.col_item_id,
       Sum(A.rating * B.value) AS score
FROM   df_train A
       INNER JOIN item_similarity B
               ON A.col_id = B.row_item_id
GROUP  BY A.row_id,
          B.col_item_id 
"""
scores = spark.sql(query)
scores.show()
scores.count()


+-----------+-----------+-----+
|row_user_id|col_item_id|score|
+-----------+-----------+-----+
|          3|          1|  6.5|
|          1|          2| 12.5|
|          1|          1| 10.0|
|          1|          3| 12.5|
|          2|          5|  2.0|
|          3|          3|  2.5|
|          2|          4|  2.0|
|          3|          6|  5.5|
|          3|          2|  2.5|
|          1|          6|  2.5|
+-----------+-----------+-----+



10

# 6 Remove Seen Items

Optionally we remove items which have already been seen in the training set, i.e. don't recommend items which have been previously bought by the user again.

In [15]:
if not RECOMMEND_SEEN:
    print ("Removing seen items")
    masked_scores = scores\
        .join(df, (scores.row_user_id == df.row_id) & (scores.col_item_id == df.col_id), "left_outer")    
    masked_scores.show()
    # now since training set is smaller, we have nulls under its value column, i.e. item is not in the
    # training set
    masked_scores = \
        masked_scores.withColumn("rating", F.when(F.col('rating').isNull(), F.col('score')).otherwise(0))
else:
    print ("Keeping seen items")
    scores.createOrReplaceTempView("scores")
    masked_scores = spark.sql("select row_user_id, col_item_id, score as rating from scores")
masked_scores.show()

Keeping seen items
+-----------+-----------+------+
|row_user_id|col_item_id|rating|
+-----------+-----------+------+
|          3|          1|   6.5|
|          1|          2|  12.5|
|          1|          1|  10.0|
|          1|          3|  12.5|
|          2|          5|   2.0|
|          3|          3|   2.5|
|          2|          4|   2.0|
|          3|          6|   5.5|
|          3|          2|   2.5|
|          1|          6|   2.5|
+-----------+-----------+------+



# 7 Top-K Item Calculation

The personalized recommendations for a set of users can then be obtained by multiplying the affinity matrix by the similarity matrix. The result is an recommendation score matrix, with one row per user / item pair; higher scores correspond to more strongly recommended items.

This is the unoptimized way of performing top-K on Spark - although this is very readable:

In [16]:
window = Window.partitionBy(masked_scores["row_user_id"]).orderBy(masked_scores["rating"].desc())
#top_scores =\
#    masked_scores.select("*", F.rank().over(window).alias("top")).filter(F.col("top")<=TOP_K)
top_scores =\
    masked_scores.select("*", F.row_number().over(window).alias("top")).filter(F.col("top")<=TOP_K)
top_scores.show()
top_scores.count()

+-----------+-----------+------+---+
|row_user_id|col_item_id|rating|top|
+-----------+-----------+------+---+
|          1|          2|  12.5|  1|
|          1|          3|  12.5|  2|
|          3|          1|   6.5|  1|
|          3|          6|   5.5|  2|
|          2|          5|   2.0|  1|
|          2|          4|   2.0|  2|
+-----------+-----------+------+---+



6

### 7.1 Optimized Top-K Item Calculation

In [17]:
# pivot the ratings by user and item
pivoted_scores = masked_scores.withColumn("combined", F.struct("col_item_ID", "rating"))\
    .groupBy("row_user_id").agg(F.collect_list(F.col("combined")).alias("tuples"))
pivoted_scores.show()



+-----------+--------------------+
|row_user_id|              tuples|
+-----------+--------------------+
|          1|[[2, 12.5], [1, 1...|
|          3|[[1, 6.5], [3, 2....|
|          2|[[5, 2.0], [4, 2.0]]|
+-----------+--------------------+



In [18]:
# sort the items by their scores
def wrapped_fun(tuples, params):
    """
    Use heapq to sort the items by ratings for each user - complexity analysis provided here:
    
    """
    # TODO: can add params here if needed
    n, sort_key = params
    print(n, sort_key)
    return heapq.nlargest(n, tuples, key = lambda l: l[sort_key])

# wraps the above function so that we can pass in parameters in UDF
def udf_wrapper_fun(params):
    # notice that if needed, this can also pass in user_ID to the create_random_ratings function
    schema = ArrayType(StructType((StructField("ItemID", StringType()),
                                   StructField("Rating", FloatType()))))

    return F.udf(lambda tuples: wrapped_fun(tuples, params), schema)


In [19]:
# top 2 items and 1 as sort key index
params = (TOP_K, 1)
sorted_pivoted_scores = pivoted_scores.withColumn("tuples", udf_wrapper_fun(params)(F.col("tuples")))
sorted_pivoted_scores.show()

+-----------+--------------------+
|row_user_id|              tuples|
+-----------+--------------------+
|          1|[[2, 12.5], [3, 1...|
|          3|[[1, 6.5], [6, 5.5]]|
|          2|[[5, 2.0], [4, 2.0]]|
+-----------+--------------------+



In [20]:
exploded_df = sorted_pivoted_scores.select("*", F.explode("tuples").alias("exploded_tuples"))
exploded_df = exploded_df\
.withColumn("ItemID", F.col("exploded_tuples").getItem("ItemID"))\
.withColumn("Rating", F.col("exploded_tuples").getItem("Rating"))

top_scores = exploded_df.drop("tuples").drop("exploded_tuples")
top_scores.show()

+-----------+------+------+
|row_user_id|ItemID|Rating|
+-----------+------+------+
|          1|     2|  12.5|
|          1|     3|  12.5|
|          3|     1|   6.5|
|          3|     6|   5.5|
|          2|     5|   2.0|
|          2|     4|   2.0|
+-----------+------+------+



## Predict Method

Please note that certain Machine Learning frameworks require a recommendation algorithm to have a .predict method. SAR is a ranker - it uses internally-generated SAR scores to rank the items for each user. We can output these SAR scores as a proxy for the rating - the higher the better - however we obviously cannot directly interpret those as ratings.

The proxy is simple - we just subset the full set of scores on user-item pairs found in the test set. This is easy to do on Spark.

In [21]:
query = """
SELECT row_id,
       col_id,
       rating
FROM   df_all
WHERE  type = 0 
"""
df_test = spark.sql(query)
df_test.show()

+------+------+------+
|row_id|col_id|rating|
+------+------+------+
|     1|     4|     1|
|     1|     5|     1|
|     2|     1|     5|
|     2|     5|     5|
|     3|     1|     5|
|     3|     6|     5|
+------+------+------+



In [22]:
predictions = df_test.join(
    masked_scores, 
    (df_test.row_id==masked_scores.row_user_id) & (df_test.col_id==masked_scores.col_item_id),
    "inner")

In [23]:
predictions.show()

+------+------+------+-----------+-----------+------+
|row_id|col_id|rating|row_user_id|col_item_id|rating|
+------+------+------+-----------+-----------+------+
|     3|     1|     5|          3|          1|   6.5|
|     2|     5|     5|          2|          5|   2.0|
|     3|     6|     5|          3|          6|   5.5|
+------+------+------+-----------+-----------+------+

