In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import nltk
from nltk.corpus import stopwords
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/pranavsukumaran/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [2]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("LyricsAnalysis").getOrCreate()

# Load the lyric model data
lyric = spark.read.csv('lyric_model.csv', header=True, inferSchema=True)
lyric = lyric.limit(15000)


23/12/05 12:10:32 WARN Utils: Your hostname, Pranavs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.4.28 instead (on interface en0)
23/12/05 12:10:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/05 12:10:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [4]:
from pyspark.sql import SparkSession

# Load the artist similarity data
artist_sim = spark.read.csv('./cleaned_data/artist_similarity.csv', header=True, inferSchema=True)

# Load the tracks metadata
track_meta = spark.read.csv('./cleaned_data//tracks_metadata.csv', header=True, inferSchema=True)

# Assuming artist_sim is your PySpark DataFrame and 'new' is the column with string representation of lists

# UDF to convert string representation of list to a list
def string_to_list(s):
    return s[1:-1].split(',')

string_to_list_udf = udf(string_to_list, ArrayType(StringType()))

# Apply the UDF to create a new column 'similar_artist'
artist_sim = artist_sim.withColumn('similar_artist', string_to_list_udf(col('new')))

# UDFs to clean up the elements of the list
def remove_quotes(items):
    return [item.replace("'", '').strip() for item in items]

remove_quotes_udf = udf(remove_quotes, ArrayType(StringType()))

# Apply the UDFs to clean up the list elements
artist_sim = artist_sim.withColumn('similar_artist', remove_quotes_udf(col('similar_artist')))


                                                                                

### Recommendation model

In [9]:
from pyspark.ml.feature import BucketedRandomProjectionLSHModel
from pyspark.sql.functions import col

def recommend_id(track_id, lsh_model, lyric_df, track_meta_df):
    # Initialize an empty DataFrame with the same schema as track_meta_df for the case of no recommendations
    recommend_trackids = spark.createDataFrame([], track_meta_df.schema)

    # Filter the row with the given track_id
    target_lyric = lyric_df.filter(lyric_df['track_id'] == track_id).select('features')

    if target_lyric.count() > 0:
        target_features = target_lyric.first()['features']

        # Create a dataset with the target features
        target_dataset = spark.createDataFrame([(track_id, target_features)], ['track_id', 'features'])

        # Find approximate matches using LSH
        approx_matches = lsh_model.approxSimilarityJoin(target_dataset, lyric_df, 10, "distCol")
        
        # Exclude the exact match (target track itself)
        approx_matches = approx_matches.filter(~(col("datasetB.track_id") == track_id))
        
        # Get the top 10 recommendations
        top10_approx_matches = approx_matches.sort("distCol").limit(10)
        
        # Get the recommended track ids with distances
        recommend_trackids = top10_approx_matches.select("datasetB.track_id", "distCol")
        
        # Join with track_meta_df to get artist_name and title
        recommend_trackids = recommend_trackids.join(track_meta_df, recommend_trackids["track_id"] == track_meta_df["track_id"]).select("artist_name", "title", "distCol")

    return recommend_trackids


def recommend_title(title, artist, lsh_model, lyric_df, track_meta_df, artist_sim_df):
    #recommended = []

    # Match the track id and artist id with the song title and artist name
    track_input = track_meta_df.filter((col('title') == title) & (col('artist_name') == artist)).select('track_id', 'artist_id')

    if not track_input.isEmpty():
        tid = track_input.first()['track_id']
        aid = track_input.first()['artist_id']
        
        # Get similar artists based on the artist_sim_df
        similar_artists = artist_sim_df.filter(artist_sim_df['target'] == aid).select('similar_artist').first()['similar_artist']
        
        # Include the input track in the recommendations
        #recommended.append(track_meta_df.filter(track_meta_df['track_id'] == tid).select('artist_name', 'title','track_id'))

        # Recommend based on approximate matching using LSH
        recommended = recommend_id(tid, lsh_model, lyric_df, track_meta_df).collect()
        
        # Recommend based on similar artists
        similar_artist_tracks = []
        for similar_artist_id in similar_artists:
            similar_artist_tracks.extend( track_meta_df.filter(col('artist_id') == similar_artist_id).select('track_id').rdd.flatMap(lambda x: x).collect())
        
        # Filter out the input track and tracks that were already recommended
        similar_artist_tracks = [track_id for track_id in similar_artist_tracks if track_id != tid and track_id not in [row['track_id'] for row in recommended]]

        # Recommend based on similar artists if available
        if similar_artist_tracks:
            recommended.extend(track_meta_df.filter(col('track_id').isin(similar_artist_tracks)).select('artist_name', 'title').take(10))

    return recommended



### TF-IDF model

In [6]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.functions import col

# Tokenize the text
tokenizer = Tokenizer(inputCol="lyrics", outputCol="words")
wordsData = tokenizer.transform(lyric)

# Apply HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

# Apply IDF
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# Show the resulting DataFrame with TF-IDF features
rescaledData.select("track_id", "idf_features")

# Create a StandardScaler object
scaler = StandardScaler(inputCol="idf_features", outputCol="features", withMean=True, withStd=True)

# Fit and transform your TF-IDF data
scaledData = scaler.fit(rescaledData).transform(rescaledData)

# Create LSH model on scaled data
lsh = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=4.0, numHashTables=4)

# Fit LSH model to the scaled data
lsh_model_tfidf = lsh.fit(scaledData)

# Save the LSH model to disk
model_path = "./lsh_model_tfidf"  
lsh_model_tfidf.save(model_path)

print(f"LSH model saved to {model_path}")


23/12/05 12:11:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

LSH model saved to ./lsh_model_tfidf


In [10]:
# Call the recommend_title function
title = "Trash And Ready"
artist = "Super Cat"

recommendations_tf = recommend_title(title, artist, lsh_model_tfidf, scaledData, track_meta, artist_sim)

for rec in recommendations_tf.collect():
    print(f'Artist Name: {rec.artist_name}, Title Name: {rec.title}, Similarity Distance: {rec.distCol}')

                                                                                

AttributeError: 'list' object has no attribute 'collect'

In [11]:
# Call the recommend_title function
title = "Trash And Ready"
artist = "Super Cat"

recommendations_tf = recommend_title(title, artist, lsh_model_tfidf, scaledData, track_meta, artist_sim)

# Iterate over the list of recommendations
for rec in recommendations_tf:
    print(f'Artist Name: {rec.artist_name}, Title Name: {rec.title}, Similarity Distance: {rec.distCol if hasattr(rec, 'distCol') else 'N/A'}')


                                                                                

Artist Name: Ward 21, Title Name: Never Sell Out, Similarity Distance: N/A
Artist Name: Sizzla, Title Name: Sound The Trumpet, Similarity Distance: N/A
Artist Name: T.O.K., Title Name: Guardian Angel, Similarity Distance: N/A
Artist Name: Fantan Mojah, Title Name: Feel Di Pain, Similarity Distance: N/A
Artist Name: Cocoa Tea, Title Name: A Business, Similarity Distance: N/A
Artist Name: Pinchers, Title Name: Hold Me, Similarity Distance: N/A
Artist Name: Mavado, Title Name: House Cleaning, Similarity Distance: N/A
Artist Name: T.O.K., Title Name: Gal You Lead, Similarity Distance: N/A
Artist Name: Mr. Vegas, Title Name: Deh Pon The Scene (Album Version), Similarity Distance: N/A
Artist Name: Shabba Ranks, Title Name: Hood Top, Similarity Distance: N/A


23/12/05 14:15:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 219925 ms exceeds timeout 120000 ms
23/12/05 14:15:27 WARN SparkContext: Killing executors is not supported by current scheduler.
23/12/05 14:15:31 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o

### Word2Vec

In [8]:
from pyspark.sql.functions import split
from pyspark.ml.feature import Word2Vec

lyric = lyric.withColumn("tokens", split(col("lyrics"), " "))
# Define the Word2Vec model
word2Vec = Word2Vec(vectorSize=100, windowSize=5, minCount=1, inputCol="tokens", outputCol="w2v_features")

# Fit the model
model = word2Vec.fit(lyric)
# Transform the model to get vectors
wordvectors = model.transform(lyric)

# Create a StandardScaler object
scaler = StandardScaler(inputCol="w2v_features", outputCol="features", withMean=True, withStd=True)

# Fit and transform your TF-IDF data
scaledData = scaler.fit(wordvectors).transform(wordvectors)

# Create LSH model on scaled data
lsh = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=4.0, numHashTables=4)

# Fit LSH model to the scaled data
lsh_model_w2vec = lsh.fit(scaledData)

# Save the LSH model to disk
model_path = "./lsh_model_w2vec"  # Specify the path where you want to save the model
lsh_model_w2vec.save(model_path)

print(f"LSH model saved to {model_path}")


                                                                                

LSH model saved to ./lsh_model_w2vec


In [10]:
# Call the recommend_title function
title = "Trash And Ready"
artist = "Super Cat"

recommendations_tf = recommend_title(title, artist, lsh_model_w2vec, scaledData, track_meta, artist_sim)

for a in recommendations_tf:
    print(f'Artist Name: {a.artist_name}, Title Name:{a.title}')

                                                                                

Artist Name: Ward 21, Title Name:Never Sell Out
Artist Name: Sizzla, Title Name:Sound The Trumpet
Artist Name: T.O.K., Title Name:Guardian Angel
Artist Name: Fantan Mojah, Title Name:Feel Di Pain
Artist Name: Cocoa Tea, Title Name:A Business
Artist Name: Pinchers, Title Name:Hold Me
Artist Name: Mavado, Title Name:House Cleaning
Artist Name: T.O.K., Title Name:Gal You Lead
Artist Name: Mr. Vegas, Title Name:Deh Pon The Scene (Album Version)
Artist Name: Shabba Ranks, Title Name:Hood Top


### LDA model

In [11]:
from pyspark.ml.feature import CountVectorizer, StandardScaler, BucketedRandomProjectionLSH
from pyspark.ml.clustering import LDA
from pyspark.sql.functions import split

# Assuming you have a DataFrame called 'lyric' with a 'lyrics' column
lyric = lyric.withColumn("tokens", split(col("lyrics"), " "))

cv = CountVectorizer(inputCol="tokens", outputCol="cv_features", vocabSize=10000, minDF=5)
cv_model = cv.fit(lyric)
count_vectorized_data = cv_model.transform(lyric)

num_topics = 10  # You can adjust this
lda = LDA(k=num_topics, maxIter=10, featuresCol="cv_features")
lda_result = lda.fit(count_vectorized_data).transform(count_vectorized_data)
lda_result = lda_result.withColumnRenamed("topicDistribution", "features")

# Create LSH model on LDA topics
lsh = BucketedRandomProjectionLSH(
    inputCol="features",  # Use the name of the LDA topic distribution column
    outputCol="hashes",
    bucketLength=4.0,
    numHashTables=4  # You can adjust this value
)

# Fit LSH model to the LDA result
lsh_model_lda = lsh.fit(lda_result)

# Save the LSH model to disk
model_path = "./lsh_model_lda"  # Specify the path where you want to save the model
lsh_model_lda.save(model_path)

print(f"LSH model saved to {model_path}")


                                                                                

LSH model saved to ./lsh_model_lda


In [12]:
# Call the recommend_title function
title = "Trash And Ready"
artist = "Super Cat"

recommendations_tf = recommend_title(title, artist, lsh_model_lda, lda_result, track_meta, artist_sim)

for a in recommendations_tf:
    print(f'Artist Name: {a.artist_name}, Title Name:{a.title}')

                                                                                

Artist Name: Ward 21, Title Name:Never Sell Out
Artist Name: Sizzla, Title Name:Sound The Trumpet
Artist Name: T.O.K., Title Name:Guardian Angel
Artist Name: Fantan Mojah, Title Name:Feel Di Pain
Artist Name: Cocoa Tea, Title Name:A Business
Artist Name: Pinchers, Title Name:Hold Me
Artist Name: Mavado, Title Name:House Cleaning
Artist Name: T.O.K., Title Name:Gal You Lead
Artist Name: Mr. Vegas, Title Name:Deh Pon The Scene (Album Version)
Artist Name: Shabba Ranks, Title Name:Hood Top
