In [228]:
# From:
#   https://hdfgroup.org/wp/2015/03/from-hdf5-datasets-to-apache-spark-rdds/
# Specialized to MIDS W205-2 (Fall 2015)

import os
import os.path
import h5py
import sys

mainRoot = os.path.expanduser('~/MIDS/mss/') 
mssRoot = os.path.expanduser('~/MIDS/mss/data/')
blocksRoot = os.path.expanduser('~/MIDS/mss/blocks/')
resultsRoot = os.path.expanduser('~/MIDS/mss/results/')

llss = os.listdir(mainRoot)

if blocksRoot.split('/')[-2] not in llss:
    os.mkdir(blocksRoot)
if resultsRoot.split('/')[-2] not in llss:
    os.mkdir(resultsRoot)
if mssRoot.split('/')[-2] not in llss:
    os.mkdir(mssRoot)
    
    

filesPerBlock = 100  # Gives about 30MB/file. Should use larger size in production.

minPartitions = 4 #minimum number of partitions for use by Spark


In [2]:
# Configure spark to work
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

#execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

In [3]:
def getFileNames(dataRoot, verbose = True):
	
	fileStructure = os.walk(dataRoot)
	
	h5Files = []
	for entry in fileStructure:
		thisPath = entry[0]
		thisFileList = entry[2]
		#print thisPath, thisFileList

		for ff in thisFileList:
			if ff.endswith('.h5'):
				h5Files.append(os.path.join(thisPath,ff))
				if verbose:
					print os.path.join(thisPath,ff)
	return h5Files

In [4]:
def buildReferenceCSV(dataRoot = None, publish = True, verbose = True):
	'''
	"Walks" the file tree and finds all of the HD5 files in the tree under dataRoot
	(assumes these are from the million song database)
	'''
	
	if not dataRoot:
		dataRoot = blocksRoot
    
	h5Files = getFileNames(dataRoot, verbose = verbose)

	if publish:
		csvFileName = os.path.join(dataRoot,'Hd5Extracts.csv')
		oFile = open(csvFileName, 'w')
        
	
	for ff in h5Files:

		pubString = ff
#		pubString += ', '
#		pubString += 'analysis, '
#		pubString += 'key \n'   # Key of the song, not key of the hash

		if publish:
			oFile.write(pubString + '\n')

		if verbose:
			print pubString

	return csvFileName

In [219]:
def nodeToDict(inputLine, verbose = False):
    '''
    This function is intended for use in Spark.
    Inputs a line (probably from the reference CSV)
    and extracts some important info 
    
    Returns a list of dictionaries, each dictionary has all "analysis" fields in the song.
    It's possible that all songs don't have exactly the same fields.
    '''
    import h5py
    parsedLine = inputLine.split(',')
    thisH5File = h5py.File(parsedLine[0])
    thisBlockSongList = thisH5File['songs']
    
    songInfo = []
    
    for thisSong in thisBlockSongList:
        dataSet = thisH5File['songs'][thisSong]['analysis']['songs']
        
        outDict = dict()
        
        for measurement in dataSet.dtype.names:
            outDict[measurement] = dataSet[measurement][0]
                                 
        #if verbose:
        #print outTuple
        songInfo.append(outDict)
        
    return songInfo


In [220]:
#  Process in Spark

from pyspark import SparkContext
from optparse import OptionParser
from datetime import datetime as dt
print 'Establishing Spark Context'

# Kill the context if it already exists
if 'sc' in dir():
    sc.stop()

sc = SparkContext(appName="SparkHDF5")

csvFileName = buildReferenceCSV(dataRoot = blocksRoot, verbose = False)

print 'Fetching all file names'
timeNow = dt.now().isoformat()
file_paths = sc.textFile(os.path.join(blocksRoot,'Hd5Extracts.csv'),minPartitions=minPartitions)

print 'Mapping RDD'
rdd = file_paths.flatMap(nodeToDict)
#rdd.take(1)
#print 'Results: ', rdd.take(1000)

results = rdd.take(1000)

sc.stop()




Establishing Spark Context
Fetching all file names
Mapping RDD


In [233]:
# Get unique column names
# and write results out as a csv

import csv

allParameters = []

#  This loop gets all keys (making this code not dependent on every song having exactly the same structure)
for song in res:
    [allParameters.append(measure) for measure in song.keys()]
    allParameters = list(set(allParameters))   # keep only unique measurement names 
    
#print len(allParameters),allParameters


with open(os.path.join(resultsRoot,'MSD_Flat.csv'), 'wb') as output_file:
    dict_writer = csv.DictWriter(output_file, allParameters)
    dict_writer.writeheader()
    dict_writer.writerows(results)

print '\n\n\tComplete.  CSV written to ', os.path.join(resultsRoot,'MSD_Flat.csv'), '\n\n'



	Complete.  CSV written to  /home/james/MIDS/mss/results/MSD_Flat.csv 


