## Collaborative filtering

Nemanja Kostadinovic

In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
data = spark.read.csv('/usr/local/Cellar/apache-spark/3.0.1/libexec/data/mllib/als/test.data', sep=',')

In [3]:
data = data.select(*(col(c).cast("int").alias(c) for c in data.columns))

In [4]:
data = data.selectExpr("_c0 as user", "_c1 as item", "_c2 as rate")

In [5]:
data.collect()

[Row(user=1, item=1, rate=5),
 Row(user=1, item=2, rate=1),
 Row(user=1, item=3, rate=5),
 Row(user=1, item=4, rate=1),
 Row(user=2, item=1, rate=5),
 Row(user=2, item=2, rate=1),
 Row(user=2, item=3, rate=5),
 Row(user=2, item=4, rate=1),
 Row(user=3, item=1, rate=1),
 Row(user=3, item=2, rate=5),
 Row(user=3, item=3, rate=1),
 Row(user=3, item=4, rate=5),
 Row(user=4, item=1, rate=1),
 Row(user=4, item=2, rate=5),
 Row(user=4, item=3, rate=1),
 Row(user=4, item=4, rate=5)]

In [10]:
model = ALS(
         userCol="user", 
         itemCol="item",
         ratingCol="rate").fit(data)

In [11]:
predictions = model.transform(data)

In [12]:
predictions = predictions.sort('user','item')

In [13]:
predictions.collect()

[Row(user=1, item=1, rate=5, prediction=4.847231864929199),
 Row(user=1, item=2, rate=1, prediction=1.0190666913986206),
 Row(user=1, item=3, rate=5, prediction=4.847231864929199),
 Row(user=1, item=4, rate=1, prediction=1.0190666913986206),
 Row(user=2, item=1, rate=5, prediction=4.847231864929199),
 Row(user=2, item=2, rate=1, prediction=1.0190666913986206),
 Row(user=2, item=3, rate=5, prediction=4.847231864929199),
 Row(user=2, item=4, rate=1, prediction=1.0190666913986206),
 Row(user=3, item=1, rate=1, prediction=1.0180920362472534),
 Row(user=3, item=2, rate=5, prediction=4.852105617523193),
 Row(user=3, item=3, rate=1, prediction=1.0180920362472534),
 Row(user=3, item=4, rate=5, prediction=4.852105617523193),
 Row(user=4, item=1, rate=1, prediction=1.0180920362472534),
 Row(user=4, item=2, rate=5, prediction=4.852105617523193),
 Row(user=4, item=3, rate=1, prediction=1.0180920362472534),
 Row(user=4, item=4, rate=5, prediction=4.852105617523193)]

In [14]:
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rate", 
           predictionCol="prediction") 

In [15]:
RMSE = evaluator.evaluate(predictions)
print(RMSE)

0.10712342409303925


In [16]:
users_pred = model.recommendForAllUsers(10)

In [17]:
users_pred.collect()

[Row(user=1, recommendations=[Row(item=3, rating=4.847231864929199), Row(item=1, rating=4.847231864929199), Row(item=2, rating=1.0190666913986206), Row(item=4, rating=1.0190666913986206)]),
 Row(user=3, recommendations=[Row(item=2, rating=4.852105617523193), Row(item=4, rating=4.852105617523193), Row(item=1, rating=1.0180920362472534), Row(item=3, rating=1.0180920362472534)]),
 Row(user=4, recommendations=[Row(item=2, rating=4.852105617523193), Row(item=4, rating=4.852105617523193), Row(item=1, rating=1.0180920362472534), Row(item=3, rating=1.0180920362472534)]),
 Row(user=2, recommendations=[Row(item=3, rating=4.847231864929199), Row(item=1, rating=4.847231864929199), Row(item=2, rating=1.0190666913986206), Row(item=4, rating=1.0190666913986206)])]

In [18]:
item_pred = model.recommendForAllItems(10)
item_pred.collect()

[Row(item=1, recommendations=[Row(user=1, rating=4.847231864929199), Row(user=2, rating=4.847231864929199), Row(user=3, rating=1.0180920362472534), Row(user=4, rating=1.0180920362472534)]),
 Row(item=3, recommendations=[Row(user=1, rating=4.847231864929199), Row(user=2, rating=4.847231864929199), Row(user=3, rating=1.0180920362472534), Row(user=4, rating=1.0180920362472534)]),
 Row(item=4, recommendations=[Row(user=3, rating=4.852105617523193), Row(user=4, rating=4.852105617523193), Row(user=1, rating=1.0190666913986206), Row(user=2, rating=1.0190666913986206)]),
 Row(item=2, recommendations=[Row(user=3, rating=4.852105617523193), Row(user=4, rating=4.852105617523193), Row(user=1, rating=1.0190666913986206), Row(user=2, rating=1.0190666913986206)])]

## FPGrowth 

In [19]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import split

In [20]:
data = spark.read.text('/usr/local/Cellar/apache-spark/3.0.1/libexec/data/mllib/sample_fpgrowth.txt')

In [21]:
type(data)

pyspark.sql.dataframe.DataFrame

In [22]:
data.collect()

[Row(value='r z h k p'),
 Row(value='z y x w v u t s'),
 Row(value='s x o n r'),
 Row(value='x z y m t s q e'),
 Row(value='z'),
 Row(value='x z y r q t p')]

In [23]:
data = data.withColumn("value", split(col("value"), " ").cast("array<string>"))

In [24]:
data.collect()

[Row(value=['r', 'z', 'h', 'k', 'p']),
 Row(value=['z', 'y', 'x', 'w', 'v', 'u', 't', 's']),
 Row(value=['s', 'x', 'o', 'n', 'r']),
 Row(value=['x', 'z', 'y', 'm', 't', 's', 'q', 'e']),
 Row(value=['z']),
 Row(value=['x', 'z', 'y', 'r', 'q', 't', 'p'])]

In [25]:
model = FPGrowth(itemsCol='value',minSupport=0.2, numPartitions=10).fit(data)

In [26]:
model.freqItemsets.show()

+------------+----+
|       items|freq|
+------------+----+
|         [z]|   5|
|         [x]|   4|
|      [x, z]|   3|
|         [y]|   3|
|      [y, x]|   3|
|   [y, x, z]|   3|
|      [y, z]|   3|
|         [r]|   3|
|      [r, x]|   2|
|      [r, z]|   2|
|         [s]|   3|
|      [s, y]|   2|
|   [s, y, x]|   2|
|[s, y, x, z]|   2|
|   [s, y, z]|   2|
|      [s, x]|   3|
|   [s, x, z]|   2|
|      [s, z]|   2|
|         [t]|   3|
|      [t, y]|   3|
+------------+----+
only showing top 20 rows



In [27]:
model.associationRules.show()

+------------+----------+----------+----+
|  antecedent|consequent|confidence|lift|
+------------+----------+----------+----+
|   [t, s, y]|       [x]|       1.0| 1.5|
|   [t, s, y]|       [z]|       1.0| 1.2|
|   [y, x, z]|       [t]|       1.0| 2.0|
|         [y]|       [x]|       1.0| 1.5|
|         [y]|       [z]|       1.0| 1.2|
|         [y]|       [t]|       1.0| 2.0|
|         [p]|       [r]|       1.0| 2.0|
|         [p]|       [z]|       1.0| 1.2|
|   [q, t, z]|       [y]|       1.0| 2.0|
|   [q, t, z]|       [x]|       1.0| 1.5|
|      [q, y]|       [x]|       1.0| 1.5|
|      [q, y]|       [z]|       1.0| 1.2|
|      [q, y]|       [t]|       1.0| 2.0|
|   [t, s, x]|       [y]|       1.0| 2.0|
|   [t, s, x]|       [z]|       1.0| 1.2|
|[q, t, y, z]|       [x]|       1.0| 1.5|
|[q, t, x, z]|       [y]|       1.0| 2.0|
|      [q, x]|       [y]|       1.0| 2.0|
|      [q, x]|       [t]|       1.0| 2.0|
|      [q, x]|       [z]|       1.0| 1.2|
+------------+----------+---------

In [28]:
model.transform(data).show()

+--------------------+----------+
|               value|prediction|
+--------------------+----------+
|     [r, z, h, k, p]|        []|
|[z, y, x, w, v, u...|        []|
|     [s, x, o, n, r]|        []|
|[x, z, y, m, t, s...|        []|
|                 [z]|        []|
|[x, z, y, r, q, t...|        []|
+--------------------+----------+



In [29]:
model.explainParams()

'itemsCol: items column name (default: items, current: value)\nminConfidence: Minimal confidence for generating Association Rule. [0.0, 1.0]. minConfidence will not affect the mining for frequent itemsets, but will affect the association rules generation. (default: 0.8)\nminSupport: Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears more than (minSupport * size-of-the-dataset) times will be output in the frequent itemsets. (default: 0.3, current: 0.2)\nnumPartitions: Number of partitions (at least 1) used by parallel FP-growth. By default the param is not set, and partition number of the input dataset is used. (current: 10)\npredictionCol: prediction column name. (default: prediction)'