1. Compute Core distances

2. Compute MRG

3. Build MST

4. Dendodgram

5. GLOSH

6. Cluster Stability


In [15]:
import csv
import math
import sys

In [16]:
minPts = 4

In [17]:
def get_data(wiht_label: bool = False) -> list[list]:
    label = []
    points = []

    file_path = "data/min_example.csv"

    with open(file_path, "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            label.append(row["name"])
            points.append([int(row["x"]), int(row["y"])])

    result = points if wiht_label == False else [label, points]
    return result

def get_data_with_label() -> dict:
    result = {} # {label: [x, y]}

    file_path = "data/min_example.csv"

    with open(file_path, "r") as file:
        reader = csv.DictReader(file)
        for row in reader:
            result[row["name"]] = [int(row["x"]), int(row["y"])]

    return result



1. Calculating core distances

In [18]:
def calc_distance(x1: list, x2: list) -> float:
    """Calculates euclidean distance between two points"""
    if len(x1) != len(x2):
        raise ValueError("x1 and x2 must have equal sizes")

    sq_sum = 0
    for i in range(len(x1)):
        sq_sum += (x1[i] - x2[i]) ** 2
    return math.sqrt(sq_sum)

def calc_core_distances(data: list[list], label: list, minPts: int) -> dict:
    result = {}
    point_distances = []

    for i, x1 in enumerate(data):
        for x2 in data:
            point_distances.append(calc_distance(x1, x2))
        point_distances.sort()
        core_distance = point_distances[minPts - 1]
        result[label[i]] = core_distance
        point_distances = []

    return result

2. Computing MRT


In [19]:
def calc_mrg(data: list, labels: list, minPts: int) -> dict:
    result = {} # {label: [[mutual reachability distances, label]]}
    core_distances = calc_core_distances(data, labels, minPts)

    for i, x1 in enumerate(data):
        x1_label = labels[i] 
        x1_cd = core_distances[x1_label] 
        for j, x2 in enumerate(data):
            x2_label = labels[j]
            x2_cd = core_distances[x2_label]
            actual_distance = calc_distance(x1, x2)

            mutual_reac_dist = max(x1_cd, x2_cd, actual_distance) # Calculating
            result[x1_label] = result.get(x1_label, [])
            result[x1_label].append([mutual_reac_dist, x2_label])

    return result


def save_mrg(mrg: dict):
    file_path = "data/mrg.csv"
    headers =  sorted(mrg.keys())
    rows = [[header, *[score[0] for score in mrg[header]]] for header in headers]

    with open(file_path, "w") as file:
        writer = csv.writer(file)
        headers.insert(0, "0")
        writer.writerow(headers)
        writer.writerows(rows)
    return file_path

data = get_data(wiht_label=True)
mrg = calc_mrg(data[1], data[0], minPts)

save_mrg(mrg)

'data/mrg.csv'

3. Building MST

3.1 Building MST using Prim's algorithm

In [None]:
import heapq

def build_mst_prims(mrg: dict): 
    """Time complexity = O(n^2 * log(n))
    """
    labels = list(mrg.keys())
    n = len(labels)
    mst = []
    discovered = set()
    pq = [(0, labels[0], None)] # weigth, index
    cache = {}

    while not len(discovered) == n:
        weight, node1, node2 = heapq.heappop(pq)
        if node1 in discovered:
            continue
        discovered.add(node1)

        if node2:
            mst.append([node1, node2, weight])

        for neighbor in mrg[node1]:
            weight, node2 = neighbor
            if node1 == node2 or node2 in discovered:
                continue
            if cache.get(node2, sys.maxsize) > weight:
                cache[node2] = weight
                heapq.heappush(pq, [weight, node2, node1]) 

    return mst


3.2 Building MST using Kruskal algorithm

In [42]:
import heapq

class MstKruskal:
    def __init__(self, mrg: dict):
        self.mrg = mrg
        self.labels = list(mrg.keys())

    def find(self, node_i: int, parents: list[int]):
        if parents[node_i] == node_i:
            return node_i
        return self.find(parents[node_i], parents)

    def union(self, node1_i: int, node2_i: int, ranks: list[int], parents: list[int]):
        if ranks[node1_i] > ranks[node2_i]:
            ranks[node1_i] += ranks[node2_i]
            parents[node2_i] = node1_i
        elif ranks[node1_i] < ranks[node2_i]:
            ranks[node2_i] += ranks[node1_i]
            parents[node1_i] = node2_i
        else:
            ranks[node2_i] += 1
            parents[node1_i] = node2_i

    def compute(self):
        """Returns mst of given mrg.
        E = number of edges = n^2
        Time complexity O(E * log(E))
        """
        n = len(self.labels)
        parents = [i for i in range(n)]
        ranks = [0] * n
        mst = []
        edges = []
        for label in self.labels:
            node_edges = [[edge[0], edge[1], label] for edge in self.mrg[label] if edge[1] != label]
            edges.extend(node_edges)

        heapq.heapify(edges) # Time complexity O(E)

        while len(mst) != n - 1: 
            # O(E * log(E))
            weight, node1, node2 = heapq.heappop(edges)  # Time complexity: O(log(E))
            node1_i, node2_i = ord(node1) - ord('a'), ord(node2) - ord('a')

            parent1, parent2 = self.find(node1_i, parents), self.find(node2_i, parents)  # Time comlexity considered: O(1)

            if parent1 != parent2:
                self.union(parent1, parent2, ranks, parents) # Time comlexity: O(1)
                mst.append([node1, node2, weight])
        return mst

In [41]:
def save_mst(mst):
    header = ["Node1", "Node2", "Weight"]
    file_path = "data/mst.csv"

    with open(file_path, "w") as file:
        writer = csv.writer(file)
        writer.writerow(header)
        for row in mst:
            node1, node2, weight = row
            weight = round(weight, 2)
            writer.writerow([node1, node2, weight])

4. Dendogram