In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import h5py
import os
from tqdm import tqdm
from copy import deepcopy
import numpy as np
import json
from typing import Dict, Any
import pandas as pd

In [3]:
PROJECT_PATH = os.getcwd().replace(os.path.basename(os.getcwd()), "")
DATA_PATH = os.path.join(PROJECT_PATH, "data")
SONGS_H5_PATH = os.path.join(DATA_PATH, "songs-h5-unpacked")
SONGS_FEATURES_PATH = os.path.join(DATA_PATH, "songs-features_processed")

In [4]:
if not os.path.exists(SONGS_H5_PATH):
    os.makedirs(SONGS_H5_PATH)

In [5]:
# unpack h5 files from nested directories
for root, dirs, files in tqdm(os.walk(DATA_PATH), leave=False):    
    for file in files:
        if file.endswith('.h5'):
            os.rename(os.path.join(root, file), os.path.join(SONGS_H5_PATH, file))

                          

In [6]:
def process_h5(
    h5: h5py._hl.files.File) -> Dict[str, Dict[str, np.ndarray]]:
    """Convert Yahoo million songs h5 file into Python dictionary."""
    l1 = dict()
    for k1, v1 in h5.items():
        l2 = dict()
        for k2, v2 in v1.items():
            v2 = np.array(v2)
            if (np.prod(v2.shape) == 1) & (v2.dtype.fields is not None):
                l3 = dict()
                for k3, v3 in zip(list(v2.dtype.fields.keys()), v2[0]):
                    l3[k3] = v3
                l2[k2] = l3
            else:
                l2[k2] = np.array(v2)
        l1[k1] = l2 
        
    return deepcopy(l1)

In [7]:
some_h5 = h5py.File(os.path.join(SONGS_H5_PATH, np.random.choice(os.listdir(SONGS_H5_PATH))), 'r')

In [8]:
processed_h5 = process_h5(some_h5)

In [9]:
def flatten_dict(dict_: Dict, prefix_sep = "__") -> Dict:
    """FLatten nested dictionary."""
    flattened_dict = dict()
    
    def flattener(dict_: Dict, parent_key=None):
        for k,v in dict_.items():
            if type(v) == dict:
                if parent_key is None:
                    flattener(v, k)
                else:
                    flattener(v, f"{parent_key}{prefix_sep}{k}")
            else:
                flattened_dict[f"{parent_key}{prefix_sep}{k}"] = v
                
    flattener(dict_)
    
    return deepcopy(flattened_dict)

In [10]:
flattened_h5 = flatten_dict(processed_h5)

In [11]:
len(flattened_h5)

74

In [12]:
keys_to_keep = []
for k, v in flattened_h5.items():
    if isinstance(v, (np.int32, np.float32, int, float)):
        keys_to_keep.append(k)
        print(k, v)

analysis__songs__analysis_sample_rate 22050
analysis__songs__danceability 0.0
analysis__songs__duration 409.91302
analysis__songs__end_of_fade_in 0.485
analysis__songs__energy 0.0
analysis__songs__idx_bars_confidence 0
analysis__songs__idx_bars_start 0
analysis__songs__idx_beats_confidence 0
analysis__songs__idx_beats_start 0
analysis__songs__idx_sections_confidence 0
analysis__songs__idx_sections_start 0
analysis__songs__idx_segments_confidence 0
analysis__songs__idx_segments_loudness_max 0
analysis__songs__idx_segments_loudness_max_time 0
analysis__songs__idx_segments_loudness_start 0
analysis__songs__idx_segments_pitches 0
analysis__songs__idx_segments_start 0
analysis__songs__idx_segments_timbre 0
analysis__songs__idx_tatums_confidence 0
analysis__songs__idx_tatums_start 0
analysis__songs__key 1
analysis__songs__key_confidence 0.342
analysis__songs__loudness -12.066
analysis__songs__mode 1
analysis__songs__mode_confidence 0.379
analysis__songs__start_of_fade_out 402.088
analysis__s

In [13]:
keys_to_keep = [
    'analysis__songs__analysis_sample_rate',
    'analysis__songs__danceability',
    'analysis__songs__duration',
    'analysis__songs__end_of_fade_in',
    'analysis__songs__energy',
    'analysis__songs__key',
    'analysis__songs__key_confidence',
    'analysis__songs__loudness',
    'analysis__songs__mode_confidence',
    'analysis__songs__start_of_fade_out',
    'analysis__songs__tempo',
    'analysis__songs__time_signature',
    'analysis__songs__time_signature_confidence',
    'metadata__songs__artist_familiarity',
    'metadata__songs__artist_hotttnesss',
    'metadata__songs__song_hotttnesss',
    'musicbrainz__songs__year']

In [14]:
if not os.path.exists(SONGS_FEATURES_PATH):
    os.makedirs(SONGS_FEATURES_PATH)

In [15]:
song_features = []
# size = 600
# parts = int(len(os.listdir(SONGS_H5_PATH))/size) 

# if len(os.listdir(SONGS_H5_PATH))//size > 0:
#     parts += 1

for i, file in tqdm(enumerate(os.listdir(SONGS_H5_PATH)), leave=False):
    h5 = h5py.File(os.path.join(SONGS_H5_PATH, file), "r")
    h5 = flatten_dict(process_h5(h5))
    h5 = {k:v for k,v in h5.items() if k in keys_to_keep}
    h5 = {k:v if not isinstance(v, np.int32) else int(v) for k,v in h5.items()}
    h5["file_id"] = os.path.basename(file).replace(".h5", "")    
    song_features.append(h5)
#     with open(os.path.join(SONGS_FEATURES_PATH, file.replace("h5", "json")), 'w') as fp:
#         json.dump(h5, fp)

                          

In [17]:
pd.DataFrame(song_features).to_csv(os.path.join(DATA_PATH, "song_features.csv"), index=False)