In [19]:
from pyspark.sql.functions import rand, udf, col, lower, split
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, FloatType, DoubleType
import re
import string
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import nltk
import string
import contractions
import inflect
from pyspark.ml.feature import Word2Vec
import numpy as np
import pandas as pd
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import lit, array

In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Genius Songs Recommendation") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

In [21]:
spark

In [22]:
# Read the dataset
data_path = "hdfs://127.0.0.1:9000/user/loveushakovaa/song_lyrics.csv"

genius_song = (spark.read
      .option("multiline", "true")
      .option("quote", '"')
      .option("header", "true")
      .option("escape", "\\")
      .option("escape", '"')
      .csv(data_path)
)

# data = genius_song.sample(fraction=0.2, seed=42)

data = genius_song.orderBy(rand()).limit(10000)

# Data preprocessing

In [23]:
data = data.filter(genius_song['language'] == 'en')

## Dropping non-relevant columns

In [24]:
#data = data.cache()
columns_to_drop = ["language_ft", "language_cld3"]
data = data.drop(*columns_to_drop)
data_original = data

## Missing values

In [25]:
data = data.filter(data["title"].isNotNull())
data = data.filter(data["lyrics"].isNotNull())
data = data.fillna({'language': ''})

## Remove duplicates

In [26]:
data = data.dropDuplicates(['artist', 'title', 'year'])

In [27]:
data = data.cache()
data = data.repartition(200)

In [28]:
# data.show(5, truncate=False)

## Relevant type

In [29]:
data = data.withColumn("views", col("views").cast("int"))

In [30]:
data = data.cache()
data = data.repartition(200)

In [31]:
# Sample the data to get 10 random rows
random_rows = data.orderBy(rand()).limit(10)

random_rows.show(truncate=False)



+-------------------------------+----+-------------+----+-----+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

## Preprocess the lyrics

Remove noise

In [32]:
def remove_round_brackets(data):
    return re.sub(r'\(.*?\)', '', data)
remove_round_brackets_udf = udf(remove_round_brackets, StringType())

def remove_punc(data):
    trans = str.maketrans('', '', string.punctuation)
    return data.translate(trans)
remove_punc_udf = udf(remove_punc, StringType())

def white_space(data):
    return ' '.join(data.split())
white_space_udf = udf(white_space, StringType())

def complete_noise(data):
    new_data = remove_round_brackets(data)
    new_data = remove_punc(new_data)
    new_data = white_space(new_data)
    return new_data
complete_noise_udf = udf(complete_noise, StringType())

Normalize, lemmatize

In [33]:
# nltk.download('omw-1.4')
# nltk.download('wordnet')
# nltk.download('stopwords')

def text_lower(data):
    return data.lower()
text_lower_udf = udf(text_lower, StringType())

def contraction_replace(data):
    return contractions.fix(data)
contraction_replace_udf = udf(contraction_replace, StringType())

def number_to_text(data):
    temp_str = data.split()
    string = []
    for i in temp_str:
        if i.isdigit():
            temp = inflect.engine().number_to_words(i)
            string.append(temp)
        else:
            string.append(i)
    return ' '.join(string)
number_to_text_udf = udf(number_to_text, StringType())

def stopword(data):
    clean = [word for word in data.split() if word.lower() not in stopwords.words('english')]
    return ' '.join(clean)
stopword_udf = udf(stopword, StringType())

def lemmatization(data):
    lemma = WordNetLemmatizer()
    lemmas = [lemma.lemmatize(word, pos='v') for word in data.split()]
    return ' '.join(lemmas)
lemmatization_udf = udf(lemmatization, StringType())

def normalization(data):
    text = text_lower(data)
    text = contraction_replace(text)
    text = number_to_text(text)
    text = stopword(text)
    text = lemmatization(text)
    return text
normalization_udf = udf(normalization, StringType())

Apply

In [34]:
data = data.withColumn("cleaned_lyrics", complete_noise_udf(col("lyrics")))

In [35]:
data = data.withColumn("normalized_lyrics", normalization_udf(col("cleaned_lyrics")))

Tokenize

In [36]:
data = data.withColumn("tokens", split(col("normalized_lyrics"), " "))

In [37]:
# data.select('tokens').show(2, truncate=False)

## Use Word2Vec to make vectors

In [38]:
word2Vec = Word2Vec(vectorSize=200, minCount=3, inputCol="tokens", outputCol="word_vectors")
model = word2Vec.fit(data)

24/09/10 17:46:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [39]:
data = model.transform(data)

In [40]:
# data.show(2,truncate=False)

In [41]:
# data.printSchema()

## Cosine similarity

In [42]:
def cosine_similarity(v1, v2):
    np_v1 = np.array(v1)
    np_v2 = np.array(v2)
    dot_product = np.dot(np_v1, np_v2)
    norm_v1 = np.linalg.norm(np_v1)
    norm_v2 = np.linalg.norm(np_v2)
    similarity = dot_product / (norm_v1 * norm_v2)
    return float(similarity)

# User test

In [46]:
user_input_id = '753352'

input_song = data.filter(col("id") == user_input_id).select('word_vectors').collect()

# Check if the song was found
if len(input_song) == 0:
    raise ValueError(f"Input song with ID '{user_input_id}' not found in the dataset.")
else:
    input_song_vector = input_song[0]['word_vectors']
    print("Input song vector retrieved successfully.")

# input_song_vector = data.select("word_vectors").first()[0]

                                                                                

Input song vector retrieved successfully.


In [47]:
cosine_similarity_udf = udf(lambda v: float(cosine_similarity(v, input_song_vector)), DoubleType())

similarities = data.withColumn("similarity", cosine_similarity_udf(col("word_vectors"))).select("id", "similarity")

In [48]:
top_5_recommendations = similarities.orderBy(col("similarity").desc()).limit(5)

recommended_songs = (
    top_5_recommendations
    .join(data, on='id', how='inner')
    .select("id", "title", "artist", "tag", "year", "features", "views", "similarity")
)

sorted_recommended_songs = recommended_songs.orderBy(col("views").desc())

sorted_recommended_songs.show(truncate=False)

                                                                                

+-------+-------------------------+-----------------+----+----+------------------------------+-----+------------------+
|id     |title                    |artist           |tag |year|features                      |views|similarity        |
+-------+-------------------------+-----------------+----+----+------------------------------+-----+------------------+
|3891719|Stranger Shores          |Dead Like Juliet |rock|2018|{}                            |552  |0.8130600470831991|
|2322217|Channel                  |Bryan Divisions  |rap |2015|{}                            |107  |0.8134383984539044|
|5742062|Thorazine or Mundicide   |The Mad Conductor|rap |2018|{"The Stupid Stupid Henchmen"}|19   |0.8002755417838202|
|753352 |Dreamers Island          |Abraxas          |pop |2015|{}                            |17   |0.9999999999999999|
|7213197|VINYL PLANTASY VII REMAKE|Leaf Plant       |rock|2021|{}                            |2    |0.8206038026337164|
+-------+-------------------------+-----

In [None]:
spark.stop()