In [1]:
import h5py
import os
import numpy as np

def list_h5_contents(h5_file_path):
    """
    Generically lists the contents of an HDF5 file, including groups, datasets,
    and the content of small, text-based datasets.
    Args:
        h5_file_path (str): The path to the .h5 file.
    """
    if not os.path.exists(h5_file_path):
        print(f"Error: File not found at '{h5_file_path}'")
        return
    try:
        with h5py.File(h5_file_path, 'r') as h5_file:
            print(f"Contents of '{h5_file_path}':")
            
            def print_structure(name, obj):
                """Callback function to print the structure of the HDF5 file."""
                indent = '  ' * name.count('/')
                if isinstance(obj, h5py.Dataset):
                    print(f"{indent}- Dataset: {os.path.basename(name)} (Shape: {obj.shape}, Dtype: {obj.dtype})")
                    
                    # --- Check if the dataset contains short text ---
                    # Heuristic: Check if dtype is string-like and if it's a scalar or small array.
                    is_string_like = h5py.check_string_dtype(obj.dtype) is not None
                    is_small = obj.size < 10 and obj.ndim <= 1

                    if is_string_like and is_small:
                        try:
                            # Read the data from the dataset
                            data = obj[()]
                            # Decode if it's in bytes (common in HDF5)
                            if isinstance(data, bytes):
                                value = data.decode('utf-8', 'ignore')
                            elif isinstance(data, np.ndarray) and data.size > 0:
                                # Handle array of strings/bytes
                                value = [d.decode('utf-8', 'ignore') if isinstance(d, bytes) else str(d) for d in data]
                            else:
                                value = str(data)
                            print(f"{indent}  Content: {value}")
                        except Exception as e:
                            print(f"{indent}  (Could not read content: {e})")

                elif isinstance(obj, h5py.Group):
                    print(f"{indent}- Group: {os.path.basename(name) or '/'}")

            h5_file.visititems(print_structure)
            
    except Exception as e:
        print(f"An error occurred while reading the H5 file: {e}")

# --- Example Usage ---
# NOTE: You must first extract the 'lmd_matched_h5.tar.gz' archive.
# The path should point to an actual .h5 file on your disk.

# Create a placeholder for the path to the H5 dataset directory
LMD_H5_DIR = os.path.join('data_sets', 'lmd_matched_h5')
example_h5_file = os.path.join(LMD_H5_DIR, 'A', 'A', 'A', 'TRAAAGR128F425B14B.h5')

# Check if the example file exists before trying to read it
if os.path.exists(example_h5_file):
    list_h5_contents(example_h5_file)
else:
    print("Example H5 file not found. Please ensure 'lmd_matched_h5.tar.gz' is extracted")
    print(f"and the file exists at: {example_h5_file}")

Contents of 'data_sets\lmd_matched_h5\A\A\A\TRAAAGR128F425B14B.h5':
- Group: analysis
  - Dataset: bars_confidence (Shape: (123,), Dtype: float64)
  - Dataset: bars_start (Shape: (123,), Dtype: float64)
  - Dataset: beats_confidence (Shape: (497,), Dtype: float64)
  - Dataset: beats_start (Shape: (497,), Dtype: float64)
  - Dataset: sections_confidence (Shape: (8,), Dtype: float64)
  - Dataset: sections_start (Shape: (8,), Dtype: float64)
  - Dataset: segments_confidence (Shape: (940,), Dtype: float64)
  - Dataset: segments_loudness_max (Shape: (940,), Dtype: float64)
  - Dataset: segments_loudness_max_time (Shape: (940,), Dtype: float64)
  - Dataset: segments_loudness_start (Shape: (940,), Dtype: float64)
  - Dataset: segments_pitches (Shape: (940, 12), Dtype: float64)
  - Dataset: segments_start (Shape: (940,), Dtype: float64)
  - Dataset: segments_timbre (Shape: (940, 12), Dtype: float64)
  - Dataset: songs (Shape: (1,), Dtype: [('analysis_sample_rate', '<i4'), ('audio_md5', 'S32'),

In [3]:
import requests
import os

# --- Download the Million Song Dataset Metadata Database ---

MSD_METADATA_URL = 'http://millionsongdataset.com/sites/default/files/AdditionalFiles/track_metadata.db'
DB_DIR = 'data_sets'
DB_PATH = os.path.join(DB_DIR, 'track_metadata.db')

# Create the directory if it doesn't exist
os.makedirs(DB_DIR, exist_ok=True)

# Check if the database file already exists
if not os.path.exists(DB_PATH):
    print(f"Downloading Million Song Dataset metadata from: {MSD_METADATA_URL}")
    
    try:
        response = requests.get(MSD_METADATA_URL, stream=True)
        response.raise_for_status()

        # Write the content directly to the database file
        with open(DB_PATH, 'wb') as f:
            print(f"Saving 'track_metadata.db' to '{DB_PATH}'...")
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        if os.path.exists(DB_PATH):
            print("Successfully downloaded 'track_metadata.db'.")
        else:
            print("Error: Download failed, 'track_metadata.db' not found.")

    except requests.exceptions.RequestException as e:
        print(f"Error: Could not download the metadata database. An error occurred: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
else:
    print(f"Million Song Dataset metadata database already exists at: {DB_PATH}")



Million Song Dataset metadata database already exists at: data_sets\track_metadata.db


In [4]:
import sqlite3
import os

def create_annotations_database(source_db_path, new_db_path):
    """
    Extracts relevant song data from the MSD database into a new, indexed database.

    Args:
        source_db_path (str): Path to the source 'track_metadata.db'.
        new_db_path (str): Path for the new SQLite database to be created.
    """
    if not os.path.exists(source_db_path):
        print(f"Error: Source database not found at '{source_db_path}'")
        return

    print(f"Creating new annotations database at '{new_db_path}'...")

    # Connect to both databases
    source_conn = None
    new_conn = None
    try:
        source_conn = sqlite3.connect(source_db_path)
        new_conn = sqlite3.connect(new_db_path)
        
        source_cursor = source_conn.cursor()
        new_cursor = new_conn.cursor()

        # 1. Create the new table
        print("Creating 'annotations' table with indexes...")
        new_cursor.execute('''
            CREATE TABLE IF NOT EXISTS annotations (
                track_id TEXT PRIMARY KEY,
                title TEXT NOT NULL,
                artist_name TEXT NOT NULL,
                year INTEGER
            )
        ''')

        # 2. Create indexes for efficient searching
        new_cursor.execute('CREATE INDEX IF NOT EXISTS idx_title ON annotations (title)')
        new_cursor.execute('CREATE INDEX IF NOT EXISTS idx_artist_name ON annotations (artist_name)')
        new_cursor.execute('CREATE INDEX IF NOT EXISTS idx_artist_title ON annotations (artist_name, title)')
        
        # 3. Extract data from the source table
        print("Extracting data from source database...")
        source_cursor.execute('SELECT track_id, title, artist_name, year FROM songs')

        # 4. Insert data into the new table in batches for efficiency
        batch_size = 10000
        while True:
            rows = source_cursor.fetchmany(batch_size)
            if not rows:
                break
            new_cursor.executemany('''
                INSERT OR IGNORE INTO annotations (track_id, title, artist_name, year)
                VALUES (?, ?, ?, ?)
            ''', rows)
            new_conn.commit()
        
        print("Data extraction and insertion complete.")

        # Verify the number of rows in the new table
        count = new_cursor.execute('SELECT COUNT(*) FROM annotations').fetchone()[0]
        print(f"Successfully created '{new_db_path}' with {count} records.")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if source_conn:
            source_conn.close()
        if new_conn:
            new_conn.close()

# --- Execution ---
SOURCE_DB = os.path.join('data_sets', 'track_metadata.db')
NEW_DB = os.path.join('data_sets', 'midi_annotations.db')

# Run the function to create and populate the new database
create_annotations_database(SOURCE_DB, NEW_DB)

Creating new annotations database at 'data_sets\midi_annotations.db'...
Creating 'annotations' table with indexes...
Extracting data from source database...
Data extraction and insertion complete.
Successfully created 'data_sets\midi_annotations.db' with 1000000 records.
