In [1]:
# ETL --> EXTRACT HDF5 files into Spark dataframe
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.sql.types import *
import pandas as pd
import tables
import h5py

# Get list of all HDF5 files contained in the folder 
def build_list_file(root_folder):
  files = []
  if(LIMIT_SONG != -1):
    list_patch = dbutils.fs.ls(root_folder)[:LIMIT_SONG]
  else:
    list_patch = dbutils.fs.ls(root_folder)
  
  #Fetch file names
  for file in list_patch:
    files.append("/dbfs//FileStore/tables/songs/" + file[1])
  return files
    
def read_hdf5(path):
  # Fetch table objects through HDFStore
  store = pd.HDFStore(path, mode = 'r')
  metadata = store.get("metadata/songs")
  analysis = store.get("analysis/songs")
  year = store.get("musicbrainz/songs")

  # Fetch arrays through h5py metadata (+ encode strings which were encoded in an old format)
  h5 = h5py.File(path, 'r')
  artist_terms = pd.Series([[item.decode() for item in list(h5.get("metadata/artist_terms"))]], name = "artist_terms")
  similar_artists = pd.Series([[item.decode() for item in list(h5.get("metadata/similar_artists"))]], name = "similar_artists")
  
  # Fetch arrays by h5py - analysis (+ turn numpy numbers into float)
  bars_start = pd.Series([[float(item) for item in list(h5.get("/analysis/bars_start"))]], name="bars_start")
  beats_start = pd.Series([[float(item) for item in list(h5.get("/analysis/beats_start"))]], name="beats_start")
  sections_start =pd.Series([[float(item) for item in list(h5.get("/analysis/sections_start"))]], name="sections_start")
  segments_start = pd.Series([[float(item) for item in list(h5.get("/analysis/segments_start"))]], name="segments_start")
  tatums_start = pd.Series([[float(item) for item in list(h5.get("/analysis/tatums_start"))]], name="tatums_start")
  
  # THE FOLLOWING FIELDS HAVE BEEN EXCLUDED FROM THE ORIGINAL SONG FEATURES
  # EXCLUDED (confidence arrays): bars_confidence, beats_confidence, sections_confidence, segments_confidence, tatums_confidence 
  # EXCLUDED (segments metadata): segments_pitches, segments_timbre, segments_loudness_max, segments_loudness_max_time, segments_loudness_start
  # EXCLUDED (artist terms weights and frequency): artist_terms_freq, artist_terms_weight,
  
  # Merge outcomes
  song = pd.concat([metadata, analysis,year, artist_terms, similar_artists, bars_start,beats_start, sections_start, segments_start, tatums_start], axis=1, join='outer')
  
  # Delete some columns 
  song = song.drop(columns = ["idx_artist_terms", "idx_similar_artists", "audio_md5", "idx_bars_confidence","idx_bars_start", "idx_beats_confidence", "idx_beats_start", "idx_sections_confidence", "idx_sections_start", "idx_segments_confidence", "idx_segments_loudness_max", "idx_segments_loudness_max_time", "idx_segments_loudness_start",	"idx_segments_pitches", "idx_segments_start", "idx_segments_timbre", "idx_tatums_confidence", "idx_tatums_start", "time_signature_confidence", "analyzer_version", "key_confidence", "mode_confidence", "idx_artist_mbtags", "track_7digitalid", "release_7digitalid", "artist_playmeid","artist_mbid", "artist_7digitalid","analysis_sample_rate"])

  return song.values.tolist()

#-----------------------------------------------------------------------------------
ROOT_FOLDER = "/FileStore/tables/songs"
LIMIT_SONG = -1 #All songs

# GET: Spark context
spark = SparkSession.builder.appName("Milion Songs Dataset").getOrCreate()
sc = SparkContext.getOrCreate() 

# READ: HDF5 files (using RDD)
files_list = build_list_file(ROOT_FOLDER)
rdd = sc.parallelize(files_list).flatMap(lambda path: read_hdf5(path)) 

schema = StructType([
  StructField("artist_familiarity", DoubleType(), True),
  StructField("artist_hotttnesss", DoubleType(), True),
  StructField("artist_id", StringType(), True),
  StructField("artist_latitude", DoubleType(), True),
  StructField("artist_location", StringType(), True),
  StructField("artist_longitude", DoubleType(), True),
  StructField("artist_name", StringType(), True),
  StructField("genre", StringType(), True),
  StructField("release", StringType(), True),
  StructField("song_hotttnesss", DoubleType(), True),
  StructField("song_id", StringType(), True),
  StructField("title", StringType(), True),
  StructField("danceability", DoubleType(), True),
  StructField("duration", DoubleType(), True),
  StructField("end_of_fade_in", DoubleType(), True),
  StructField("energy", DoubleType(), True),
  StructField("key", LongType(), True),
  StructField("loudness", DoubleType(), True),
  StructField("mode", LongType(), True),
  StructField("start_of_fade_out", DoubleType(), True),
  StructField("tempo", DoubleType(), True),
  StructField("time_signature", LongType(), True),
  StructField("track_id", StringType(), True),
  StructField("year", LongType(), True),
  StructField("artist_terms", ArrayType(StringType()), True),
  StructField("similar_artists", ArrayType(StringType()), True),
  StructField("bars_start", ArrayType(DoubleType()), True),
  StructField("beats_start", ArrayType(DoubleType()), True),
  StructField("sections_start", ArrayType(DoubleType()), True),
  StructField("segments_start", ArrayType(DoubleType()), True),
  StructField("tatums_start", ArrayType(DoubleType()), True)])

# Create dataframe
df = spark.createDataFrame(rdd,schema) 
print("NUM_SONGS_LOADED: ", df.count())
display(df)

In [2]:
# ETL --> TRANSFORM, add and remove some columns
from pyspark.sql.functions import col, size, create_map, lit, concat, element_at, when
import numpy as np
from itertools import chain

#df = spark.sql("SELECT * FROM SONGS")

# ADD COLUMNS: Density, Fadiness and Genre
df=df.withColumn('density', size(df.segments_start)/df.duration)
df=df.withColumn('fadiness', df.end_of_fade_in+df.duration-df.start_of_fade_out)
df=df.withColumn('variability', size(df.sections_start))

# ADD COLUMN: Tonality (Key + Mode)
dict_mode={1:'maj', 0:'min'}
dict_key ={0:'DO',1:'DO#',2:'RE',3:'RE#',4:'MI',5:'FA',6:'FA#',7:'SOL',8:'SOL#',9:'LA',10:'LA#',11:'SI'}

mapping_expr_key = create_map([lit(x) for x in chain(*dict_key.items())])
mapping_expr_mode = create_map([lit(x) for x in chain(*dict_mode.items())])

df=df.withColumn("key_string", (mapping_expr_key.getItem(col("key"))))
df=df.withColumn('mode_string', (mapping_expr_mode.getItem(col("mode"))))
df=df.withColumn('tonality', concat(df.key_string,lit(' '),df.mode_string))

# Set some value to 0 when there's NaN value 
df=df.withColumn("song_hotttnesss", when(col("song_hotttnesss")=="NaN", 0).otherwise(col("song_hotttnesss")))
df=df.withColumn("artist_familiarity", when(col("artist_familiarity")=="NaN", 0).otherwise(col("artist_familiarity")))

# Final dataframe
included =["artist_name","artist_familiarity","artist_hotttnesss","artist_id","artist_latitude","artist_longitude","artist_location", "release","song_hotttnesss","key_string","mode_string","tonality","title","duration","loudness","tempo","track_id","year","density","fadiness","variability","similar_artists"]
songs = df.select(included)

In [3]:
# ETL: Transfrom (Join with another table) - Improve genre lables with "Tagtraum genre annotation table"
df = spark.sql("SELECT SONGS.*, GENRE.SEED_GENRE AS GENRE FROM SONGS LEFT JOIN genre ON SONGS.track_id=genre.track_id")
display(df)

In [4]:
# ETL --> LOAD and save dataframe as HIVE TABLE
df.write.mode('overwrite').format("parquet").saveAsTable("songs")
display(df)