In [4]:
import numpy as np
from scipy import stats

FEATURES = ['acousticness', 'danceability', 'energy', 'instrumentalness', 'liveness', 'loudness', 'speechiness', 'tempo', 'valence']

class Point(object):
    """
    Represents a data point
    """

    def __init__(self, features):
        """
        Initialize attributes
        """
        if type(features) == np.ndarray:
            self.features = features
            self.descriptors = None
        else:
            self.features = np.array(features[FEATURES])
            self.descriptors = np.array(features.drop(FEATURES))

    def dimensionality(self):
        """Returns dimension of the point"""
        return len(self.features)
    
    def get_position(self):
        return self.features

    def get_features(self):
        """Returns features"""
        if self.descriptors is not None:
            return np.array(list(self.features) + list(self.descriptors))
        else:
            return np.array(self.features)

    def distance(self, other):
        """
        other: point, to which we are measuring distance to
        Return Euclidean distance of this point with other
        """
        return np.linalg.norm(np.subtract(np.array(self.features),np.array(other.features)))

class Cluster(object):
    """
    A Cluster is defined as a set of elements
    """

    def __init__(self, centroid):
        """
        Elements of a cluster are saved in a list, self.points
        """
        self.points = []
        self.centroid = Point(centroid)

    def add_point(self, point):
        """Adds a point to the cluster"""
        self.points.append(point)

    def get_points(self):
        """Returns points in the cluster as a list"""
        return self.points
    
    def update(self, new_mean):
        """
        new_mean: the new centroid 
        Updates a cluster mean
        """
        self.centroid = Point(np.array(new_mean))

class ClusterSet(object):
    """
    A ClusterSet is defined as a list of clusters
    """

    def __init__(self):
        """
        Initialize an empty set, without any clusters
        """
        self.clusters = []

    def add(self, c):
        """
        c: Cluster
        Appends a cluster c to the end of the cluster list
        only if it doesn't already exist in the ClusterSet.
        If it is already in self.clusters, raise a ValueError
        """
        if c in self.clusters:
            raise ValueError
        self.clusters.append(c)
        

    def get_clusters(self):
        """Returns clusters in the ClusterSet"""
        return self.clusters[:]

    def get_centroids(self):
        """Returns centroids of each cluster in the ClusterSet as a list"""
        return [cluster.centroid for cluster in self.get_clusters()]

    def distortion(self):
        """
        Distortion is the average of the squared distances from centroids to their points.
        It uses the euclidean distance 
        """
        distances = []
        for cluster in self.clusters:
            centroid = cluster.centroid
            points = cluster.get_points()
            distance = 0
            for point in points:
                distance += point.distance(centroid)**2
            distances.append(distance)
        
        # we have a problem where a lot of clusters get 0 points
        # dont count these
        return sum(distances)/(len(distances) - distances.count(0))
            
    def num_clusters(self):
        """Returns number of clusters in the ClusterSet"""
        return len(self.clusters)
    
    def get_loss(self):
        loss = 0
        clusters = self.clusters
        centroids = self.get_centroids()
        for i, cluster in enumerate(clusters):
            points = cluster.get_points()
            for point in points:
                loss += sum((point.get_features() - centroids[i].get_features())**2)
        return loss

In [6]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import pairwise_distances
from operator import methodcaller

def closest_centroid(centroids, point):
    """
    Arguemnts:
        centroids: list of all centroids (list of points)
        point: the single point to determine closest to
    """
    distances = [centroid.distance(point) for centroid in centroids]
    return distances.index(min(distances))
    

def random_init(df, k, sample):
    """
    Arguments:
        df: a dataframe of all the tracks
        k: Number of initial centroids
        sample: method to sample. None is uniform, otherwise it will be normal
    Returns:
        List of k unique points randomly selected point ranges
    """
    
    if sample is None:
        return np.random.uniform(low=df.min(), high=df.max(), size=(k,df.shape[1]))
    else:
        return np.random.normal(loc=df.mean(), scale=df.std(), size=(k,df.shape[1]))


def plot_loss(losses):
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    ax.plot(list(range(len(losses))), losses, '--', color='tab:blue', label='losses')
    plt.legend(loc="upper left")
    plt.xlabel('Iteration')
    plt.ylabel('Loss')
    title = f'Minimum Loss: {losses[-1]}'
    plt.title(title)
    # display the plot
    plt.show()

def sequential_k_means(df, k, divisor=None, sample=None):
    """
    Clusters points into k clusters using k_means clustering.
    Arguments:
        df: dataframe containing data for all the tracks
        k: the number of clusters
        init: The method of initialization. One of ['random', 'kpp'].
              If init='kpp', use k_means_pp_init to initialize clusters.
              If init='random', use random_init to initialize clusters.
              Default value 'random'.
    Returns:
        Instance of ClusterSet with k clusters
    Reference: https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm
    """
    initial_centroids = random_init(df[FEATURES], k, sample)

    # fill cluster set
    cs = ClusterSet()
    for centroid in initial_centroids:
        cs.add(Cluster(np.array(centroid)))

    centroids_features = [centroid.get_features() for centroid in cs.get_centroids()]
    centroids_points = cs.get_centroids()
        
    # sequential K-means converges when we have gone through all points, unlike regular k means which 
    # converges when the cluster assignments or means do not change between iterations.
    for i in range(df.shape[0]):
        # Find centroid that is closest to point i
        centroid_index = closest_centroid(centroids_points, Point(df.iloc[i,:]))
        # add point to cluster
        cs.get_clusters()[centroid_index].add_point(Point(df.iloc[i, :]))
        # Update centroid
        # can also consider different updates besides 1/n such as a constant shift
        prev_mean = centroids_features[centroid_index]
        if divisor is None:
            new_mean = prev_mean + (1/len(cs.get_clusters()[centroid_index].points))*(df.iloc[i,:][FEATURES] - prev_mean)
        else:
            # constant divisor
            new_mean = prev_mean + (1/divisor)*(df.iloc[i,:][FEATURES] - prev_mean)
        cs.get_clusters()[centroid_index].update(new_mean)
        
        
    # calculate distortion
    distortion = cs.distortion()
        # calculate loss
#         distances = np.round(pairwise_distances(
#             [centroid.get_features() for centroid in new_cs.get_centroids()],
#             [p.get_features() for p in points]), 3)
#         loss_iter = new_cs.get_loss()
#         losses.append(loss_iter)
#         print(f'----Loss: {loss_iter}----')
    
#     plot_loss(losses)
#     score = prev_cs.get_score()
#     loss = prev_cs.get_loss()
#     centroids = prev_cs.get_centroids()
#     sizes = [len(cluster.get_points()) for cluster in prev_cs.get_clusters()]

    centroids = cs.get_centroids()
    clusters = cs.get_clusters()
    sizes = [len(cluster.get_points()) for cluster in cs.get_clusters()]
    
    return [centroids, sizes, clusters, distortion]
    return [score, loss, centroids, sizes]

def plot_performance(sequential_k_means_scores, k_vals, percent_empty):
    """
    Uses matplotlib to generate a graph of performance vs. k
    Arguments:
        sequential_k_means_scores: A list of len(k_vals) average distortion scores from
            running the k-means algorithm with random initialization
        k_vals: A list of integer k values used to calculate the above scores
    """
    fig = plt.figure()
    ax1 = fig.subplots()
    ax1.plot(k_vals, sequential_k_means_scores, '--', color='tab:blue', label='distortion')
    ax1.legend(loc='upper left')
    ax2 = ax1.twinx()
    ax2.plot(k_vals, percent_empty, '--', color='tab:orange', label='% empty')
    ax2.legend(loc='upper right')
    plt.xlabel('Number of Clusters, k')
#     plt.ylabel('Distortion')

    # display the plot
    plt.show()