## Importy

In [19]:
import numpy as np
from collections import defaultdict
from typing import Callable
from sklearn.cluster import DBSCAN
from collections import Counter
from copy import deepcopy
from timeit import default_timer

In [20]:
N = 4

In [21]:
def make_ngrams(text):
    n_grams = defaultdict(lambda: 0)
    for i in range(len(text) - N + 2):
        n_grams[text[i:i + N]] += 1
    return n_grams


## Metryka Levenshteina

In [22]:
def Levenstein(a, b):
    size_a: int = len(a)
    size_b: int = len(b)
    maximum = max(size_b, size_a)
    L = [[0 for _ in range(size_b + 1)] for _ in range(size_a + 1)]
    for i in range(size_b + 1):
        L[0][i] = i

    for i in range(size_a + 1):
        L[i][0] = i

    for i in range(1, size_a + 1):
        for j in range(1, size_b + 1):
            cost: int = 0
            if a[i - 1] != b[j - 1]:
                cost = 1
            L[i][j] = min(L[i - 1][j] + 1, L[i][j - 1] + 1, L[i - 1][j - 1] + cost)
    return L[-1][-1]/maximum

## Metryka Dice

In [23]:
def DICE_distance(line1, line2):
    ngrams1 = make_ngrams(line1)
    ngrams2 = make_ngrams(line2)
    intersection = set(ngrams1.keys()) & set(ngrams2.keys())
    return 1 - 2 * len(intersection) / (len(ngrams1.keys()) + len(ngrams2.keys()))

## Metryka euklidesowa

In [24]:
def euclidean_distance(line1, line2):
    ngrams1 = list(make_ngrams(line1).values())
    ngrams2 = list(make_ngrams(line2).values())
    if len(ngrams1) > len(ngrams2):
        for i in range(len(ngrams1) - len(ngrams2)):
            ngrams2.append(0)
    else:
        for i in range(len(ngrams2) - len(ngrams1)):
            ngrams1.append(0)
    n1 = np.array(ngrams1)
    n2 = np.array(ngrams2)
    dist = np.linalg.norm(n1 - n2)
    return 1 / (1 + dist)

## Metryka LCS

In [25]:
def LCS_distance(line1, line2):
    len_a = len(line1)
    len_b = len(line2)
    maximum: float = 0.0
    C = [[0] * (len_b + 1) for _ in range(len_a + 1)]
    for i in range(1, len_a + 1):
        for j in range(1, len_b + 1):
            if line1[i - 1] == line2[j - 1]:
                C[i][j] = C[i - 1][j - 1] + 1
                maximum = max(maximum, C[i][j])
    return 1.0 - maximum / max(len_a, len_b)

## Stoplista

In [26]:
def create_stoplist(lines, words_to_remove):
    signs_to_remove = '\'"/:.;,()'
    lines_dpcp = deepcopy(lines)
    for i in range(len(lines_dpcp)):
        for sign in signs_to_remove:
            lines_dpcp[i] = lines_dpcp[i].replace(sign, ' ')
    words_count = Counter()
    split_lines = []
    for line in lines_dpcp:
        split_lines.append(line.split(' '))
    for line in split_lines:
        for word in line:
            if word != '':
                words_count[word] += 1
    for i in range(len(lines_dpcp)):
        for word in words_count:
            lines_dpcp[i] = lines_dpcp[i].replace(word[0], ' ')
    return lines_dpcp

## Macierz odległości

In [27]:
def create_dist_matrix(lines, metric_function: Callable):
    n = len(lines)
    matrix = np.zeros((n, n))
    for i in range(len(lines)):
        line1 = lines[i]
        for j in range(i, len(lines)):
            line2 = lines[j]
            p = metric_function(line1, line2)
            matrix[i][j] = p
            matrix[j][i] = p
    return matrix

## Klasteryzacja

In [28]:
def clustering(lines, metric_fun: Callable, epsilon):
    X: np.array = create_dist_matrix(lines, metric_fun)
    clusters = DBSCAN(eps=epsilon, min_samples=2, metric="precomputed").fit(X)
    return clusters

In [29]:
def find_centroid(cluster, dist_matrix):
    min_index = 0
    min_val = 10**5
    mean = 0
    for elem in cluster:
        elem_sum = 0
        for neighbour in cluster:
            if neighbour != elem:
                elem_sum += dist_matrix[elem][neighbour]
        mean += elem_sum
        if elem_sum < min_val:
            min_val = elem_sum
            min_index = elem
    mean = mean/(2*len(cluster))
    return min_index, mean

## Dunn index

In [30]:
def Dunn_index(clusters_set: DBSCAN, dist_matrix):
    labels = clusters_set.labels_
    n = max(labels) + 1
    T = [[] for _ in range(n)]
    min_d = 10 ** 5
    max_size = 0
    centroids = []
    for i in range(len(labels)):
        if labels[i] != -1:
            T[labels[i]].append(i)
    for elem in T:
        centroids.append(find_centroid(elem, dist_matrix)[0])
    for i in range(n):
        max_size = max(max_size, len(T[i]))
        for j in range(i+1, n):
            p = dist_matrix[centroids[i]][centroids[j]]
            min_d = min(p, min_d)
    return min_d / max_size

## Davies-Bouldin index

In [31]:
def DB_index(clusters_set: DBSCAN, dist_matrix):
    labels = clusters_set.labels_
    n = max(labels) + 1
    T = [[] for _ in range(n)]
    centroids = [0 for _ in range(n)]
    means = [0 for _ in range(n)]
    max_val = 0
    for i in range(len(labels)):
        if labels[i] != -1:
            T[labels[i]].append(i)
    for i in range(n):
        elems = T[i]
        c, m = find_centroid(elems, dist_matrix)
        means[i] = m
        centroids[i] = c
    for i in range(n):
        for j in range(n):
            if i != j:
                val = (means[i] + means[j])/dist_matrix[i][j]
                max_val = max(max_val, val)
    return max_val

## Testy

In [32]:
def run_test(lines_set, algorithm: Callable, lines_num, slist_size, epsilon):
    data = lines_set[:lines_num]
    if slist_size > 0:
        data = create_stoplist(data, slist_size)
    start = default_timer()
    clusters, dist_matrix = clustering(data, algorithm, epsilon)
    end = default_timer()
    d_ind = Dunn_index(clusters, dist_matrix)
    db_ind = DB_index(clusters, dist_matrix)
    print("Ilość linii:", lines_num)
    print("Czas realizacji:", end-start)
    print("Indeks Dunna:", d_ind)
    print("Indeks Daviesa-Bouldina", db_ind)

In [33]:
with open("lines.txt", "r") as file:
    lines = file.readlines()

## Metryka euklidesowa

In [34]:
print("Z użyciem stoplisty")
run_test(lines, euclidean_distance, 100, 10, 0.3)
print("Bez użycia stoplisty")
run_test(lines, euclidean_distance, 100, 10, 0.3)

Z użyciem stoplisty


TypeError: cannot unpack non-iterable DBSCAN object