In [0]:
%pip install "boto3==1.21.32" "fsspec==2023.10.0" "s3fs==0.4.2"
%pip install "spark-nlp==5.1.4"
%pip install "textblob==0.17.1"

Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import os
# To work with Amazon S3 storage, set the following variables using your AWS Access Key and Secret Key
# Set the Region to where your files are stored in S3.
access_key = 'abc'
secret_key = 'abc'
# Set the environment variables so boto3 can pick them up later
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
encoded_secret_key = secret_key.replace("/", "%2F")
aws_region = "us-east-2"
# Set this to the name of your bucket where the files are stored
aws_bucket_name = "genius-lyrics-ss"
mount_name = "s3dataread"

In [0]:
# Update the Spark options to work with our AWS Credentials
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region +
".amazonaws.com")

In [0]:
from pyspark.sql import SparkSession
import pandas as pd 

spark = SparkSession.builder.appName("lyrics").getOrCreate()
# Path to Amazon S3 files
filepath = "s3://genius-lyrics-ss/landing/"
filename = 'song_lyrics.csv'



In [0]:
#To read my data, since my file is way too big for this cluster 

skip = 0
nrows = 10000
total_rows_to_read = 1000000

# Read in the first set of rows using Python Pandas
df = pd.read_csv(f"{filepath}{filename}", sep=',', skiprows=skip, nrows=nrows, encoding='utf-8')
# Get the Spark dataframe started
sdf = spark.createDataFrame(df)

# Increment the skip
skip = skip + nrows
while (skip < total_rows_to_read):
    df = pd.read_csv(f"{filepath}{filename}", sep=',', skiprows=skip, nrows=nrows, encoding='utf-8')
    # Convert to a spark dataframe and append (union) to the existing sdf
    sdf = sdf.union(spark.createDataFrame(df))
    # Increment the skip
    skip = skip + nrows



severe performance issues, see also https://github.com/dask/dask/issues/10276

To fix, you should specify a lower version bound on s3fs, or
update the current installation.



In [0]:
#importing all neccessary libraries
from pyspark.sql.functions import udf, col, lower
from pyspark.sql.types import StringType
import string

In [0]:
#Cleaning Lyrics 

def remove_punct(text):
    return "".join([char for char in text if char not in string.punctuation])

#registering udf 
remove_punct_udf = udf(remove_punct, StringType())

#using udf to the lyrics column & appending a new column with the cleaned lyrics
sdf = sdf.withColumn("lyrics_cleaned", remove_punct_udf(col("lyrics")))
sdf.select("lyrics", "lyrics_cleaned").show(truncate=100)
#apply udf to features 
sdf = sdf.withColumn("features", remove_punct_udf(col("features")))
sdf.show()

+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|                                                                                              lyrics|                                                                                      lyrics_cleaned|
+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|[Chorus: Opera Steve & Cam'ron]\nKilla Cam, Killa Cam, Cam\nKilla Cam, Killa Cam\nKilla Cam, Cam\...|Chorus Opera Steve  Camron\nKilla Cam Killa Cam Cam\nKilla Cam Killa Cam\nKilla Cam Cam\nKilla Ca...|
|[Produced by Irv Gotti]\n\n[Intro]\nYeah, hah, yeah, Roc-A-Fella\nWe invite you to somethin' epic...|Produced by Irv Gotti\n\nIntro\nYeah hah yeah RocAFella\nWe invite you to somethin

In [0]:
#Check for null values again to make sure I don't run into issues later
null_counts = [sdf.where(col(column).isNull()).count() for column in sdf.columns]

# Create a dictionary with column names and corresponding null counts
null_counts_dict = dict(zip(sdf.columns, null_counts))

# Display the null counts
for column, null_count in null_counts_dict.items():
    print(f"Column '{column}': {null_count} null values")

Column 'title': 2 null values
Column 'tag': 0 null values
Column 'artist': 0 null values
Column 'year': 0 null values
Column 'views': 0 null values
Column 'features': 0 null values
Column 'lyrics': 0 null values
Column 'id': 0 null values
Column 'language_cld3': 275 null values
Column 'language_ft': 613 null values
Column 'language': 913 null values
Column 'lyrics_cleaned': 0 null values


In [0]:
#lower my lyrics data
sdf = sdf.withColumn("lyrics_cleaned", lower(col("lyrics_cleaned")))
#dropping unneccesary columns for my feature engineering - dont need the two language columns, or the number of views
sdf = sdf.drop("language", "language_ft", "views") 
sdf = sdf.na.drop(subset=["title"])
#renaming tag to genre: 
sdf = sdf.withColumnRenamed("tag", "genre")
sdf = sdf.withColumnRenamed("language_cld3", "language")
sdf = sdf.na.fill({'language': 'unknown'})

In [0]:
#removes the new line characters in lyrics - '/n'
def remove_newline(text):
    return text.replace('\n', ' ')

#registering udf 
remove_newline_udf = udf(remove_newline, StringType())

#applying udf to lyrics (lyrics_cleaned)
sdf = sdf.withColumn("lyrics_cleaned", remove_newline_udf(col("lyrics_cleaned")))
#dropping lyrics since i made a new column that cleans it: 
sdf = sdf.drop("lyrics")
sdf.show()


+--------------------+-----+---------+----+--------------------+---+--------+--------------------+
|               title|genre|   artist|year|            features| id|language|      lyrics_cleaned|
+--------------------+-----+---------+----+--------------------+---+--------+--------------------+
|           Killa Cam|  rap|  Cam'ron|2004|   CamronOpera Steve|  1|      en|chorus opera stev...|
|          Can I Live|  rap|    JAY-Z|1996|                    |  3|      en|produced by irv g...|
|   Forgive Me Father|  rap| Fabolous|2003|                    |  4|      en|maybe cause im ea...|
|        Down and Out|  rap|  Cam'ron|2004|CamronKanye WestS...|  5|      en|produced by kanye...|
|              Fly In|  rap|Lil Wayne|2005|                    |  6|      en|intro so they ask...|
|      Lollipop Remix|  rap|Lil Wayne|2008|Kanye WestStatic ...|  7|      en|intro lil wayne h...|
|          Im Not You|  rap|   Clipse|2002|JadakissStyles PR...|  8|      en|intro pusha t no ...|
|         

In [0]:
output_filepath = "s3://genius-lyrics-ss/raw/"
# filename = 'song_lyrics_sample.csv'
output_filename = 'song_lyrics_2.parquet'
sdf.write.option("overwrite", True).parquet(f"{output_filepath}{output_filename}")