In [0]:
val df = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("/notebook/tripadvisor_hotel_reviews.csv")
        .withColumn("semi_clean_desc", regexp_replace(trim(lower(col("Review"))), "[^a-zA-Z ]", ""))
        .withColumn("clean_desc", regexp_replace(lower(col("semi_clean_desc")), "[ ]+", " "))
        .withColumn("splitted", split(col("clean_desc"), " "))
        .withColumn("doc_len", size(col("splitted")))
        .withColumn("doc_id", monotonically_increasing_id())

val collection_size = df.count()

df.show

In [1]:
val doc_w_words = df
    .select(col("doc_id"), explode(col("splitted")).alias("s_word"))
    .groupBy("s_word")
    .agg(countDistinct("doc_id").alias("cnt_doc"))
    .orderBy(desc("cnt_doc"))
    .limit(100)

doc_w_words.show

In [2]:
val exploded = df
    .select(
        col("doc_id"),
        col("doc_len"),
        explode(col("splitted")).alias("word")
        )
    .groupBy("doc_id", "word")
    .agg(
        count(col("doc_id")).as("word_cnt"),
        first(col("doc_len")).as("doc_len")
        )
    .orderBy(col("doc_id"))

val tf_idf = exploded
    .join(
        doc_w_words, 
        exploded("word") === doc_w_words("s_word"), 
        "inner"
        )
    .withColumn("tf_idf", col("word_cnt") / col("doc_len") * log(lit(collection_size) / col("cnt_doc")))
    .groupBy("doc_id")
    .pivot("word")
    .sum("tf_idf")
    .na.fill(0)
    .show