In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=cbfce93b5ae304252672157b33ed1913aeb9ccee90a8c2749e1625232641db4c
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import log, col, sum
import string
import os
import math

In [None]:
# Create a SparkSession
spark = SparkSession.builder.appName("TF-IDF").getOrCreate()

In [None]:
# Load the text files
files_rdd = spark.sparkContext.wholeTextFiles("/content/Songdata")

In [None]:
# Tokenize the documents
def tokenize_doc(x):
    return os.path.basename(x[0]), x[1].lower().translate(str.maketrans("", "", string.punctuation)).split()
tokens_rdd = files_rdd.map(tokenize_doc)

In [None]:
# Compute the term frequency for each token in each document
tf_rdd = tokens_rdd.flatMap(lambda x: [((x[0], token), 1) for token in x[1]]) \
                   .reduceByKey(lambda x, y: x + y)
tf_df = tf_rdd.map(lambda x: (x[0][0], x[0][1], x[1])).toDF(["Song ID", "Token", "Term Frequency"])

In [None]:
tf_df.show()

+--------------------+--------+--------------+
|             Song ID|   Token|Term Frequency|
+--------------------+--------+--------------+
|Girl from the Nor...|      if|             8|
|Girl from the Nor...|     the|            15|
|Girl from the Nor...|   north|             3|
|Girl from the Nor...|    fair|             2|
|Girl from the Nor...|   winds|             3|
|Girl from the Nor...|     hit|             2|
|Girl from the Nor...|      on|             2|
|Girl from the Nor...|remember|             3|
|Girl from the Nor...|      me|             4|
|Girl from the Nor...|      to|             3|
|Girl from the Nor...|     one|             2|
|Girl from the Nor...|     who|             2|
|Girl from the Nor...|   lives|             2|
|Girl from the Nor...|     for|             3|
|Girl from the Nor...|     she|             4|
|Girl from the Nor...|       a|             3|
|Girl from the Nor...|    mine|             2|
|Girl from the Nor...|     you|             1|
|Girl from th

In [None]:
# Compute the inverse document frequency for each token
total_docs = files_rdd.count()
df_rdd = tokens_rdd.flatMap(lambda x: [(token, x[0]) for token in x[1]]) \
                   .distinct() \
                   .map(lambda x: (x[0], 1)) \
                   .reduceByKey(lambda x, y: x + y)
idf_rdd = df_rdd.map(lambda x: (x[0], math.log(total_docs / x[1],10)))
idf_df = idf_rdd.toDF(['Token', 'Inverse Document Frequency'])



In [None]:
idf_df.printSchema()

root
 |-- Token: string (nullable = true)
 |-- Inverse Document Frequency: double (nullable = true)



In [None]:
idf_df.show()

+----------+--------------------------+
|     Token|Inverse Document Frequency|
+----------+--------------------------+
|      it’s|        0.3979400086720376|
|         i|                       0.0|
|       was|       0.33099321904142437|
|     after|        0.5740312677277188|
|     these|        0.5740312677277188|
|     years|        0.5740312677277188|
|      like|        0.3979400086720376|
|      meet|        0.8750612633916999|
|        go|       0.17609125905568124|
|everything|        0.5740312677277188|
|       say|       0.47712125471966244|
|    time’s|        0.8750612633916999|
|      heal|        0.5740312677277188|
|        ya|        0.6989700043360187|
|       but|       0.13469857389745615|
|      hear|        0.6989700043360187|
|       i’m|        0.2730012720637376|
|        in|       0.06214790674884443|
|  dreaming|        1.1760912590556811|
|        we|        0.3979400086720376|
+----------+--------------------------+
only showing top 20 rows



In [None]:
# Compute the TF-IDF score for each token in each document
idf_dict = dict(idf_rdd.collect())
tfidf_rdd = tf_rdd.map(lambda x: (x[0][0], x[0][1], x[1] * idf_dict[x[0][1]]))
tfidf_df = tfidf_rdd.toDF(["Song ID", "Token", "TF-IDF"])

In [None]:
tfidf_df.printSchema()

root
 |-- Song ID: string (nullable = true)
 |-- Token: string (nullable = true)
 |-- TF-IDF: double (nullable = true)



In [None]:
# Join the TF-IDF and token dataframes
join_df = tfidf_df.join(tf_df, ["Song ID", "Token"])

In [None]:
#Question 1 A

In [None]:

join_idf_df = join_df.join(idf_df, "Token")

# Select only the required columns
table_df = join_idf_df.select("Song ID", "Token", "Term Frequency", "Inverse Document Frequency", "TF-IDF")

# Show the results
table_df.show()

+--------------------+---------+--------------+--------------------------+-------------------+
|             Song ID|    Token|Term Frequency|Inverse Document Frequency|             TF-IDF|
+--------------------+---------+--------------+--------------------------+-------------------+
|       When I'm Gone|  friends|             1|        1.1760912590556811| 1.1760912590556811|
|       When I'm Gone|  through|             1|        0.5740312677277188| 0.5740312677277188|
|               Hello|   laurie|             1|        1.1760912590556811| 1.1760912590556811|
|                Numb|     feel|             5|       0.22184874961635637| 1.1092437480817818|
|         Mockingbird|     till|             1|        0.8750612633916999| 0.8750612633916999|
|               Faded|       go|             1|       0.17609125905568124|0.17609125905568124|
|               Hello|       us|             2|        0.5740312677277188| 1.1480625354554377|
|         Mockingbird|      was|             8|   

In [None]:
#Question 1 B

In [None]:
from pyspark.sql.functions import log, col, collect_list, sum

In [None]:
# Find the word with the highest TF-IDF score for each song
join_df.createOrReplaceTempView("tfidf_table")
result_df = spark.sql("SELECT `Song ID`, `Token`, `TF-IDF` FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `Song ID` ORDER BY `TF-IDF` DESC) AS rank FROM tfidf_table) WHERE rank = 1")
result_df.show()

+--------------------+----------+------------------+
|             Song ID|     Token|            TF-IDF|
+--------------------+----------+------------------+
|               Faded|     faded|15.289186367723854|
|Girl from the Nor...|     north| 3.528273777167043|
|               Hello|     hello|11.760912590556812|
|          Impossible|impossible| 21.16964266300226|
|          Kryptonite|kryptonite| 5.880456295278406|
|         Mockingbird|     daddy|11.760912590556812|
|    Murder Most Foul|      play| 71.74156680239655|
|                Numb|    caught| 9.408730072445449|
|          Photograph|      wait| 5.250367580350199|
|          Run To You|       run|24.697916440169305|
|  Somewhere I Belong|      want|12.628687890009815|
|      Still into You|      into|15.377340095392412|
|    Tears Don't Fall|conscience| 10.58482133150113|
|   The Diary Of Jane|      jane| 7.056547554334086|
|       When I'm Gone|       i’m| 10.10104706635829|
+--------------------+----------+-------------

In [None]:
#Question 1 C

In [None]:
# Find the top 3 songs with the highest TF-IDF scores for the relevant keywords
keywords = ["tear", "feel", "hate"]
relevant_df = join_df.filter(join_df.Token.isin(keywords))
relevant_df = relevant_df.groupBy("Song ID").agg(sum("TF-IDF").alias("Rank Score"))
relevant_df = relevant_df.orderBy(col("Rank Score").desc()).limit(3)
relevant_df.show()

+------------------+------------------+
|           Song ID|        Rank Score|
+------------------+------------------+
|             Hello| 3.528273777167043|
|Somewhere I Belong|2.2184874961635637|
|              Numb|1.1092437480817818|
+------------------+------------------+

