In [1]:
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature._

val instacartFolderPath = "instacart/"
val bfpdFolderPath = "bfpd/"
val linkPath = "insta-bfpd.csv"

val schema = StructType(
    List(
        StructField("order_id", IntegerType),
        StructField("product_id", IntegerType)
    )
)

val orders = spark.read.format("csv").option("header", "true").load(instacartFolderPath+"orders.csv").select("order_id", "user_id")

val order_products = spark.read.format("csv").option("header", "true").schema(schema).load(instacartFolderPath+"order_*.csv")

val orders_set = orders.join(order_products, "order_id").drop("order_id")

val product_counts = orders_set.groupBy("user_id", "product_id").count()

val user_desc = product_counts.drop("count").as[(String,Int)].rdd.groupByKey.mapValues( _.toList )

val user_desc_vector = user_desc.map( row => (row._1, row._2.map(x => (x.toInt, 1.0)) ))

val user_vector_df = user_desc_vector.map( row => (row._1, Vectors.sparse(49689, row._2))).toDF("user_id", "keys")


Intitializing Scala interpreter ...

Spark Web UI available at http://100.64.13.143:4041
SparkContext available as 'sc' (version = 2.4.1, master = local[*], app id = local-1556350981549)
SparkSession available as 'spark'


import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature._
instacartFolderPath: String = instacart/
bfpdFolderPath: String = bfpd/
linkPath: String = insta-bfpd.csv
schema: org.apache.spark.sql.types.StructType = StructType(StructField(order_id,IntegerType,true), StructField(product_id,IntegerType,true))
orders: org.apache.spark.sql.DataFrame = [order_id: string, user_id: string]
order_products: org.apache.spark.sql.DataFrame = [order_id: int, product_id: int]
orders_set: org.apache.spark.sql.DataFrame = [user_id: string, product_id: int]
product_counts: org.apache.spark.sql.DataFrame = [user_id: string, product_id: int ... 1 more field]
user_desc: org.apache.spark.rdd.RDD[(String, List[Int])] = MapPartitio...

In [2]:
val mh = new MinHashLSH()
  .setNumHashTables(20)
  .setInputCol("keys")
  .setOutputCol("values")

val model = mh.fit(user_vector_df)

val splitDf = user_vector_df.randomSplit(Array(.995,.005))
val (usersA,usersB) = (splitDf(0),splitDf(1))

val transformedA = model.transform(usersA).cache()
val transformedB = model.transform(usersB).cache()



mh: org.apache.spark.ml.feature.MinHashLSH = mh-lsh_0b5db5dc7a01
model: org.apache.spark.ml.feature.MinHashLSHModel = mh-lsh_0b5db5dc7a01
splitDf: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([user_id: string, keys: vector], [user_id: string, keys: vector])
usersA: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string, keys: vector]
usersB: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string, keys: vector]
transformedA: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string, keys: vector ... 1 more field]
transformedB: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: string, keys: vector ... 1 more field]


In [3]:
// println(user_vector_df.take(1)(0))
val key = Vectors.sparse(49689, Array(5077,11323,14303,20082,22108,46522),Array(1.0,1.0,1.0,1.0,1.0,1.0))
model.approxNearestNeighbors(transformedA, key, 7).show()
model.approxNearestNeighbors(transformedB, key, 7).show()


+-------+--------------------+--------------------+------------------+
|user_id|                keys|              values|           distCol|
+-------+--------------------+--------------------+------------------+
| 124168|(49689,[5077,1132...|[[1.16371862E8], ...|               0.0|
| 200061|(49689,[4210,5077...|[[1.17743046E8], ...|0.8333333333333334|
|  15270|(49689,[5077,4026...|[[1.472083947E9],...|0.8571428571428572|
| 143048|(49689,[5077,1394...|[[5.69006954E8], ...|             0.875|
|  61764|(49689,[5077,2390...|[[7.04531856E8], ...|             0.875|
| 113711|(49689,[5077,6104...|[[3.80572922E8], ...|             0.875|
| 201620|(49689,[5077,5782...|[[3.10958282E8], ...|0.8888888888888888|
+-------+--------------------+--------------------+------------------+

+-------+--------------------+--------------------+------------------+
|user_id|                keys|              values|           distCol|
+-------+--------------------+--------------------+------------------+
|  90

key: org.apache.spark.ml.linalg.Vector = (49689,[5077,11323,14303,20082,22108,46522],[1.0,1.0,1.0,1.0,1.0,1.0])


In [4]:
val key2 = Vectors.sparse(49689, Array(4210,5077,22108,46522),Array(1.0,1.0,1.0,1.0))
model.approxNearestNeighbors(transformedA, key2, 7).show()


+-------+--------------------+--------------------+------------------+
|user_id|                keys|              values|           distCol|
+-------+--------------------+--------------------+------------------+
| 124168|(49689,[5077,1132...|[[1.16371862E8], ...|0.5714285714285714|
| 200061|(49689,[4210,5077...|[[1.17743046E8], ...|0.6666666666666667|
| 137533|(49689,[4210],[1.0])|[[1.108687256E9],...|              0.75|
| 146248|(49689,[4210],[1.0])|[[1.108687256E9],...|              0.75|
|  15270|(49689,[5077,4026...|[[1.472083947E9],...|               0.8|
| 164197|(49689,[4210,1152...|[[1.108687256E9],...|               0.8|
|  44873|(49689,[4210,4273...|[[6.3132494E7], [...|               0.8|
+-------+--------------------+--------------------+------------------+



key2: org.apache.spark.ml.linalg.Vector = (49689,[4210,5077,22108,46522],[1.0,1.0,1.0,1.0])


In [5]:
model.approxSimilarityJoin(transformedA, transformedB, 0.5).show()







+--------------------+--------------------+-------------------+
|            datasetA|            datasetB|            distCol|
+--------------------+--------------------+-------------------+
|[49762, (49689,[1...|[111101, (49689,[...|                0.0|
|[117980, (49689,[...|[130034, (49689,[...|0.33333333333333337|
|[123997, (49689,[...|[111101, (49689,[...|                0.0|
|[37062, (49689,[1...|[111101, (49689,[...|                0.0|
|[109672, (49689,[...|[111101, (49689,[...|                0.0|
|[99295, (49689,[1...|[111101, (49689,[...|                0.0|
|[162586, (49689,[...|[111101, (49689,[...|                0.0|
|[66170, (49689,[1...|[111101, (49689,[...|                0.0|
|[102792, (49689,[...|[111101, (49689,[...|                0.0|
|[163118, (49689,[...|[111101, (49689,[...|                0.0|
|[150279, (49689,[...|[111101, (49689,[...|                0.0|
|[58530, (49689,[1...|[111101, (49689,[...|                0.0|
+--------------------+------------------