## Cluster : 
### 3 Node i3.xlarge cluster with v.10.3 (includes Apache Spark 3.2.1, Scala 2.12)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# Set parquet configuration to interpret binary byte array as string
spark = SparkSession.builder.config('spark.sql.parquet.binaryAsString', 'true') \
            .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1')\
            .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
            .config("spark.network.timeout", "36000000s")\
            .config("spark.executor.heartbeatInterval", "3600s")\
            .config("spark.dynamicAllocation.enabled", "false")\
            .appName("OMS") \
            .getOrCreate()


# Add configuration for accessing S3

In [0]:
aws_access_key = 'UPDATE'
aws_secret_key = 'UPDATE'
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

# Data Pre-processing:
### For simplicity, read files from S3 and join them to create an aggregate for this example.

In [0]:
# Read the genre dataset
genre_rdd = sc.textFile("s3://million-song-dataset-16/genre_data/msd_allmusic_genre.txt")

# Convert each line into list of values using the tab separator
genre_rdd = genre_rdd.map(lambda x: x.split('\t'))

# Replace '_' in genre values with ' '
genre_rdd = genre_rdd.map(lambda x: [x[0], x[1].replace('_', ' ')])

# Create schema for the genre rdd
schema = StructType([StructField("track_id", StringType(), False),\
    StructField("genre", StringType(), False)])

# Create genre data frame from the genre rdd
genre_df = spark.createDataFrame(genre_rdd, schema)

# Check dataframe
genre_df.display()

# Check count
genre_df.count()

track_id,genre
TRAAAAK128F9318786,Pop Rock
TRAAAAV128F421A322,Pop Rock
TRAAAAW128F429D538,Rap
TRAAABD128F429CF47,Pop Rock
TRAAACV128F423E09E,Pop Rock
TRAAADT12903CCC339,Easy Listening
TRAAAED128E0783FAB,Vocal
TRAAAEF128F4273421,Pop Rock
TRAAAEM128F93347B9,Electronic
TRAAAFD128F92F423A,Pop Rock


In [0]:
# Read million song dataset
songs_df = spark.read.parquet("s3://million-song-dataset-16/data")

# Select columns relevant to the genre detection task
selected_columns =['analysis_sample_rate','artist_id','artist_latitude','artist_location','artist_longitude','artist_name','danceability','duration','energy','key','loudness','mode','song_id','start_of_fade_out','tempo','time_signature','title','track_id','year']
songs_df2 =songs_df.select(*selected_columns)

# Cache new dataframe
songs_df2.cache()
songs_df2.count()

# Check dataframe
songs_df2.display()

analysis_sample_rate,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,danceability,duration,energy,key,loudness,mode,song_id,start_of_fade_out,tempo,time_signature,title,track_id,year
22050,ARDPB5U1187FB4A91D,35.14968,"Memphis, TN",-90.04892,Hank Crawford,0.0,281.23383,0.0,7,-12.023,1,SOTAVYY12A8C13A8D2,277.2,115.498,5,The Story (Remastered LP Version),TRJUHWO128F4276694,0
22050,ARKZ2UK1187B98D2D7,35.21962,North Carolina,-80.01955,Delicious,0.0,2443.33669,0.0,11,-7.855,0,SONTWBE12AB0187432,2443.337,0.0,7,Rue Saint Honoré #2008,TRJUHNQ128F933B7FE,0
22050,ARQ8CJ61187FB3DC9C,,"Boussard, LA",,Illinois Jacquet,0.0,412.26404,0.0,9,-22.945,1,SOHOURU12A8AE48CFC,405.507,61.185,5,'Round Midnight,TRJUHGJ128F42614A5,1969
22050,ARJ5JWV1187FB5CF25,32.67828,Georgia,-83.22295,Jay Vaquer,0.0,256.49587,0.0,6,-4.725,0,SOVBWMX12A8C13411D,242.95,100.037,4,Abismo (Under Doses),TRJUHBI128F4259534,0
22050,AR4IWYN1187FB47DF6,37.28848,"Campbell, CA",-121.94486,Strata,0.0,160.13016,0.0,11,-22.846,0,SONALUP12A8C142B49,145.508,76.291,1,Eight Pieces_ Op. 83_ Movement 2: Allegro Con Moto,TRJUHVT128F92DAC12,2004
22050,AROMORH1187FB50213,41.38572,barcelona,2.17005,Ojos De Brujo,0.0,625.37098,0.0,5,-12.332,0,SOMHYVC12AB017E104,592.149,112.324,3,Lluvia,TRJUHJV128F9307F50,2009
22050,ARGPEQT1241B9CCC23,,,,Comtron,0.0,2711.77098,0.0,10,-6.835,0,SOTCYLV12A6D4F7B8D,2667.613,124.025,4,Follow The Money Mix,TRJUHJD128F14A86C1,0
22050,AR2GAGN1187B9A07C9,40.71455,New York,-74.00712,Neikka RPM,0.0,219.95057,0.0,10,-4.246,0,SOOTJIU12AB018AD48,215.028,141.959,4,Warped (Terror Mix) [feat. Leaether Strip],TRJUHGZ12903CE31B7,0
22050,ARSCEVG11F50C48161,,,,Gregory Miller,0.0,246.72608,0.0,0,-28.711,0,SOPWZNQ12AB0180B95,228.792,62.62,3,Arthur Frackenpohl: Largo and Allegro: Largo,TRJUHXD128F9333AB1,0
22050,ARKELMP1187FB5358C,,,,Total Eclipse,0.0,477.75302,0.0,2,-12.002,1,SOQSRNH12AC468D478,461.746,96.71,3,Alternatif,TRJUHJI12903CF5340,0


In [0]:
# # Join genre dataframe with songs dataframe
joined_df = songs_df2.join(genre_df, on='track_id', how='inner' )

# Cache joined dataframe
joined_df.cache()

# Check dataframe
joined_df.display()

# Check count
joined_df.count()

track_id,analysis_sample_rate,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,danceability,duration,energy,key,loudness,mode,song_id,start_of_fade_out,tempo,time_signature,title,year,genre
TRAADQX128F422B4CF,22050,ARRJNTE1187FB44E12,,,,Casiotone For The Painfully Alone,0.0,175.72526,0.0,0,-10.93,1,SOTUNMH12A6D4FA3E6,168.699,141.919,4,Nashville Parthenon,2006,Pop Rock
TRAAFTE128F429545F,22050,AROVU6Z1187B9AE74E,,,,The Corrs,0.0,222.17098,0.0,7,-7.664,1,SOCHGUG12A58A7E184,205.868,95.474,4,Love Gives Love Takes ( LP Version ),1997,Pop Rock
TRAAKAG128F4275D2A,22050,ARD1X7Y1187B9A671F,,,,Soulfly,0.0,110.13179,0.0,10,-12.308,0,SOVDWHH12A8C13B42A,108.037,127.395,3,Sultao Das Matas (Album Version),0,Pop Rock
TRAAMRO128F92F20D7,22050,ARE4SDM1187FB4D7E4,45.51228,"Montreal, Quebec, Canada",-73.55439,David Wilcox,0.0,139.72853,0.0,3,-11.256,0,SOKNGDE12AB017CA4D,127.147,110.041,1,Step Into Your Skin,0,Folk
TRABHVL12903CEA1E2,22050,ARMZWLZ1187B98963C,,,,Jet Set Satellite,0.0,202.65751,0.0,10,-3.992,1,SOTJBAF12AC468B3C3,202.658,130.054,4,The Best Way To Die,2001,Pop Rock
TRABIOI12903CD8B9B,22050,ARJ9B1V1187FB4DD1F,,Chicago IL,,Kid Sister,0.0,277.2371,0.0,11,-3.573,0,SOFDHHB12AB0187788,272.881,140.026,4,Daydreaming [Jakwob Remix],2009,Pop Rock
TRACBHR12903CA7C2B,22050,ARN1XGZ1187B9B956E,,,,GORGOROTH,0.0,205.53098,0.0,0,-3.812,1,SONCRBB12AB01890A7,203.366,143.242,4,Ritual,1993,Pop Rock
TRACUFS128F42AB248,22050,ARPOBPN1187B9ADA45,,Michigan,,George Winston,0.0,231.1571,0.0,10,-19.862,1,SOERICN12AF72A4077,211.203,103.57,3,Frangenti,1999,New Age
TRADDHV128F4290058,22050,ARLGWY81187B9ADEE2,,,,Le Vibrazioni,0.0,390.922,0.0,2,-6.026,1,SOVESSN12A8C138D11,372.5,139.98,4,Il Compositore Di Nuvole,2003,Pop Rock
TRADGMK128F149DAB7,22050,ARBUPLW1187FB49B3C,39.95227,"Philadelphia, PA",-75.16237,Michael Brecker,0.0,458.86649,0.0,10,-12.726,0,SONZJGK12A6D4FBA14,444.68,91.89,1,Beau Rivage,1996,Jazz


In [0]:
joined_df_ = joined_df.limit(5)

In [0]:
# joined_df.write.mode("overwrite").saveAsTable("default.genre_joined")

# Connect to MongoDB
## Store aggregates in the database and re-read for machine learning later

In [0]:
database = 'oms'
collection = 'song_genre'
user_name = 'oms'
password = 'oms'
address = 'oms-cluster.0navm.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
connection_string

In [0]:
# joined_df.write.format("mongo").option("uri",connection_string).mode("overwrite").save()

In [0]:
# df = spark.read.format("mongo").option("uri",connection_string).load()

In [0]:
df = spark.read.table("default.genre_joined")
df_yr_decade = spark.read.table("default.song_popularity").select('year', 'song_id')
lyrics_embeddings = spark.read.table("default.kmeans_emb_10").select('track_id', 'result')
df = df.drop('year')
df_yr = df.join(df_yr_decade, 'song_id')
df_yr_ly = df_yr.join(lyrics_embeddings, 'track_id')

In [0]:
df_yr_ly.cache()
df_yr_ly.count()

In [0]:
from pyspark.ml.functions import vector_to_array

In [0]:
df_yr_ly = df_yr_ly.withColumn("lyrics_embedding", vector_to_array("result")).drop('result')

In [0]:
df_yr_ly.write.format("mongo").option("uri",connection_string).mode("overwrite").save()