In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import pyspark
spark = SparkSession.builder.appName("TF-TDF").getOrCreate()

In [0]:
#(i)
import string
import math
#reading all the files
songs_rdd= spark.sparkContext.wholeTextFiles("/FileStore/tables/Songs/*").cache()
#replacing the entire path of file with Song_ID and removing the punctuations from the sting 
songs_rdd=songs_rdd.map(lambda x: (x[0].replace("dbfs:/FileStore/tables/Songs/",''),x[1].replace('’',"").split("_")[0].translate(str.maketrans('', '', string.punctuation)).lower().split()))
#computing term frequency
songs_tf=songs_rdd.flatMap(lambda x: [((x[0],w),1) for w in x[1]]).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0][1],(x[0][0],x[1])))
#to count no:of documents
number_of_documents=songs_rdd.count()
#computing idf
songs_idf=songs_tf.groupByKey().mapValues(lambda x:math.log10(number_of_documents/len(x)))
#joining and computing tf-idf
joining_tf_idf= songs_tf.join(songs_idf).map(lambda x :(x[1][0][0],x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1]))
#converting it to dataframe
headers=["SongID","Token","TermFrequency","InverseDocumentFrequency","TF_IDF"]
songs_df= spark.createDataFrame(joining_tf_idf).toDF(*headers)

In [0]:
#displaying the song_df rows
display(songs_df)

SongID,Token,TermFrequency,InverseDocumentFrequency,TF_IDF
Faded,feel,1,0.2218487496163563,0.2218487496163563
Mockingbird,feel,2,0.2218487496163563,0.4436974992327127
Numb,feel,5,0.2218487496163563,1.1092437480817818
Photograph,feel,1,0.2218487496163563,0.2218487496163563
Somewhere_I_Belong,feel,10,0.2218487496163563,2.218487496163564
Still_into_You,feel,2,0.2218487496163563,0.4436974992327127
Kryptonite,feel,1,0.2218487496163563,0.2218487496163563
Run_To_You,feel,2,0.2218487496163563,0.4436974992327127
Tears_Don_t_Fall,feel,1,0.2218487496163563,0.2218487496163563
Faded,us,2,0.5740312677277188,1.1480625354554377


In [0]:
#(ii)
#creating a view Song TFIDF from songs dataframe
songs_df.createOrReplaceTempView("SongsTFIDF")
#selecting the max tf-idf grouping bey song id
sqlMaxDF = spark.sql("SELECT SongID,max(TF_IDF) as TF_IDF  FROM SongsTFIDF GROUP BY SongID")
#creating a view MaxTFUDF from songs max dataframe
sqlMaxDF.createOrReplaceTempView("MaxTFIDF")
#joining the tables and computing the max tf-idf for song
songs_word_max_df=spark.sql("SELECT s.SongID, s.Token,s.TF_IDF as HighestTF_IDF FROM SongsTFIDF as s, MaxTFIDF as m where s.TF_IDF=m.TF_IDF and s.SongID=m.SongID")

In [0]:
#displaying the songs_word_max_df rows
display(songs_word_max_df)

SongID,Token,HighestTF_IDF
Kryptonite,kryptonite,4.704365036222725
Kryptonite,might,4.704365036222725
Kryptonite,superhuman,4.704365036222725
Kryptonite,superman,4.704365036222725
Photograph,wait,5.250367580350201
Impossible,impossible,18.8174601448909
Faded,faded,12.937003849612497
Mockingbird,daddy,11.760912590556812
Murder_Most_Foul,play,71.74156680239656
Girl_from_the_North_Country,winds,3.528273777167044


In [0]:
#(iii)
#creating a filter_df by the given words
filter_df=songs_df.filter(((songs_df.Token==lit("tear")) | (songs_df.Token==lit("hate")) | (songs_df.Token==lit("feel"))))
#taking top3 by grouping song id and performing summazation on TF-IDF
top3_sad_songs=filter_df.groupBy("SongID").sum('TF_IDF').withColumnRenamed("sum(TF_IDF)", "Rank_Score").orderBy("Rank_Score", ascending=False).limit(3)

In [0]:
#displaying the top3_sad_songs rows
display(top3_sad_songs)

SongID,Rank_Score
Hello,3.528273777167044
Somewhere_I_Belong,2.218487496163564
Numb,1.1092437480817818
