In [None]:
# Install Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

!pip install -q findspark
import findspark
findspark.init()

In [None]:
# Start Spark App

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

In [None]:
# Read data

In [None]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv("tripadvisor_hotel_reviews.csv")
n_docs = df.count()
print(f"Total number of documents: {n_docs}")

Total number of documents: 20491


In [None]:
df = df.select(F.lower(F.col('Review')).alias('Review'))
df = df.withColumn("Review", F.regexp_replace("Review", r"[^A-Za-z0-9 ]+", ""))
df = df.select(F.split(F.col("Review")," ").alias("Review"))

In [None]:
# Calculate IDF

In [None]:
idf = df.withColumn("disctinct_review", F.array_distinct(F.col("Review"))).drop("Review")
idf = idf.withColumn("word_in_doc_count", F.lit("1").cast(IntegerType()))
idf = idf.withColumn("word", F.explode(F.col("disctinct_review"))).drop("disctinct_review")
idf = idf.filter(F.col("word")!="")

In [None]:
idf = idf.groupby(F.col("word")).agg(F.sum("word_in_doc_count").alias("word_in_doc_count"))
idf = idf.orderBy(F.col("word_in_doc_count").desc())
idf = idf.limit(100)
idf = idf.withColumn("idf", F.log10(n_docs/F.col("word_in_doc_count")))
idf = idf.select("word", "idf")

In [None]:
TOP_WORDS = [row.word for row in idf.select("word").collect()]

In [None]:
idf.show(5)

In [None]:
# Calculate TF

In [None]:
df = df.withColumn("id", F.monotonically_increasing_id())
tf = df.withColumn("word_count", F.lit("1").cast(IntegerType()))
tf = tf.withColumn("word", F.explode(F.col("Review"))).drop("Review")
tf = tf.filter(F.col("word").isin(TOP_WORDS))
word_count_in_doc = tf.groupby(F.col("id"), F.col("word")).agg(F.sum("word_count").alias("word_count"))
doc_len = tf.groupby("id").agg(F.count("word").alias("doc_len"))
tf = word_count_in_doc.join(doc_len, on=["id"])
tf = tf.withColumn("tf", F.col("word_count")/F.col("doc_len"))
tf = tf.select("id", "word", "tf")

In [None]:
tf.show(5)

In [None]:
# Merge and get result

In [None]:
joined = tf.join(idf, on=["word"])
joined = joined.withColumn("tf_idf", F.col("tf")*F.col("idf"))
joined = joined.select("id","word","tf_idf")

In [None]:
tf_idf = joined.groupBy("id").pivot("word").agg(F.first(F.col("tf_idf")))
tf_idf = tf_idf.fillna(0.0)

In [None]:
tf_idf.show()