In [4]:
from pyspark.sql import SparkSession
import dask.dataframe as dd

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[2]") \
    .config("spark.driver.memory", "20g").getOrCreate()

df = dd.read_csv('song_lyrics.csv')
df.to_parquet('test.parquet')

df = spark.read.parquet("test.parquet")
df.printSchema()

ParserError: Error tokenizing data. C error: Expected 11 fields in line 17, saw 13


In [2]:
df.show()

+--------------------+--------------------+--------------------+----+------+---------------+-----------------+--------------------+-------------+-----------+--------+
|                 _c0|                 _c1|                 _c2| _c3|   _c4|            _c5|              _c6|                 _c7|          _c8|        _c9|    _c10|
+--------------------+--------------------+--------------------+----+------+---------------+-----------------+--------------------+-------------+-----------+--------+
|               title|                 tag|              artist|year| views|       features|           lyrics|                  id|language_cld3|language_ft|language|
|           Killa Cam|                 rap|             Cam'ron|2004|173166|"{""Cam\\'ron""|""Opera Steve""}"|[Chorus: Opera St...|         null|       null|    null|
|           Killa Cam|           Killa Cam|                 Cam|null|  null|           null|             null|                null|         null|       null|    null

In [3]:
df.select("lyrics").show(1, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
df.filter(df.year.between(1960,2023) & (df.language == 'en')).count()

3338537

In [5]:
df.filter(df.title.isNull()).count()

165

In [6]:
df.na.drop(subset=["title"]).count()

5134691

In [2]:
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(IntegerType())
def decade(year):
    return int(math.trunc(year / 10) * 10)

In [None]:

# decadeUDf = udf(lambda z: decade(z), IntegerType())

In [3]:

df = df.filter(df.year.between(1960,2023) & (df.language == 'en') &
               (df.artist != "Genius English Translations")) \
    .withColumn("decade", decade(df.year)) \
    .na.drop(subset=["title"])
    

In [4]:
df.filter(df.artist == 'Genius English Translations').count()

                                                                                

0

In [5]:
from pyspark.sql.functions import min

n_sample = df.groupBy("decade").count().select(min("count")).collect()[0]["min(count)"]

                                                                                

In [6]:
from pyspark.sql.functions import col

frac = df.groupBy("decade").count().withColumn("required_n", n_sample / col("count")).drop("count").rdd.collectAsMap()
print(frac)



{1990: 0.18922084741426898, 2020: 0.058177432752782476, 1960: 1.0, 1970: 0.6472620089347395, 1980: 0.4341853232549997, 2000: 0.09333864647672178, 2010: 0.024496668471680267}


                                                                                

In [7]:
sample = df.stat.sampleBy("decade", frac, 42)
sample.groupBy("decade").count().show()



+------+-----+
|decade|count|
+------+-----+
|  1990|42238|
|  2020|42035|
|  1960|42162|
|  1970|42439|
|  1980|41910|
|  2000|42073|
|  2010|42007|
+------+-----+



                                                                                

In [8]:
sample.count()

                                                                                

294864

In [9]:

sample.filter(sample.artist != 'Genius English Translations').count() + sample.filter(sample.artist == 'Genius English Translations').count()

                                                                                

294864

In [12]:
result.coalesce(1).write.format("csv") \
            .option("header",True) \
            .mode('overwrite') \
            .save('data/result.csv')

                                                                                

In [6]:
df_res.to_csv('result.csv')

In [7]:
import pandas as pd

res = pd.read_csv('result.csv')

In [9]:
res.head()

Unnamed: 0.1,Unnamed: 0,title,tag,artist,year,views,features,lyrics,id,language_cld3,language_ft,language,decade
0,0,Forgive Me Father,rap,Fabolous,2003,4743,{},Maybe cause I'm eatin\nAnd these bastards fien...,4,en,en,en,2000
1,1,Lord You Know,rap,Cam'ron,2004,11882,"{""Cam\\'ron"",""Juelz Santana"",Jaheim}","[Chorus: Jaheim]\nNow Lord you know, just how ...",11,en,en,en,2000
2,2,Its Hot Some Like It Hot,rap,JAY-Z,1999,103549,{},"[Produced by Timbaland]\n\n[Verse 1]\nYo, show...",18,en,en,en,1990
3,3,Time,rap,AZ,2004,7577,"{Nas,Nature}","[Verse 1: Nature]\nAt the ripe age of 24, I do...",8717,en,en,en,2000
4,4,Barry Bonds,rap,Kanye West,2007,280626,"{""Lil Wayne""}",[Verse 1: Kanye West]\nIt's what you all been ...,38,en,en,en,2000
