In [1]:
from tslearn.generators import random_walks
import numpy
import pandas as pd
from tslearn.clustering import KShape
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
import matplotlib.pyplot as plt
plt.figure(figsize=(8, 4))
from tslearn.clustering import silhouette_score
X = random_walks(n_ts=50, sz=32, d=1)
X = TimeSeriesScalerMeanVariance(mu=0., std=1.).fit_transform(X)
ks = KShape(n_clusters=3, n_init=1, random_state=0).fit(X)
ks.cluster_centers_.shape


(3, 32, 1)

<Figure size 800x400 with 0 Axes>

In [1]:
class KShape(ClusterMixin, TimeSeriesCentroidBasedClusteringMixin,
             BaseModelPackage, TimeSeriesBaseEstimator):
   
    def __init__(self, n_clusters=3, max_iter=100, tol=1e-6, n_init=1, verbose=False, random_state=None, init='random'):
        #聚类的簇数目，默认为3
        self.n_clusters = n_clusters
        #迭代的最大次数，默认为100
        self.max_iter = max_iter
        #收敛容忍度，默认为1e-6
        self.tol = tol
        self.random_state = random_state
        #运行算法的初始化次数，默认为1
        self.n_init = n_init
        #是否输出详细信息，默认为False
        self.verbose = verbose
        #初始化聚类中心的策略，默认为'random'
        self.init = init

    def _is_fitted(self):
        """
        Check if the model has been fit.
        Returns:bool
        用于检查模型是否已经被拟合（即训练）
        """
        #使用 check_is_fitted 函数检查是否已经存在必要的属性，包括 'cluster_centers_', 'norms_', 'norms_centroids_'
        check_is_fitted(self,
                        ['cluster_centers_', 'norms_', 'norms_centroids_'])
        return True

    def _shape_extraction(self, X, k):
        """
        聚类中心提取形状信息
        :param X: 时间序列数据
        :param k: 聚类的索引
        :return: 形状信息
        """
        #获取时间序列数据的特征维度
        sz = X.shape[1]
        Xp = y_shifted_sbd_vec(self.cluster_centers_[k], X[self.labels_ == k],
                               norm_ref=-1,
                               norms_dataset=self.norms_[self.labels_ == k])
        #计算 Xp 中的一部分数据的乘积
        S = numpy.dot(Xp[:, :, 0].T, Xp[:, :, 0])
        #创建一个与 sz 相关的矩阵 Q
        Q = numpy.eye(sz) - numpy.ones((sz, sz)) / sz
        #计算 Q 与 S 的矩阵乘积
        M = numpy.dot(Q.T, numpy.dot(S, Q))
        #计算矩阵 M 的特征值和特征向量。
        _, vec = numpy.linalg.eigh(M)
        #获取最后一个特征向量，并将其重新形状为 (sz, 1)。
        mu_k = vec[:, -1].reshape((sz, 1))

        # The way the optimization problem is (ill-)formulated, both mu_k and
        # -mu_k are candidates for barycenters
        # In the following, we check which one is best candidate
        #计算聚类中心与 mu_k 之间的距离
        dist_plus_mu = numpy.sum(numpy.linalg.norm(Xp - mu_k, axis=(1, 2)))
        #计算聚类中心与 -mu_k 之间的距离
        dist_minus_mu = numpy.sum(numpy.linalg.norm(Xp + mu_k, axis=(1, 2)))
        #根据距离的比较，选择最优的形状信息
        if dist_minus_mu < dist_plus_mu:
            mu_k *= -1
        #返回计算得到的形状信息
        return mu_k

    def _update_centroids(self, X):
        """
        提取的形状信息更新聚类中心
        :param X: 
        :return: 
        """
        #使用 _shape_extraction 方法更新簇中心
        for k in range(self.n_clusters):
            self.cluster_centers_[k] = self._shape_extraction(X, k)
        #使用 TimeSeriesScalerMeanVariance 将其缩放为零均值和单位方差
        self.cluster_centers_ = TimeSeriesScalerMeanVariance(
            mu=0., std=1.).fit_transform(self.cluster_centers_)
        #计算更新后中心的范数
        self.norms_centroids_ = numpy.linalg.norm(self.cluster_centers_,
                                                  axis=(1, 2))

    def _cross_dists(self, X):
        """
        计算每个数据点与聚类中心之间的交叉距离
        :param X: 
        :return: 
        """
        #从1中减去归一化的互相关以获得距离。
        return 1. - cdist_normalized_cc(X, self.cluster_centers_,   norms1=self.norms_,
                                        norms2=self.norms_centroids_,
                                    self_similarity=False)

    def _assign(self, X):
        """
        每个数据点分配给最接近的聚类中心所在的簇
        :param X: 
        :return: 
        """
        dists = self._cross_dists(X)
        self.labels_ = dists.argmin(axis=1)
        _check_no_empty_cluster(self.labels_, self.n_clusters)
        self.inertia_ = _compute_inertia(dists, self.labels_)

    def _fit_one_init(self, X, rs):
        if hasattr(self.init, '__array__'):
            self.cluster_centers_ = self.init.copy()
        elif self.init == "random":
            indices = rs.choice(X.shape[0], self.n_clusters)
            self.cluster_centers_ = X[indices].copy()
        else:
            raise ValueError("Value %r for parameter 'init' is "
                             "invalid" % self.init)
        self.norms_centroids_ = numpy.linalg.norm(self.cluster_centers_,
                                                  axis=(1, 2))
        self._assign(X)
        old_inertia = numpy.inf

        for it in range(self.max_iter):
            old_cluster_centers = self.cluster_centers_.copy()
            self._update_centroids(X)
            self._assign(X)
            if self.verbose:
                print("%.3f" % self.inertia_, end=" --> ")

            if numpy.abs(old_inertia - self.inertia_) < self.tol or \
                    (old_inertia - self.inertia_ < 0):
                self.cluster_centers_ = old_cluster_centers
                self._assign(X)
                break

            old_inertia = self.inertia_
        if self.verbose:
            print("")

        self._iter = it + 1

        return self

    def fit(self, X, y=None):
        """Compute k-Shape clustering.

        Parameters
        ----------
        X : array-like of shape=(n_ts, sz, d)
            Time series dataset.

        y
            Ignored
        """
        X = check_array(X, allow_nd=True)

        max_attempts = max(self.n_init, 10)

        self.labels_ = None
        self.inertia_ = numpy.inf
        self.cluster_centers_ = None

        self.norms_ = 0.
        self.norms_centroids_ = 0.

        self.n_iter_ = 0

        X_ = to_time_series_dataset(X)
        self._X_fit = X_
        self.norms_ = numpy.linalg.norm(X_, axis=(1, 2))

        _check_initial_guess(self.init, self.n_clusters)

        rs = check_random_state(self.random_state)

        best_correct_centroids = None
        min_inertia = numpy.inf
        n_successful = 0
        n_attempts = 0
        while n_successful < self.n_init and n_attempts < max_attempts:
            try:
                if self.verbose and self.n_init > 1:
                    print("Init %d" % (n_successful + 1))
                n_attempts += 1
                self._fit_one_init(X_, rs)
                if self.inertia_ < min_inertia:
                    best_correct_centroids = self.cluster_centers_.copy()
                    min_inertia = self.inertia_
                    self.n_iter_ = self._iter
                n_successful += 1
            except EmptyClusterError:
                if self.verbose:
                    print("Resumed because of empty cluster")
        self.norms_centroids_ = numpy.linalg.norm(self.cluster_centers_,
                                                  axis=(1, 2))
        self._post_fit(X_, best_correct_centroids, min_inertia)
        return self

    def fit_predict(self, X, y=None):
        """Fit k-Shape clustering using X and then predict the closest cluster
        each time series in X belongs to.

        It is more efficient to use this method than to sequentially call fit
        and predict.

        Parameters
        ----------
        X : array-like of shape=(n_ts, sz, d)
            Time series dataset to predict.

        y
            Ignored

        Returns
        -------
        labels : array of shape=(n_ts, )
            Index of the cluster each sample belongs to.
        """
        return self.fit(X, y).labels_

    def predict(self, X):
        """Predict the closest cluster each time series in X belongs to.

        Parameters
        ----------
        X : array-like of shape=(n_ts, sz, d)
            Time series dataset to predict.

        Returns
        -------
        labels : array of shape=(n_ts, )
            Index of the cluster each sample belongs to.
        """
        X = check_array(X, allow_nd=True)
        check_is_fitted(self,
                        ['cluster_centers_', 'norms_', 'norms_centroids_'])

        X_ = check_dims(X, X_fit_dims=self.cluster_centers_.shape)
        X_ = TimeSeriesScalerMeanVariance(mu=0., std=1.).fit_transform(X_)
        dists = self._cross_dists(X_)
        return dists.argmin(axis=1)

NameError: name 'ClusterMixin' is not defined