In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=667837099f172bfcf397e93906f2879f62a5a0d22a418077a316655e56ba2767
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# Install findspark to locate Spark in the system
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
# Import findspark and initialize Spark
import findspark
findspark.init()

In [1]:
# Import PySpark
from pyspark.sql import SparkSession

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("song_lyrics_analysis").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 15:55:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [3]:
file_path = 'data/song_lyrics.csv'


In [4]:
# Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True, multiLine=True,
                    quote="\"", escape="\"")

# Show the first few rows of the DataFrame
df.show()

                                                                                

+--------------------+---+---------+----+------+--------------------+--------------------+---+-------------+-----------+--------+
|               title|tag|   artist|year| views|            features|              lyrics| id|language_cld3|language_ft|language|
+--------------------+---+---------+----+------+--------------------+--------------------+---+-------------+-----------+--------+
|           Killa Cam|rap|  Cam'ron|2004|173166|{"Cam\\'ron","Ope...|[Chorus: Opera St...|  1|           en|         en|      en|
|          Can I Live|rap|    JAY-Z|1996|468624|                  {}|[Produced by Irv ...|  3|           en|         en|      en|
|   Forgive Me Father|rap| Fabolous|2003|  4743|                  {}|Maybe cause I'm e...|  4|           en|         en|      en|
|        Down and Out|rap|  Cam'ron|2004|144404|{"Cam\\'ron","Kan...|[Produced by Kany...|  5|           en|         en|      en|
|              Fly In|rap|Lil Wayne|2005| 78271|                  {}|[Intro]\nSo they ...|

In [7]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# Define the condition for each column
conditions = [col(column) != "misc" for column in df.columns]

# Combine conditions using the AND operator
final_condition = conditions[0]
for condition in conditions[1:]:
    final_condition = final_condition & condition

# Apply the filter
music = df.filter(final_condition)


In [8]:
music.show()

+-----+---+------+----+-----+--------+------+---+-------------+-----------+--------+
|title|tag|artist|year|views|features|lyrics| id|language_cld3|language_ft|language|
+-----+---+------+----+-----+--------+------+---+-------------+-----------+--------+
+-----+---+------+----+-----+--------+------+---+-------------+-----------+--------+



In [11]:
import pyspark.sql.functions as F

music = music.withColumn('language_cld3', F.coalesce(music['language_cld3'], music['language_ft'], music['language']))


In [12]:
music = music.drop('language_ft', 'language')

In [13]:
from pyspark.sql.functions import col

# Drop rows with missing values in 'tag' and 'language_cld3' columns
music = music.na.drop(subset=['tag', 'language_cld3'])

# Drop columns 'language_ft' and 'language'
music = music.drop('language_ft', 'language')


In [None]:
num_rows = music.count()
num_cols = len(music.columns)

print((num_rows, num_cols))

In [None]:
from pyspark.sql.functions import col, sum as spark_sum

# Calculate the sum of null values for each column
null_counts = music.agg(*[spark_sum(col(c).isNull().cast("int")).alias(c) for c in music.columns])

# Show the result
null_counts.show()


In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# Define a UDF (User Defined Function) to count lines
def count_lines(text):
    return text.count("\n")

# Register the UDF
count_lines_udf = udf(count_lines, IntegerType())

# Create a new column 'no_of_lines' using the UDF
music = music.withColumn('no_of_lines', count_lines_udf(col('lyrics')))


In [None]:
from pyspark.sql.functions import col, regexp_replace

# Replace newline characters with spaces in the 'lyrics' column
music = music.withColumn('lyrics', regexp_replace(col('lyrics'), '\n', ' '))


In [None]:
from pyspark.sql.functions import col, regexp_replace

# Define a PySpark UDF (User Defined Function) to remove text inside brackets
def remove_text_inside_brackets_udf(input_string):
    # Define a regular expression pattern for matching text inside square brackets
    pattern = r'\[.*?\]'

    # Use regexp_replace() function to replace the matched pattern with an empty string
    result = re.sub(pattern, '', input_string)

    return result

# Register the UDF
remove_text_udf = udf(remove_text_inside_brackets_udf)

# Apply the UDF to the 'your_column' column and create a new column 'result'
music = music.withColumn('result', remove_text_udf(col('your_column')))


In [None]:
from pyspark.sql.functions import col, count

# Calculate the count of each unique value in the 'tag' column
tag_counts = music.groupBy('tag').agg(count('*').alias('count'))

# Calculate the percentage for each tag
tag_percentage = tag_counts.withColumn('percentage', (col('count') / music.count()) * 100)

# Show the result
tag_percentage.show()


In [None]:
from pyspark.sql.functions import col

# Define the conditions for filtering
conditions = (col("tag") == "rap") | (col("tag") == "pop") | (col("tag") == "rock")

# Apply the filter
subset = music.filter(conditions)


In [None]:
# Perform value counts on the 'artist' column
artist_counts = subset.groupBy('artist').count()

# Show the result
artist_counts.show()

In [None]:
# Filter out rows where 'artist' column contains 'genius' (case-insensitive)
subset_filtered = subset.filter(~col("artist").contains("genius", caseInsensitive=True))
subset_filtered.show()