In [None]:
pip install nltk

In [None]:
import pandas as pd
from pyspark.ml.feature import IndexToString
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, explode, concat_ws, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Tokenizer, StopWordsRemover, Word2Vec, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import collect_list
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
import nltk
from nltk.stem import PorterStemmer
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType, DoubleType
from google.colab import drive
import os

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


**Merge Datasets**

In [None]:
df1 = pd.read_csv("/content/Mendeley_Dataset.csv", encoding="ISO-8859-1")
df2 = pd.read_csv("/content/Student_Dataset.csv", encoding="ISO-8859-1")

df2 = df2.reindex(columns=df1.columns)

merged_df = pd.concat([df1, df2], ignore_index=True)

print("Rows in merged dataset:", len(merged_df))

merged_df.to_csv("merged_dataset.csv", index=False)

Rows in merged dataset: 28472


In [None]:
print("Rows in df1:", len(df1))
print("Rows in df2:", len(df2))
print("Rows in merged_df:", len(merged_df))

Rows in df1: 28372
Rows in df2: 100
Rows in merged_df: 28472


In [None]:
file_path = "/content/merged_dataset.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.printSchema()

root
 |-- id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- lyrics: string (nullable = true)
 |-- len: double (nullable = true)
 |-- dating: double (nullable = true)
 |-- violence: double (nullable = true)
 |-- world/life: double (nullable = true)
 |-- night/time: double (nullable = true)
 |-- shake the audience: double (nullable = true)
 |-- family/gospel: double (nullable = true)
 |-- romantic: double (nullable = true)
 |-- communication: double (nullable = true)
 |-- obscene: double (nullable = true)
 |-- music: double (nullable = true)
 |-- movement/places: double (nullable = true)
 |-- light/visual perceptions: double (nullable = true)
 |-- family/spiritual: double (nullable = true)
 |-- like/girls: double (nullable = true)
 |-- sadness: double (nullable = true)
 |-- feelings: double (nullable = true)
 |-- danceability: double (nulla

In [None]:
# Get the schema of the DataFrame to identify column data types
data_types = df.dtypes

# Initialize lists to store categorical and numerical column names
categorical_columns = []
numerical_columns = []

for column, dtype in df.dtypes.items():
    if dtype == 'object' or pd.api.types.is_categorical_dtype(dtype):
        categorical_columns.append(column)
    elif pd.api.types.is_numeric_dtype(dtype):
        numerical_columns.append(column)

# Output the results
print("Categorical columns:", categorical_columns)
print("Numerical columns:", numerical_columns)

In [None]:
valid_genres = ['pop', 'country', 'blues', 'jazz', 'reggae', 'rock', 'hip hop', 'shoegaze']

merged_df = merged_df[merged_df['genre'].isin(valid_genres)]

print(merged_df['genre'].unique())

['pop' 'country' 'blues' 'jazz' 'reggae' 'rock' 'hip hop' 'shoegaze']


**Data Preprocessing and Model Training**

In [None]:
# Initialize Spark Session
spark = SparkSession.builder.appName("MusicGenreClassifications").getOrCreate()

stemmer = PorterStemmer()

def apply_stem(word):
    return stemmer.stem(word)

stem_udf = udf(apply_stem, StringType())

file_path = "/content/merged_dataset.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Cleanser: Remove punctuation & special characters, convert to lowercase
df = df.withColumn("lyrics", F.lower(F.col("lyrics")))
df = df.withColumn("lyrics", F.regexp_replace(F.col("lyrics"), "[^a-zA-Z\\s]", ""))

# Handle null or empty lyrics
df = df.withColumn("lyrics", when(col("lyrics").isNull(), "").otherwise(col("lyrics")))

# Index genre column
indexer_genre = StringIndexer(inputCol="genre", outputCol="genre_index")

# Tokenizer: Split lyrics into words
tokenizer = Tokenizer(inputCol="lyrics", outputCol="words")

# StopWordsRemover: Remove common stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Define a function to preprocess lyrics
def preprocess_lyrics(df):
  exploder = tokenizer.transform(df)
  exploder = remover.transform(exploder)
  exploder = exploder.withColumn("word", explode(col("filtered_words")))
  stemmer = exploder.withColumn("stemmed_word", stem_udf(F.col("word")))
  uniter = stemmer.groupBy("lyrics", "genre").agg(F.collect_list("stemmed_word").alias("processed_lyrics"))
  return uniter

df = preprocess_lyrics(df)

# Drop unnecessary columns
df = df.drop("id", "track_name", "artist_name", "len", "topic", "release_date")
numerical_columns = ['dating', 'violence', 'world/life', 'night/time',
                     'shake the audience', 'family/gospel', 'romantic', 'communication', 'obscene', 'music',
                     'movement/places', 'light/visual perceptions', 'family/spiritual', 'like/girls',
                     'sadness', 'feelings', 'danceability', 'loudness', 'acousticness', 'instrumentalness',
                     'valence', 'energy', 'age', '_c0']
df = df.drop(*numerical_columns)

# Verser: Convert processed lyrics into numerical representation
word2Vec = Word2Vec(vectorSize=300, minCount=1, inputCol="processed_lyrics", outputCol="lyrics_vector")

# Combine features
input_cols = ['lyrics_vector']

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# Logistic Regression Model
lr = LogisticRegression(
    labelCol="genre_index",
    featuresCol="features",
    elasticNetParam=0,
    threshold=1
)

# Build Pipeline
pipeline = Pipeline(stages=[indexer_genre, word2Vec, assembler, lr])

# Train-Test Split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Train Model
model = pipeline.fit(train_df)

In [None]:
model_save_path = '/content/drive/MyDrive/Colab Notebooks/Logistic Regression Model'

os.makedirs(os.path.dirname(model_save_path), exist_ok=True)

model.write().overwrite().save(model_save_path)
print(f"Model saved to {model_save_path}")

Model saved to /content/drive/MyDrive/Colab Notebooks/Logistic Regression Model


In [None]:
# Get the StringIndexer model from the pipeline
indexer_model = model.stages[0]

# Get the genre-to-index mapping
genre_mapping = dict(enumerate(indexer_model.labels))
print("Genre to Index Mapping:", genre_mapping)

**Testing and Evaluation**

In [None]:
# Predictions
predictions = model.transform(test_df)

# Convert Predictions to Genre Labels
genreIndexerModel = indexer_genre.fit(train_df)
indexToString = IndexToString(inputCol="prediction", outputCol="predicted_genre", labels=genreIndexerModel.labels)
predictions_with_genre = indexToString.transform(predictions)

# Show Final Predictions
predictions_with_genre.select("genre", "predicted_genre").show()

In [None]:
# Initialize the evaluator for accuracy and F1 score
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="genre_index", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="genre_index", predictionCol="prediction", metricName="f1")

if 'genre_index' in predictions_with_genre.columns:
    accuracy = evaluator_accuracy.evaluate(predictions_with_genre)
    f1_score = evaluator_f1.evaluate(predictions_with_genre)

    # Print the evaluation metrics
    print(f"Model Accuracy: {accuracy}")
    print(f"Model F1 Score: {f1_score}")
else:
    print("Error: genre_index column is missing in the predictions DataFrame.")

**Predict Genre for Lyrics**

In [None]:
# Prediction function with class labels and probabilities
def predict_genre_with_probabilities(model, input_lyrics):
    input_df = spark.createDataFrame([(input_lyrics,)], ["lyrics"])

    input_df = input_df.withColumn("lyrics", F.lower(F.col("lyrics")))
    input_df = input_df.withColumn("lyrics", F.regexp_replace(F.col("lyrics"), "[^a-zA-Z\\s]", ""))

    input_df = tokenizer.transform(input_df)
    input_df = remover.transform(input_df)
    input_df = input_df.withColumn("word", explode(F.col("filtered_words")))
    input_df = input_df.withColumn("stemmed_word", stem_udf(F.col("word")))
    input_df = input_df.groupBy("lyrics").agg(F.collect_list("stemmed_word").alias("processed_lyrics"))

    prediction_df = model.transform(input_df)

    prediction_df = prediction_df.select("probability")

    def extract_probabilities(probabilities):
        return probabilities.toArray().tolist()

    extract_probabilities_udf = udf(extract_probabilities, ArrayType(DoubleType()))

    prediction_df = prediction_df.withColumn("probabilities", extract_probabilities_udf(F.col("probability")))

    genre_classes = ['pop', 'country', 'blues', 'rock', 'jazz', 'reggae', 'hip hop']

    def map_genres(probabilities):
        return [(genre_classes[i], probabilities[i]) for i in range(len(probabilities))]

    map_genres_udf = udf(map_genres, ArrayType(ArrayType(StringType())))

    prediction_df = prediction_df.withColumn("genre_probabilities", map_genres_udf(F.col("probabilities")))

    return prediction_df.select("genre_probabilities").collect()


In [None]:
# Example of how to use the prediction function
input_lyrics = "My friend Goo has a real tattoo She always knows just what to do She looks through her hair like she doesn't careWhat she does best is stand and stare"
predictions = predict_genre_with_probabilities(model, input_lyrics)

# Display the predicted probabilities
for row in predictions:
    print(row)