In [None]:
import math, time, datetime
import numpy as np
import matplotlib.pyplot as plt
from random import randrange, shuffle
from enchant.utils import levenshtein # pip install pyenchant
from pyts.approximation import SymbolicAggregateApproximation # pip install pyts

### _________________________ Algorithms _________________________

#### String representation

In [None]:
def String_representation(X, n_bins):
    
    C = len(X)
    X_norm, S = [[] for _ in range(C)], [[] for _ in range(C)]
    
    # normalization
    for i in range(C):
        X_norm[i] = [(Xij - Xij.mean())/Xij.std() for Xij in X[i]]
    
    # Symbolic Aggregate Approximation
    sax = SymbolicAggregateApproximation(n_bins=n_bins, strategy='normal')
    for i in range(C):
        for xs in X_norm[i]:
            S[i].append(''.join(sax.fit_transform([xs])[0]))
    return S.copy()

#### String grammar clustering

In [None]:
def StringGrammarClustering(S, C, m, eta, a, b): # normally (m , eta > 1)  (beta ,a , b > 0)
    
    N = len(S) # Number of string
    V_pos = [i for i in range(len(S))]
    shuffle(V_pos)
    V_pos = V_pos[:C]
    
    print('Initial : V_pos(random) =', V_pos)
    
    # Compute Levenshtein distance between input string j and cluster prototype i (Lev(s_j, sc_i))
    Lev = [[0]*N for _ in range(N)]
    for i in range(N):
        for j in range(N):
            if i <= j:
                Lev[i][j] = levenshtein(S[i],S[j])
            else:
                Lev[i][j] = Lev[j][i]
    
    # Compute beta using fuzzy median equation (3)
    Med_pos = np.array([sum([Lev[j][k] for k in range(N)]) for j in range(N)]).argmin()
    beta = sum([Lev[Med_pos][k] for k in range(N)])/N 
    
    epoch = 1
    while True:
        
        U = [[0]*N for _c in range(C)] # membership matrix [u_ik]_CxN
        T = [[0]*N for _c in range(C)] # possibilistic matrix [t_ik]_CxN
        
        # Update membership and possibilistic
        for i in range(C):
            for k in range(N):    
                
                # Update membership value using equation (5)
                if k not in V_pos: # string s_k is not prototype(V)
                    U[i][k] = 1/sum([(Lev[V_pos[i]][k]/Lev[V_pos[j]][k])**(1/(m-1)) for j in range(C)])

                # Update possibilistic value using equation (6)
                T[i][k] = math.exp(-b * eta * math.sqrt(C) * Lev[V_pos[i]][k] / beta)
            
            # Set membership value sc_i = 1 (prototype)
            U[i][V_pos[i]] = 1

        # Update center string of each cluster i (sc_i) using equation (10)
        V_pos_updated = [np.array([sum([(a*U[i][k]**m + b*T[i][k]**eta)*Lev[j][k] for k in range(N)]) for j in range(N)]).argmin() for i in range(C)]
        
        print('epoch', epoch, end=': ')
        print('V_pos =', V_pos_updated)
        if V_pos_updated == V_pos:
            # return Multi-prototypes
            V = []
            for i in range(C):
                if S[V_pos[i]] not in V:
                    V.append(S[V_pos[i]])
#             for i in range(C):
#                 V.append(S[V_pos[i]])
            return V
        
        V_pos = V_pos_updated.copy()
        epoch += 1
        

#### FKNN Classifcation

In [None]:
def FKNN(SC, S_test, C, K = 3, m = 2):
    
    pred = [[] for _ in range(C)]
    
    for idx in range(C):
        for _s in S_test[idx]:

            # lowest levenshtein distance K prototypes
            lowest = []
            Lev = [[levenshtein(_s, sc_ij) for sc_ij in sc_i] for sc_i in SC]
            for _ in range(K):
                lev_min = float('inf')
                lev_class, lev_idx = -1, -1
                for i in range(len(Lev)):
                    for j in range(len(Lev[i])):
                        if Lev[i][j] < lev_min:
                            lev_min = Lev[i][j]
                            lev_class = i
                            lev_idx = j
                lowest.append((lev_class, lev_idx))
                Lev[lev_class][lev_idx] = float('inf')

            # class prediction
            prob = []
            for _k in range(K):
                dividend, divisor = 0, 0
                for (i, j) in lowest:
                    lev = levenshtein(SC[i][j], _s)
                    if not lev == 0:
                        eq = (1/lev)**(1/(m-1))
                    else:
                        eq = 1

                    divisor += eq
                    if lowest[_k][0] == i:
                        dividend += eq
                prob.append(dividend/divisor)
            pred[idx].append(lowest[np.array(prob).argmax()][0])            
    return pred

#### Accuracy calculation and Confusion Matrix

In [None]:
def accuracy(pred, C):
    c_matrix = [[0]*C for _ in range(C)]
    sum_true = 0
    sum_all = 0
    for i in range(C):
        for p in pred[i]:
            c_matrix[p][i] += 1
            sum_all += 1
            if p == i:
                sum_true += 1
    print('\tConfusion Matrix')
    for m in c_matrix:
        print(f' [ {m[0] : ^5}{m[1] : ^5}{m[2] : ^5}{m[3] : ^5}{m[4] : ^5} ]')
    print()
    return round(sum_true*100/sum_all, 2)

#### Load dataset
File Header : [Date, Open, High, Low, Close, Adj Close, Volume]

In [None]:
def load_dataset(path, C):
    
    Elliott_patterns = [[] for _ in range(5)]
    pattern = []
    file = open(path)
    data = file.read().split()
    data = [d.split(',') for d in data]

    for i, ds in enumerate(data):
        if len(ds) == 2:
            if not pattern == []:
                Elliott_patterns[_y].append(np.array(pattern))
            pattern = []

            # for C classes recognition
            _y = int(ds[1]) - 1 # class number

        elif i == len(data) - 1:
            Elliott_patterns[_y].append(np.array(pattern))
            pattern = []
        else:
            pattern.append(float(ds[2])) # High price
            pattern.append(float(ds[3])) # Low Price
#             pattern.append(float(ds[4])) # Close Price
    
#     Elliott_patterns = [[p for p in ep if len(p) > 40] for ep in Elliott_patterns]

    n_spc = [len(_x) for _x in Elliott_patterns]
    print(f'> Data {sum(n_spc)} samples')
    print('> Samples per class :',n_spc)

    return Elliott_patterns

### _________________________ Experiment _________________________


#### Load Training set and Test set 

In [None]:
C = 5 # number of class

print('Training set')
training_path = '.\Dataset\\Elliott_training_set.csv'
X_train = load_dataset(training_path, C)

print('\nTest set')
test_path = '.\Dataset\\Elliott_test_set.csv'
X_test = load_dataset(test_path, C)

#### Preprocessing 

In [None]:
n_bins = 8
S_train = String_representation(X_train, n_bins)
S_test = String_representation(X_test, n_bins)

#### Training (Create prototype for each classes)

In [None]:
SC = []
cluster = [45, 30, 40, 35, 20]

start = time.time()
for i in range(C):
    print(f'\n<Class {i+1}>')
    # C = number of prototype(that you want) in each class
    SC.append(StringGrammarClustering(S_train[i], C = cluster[i], m = 2, eta = 2, a = 0.1, b = 0.16))
end = time.time()
print('time:', datetime.timedelta(seconds=end-start),end='\n\n')
print('# Photptypes per class :', [len(sc) for sc in SC])

#### Testing 

In [None]:
pred = FKNN(SC, S_test, C, K = 1, m = 2)
# print(pred)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_test, C, K = 3, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_test, C, K = 5, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_test, C, K = 7, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_test, C, K = 9, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_train, C, K = 1, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_train, C, K = 3, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_train, C, K = 5, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
pred = FKNN(SC, S_train, C, K = 7, m = 2)
print('accuracy :', accuracy(pred, C))

In [None]:
import csv
with open("multiprototype_Elliott.csv", 'w', encoding='UTF8', newline='') as f:
        writer = csv.writer(f)
        for i in range(C):
            for j in range(len(SC[i])):
                prototype = "".join(SC[i][j])
                writer.writerow([i,prototype])