In [2]:
import datetime
import os

In [1]:
from functions_clustering import cluster_baseline

In [3]:
today = datetime.datetime.now().strftime("%y-%m-%d")
save_dir = os.path.join("models", today)
os.makedirs(save_dir, exist_ok=True)

In [4]:
OUTPUT_DIR = save_dir
MUSIC_FEATURES_FP = 'data_music_features/25-03-12_processed_spotify_sample.csv'
cluster_baseline(MUSIC_FEATURES_FP, OUTPUT_DIR)

Processing genre: Baseline with 98608 samples


{'Baseline': {'genre': 'Baseline',
  'data_shape': (98608, 14),
  'num_clusters': 50,
  'model_path': 'models/25-03-13/cluster_model_Baseline.pkl.gz',
  'model_size_kb': 39.7666015625,
  'cluster_distribution': [1812,
   1974,
   2774,
   1954,
   1865,
   1987,
   1587,
   2513,
   1304,
   2186,
   1718,
   2212,
   2449,
   2733,
   1791,
   1981,
   2104,
   2169,
   1928,
   2027,
   1311,
   1716,
   2113,
   2178,
   1792,
   1640,
   2059,
   1925,
   2110,
   1668,
   2413,
   2128,
   1943,
   1882,
   2105,
   2626,
   1973,
   2036,
   1687,
   1802,
   2050,
   2212,
   1479,
   2368,
   1843,
   1398,
   1763,
   1645,
   1527,
   2148]},
 '_execution_stats': {'total_time_seconds': 1.4545578956604004,
  'threads_used': 'auto'}}

In [6]:
from functions_clustering import load_compressed_model

In [7]:
baseline = load_compressed_model('Baseline', 'models/25-03-13/')
print(baseline)

MiniBatchKMeans(max_iter=10, n_clusters=50, random_state=0,
                reassignment_ratio=0.1)


### Testing & development code below

In [12]:
import pandas as pd
import numpy as np
import os
import gzip
import pickle
import concurrent.futures
from sklearn.cluster import MiniBatchKMeans
from typing import Dict, Tuple, List, Any
import time

In [13]:
def load_and_prepare_data(filepath: str) -> pd.DataFrame:
    """
    Load and prepare the music dataset
    
    Parameters:
    filepath (str): Path to the CSV file containing music features
    
    Returns:
    pd.DataFrame: Processed DataFrame
    """
    df = pd.read_csv(filepath)
    return df

In [14]:
def split_data_by_genre(df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    """
    Split the dataset by genre
    
    Parameters:
    df (pd.DataFrame): DataFrame containing music data with genre column
    
    Returns:
    dict: Dictionary mapping genres to their respective DataFrames
    """
    # Assume 'genre' column exists, adjust if your column name is different
    genres = df['track_genre'].unique()
    genre_dfs = {}
    
    for genre in genres:
        genre_dfs[genre] = df[df['track_genre'] == genre]
    
    return genre_dfs

In [15]:
def cluster_music_data(data: pd.DataFrame, n_clusters: int = 50) -> Tuple[pd.DataFrame, np.ndarray, MiniBatchKMeans]:
    """
    Function to perform clustering on music feature data
    
    Parameters:
    data (pd.DataFrame): DataFrame containing music features
    n_clusters (int): Number of clusters to form
    
    Returns:
    tuple: (DataFrame with data, cluster labels, clustering model)
    """
    # Subset columns for clustering
    X = data[['explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 
              'speechiness', 'acousticness', 'instrumentalness', 'liveness', 
              'valence', 'tempo', 'time_signature', 'encoded_genre']]
    X = X.dropna()
    
    if len(X) < n_clusters:
        # Adjust n_clusters if there are fewer samples than requested clusters
        n_clusters = max(1, len(X) // 10)  # Ensure at least 1 cluster
    
    # Initialize and fit MiniBatch K-Means clustering
    kmeans = MiniBatchKMeans(
        n_clusters=n_clusters,
        random_state=0,
        max_iter=10,
        n_init="auto",
        reassignment_ratio=0.1
    ).fit(X)
    
    # Predict clusters
    labels = kmeans.predict(X)
    
    return X, labels, kmeans

In [16]:
def save_compressed_model(model: Any, genre: str, output_dir: str = "models", compression_level: int = 6) -> str:
    """
    Save a compressed clustering model to disk using gzip
    
    Parameters:
    model: The clustering model to save
    genre (str): Genre label for the model
    output_dir (str): Directory to save the model
    compression_level (int): Compression level (1-9, where 9 is maximum compression)
    
    Returns:
    str: Path to the saved model
    """
    os.makedirs(output_dir, exist_ok=True)
    filepath = os.path.join(output_dir, f"cluster_model_{genre}.pkl.gz")
    
    with gzip.open(filepath, 'wb', compresslevel=compression_level) as f:
        pickle.dump(model, f)
        
    return filepath

In [17]:
def load_compressed_model(genre: str, model_dir: str = "models") -> Any:
    """
    Load a compressed clustering model from disk
    
    Parameters:
    genre (str): Genre label for the model to load
    model_dir (str): Directory containing saved models
    
    Returns:
    The loaded clustering model
    """
    filepath = os.path.join(model_dir, f"cluster_model_{genre}.pkl.gz")
    
    with gzip.open(filepath, 'rb') as f:
        model = pickle.load(f)
        
    return model

In [18]:
def process_genre(genre: str, genre_df: pd.DataFrame, output_dir: str) -> Dict:
    """
    Process a single genre for multithreaded execution
    
    Parameters:
    genre (str): Genre name
    genre_df (pd.DataFrame): DataFrame filtered for this genre
    output_dir (str): Directory to save models
    
    Returns:
    dict: Results for this genre
    """
    print(f"Processing genre: {genre} with {len(genre_df)} samples")
    
    # Adjust n_clusters based on dataset size
    n_clusters = min(50, max(5, len(genre_df) // 20))
    
    # Cluster the data
    X, labels, model = cluster_music_data(genre_df, n_clusters=n_clusters)
    
    # Save the model with compression
    model_path = save_compressed_model(model, genre, output_dir, compression_level=6)
    
    # Calculate model file size
    model_size = os.path.getsize(model_path) / 1024  # Size in KB
    
    # Return results
    return {
        "genre": genre,
        "data_shape": X.shape,
        "num_clusters": n_clusters,
        "model_path": model_path,
        "model_size_kb": model_size,
        "cluster_distribution": np.bincount(labels).tolist()
    }

In [19]:
def cluster_by_genre_parallel(filepath: str, output_dir: str = "models", max_workers: int = None) -> Dict[str, Dict]:
    """
    Main function to cluster music data by genre and save the models using parallel processing
    
    Parameters:
    filepath (str): Path to the CSV file containing music features
    output_dir (str): Directory to save models
    max_workers (int): Maximum number of threads to use (None = auto-determine)
    
    Returns:
    dict: Results dictionary containing clustering results for each genre
    """
    start_time = time.time()
    
    # Load data
    df = load_and_prepare_data(filepath)
    
    # Split by genre
    genre_dfs = split_data_by_genre(df)
    
    results = {}
    
    # Process genres in parallel using ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Create a dictionary of future: genre pairs
        future_to_genre = {
            executor.submit(process_genre, genre, genre_df, output_dir): genre
            for genre, genre_df in genre_dfs.items()
        }
        
        # Process results as they complete
        for future in concurrent.futures.as_completed(future_to_genre):
            genre = future_to_genre[future]
            try:
                result = future.result()
                results[genre] = result
            except Exception as e:
                print(f"Error processing genre {genre}: {str(e)}")
                results[genre] = {"error": str(e)}
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    # Add execution stats to results
    results["_execution_stats"] = {
        "total_time_seconds": execution_time,
        "num_genres": len(genre_dfs),
        "threads_used": max_workers if max_workers else "auto"
    }
    
    return results

In [20]:
def predict_with_genre_models(new_data: pd.DataFrame, model_dir: str = "models") -> pd.DataFrame:
    """
    Predict clusters for new data using saved models by genre
    
    Parameters:
    new_data (pd.DataFrame): New music data to cluster
    model_dir (str): Directory containing saved models
    
    Returns:
    pd.DataFrame: Original DataFrame with cluster assignments added
    """
    genre_dfs = split_data_by_genre(new_data)
    result_df = pd.DataFrame()
    
    def predict_genre(genre, genre_df):
        """Helper function for parallel prediction"""
        try:
            # Load the compressed model for this genre
            model = load_compressed_model(genre, model_dir)
            
            # Prepare features
            X = genre_df[['explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 
                         'speechiness', 'acousticness', 'instrumentalness', 'liveness', 
                         'valence', 'tempo', 'time_signature', 'encoded_genre']].dropna()
            
            # Get row indices before prediction
            indices = X.index
            
            # Predict clusters
            labels = model.predict(X)
            
            # Add cluster labels to a copy of the original data
            temp_df = genre_df.copy()
            temp_df.loc[indices, 'cluster'] = labels
            
            return temp_df
            
        except Exception as e:
            print(f"Error processing genre {genre}: {str(e)}")
            # Return the data without cluster labels
            return genre_df
    
    # Process predictions in parallel
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit prediction tasks
        future_to_genre = {
            executor.submit(predict_genre, genre, genre_df): genre
            for genre, genre_df in genre_dfs.items()
        }
        
        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_genre):
            genre = future_to_genre[future]
            try:
                genre_result = future.result()
                result_df = pd.concat([result_df, genre_result])
            except Exception as e:
                print(f"Error collecting results for genre {genre}: {str(e)}")
    
    return result_df

In [21]:
def analyze_model_storage(output_dir: str = "models") -> Dict:
    """
    Analyze storage efficiency of compressed models
    
    Parameters:
    output_dir (str): Directory containing saved models
    
    Returns:
    dict: Analysis results
    """
    results = {
        "total_size_kb": 0,
        "num_models": 0,
        "models": []
    }
    
    if not os.path.exists(output_dir):
        return results
    
    for filename in os.listdir(output_dir):
        if filename.endswith('.pkl.gz'):
            filepath = os.path.join(output_dir, filename)
            size_kb = os.path.getsize(filepath) / 1024
            
            # Extract genre from filename
            genre = filename.replace('cluster_model_', '').replace('.pkl.gz', '')
            
            results["models"].append({
                "genre": genre,
                "size_kb": size_kb,
                "path": filepath
            })
            
            results["total_size_kb"] += size_kb
            results["num_models"] += 1
    
    results["avg_size_kb"] = results["total_size_kb"] / results["num_models"] if results["num_models"] > 0 else 0
    
    return results

In [22]:
# Example usage
MUSIC_FEATURES_FP = 'data_music_features/processed_spotify_sample.csv'
OUTPUT_DIR = "genre_cluster_models"

# Run clustering for all genres in parallel and save compressed models
results = cluster_by_genre_parallel(MUSIC_FEATURES_FP, output_dir=OUTPUT_DIR)

# Print execution stats
stats = results.pop("_execution_stats", {})
print("\nExecution Statistics:")
print(f"- Total execution time: {stats.get('total_time_seconds', 0):.2f} seconds")
print(f"- Number of genres processed: {stats.get('num_genres', 0)}")
print(f"- Threads used: {stats.get('threads_used', 'unknown')}")

# Print summary of results
print("\nClustering Results Summary:")
for genre, info in results.items():
    if "error" in info:
        print(f"Genre: {genre} - Error: {info['error']}")
        continue
        
    print(f"Genre: {genre}")
    print(f"  - Data shape: {info['data_shape']}")
    print(f"  - Number of clusters: {info['num_clusters']}")
    print(f"  - Model saved to: {info['model_path']}")
    print(f"  - Model size: {info['model_size_kb']:.2f} KB")
    
    # Show top 3 largest clusters if available
    if 'cluster_distribution' in info:
        cluster_dist = np.array(info['cluster_distribution'])
        top_clusters = cluster_dist.argsort()[-3:][::-1]
        print(f"  - Top 3 largest clusters: {top_clusters}")
    print()

# Analyze storage efficiency
storage_analysis = analyze_model_storage(OUTPUT_DIR)
print("\nStorage Analysis:")
print(f"- Total models: {storage_analysis['num_models']}")
print(f"- Total storage used: {storage_analysis['total_size_kb']:.2f} KB")
print(f"- Average model size: {storage_analysis['avg_size_kb']:.2f} KB")


Processing genre: acoustic with 1000 samplesProcessing genre: afrobeat with 999 samples
Processing genre: alt-rock with 999 samples

Processing genre: alternative with 999 samples
Processing genre: ambient with 999 samples
Processing genre: anime with 999 samples
Processing genre: black-metal with 997 samples
Processing genre: bluegrass with 998 samples
Processing genre: blues with 998 samples
Processing genre: brazil with 998 samples
Processing genre: breakbeat with 999 samples
Processing genre: cantopop with 999 samples
Processing genre: chicago-house with 998 samples
Processing genre: children with 998 samples
Processing genre: chill with 999 samples
Processing genre: classical with 933 samples
Processing genre: club with 995 samples
Processing genre: comedy with 996 samples
Processing genre: country with 1000 samples
Processing genre: dance with 965 samples
Processing genre: dancehall with 999 samples
Processing genre: death-metal with 999 samples
Processing genre: deep-house with 

In [23]:
# Example of loading models and predicting new data
print("\nLoading compressed models to predict on new data...")
new_df = load_and_prepare_data(MUSIC_FEATURES_FP)  # In practice, this would be new data
clustered_df = predict_with_genre_models(new_df, model_dir=OUTPUT_DIR)
print(f"Prediction complete. Output shape: {clustered_df.shape}")


Loading compressed models to predict on new data...
Prediction complete. Output shape: (104592, 23)


In [1]:
from clustering_functions import cluster_baseline

In [2]:
OUTPUT_DIR = "models/25-03-07"
MUSIC_FEATURES_FP = 'data_music_features/processed_spotify_sample.csv'
cluster_baseline(MUSIC_FEATURES_FP, OUTPUT_DIR)

Processing genre: Baseline with 104592 samples


{'Baseline': {'genre': 'Baseline',
  'data_shape': (104592, 14),
  'num_clusters': 50,
  'model_path': 'models/25-03-07/cluster_model_Baseline.pkl.gz',
  'model_size_kb': 41.98828125,
  'cluster_distribution': [2118,
   1888,
   2186,
   1781,
   1934,
   2620,
   2396,
   2522,
   2081,
   2139,
   2662,
   2170,
   2885,
   1906,
   2080,
   2634,
   2163,
   2173,
   1831,
   1682,
   2178,
   1891,
   1641,
   2010,
   2567,
   2342,
   1983,
   2185,
   1624,
   2602,
   2092,
   2147,
   2163,
   2141,
   2370,
   1715,
   2104,
   1363,
   2299,
   1801,
   2031,
   1977,
   1892,
   1512,
   2017,
   2400,
   2262,
   1719,
   1948,
   1765]},
 '_execution_stats': {'total_time_seconds': 1.4233179092407227,
  'threads_used': 'auto'}}

In [3]:
from clustering_functions import load_compressed_model

In [5]:
baseline = load_compressed_model('Baseline', 'models/25-03-07')
print(baseline)

MiniBatchKMeans(max_iter=10, n_clusters=50, random_state=0,
                reassignment_ratio=0.1)
