In [0]:
val df = spark.read.option("inferSchema", "true").option("header", "true").csv("/notebook/tripadvisor_hotel_reviews.csv")

In [1]:
df.show

In [2]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{HashingTF, IDF, RegexTokenizer}


val preprocessingPipe = new Pipeline()
    .setStages(Array(
        new RegexTokenizer()
        .setInputCol("Review")
        .setOutputCol("tokenized")
        .setPattern("\\W+"),
        new HashingTF()
            .setInputCol("tokenized")
            .setOutputCol("tf")
            .setBinary(true)
            .setNumFeatures(1000),
        new HashingTF()
            .setInputCol("tokenized")
            .setOutputCol("tf2")
            .setNumFeatures(1000),
        new IDF()
            .setInputCol("tf2")
            .setOutputCol("tfidf")
    ))


In [3]:
val Array(train, test) = df.randomSplit(Array(0.8, 0.2))


val pipe = preprocessingPipe.fit(train)

val trainFeatures = pipe.transform(train).cache()
val testFeatures = pipe.transform(test)

In [4]:
trainFeatures.show

In [5]:
val testFeaturesWithIndex = testFeatures.withColumn("id", monotonicallyIncreasingId()).cache()

In [6]:
testFeaturesWithIndex.show

In [7]:
import org.apache.spark.ml.feature.MinHashLSH


val mh = new MinHashLSH()
    .setInputCol("tf")
    .setOutputCol("buckets")
    .setNumHashTables(3)

val mhModel = mh.fit(trainFeatures)

In [8]:
mhModel.transform(trainFeatures).select(col("buckets")).show(truncate=false)

In [9]:
val neighbors = mhModel.approxSimilarityJoin(trainFeatures, testFeaturesWithIndex, 0.8)

In [10]:
neighbors.printSchema

In [11]:
neighbors.show

In [12]:
val predictions = neighbors
    .withColumn("similarity", (lit(1) - col("distCol")))
    .groupBy("datasetB.id")
    .agg((sum(col("similarity") * col("datasetA.Rating")) / sum(col("similarity"))).as("predict"))

In [13]:
val forMetric = testFeaturesWithIndex.join(predictions, Seq("id"))


In [14]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val metrics = new RegressionEvaluator()
    .setLabelCol("Rating")
    .setPredictionCol("predict")
    .setMetricName("rmse")

In [15]:
metrics.evaluate(forMetric)

In [16]:
val results = Array.range(3, 16, 2).map(numHashes => {
    val mh = new MinHashLSH()
        .setInputCol("tf")
        .setOutputCol("buckets")
        .setNumHashTables(numHashes)
        .fit(trainFeatures)
    
    val neighbors = mh.approxSimilarityJoin(trainFeatures, testFeaturesWithIndex, 0.7)
    
    val predictions = neighbors
        .withColumn("similarity", (lit(1) - col("distCol")))
        .groupBy("datasetB.id")
        .agg(
            (sum(col("similarity") * col("datasetA.Rating")) / sum(col("similarity"))).as("predict"),
            count("datasetA.Rating").as("numNeighbors")
        )
    
    val forMetric = testFeaturesWithIndex.join(predictions, Seq("id"))
    
    val meanNumNeighbors = forMetric.select(avg("numNeighbors")).collect.head(0)
    
    val metric = metrics.evaluate(forMetric)
    
    val res = (numHashes, metric, meanNumNeighbors)
    println(res)
    res
})

In [17]:
z.show(results.map(x => (x._1, x._2)).toList.toDF("numHashes", "rmse"))

In [18]:
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH


val mh = new BucketedRandomProjectionLSH()
    .setInputCol("tfidf")
    .setOutputCol("brpBuckets")
    .setBucketLength(5)
    .setNumHashTables(3)

val mhModel = mh.fit(trainFeatures)

In [19]:
val euqlidNeigh = mhModel.approxSimilarityJoin(trainFeatures, testFeaturesWithIndex, 10)

euqlidNeigh.show

In [20]:
val predictions = euqlidNeigh
    .withColumn("similarity", (lit(1) / (col("distCol") + lit(0.0000001))))
    .groupBy("datasetB.id")
    .agg((sum(col("similarity") * col("datasetA.Rating")) / sum(col("similarity"))).as("predict"))

val forMetric = testFeaturesWithIndex.join(predictions, Seq("id"))


In [21]:
metrics.evaluate(forMetric)