In [35]:
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

In [6]:
val spark = SparkSession.builder()
    // адрес мастера
    .master("local[*]")
    // имя приложения в интерфейсе спарка
    .appName("made-demo")
//     .config("spark.executor.memory",  "2g")
//     .config("spark.executor.cores", "2")
//     .config("spark.driver.memory", "2g")
    .getOrCreate()

spark = org.apache.spark.sql.SparkSession@2d70378e


org.apache.spark.sql.SparkSession@2d70378e

In [67]:
val dfReviews = spark.read.options(Map(
    "header" -> "true",
    "inferSchema" -> "true"
)).csv("tripadvisor_hotel_reviews.csv")

dfReviews = [Review: string, Rating: int]


[Review: string, Rating: int]

In [223]:
val D = dfReviews.count

D = 20491


20491

In [174]:
val dfPreprocessed = dfReviews
    .select(regexp_replace(lower(col("Review")), "[^a-z0-9_\\s]", "").as("review"))
    .withColumn("doc_id", monotonicallyIncreasingId)

dfPreprocessed = [review: string, doc_id: bigint]




[review: string, doc_id: bigint]

In [248]:
dfPreprocessed.show

+--------------------+------+
|              review|doc_id|
+--------------------+------+
|nice hotel expens...|     0|
|ok nothing specia...|     1|
|nice rooms not 4 ...|     2|
|unique great stay...|     3|
|great stay great ...|     4|
|love monaco staff...|     5|
|cozy stay rainy c...|     6|
|excellent staff h...|     7|
|hotel stayed hote...|     8|
|excellent stayed ...|     9|
|poor value stayed...|    10|
|nice value seattl...|    11|
|nice hotel good l...|    12|
|nice hotel not ni...|    13|
|great hotel night...|    14|
|horrible customer...|    15|
|disappointed say ...|    16|
|fantastic stay mo...|    17|
|good choice hotel...|    18|
|hmmmmm say really...|    19|
+--------------------+------+
only showing top 20 rows



In [202]:
val dfTokens = dfPreprocessed
    .flatMap(row => row.getAs[String]("review").split(" ").map(t => (row.getAs[Long]("doc_id"), t)))
    .withColumnRenamed("_1", "doc_id")
    .withColumnRenamed("_2", "token")

dfTokens = [doc_id: bigint, token: string]


[doc_id: bigint, token: string]

In [249]:
dfTokens.show

+------+-----------+
|doc_id|      token|
+------+-----------+
|     0|       nice|
|     0|      hotel|
|     0|  expensive|
|     0|    parking|
|     0|        got|
|     0|       good|
|     0|       deal|
|     0|       stay|
|     0|      hotel|
|     0|anniversary|
|     0|    arrived|
|     0|       late|
|     0|    evening|
|     0|       took|
|     0|     advice|
|     0|   previous|
|     0|    reviews|
|     0|        did|
|     0|      valet|
|     0|    parking|
+------+-----------+
only showing top 20 rows



In [211]:
val dfDocSize = dfTokens
    .groupBy("doc_id")
    .count
    .withColumnRenamed("count", "doc_size")

dfDocSize = [doc_id: bigint, doc_size: bigint]


[doc_id: bigint, doc_size: bigint]

In [250]:
dfDocSize.show

+------+--------+
|doc_id|doc_size|
+------+--------+
|    26|      42|
|    29|      34|
|   474|     112|
|   964|      83|
|  1677|     117|
|  1697|     242|
|  1806|     100|
|  1950|     104|
|  2040|      98|
|  2214|     126|
|  2250|     163|
|  2453|     125|
|  2509|      51|
|  2529|      60|
|  2927|     751|
|  3091|     489|
|  3506|      31|
|  3764|      90|
|  4590|     145|
|  4823|      20|
+------+--------+
only showing top 20 rows



In [229]:
val dfTF = dfTokens
    .groupBy("doc_id", "token")
    .count
    .join(dfDocSize, Seq("doc_id"))
    .withColumn("tf", col("count") / col("doc_size"))

dfTF = [doc_id: bigint, token: string ... 3 more fields]


[doc_id: bigint, token: string ... 3 more fields]

In [251]:
dfTF.show

+------+------------+-----+--------+--------------------+
|doc_id|       token|count|doc_size|                  tf|
+------+------------+-----+--------+--------------------+
|    26|     totally|    1|      42|0.023809523809523808|
|    26|      lights|    1|      42|0.023809523809523808|
|    26|      creepy|    1|      42|0.023809523809523808|
|    26|       staff|    1|      42|0.023809523809523808|
|    26|       loved|    1|      42|0.023809523809523808|
|    26|        days|    1|      42|0.023809523809523808|
|    26|       enjoy|    1|      42|0.023809523809523808|
|    26|expectations|    1|      42|0.023809523809523808|
|    26|       lever|    1|      42|0.023809523809523808|
|    26|    bathroom|    1|      42|0.023809523809523808|
|    26|        stay|    1|      42|0.023809523809523808|
|    26|  extrememly|    1|      42|0.023809523809523808|
|    26|      getway|    1|      42|0.023809523809523808|
|    26|     enjoyed|    1|      42|0.023809523809523808|
|    26|    sh

In [230]:
val dfDF = dfTF
    .groupBy("token")
    .count
    .orderBy(desc("count"))
    .limit(100)
    .withColumnRenamed("count", "df")
    .withColumn("idf", -log(col("df") / D))

dfDF = [token: string, df: bigint ... 1 more field]


[token: string, df: bigint ... 1 more field]

In [252]:
dfDF.show

+---------+-----+-------------------+
|    token|   df|                idf|
+---------+-----+-------------------+
|    hotel|16319|0.22765569220589793|
|     room|14050|0.37736336957494065|
|      not|12123| 0.5248812905902492|
|    staff|11522| 0.5757275140445042|
|    great|11020| 0.6202739616299271|
|     stay|10094| 0.7080445774366249|
|     good| 9277|  0.792447546683561|
|   stayed| 8549| 0.8741714483105988|
|       nt| 8379| 0.8942571897235461|
|    rooms| 8336| 0.8994022803436844|
| location| 8164| 0.9202515204070972|
|     just| 7736| 0.9741010072037022|
|    clean| 7648| 0.9855415896055952|
|     nice| 7415|    1.0164807897648|
|      did| 7204|  1.045349338040986|
|breakfast| 7111| 1.0583428844533145|
|       no| 6809|  1.101740498853866|
|    night| 6328| 1.1750015348893428|
|  service| 6228| 1.1909305113829436|
|     time| 6151| 1.2033710951284642|
+---------+-----+-------------------+
only showing top 20 rows



In [239]:
val dfTFIDF = dfTF
    .join(dfDF, Seq("token"), "inner")
    .withColumn("tfidf", col("tf") * col("idf"))
    .select("doc_id", "token", "tfidf")

dfTFIDF = [doc_id: bigint, token: string ... 1 more field]


[doc_id: bigint, token: string ... 1 more field]

In [None]:
dfTFIDF.show

Посмотрим для примера на скоры tf/idf для документа номер 100

In [240]:
dfTFIDF
    .filter(col("doc_id") === 100)
    .show

+------+--------+--------------------+
|doc_id|   token|               tfidf|
+------+--------+--------------------+
|   100|   place| 0.02594739662507109|
|   100|    just|0.018732711676994272|
|   100|   price| 0.03682171026861379|
|   100|    like|0.025907644950381416|
|   100|   rooms| 0.01729619769891701|
|   100|  hotels| 0.03545167188518298|
|   100|   hotel| 0.01751197632353061|
|   100|   close| 0.03843489532750064|
|   100|location| 0.03539428924642682|
|   100|    make| 0.03649966392107712|
|   100|     not| 0.02018774194577882|
|   100|    stay|0.013616241873781248|
|   100|friendly|0.024159286421330115|
|   100|  little| 0.02957114689578998|
|   100|   great| 0.03578503624788041|
|   100|   staff|0.022143365924788622|
|   100|   small| 0.02773547960892019|
|   100|    good|0.015239375897760789|
+------+--------+--------------------+



In [246]:
val tfidfData = dfTFIDF.collect

tfidfData = Array([26,staff,0.013707797953440575], [26,loved,0.04987936970980875], [26,days,0.04810612940263059], [26,bathroom,0.03743717021626664], [26,stay,0.016858204224681542], [26,think,0.05163222871021516], [26,service,0.02835548836626056], [26,hotel,0.005420373623949951], [26,clean,0.02346527594299036], [26,room,0.008984842132736683], [26,friendly,0.05982299494805551], [26,going,0.04926429496942955], [26,really,0.034153114128188736], [26,wonderful,0.09091026568052779], [26,stayed,0.020813605912157113], [29,location,0.027066221188444036], [29,rooms,0.026453008245402483], [29,place,0.03968425366187343], [29,4,0.06061819880000307], [29,recommend,0.046335955767965564], [29,not,0.01543768501736027], [29,excellent,0.04510646619148792], [29,street,0.0588...


Array([26,staff,0.013707797953440575], [26,loved,0.04987936970980875], [26,days,0.04810612940263059], [26,bathroom,0.03743717021626664], [26,stay,0.016858204224681542], [26,think,0.05163222871021516], [26,service,0.02835548836626056], [26,hotel,0.005420373623949951], [26,clean,0.02346527594299036], [26,room,0.008984842132736683], [26,friendly,0.05982299494805551], [26,going,0.04926429496942955], [26,really,0.034153114128188736], [26,wonderful,0.09091026568052779], [26,stayed,0.020813605912157113], [29,location,0.027066221188444036], [29,rooms,0.026453008245402483], [29,place,0.03968425366187343], [29,4,0.06061819880000307], [29,recommend,0.046335955767965564], [29,not,0.01543768501736027], [29,excellent,0.04510646619148792], [29,street,0.0588...