In [1]:
import pandas as pd
import numpy as np
from typing import Any, Self

In [2]:
def write(file: str, data: Any):
    with open(file, 'w') as f:
        f.write(data if isinstance(data, str) else str(data))

def rescale(x: np.ndarray) -> np.ndarray:
    min_ = np.min(x)
    range_ = np.ptp(x)
    return (x - min_) / range_

In [3]:
df = pd.read_csv('time_series_covid19_deaths_US_raw.csv')
raw_data = df.to_numpy()

In [4]:
cum_data = np.cumsum(raw_data, axis=0)
data = np.diff(cum_data)

del raw_data

In [5]:
# wisc_cal_data = cum_data[(4, 48), :]
# q1_str = '\n'.join((','.join(map(str, x)) for x in wisc_cal_data.tolist()))
# write('q1.txt', q1_str)

# del cum_data
# del q1_str

In [6]:
# q2_data = data[(4, 48), :] # 4-> cal, 48 -> wisco
# q2_str = '\n'.join((','.join(map(str, x)) for x in q2_data.tolist()))
# write('q2.txt', q2_str)

# del q2_data
# del q2_str

In [7]:
# write('q3.txt', '''Mean of the time-differenced data
# Standard deviation of the time-differenced data
# Median of the time-differenced data
# Linear trend coefficient of the data
# Auto-correlation of the data''')

In [8]:
mean = np.mean(data, axis=1)
mean = rescale(mean)

In [9]:
std = np.std(data, axis=1)
std = rescale(std)

In [10]:
median = np.median(data, axis=1)
median = rescale(median)

In [11]:
ltc = np.array([sum((x[i] - mean[idx]) * (i + 1 - (half_length := ((data.shape[-1] + 1) / 2))) for i in range(len(x))) for idx, x in enumerate(data.tolist())]) / sum((i+1 - half_length) ** 2 for i in range(data.shape[-1]))
ltc = rescale(ltc)

In [12]:
ac = np.array([ sum((x[i] * x[i-1] for i in range(1, len(x)))) / np.sum(np.power(x, 2)) for x in (data - mean.reshape(50, -1)).tolist()])
ac = rescale(ac)

In [13]:
parametric_data = np.empty((50, 5))
parametric_data[:, 0] = mean
parametric_data[:, 1] = std
parametric_data[:, 2] = median
parametric_data[:, 3] = ltc
parametric_data[:, 4] = ac

# q4_str = '\n'.join((','.join(map(lambda s: f'{s:.4f}', x)) for x in np.around(parametric_data, 4).tolist()))
# write('q4.txt', q4_str)
# del q4_str

In [14]:
class HierarchicalClustering:
    class Clustering:
        @staticmethod
        def _compute_distances(X: np.ndarray) -> np.ndarray:
            n = X.shape[0]
            distance_matrix = np.zeros((n, n))
        
            for i in range(n):
                for j in range(i + 1, n):
                    distance_matrix[i, j] = np.linalg.norm(X[i] - X[j])
        
            distance_matrix += distance_matrix.T
        
            return distance_matrix
        
        @staticmethod
        def _single(distance_matrix: np.ndarray, num_clusters: int) -> list:
            num_points = distance_matrix.shape[0]
            clusters = [[i] for i in range(num_points)]
        
            while len(clusters) > num_clusters:
                # Find the two closest clusters
                min_distance = np.inf
                merge_index1, merge_index2 = None, None
        
                for i in range(len(clusters)):
                    for j in range(i + 1, len(clusters)):
                        cluster1 = clusters[i]
                        cluster2 = clusters[j]
        
                        # Find the minimum distance between the two clusters
                        distance = np.min(distance_matrix[np.ix_(cluster1, cluster2)])
        
                        if distance < min_distance:
                            min_distance = distance
                            merge_index1, merge_index2 = i, j
        
                # Merge the two closest clusters
                clusters[merge_index1].extend(clusters[merge_index2])
                del clusters[merge_index2]
        
                # Update the distance matrix with the new distances from the merged cluster
                combined_cluster = clusters[merge_index1]
                for i in range(distance_matrix.shape[0]):
                    if i in combined_cluster:
                        continue
                    distance = np.min(distance_matrix[np.ix_(combined_cluster, [i])])
                    distance_matrix[i, combined_cluster] = distance
                    distance_matrix[combined_cluster, i] = distance
        
            return tuple(map(tuple, clusters))
    
        @staticmethod
        def _complete(distance_matrix: np.ndarray, n_clusters: int) -> list:
            num_points = distance_matrix.shape[0]
            clusters = [[i] for i in range(num_points)]
        
            while len(clusters) > n_clusters:
                # Find the two clusters with the smallest maximum distance
                min_max_distance = np.inf
                merge_index1, merge_index2 = None, None
        
                for i in range(len(clusters)):
                    for j in range(i + 1, len(clusters)):
                        cluster1 = clusters[i]
                        cluster2 = clusters[j]
        
                        # Find the maximum distance between the two clusters
                        max_distance = np.max(distance_matrix[np.ix_(cluster1, cluster2)])
        
                        if max_distance < min_max_distance:
                            min_max_distance = max_distance
                            merge_index1, merge_index2 = i, j
        
                # Merge the two closest clusters
                clusters[merge_index1].extend(clusters[merge_index2])
                del clusters[merge_index2]
        
                # Update the distance matrix with the new distances from the merged cluster
                combined_cluster = clusters[merge_index1]
                for i in range(distance_matrix.shape[0]):
                    if i in combined_cluster:
                        continue
                    distance = np.max(distance_matrix[np.ix_(combined_cluster, [i])])
                    distance_matrix[i, combined_cluster] = distance
                    distance_matrix[combined_cluster, i] = distance
        
            return clusters
        
    _METHODS = {
        'single': Clustering._single,
        'complete': Clustering._complete,
    }

    def __init__(self, n_clusters: int = 2, linkage: str = 'single'):
        self._n_clusters = n_clusters
        
        assert HierarchicalClustering._is_linkage_valid(linkage), (
            f'Invalid Linkage, valid linkages are \'single\' and '
            f'\'complete\', found {linkage!r}'
        )
        
        self._linkage = linkage

    def _build(self, X: np.ndarray):
        distance_matrix = HierarchicalClustering.Clustering._compute_distances(X)
        cluster_func = HierarchicalClustering._METHODS.get(self._linkage)
        if cluster_func is None:
            raise ValueError(
                f'Invalid Linkage, the Hierarchical Clustering instance '
                f'may have been tampered with. found {self.linkage!r}'
            )

        self._clusters = cluster_func(distance_matrix, self._n_clusters)

    def fit(self, X: np.ndarray) -> Self:
        self._build(X)
        return self

    def predict(self, X: np.ndarray):
        pass
    
    @property
    def clusters(self):
        return self._clusters

    @clusters.setter
    def clusters(self, value):
        raise ValueError('The `clusters` attribute should not be modified externally!')
    
    @staticmethod
    def _is_linkage_valid(linkage: str) -> bool:
        return linkage in HierarchicalClustering._METHODS.keys()

In [15]:
slhc = HierarchicalClustering(n_clusters = 5)
slhc = slhc.fit(parametric_data)

idxs = np.arange(parametric_data.shape[0])

for cluster_idx, cluster in enumerate(slhc.clusters):
    for item in cluster:
        idxs[item] = cluster_idx

# q5_str = ', '.join(map(str, idxs.tolist()))
# write('q5.txt', q5_str)
# del q5_str

In [16]:
clhc = HierarchicalClustering(n_clusters = 5, linkage='complete')
clhc = clhc.fit(parametric_data)

idxs = np.arange(parametric_data.shape[0])

for cluster_idx, cluster in enumerate(clhc.clusters):
    for item in cluster:
        idxs[item] = cluster_idx

# q6_str = ', '.join(map(str, idxs.tolist()))
# write('q6.txt', q6_str)
# del q6_str