In [None]:
# Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.types import StructType,StructField
import os
import numpy as np

In [None]:
spark = SparkSession.builder.\
                    appName("spotify").\
                    config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").\
                    config("spark.mongodb.input.uri", "mongodb+srv://**********").\
                    config("spark.mongodb.output.uri", "mongodb+srv://**********").\
                    config("spark.network.timeout", "7200s").\
                    config("spark.executor.heartbeatInterval", "1200s").\
                    config("spark.driver.maxResultSize", "24120").\
                    getOrCreate()

In [None]:
# Connection info
database = '*****'
collection = 'songs_lyrics'
user_name = '*****'
password = '*****'
address = '**********.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

### Reading the main song dataframe

In [None]:
df = spark.read.format('mongo').option("uri", connection_string).load()

In [None]:
df_with_lyrics = df.where(df['lyrics'].isNotNull()).select('track_uri','lyrics', 'lyrics_words').distinct().cache()

In [None]:
def get_me_tf_idf_scores(df_with_lyrics):
    """
    Input: Pass in a songs dataframe with lyrics 
    
    Output: dataframe with two columns: "track uri" and "word_and_tf_idf", which 
    contain lyric words and tf-idf scores in a list
    """
    
    hashingTF = HashingTF(inputCol="lyrics_words", outputCol="lyrical_features", numFeatures=16777216)
    featurized_data = hashingTF.transform(df_with_lyrics)
    idf = IDF(inputCol="lyrical_features", outputCol="lyrical_features_idf")
    idfModel = idf.fit(featurized_data)
    rescaledData = idfModel.transform(featurized_data)
    
    ### Finding hash of word
    ndf = df_with_lyrics.select('track_uri', explode('lyrics_words').alias('words')).\
                                            withColumn('lyrics_words', array('words'))

    hashudf = udf(lambda x: x.indices.tolist()[0], StringType())

    wordhash = hashingTF.transform(ndf).withColumn('word_hash', hashudf('lyrical_features'))

    ### Finding tfidf value of each word
    tfidfvalues = udf(lambda x: dict(zip(x.indices.tolist(),x.values.tolist())), MapType(StringType(), DoubleType()))

    wordtfidf = rescaledData.select('track_uri', explode(tfidfvalues('lyrical_features_idf'))).\
                             withColumnRenamed('key','word_hash').\
                             withColumnRenamed('value', 'word_tf_idf_score').distinct()

    ### Ranking tf idf scores
    w = Window.partitionBy('track_uri').orderBy(desc('word_tf_idf_score'))


    ### Joining with word hash value to get words                                                                                 
    final_df = wordhash.join(wordtfidf,['track_uri', 'word_hash'], 'left').\
                     withColumn('word_and_tf_idf', concat('word_tf_idf_score',lit(":"),'words')).distinct()

    ### Creating a list of word and tfidf score per song
    final = final_df.select('track_uri','word_and_tf_idf').\
                  groupBy('track_uri').\
                  agg(sort_array(collect_list('word_and_tf_idf'),asc=False).alias('tf_idf_and_word'))
    return final

In [None]:
final = get_me_tf_idf_scores(df_with_lyrics)

#### Storing TF-IDF scores in word_tf_idf_score collection in mongo db

In [None]:
database = '*****'
collection = 'word_tf_idf_score'
user_name = '*****'
password = '*****'
address = '**********.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
final.write.format("mongo").option("uri",connection_string).mode("append").save()

### Getting the song embedding trained by Chandrish

In [None]:
database = '*****'
collection = 'song_vectors'
user_name = '*****'
password = '*****'
address = '**********.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
song_df = spark.read.format('mongo').option("uri", connection_string).load()

### Joining the songs dataframe with the song embeddings on track uri

In [None]:
win = Window.partitionBy('pid','track_uri').orderBy('track_uri')

df = df.withColumn('num_occurences_in_playlist', row_number().over(win)) 

track_occurences = df.select('track_uri','pid','num_occurences_in_playlist').distinct()

song_embeddings_playlists = df.select('name','pid','track_uri','track_name','artist_name').distinct().\
                    join(song_df, 'track_uri', 'left').\
                    join(track_occurences, ['track_uri','pid'], 'left').cache()


In [None]:
def id_sentiment(x):
    try :
        return (x[0], x[1])
    except:
        None

In [None]:
temp = song_embeddings_playlists.select('name', 'values').rdd.map(lambda x: id_sentiment(x)).cache().groupByKey().\
                                       map(lambda x: (x[0], [float(x) for x in (np.mean(list(x[1]),axis = 0))])).\
                                       map(lambda x: (x[0],list(x[1])))


In [None]:
struct1 = StructType([StructField('name',StringType(),True),StructField('playlist_embedding',ArrayType(FloatType()),True)])
playlist_avg_embedding = spark.createDataFrame(temp,struct1)

### Calculate Euclidean distance between the playlist embedding and song embedding

In [None]:
def calculate_distance(arr_1, arr_2):
    # Calculate the distance between two lists.
    try:
        arr_1, arr_2 = np.array(arr_1), np.array(arr_2)
        return float(np.linalg.norm(arr_1 - arr_2))
    except:
        return -99999.99

distance = udf(lambda x, y: calculate_distance(x, y), FloatType())
    
song_embeddings_playlists =  song_embeddings_playlists.join(playlist_avg_embedding, ['name'],'left')


win_play = Window.partitionBy('name').orderBy('song_playlist_distance')


In [None]:
song_embeddings_playlists = song_embeddings_playlists.withColumnRenamed('values', 'song_embedding')
                                    
song_embeddings_playlists = song_embeddings_playlists. withColumn('song_playlist_distance', distance(song_embeddings_playlists['song_embedding'],
                                    song_embeddings_playlists['playlist_embedding'])).withColumn('rank', rank().over(win_play)).cache()

In [None]:
playlist_embeddings_final = song_embeddings_playlists.select('name','playlist_embedding').distinct()

In [None]:
playlist_embeddings_final.show(10)

### Saving playlist embedding in mongo db

In [None]:
### Saving playlist embeddings
database = '*****'
collection = 'playlist_embeddings'
user_name = '*****'
password = '*****'
address = '**********.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
playlist_embeddings_final.write.format("mongo").option("uri",connection_string).mode("overwrite").save()

### Finding top 10 songs in the Rock playlist

In [None]:
song_embeddings_playlists.where(song_embeddings_playlists['name'] == 'Rock').select('track_name','artist_name').orderBy('rank').distinct().show(truncate = False)

### Finding top 10 songs in the Country playlist

In [None]:
song_embeddings_playlists.where(song_embeddings_playlists['name'] == 'Country').select('track_name','artist_name').orderBy('rank').distinct().show(truncate = False)

### Finding top songs in the K Pop playlist

In [None]:
song_embeddings_playlists.select('track_name', 'artist_name').where(song_embeddings_playlists['name'] == 'K Pop').orderBy('rank').distinct().show()

### Finding top songs in the Jazz playlist

In [None]:
song_embeddings_playlists.select('track_name','artist_name').where(song_embeddings_playlists['name'] == 'Jazz standards').orderBy('rank').distinct().show()