In [2]:
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions._

In [4]:
def similarity(a: Array[Double], b: Array[Double]): Double = {
    val mod_a = scala.math.sqrt(a.reduce(scala.math.pow(_, 2) + scala.math.pow(_, 2)))
    val mod_b = scala.math.sqrt(b.reduce(scala.math.pow(_, 2) + scala.math.pow(_, 2)))
    val dot = a.zip(b).map(Function.tupled(_ * _)).sum
    dot / (mod_b * mod_a)
}

In [6]:
def toSeqUnique(data: DataFrame, column_df: String): Seq[String] = {
    val column = data.select(col(s"$column_df"))
    column.dropDuplicates
        .collect
        .map(_.getString(0))
        .toSeq
}

In [9]:
def getArrayFrom(data: DataFrame, column_df: String, cant: Int): Array[Double] = {
    data.select(col(s"$column_df"))
        .takeAsList(cant)
        .toArray
        .map(_.asInstanceOf[Row])
        .map(_.getString(0).toDouble)
}

In [12]:
val data = Seq(
    ("241", "u1", "m1", "2"),
    ("222", "u1", "m3", "3"),
    ("276", "u2", "m1", "5"),
    ("273", "u2", "m2", "2"),
    ("200", "u3", "m1", "3"),
    ("229", "u3", "m2", "3"),
    ("231", "u3", "m3", "1"),
    ("239", "u4", "m2", "2"),
    ("286", "u4", "m3", "2")
).toDF("ID", "USER", "ITEM", "RATING")

In [17]:
var similarity_seq = Seq.empty[(String, String, Double)]

val item_list = toSeqUnique(data, "ITEM")
val item_list_cant = item_list.size
val user_list = toSeqUnique(data, "USER")
val user_list_cant = user_list.size

var i = 0
for(i <- 0 until item_list_cant - 1) {
    val item1 = item_list(i)
    var j = 0
    for(j <- (i + 1) until item_list_cant) {
        val item2 = item_list(j)
        val df1 = data.filter($"ITEM" === item1)
        val df2 = data.filter($"ITEM" === item2).withColumnRenamed("RATING", "RATING1")
        // Only one user
        val res = df1.join(df2, Seq("USER"))
        
        val v1 = getArrayFrom(res, "RATING", user_list_cant)
        val v2 = getArrayFrom(res, "RATING1", user_list_cant)
        val simi = similarity(v1, v2)
        
        similarity_seq = similarity_seq :+ (item1, item2, simi)
    }
}

val simi_df = similarity_seq.toDF("ITEM1", "ITEM2", "SIM")
simi_df.show(false)

|ITEM1|ITEM2|SIM               |
+-----+-----+------------------+
|m3   |m1   |0.7893522173763263|
|m3   |m2   |0.8682431421244593|
|m1   |m2   |0.9037378388935388|
+-----+-----+------------------+

