In [1]:
import h5py
import sys
import os
import glob
import csv
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from hdfs import InsecureClient

Go through the base path and find all the .h5 files that are present there then save them to a csv file in order to load them later into spark

TODO: this actually needs to be reworked to create a table based on the hdfs file directory instead of local, however it doesn't matter as long as we don't change the files just run the code and pretend it does the right thing for now

In [2]:
basedir = '../MillionSongSubset/data'
os.listdir(basedir)

['A', 'B', 'files.csv']

In [3]:
ext='.h5'
all_files = []
for root, dirs, files in os.walk(basedir):
    files = glob.glob(os.path.join(root,'*'+ext))
    all_files.append(files)

flat_list = [item for sublist in all_files for item in sublist]

In [4]:
file = '/files.csv'
with open(basedir + file, 'w', newline='') as myfile:
    for line in flat_list:
        myfile.write(line)
        myfile.write('\n')

Set up the spark application

In [5]:
# calculate the number of partitions we want for the rdds
num_nodes = 3
num_rep = 2 * num_nodes

conf = (SparkConf()
   .setMaster("spark://192.168.2.110:7077")
   .setAppName("Group14")
   .set("spark.executor.cores", 2)
   .set("spark.pytspark.python","python3"))

sc = SparkContext(conf = conf)

In [6]:
sqlContext = SQLContext(sc)

code to convert filenames to actual file content

In [20]:
# good methods to explore the h5 inread: .keys(), .attrs.items() see documentation of h5py
import tables




def get_title(file, idx=0):
    return file['metadata']['songs']['title'][idx].decode("utf-8")

def get_artist_name(file, idx=0):
    return file['metadata']['songs']['artist_name'][idx].decode("utf-8")


def get_artist_familiarity(h5,songidx=0):
    """
    Get artist familiarity from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_familiarity'][songidx].astype("utf-8")

def get_artist_hotttnesss(h5,songidx=0):
    """
    Get artist hotttnesss from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_hotttnesss'][songidx].decode("utf-8")


def get_artist_id(h5,songidx=0):
    """
    Get artist id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_id'][songidx].decode("utf-8")

def get_artist_mbid(h5,songidx=0):
    """
    Get artist musibrainz id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_mbid'][songidx].decode("utf-8")

def get_artist_playmeid(h5,songidx=0):
    """
    Get artist playme id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_playmeid'][songidx].decode("utf-8")

def get_artist_7digitalid(h5,songidx=0):
    """
    Get artist 7digital id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_7digitalid'][songidx].decode("utf-8")

def get_artist_latitude(h5,songidx=0):
    """
    Get artist latitude from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_latitude'][songidx].decode("utf-8")

def get_artist_longitude(h5,songidx=0):
    """
    Get artist longitude from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_longitude'][songidx].decode("utf-8")

def get_artist_location(h5,songidx=0):
    """
    Get artist location from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['artist_location'][songidx].decode("utf-8")


def get_release(h5,songidx=0):
    """
    Get release from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['release'][songidx].decode("utf-8")

def get_release_7digitalid(h5,songidx=0):
    """
    Get release 7digital id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['release_7ditgitalid'][songidx].decode("utf-8")

def get_song_id(h5,songidx=0):
    """
    Get song id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['song_id'][songidx].decode("utf-8")

def get_song_hotttnesss(h5,songidx=0):
    """
    Get song hotttnesss from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['song_hotnesss'][songidx].decode("utf-8")


def get_track_7digitalid(h5,songidx=0):
    """
    Get track 7digital id from a HDF5 song file, by default the first song in it
    """
    return h5['metadata']['songs']['track_7ditgitalid'][songidx].decode("utf-8")

def get_similar_artists(h5,songidx=0):
    """
    Get similar artists array. Takes care of the proper indexing if we are in aggregate
    file. By default, return the array for the first song in the h5 file.
    To get a regular numpy ndarray, cast the result to: numpy.array( )
    """
    if h5.root.metadata.songs.nrows == songidx + 1:
        return h5.root.metadata.similar_artists[h5.root.metadata.songs.cols.idx_similar_artists[songidx]:]
    return h5.root.metadata.similar_artists[h5.root.metadata.songs.cols.idx_similar_artists[songidx]:
                                            h5.root.metadata.songs.cols.idx_similar_artists[songidx+1]]

def get_artist_terms(h5,songidx=0):
    """
    Get artist terms array. Takes care of the proper indexing if we are in aggregate
    file. By default, return the array for the first song in the h5 file.
    To get a regular numpy ndarray, cast the result to: numpy.array( )
    """
    if h5.root.metadata.songs.nrows == songidx + 1:
        return h5.root.metadata.artist_terms[h5.root.metadata.songs.cols.idx_artist_terms[songidx]:]
    return h5.root.metadata.artist_terms[h5.root.metadata.songs.cols.idx_artist_terms[songidx]:
                                            h5.root.metadata.songs.cols.idx_artist_terms[songidx+1]]

def get_artist_terms_freq(h5,songidx=0):
    """
    Get artist terms array frequencies. Takes care of the proper indexing if we are in aggregate
    file. By default, return the array for the first song in the h5 file.
    To get a regular numpy ndarray, cast the result to: numpy.array( )
    """
    if h5.root.metadata.songs.nrows == songidx + 1:
        return h5.root.metadata.artist_terms_freq[h5.root.metadata.songs.cols.idx_artist_terms[songidx]:]
    return h5.root.metadata.artist_terms_freq[h5.root.metadata.songs.cols.idx_artist_terms[songidx]:
                                              h5.root.metadata.songs.cols.idx_artist_terms[songidx+1]]

def get_artist_terms_weight(h5,songidx=0):
    """
    Get artist terms array frequencies. Takes care of the proper indexing if we are in aggregate
    file. By default, return the array for the first song in the h5 file.
    To get a regular numpy ndarray, cast the result to: numpy.array( )
    """
    if h5.root.metadata.songs.nrows == songidx + 1:
        return h5.root.metadata.artist_terms_weight[h5.root.metadata.songs.cols.idx_artist_terms[songidx]:]
    return h5.root.metadata.artist_terms_weight[h5.root.metadata.songs.cols.idx_artist_terms[songidx]:
                                                h5.root.metadata.songs.cols.idx_artist_terms[songidx+1]]

def get_analysis_sample_rate(h5,songidx=0):
    """
    Get analysis sample rate from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['analysis_sample_rate'][songidx].decode("utf-8")


def get_audio_md5(h5,songidx=0):
    """
    Get audio MD5 from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['audio_md5'][songidx].decode("utf-8")


def get_danceability(h5,songidx=0):
    """
    Get danceability from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['danceability'][songidx].decode("utf-8")


def get_duration(h5,songidx=0):
    """
    Get duration from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['duration'][songidx].decode("utf-8")


def get_end_of_fade_in(h5,songidx=0):
    """
    Get end of fade in from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['end_of_fade_in'][songidx].decode("utf-8")


def get_energy(h5,songidx=0):
    """
    Get energy from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['energy'][songidx].decode("utf-8")


def get_key(h5,songidx=0):
    """
    Get key from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['key'][songidx].decode("utf-8")


def get_key_confidence(h5,songidx=0):
    """
    Get key confidence from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['key_confidence'][songidx].decode("utf-8")


def get_loudness(h5,songidx=0):
    """
    Get loudness from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['loudness'][songidx].decode("utf-8")


def get_mode(h5,songidx=0):
    """
    Get mode from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['mode'][songidx].decode("utf-8")


def get_mode_confidence(h5,songidx=0):
    """
    Get mode confidence from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['mode_confidence'][songidx].decode("utf-8")


def get_start_of_fade_out(h5,songidx=0):
    """
    Get start of fade out from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['start_of_fade_out'][songidx].decode("utf-8")


def get_tempo(h5,songidx=0):
    """
    Get tempo from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['tempo'][songidx].decode("utf-8")

def get_time_signature(h5,songidx=0):
    """
    Get signature from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['time_signature'][songidx].decode("utf-8")

def get_time_signature_confidence(h5,songidx=0):
    """
    Get signature confidence from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['time_signature_confidence'][songidx].decode("utf-8")

def get_track_id(h5,songidx=0):
    """
    Get track id from a HDF5 song file, by default the first song in it
    """
    return h5['analysis']['songs']['track_id'][songidx].decode("utf-8")

def get_year(h5,songidx=0):
    """
    Get release year from a HDF5 song file, by default the first song in it
    """
    return h5['musicbrainz']['songs']['year'][songidx].decode("utf-8")



# todo: implement functions to get other attributes here!!

In [21]:
# import hdf5_getters
import tables
import requests
import io

# right now only mapping for the title needs to be changed to extract all the other features
def f(x):
    # set up the url where the data is stored (yes it's ugly)
    base_hdfs = '/LDSA/data'
    url = 'http://192.168.2.110:50070/webhdfs/v1'
    op = '?op=OPEN'
    full_path = url + base_hdfs + x[2:] + op
    # read in song file from hdfs api and then extract the wanted attributes
    with h5py.File(io.BytesIO(requests.get(url = full_path).content), 'r') as f:
        song = {}
        song['artist_name'] = get_artist_name(f)
        song['title'] = get_title(f)
        song['artist_familiarity'] = get_artist_familiarity(f)
        song['artist_hotttnesss'] = get_artist_hotttnesss(f)
        song['artist_id'] = get_artist_id(f)
        song['artist_mbid'] = get_artist_mbid(f)
        song['artist_playmeid'] = get_artist_playmeid(f)
        song['artist_7digitalid'] = get_artist_7digitalid(f)
        song['artist_latitude'] = get_artist_latitude(f)
        song['artist_longitude'] = get_artist_longitude(f)
        song['artist_location'] = get_artist_location(f)
        song['release'] = get_release(f)
        song['release_7ditgitalid'] = get_release_7ditgitalid(f)
        song['song_id'] = get_song_id(f)
        song['song_hotnesss'] = get_song_hotnesss(f)
        song['track_7ditgitalid'] = get_track_7ditgitalid(f)
        song['analysis_sample_rate'] = get_analysis_sample_rate(f)
        song['audio_md5'] = get_audio_md5(f)
        song['danceability'] = get_danceability(f)
        song['duration'] = get_duration(f)
        song['end_of_fade_in'] = get_end_of_fade_in(f)
        song['energy'] = get_energy(f)
        song['key'] = get_key(f)
        song['key_confidence'] = get_key_confidence(f)
        song['loudness'] = get_loudness(f)
        song['mode'] = get_mode(f)
        song['mode_confidence'] = get_mode_confidence(f)
        song['start_of_fade_out'] = get_start_of_fade_out(f)
        song['tempo'] = get_tempo(f)
        song['time_signature'] = get_time_signature(f)
        song['time_signature_confidence'] = get_time_signature_confidence(f)
        song['track_id'] = get_track_id(f)
        song['year'] = get_year(f)


# todo: implement functions to get other attributes here!!
        
        
        #song.append(get_title(f))
        #song.append(get_artist_name(f))
        
    return song

In [22]:
# todo: set the number of partitions here corectly as calculated above in order to maintain speed
file_paths = sc.textFile(basedir + file).map(lambda x: f(x))

In [23]:
file_paths.take(1)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 14, 192.168.2.110, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-22-e6883b9e2aaf>", line 2, in <lambda>
  File "<ipython-input-21-942d87e74035>", line 18, in f
  File "<ipython-input-20-fb1c21e741ea>", line 18, in get_artist_familiarity
TypeError: data type "utf-8" not understood

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/.local/lib/python3.6/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-22-e6883b9e2aaf>", line 2, in <lambda>
  File "<ipython-input-21-942d87e74035>", line 18, in f
  File "<ipython-input-20-fb1c21e741ea>", line 18, in get_artist_familiarity
TypeError: data type "utf-8" not understood

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


convert the rdd (api version 1) to a dataframe (api version 2)

In [None]:
# todo: add the needed attributes here that we need..
df = sqlContext.createDataFrame(file_paths, schema=['title', 'artist_name'])

In [None]:
df.printSchema()

start with the data processing

In [None]:
df.show()

In [None]:
sc.stop()