In [15]:
# calcolo distanza damerau-levenshtein

def damerau_levenshtein_distance(s1, s2):
    d = {}
    lenstr1 = len(s1)
    lenstr2 = len(s2)
    for i in range(-1,lenstr1+1):
        d[(i,-1)] = i+1
    for j in range(-1,lenstr2+1):
        d[(-1,j)] = j+1

    for i in range(lenstr1):
        for j in range(lenstr2):
            if s1[i] == s2[j]:
                cost = 0
            else:
                cost = 1
            d[(i,j)] = min(
                           d[(i-1,j)] + 1, # deletion
                           d[(i,j-1)] + 1, # insertion
                           d[(i-1,j-1)] + cost, # substitution
                          )
            if i and j and s1[i]==s2[j-1] and s1[i-1] == s2[j]:
                d[(i,j)] = min (d[(i,j)], d[i-2,j-2] + cost) # transposition

    return d[lenstr1-1,lenstr2-1]

In [16]:
from sklearn.base import BaseEstimator

In [17]:
class NLP_Classifier(BaseEstimator):
    
    MAX_DIST = 1000
    dictionary = {}
    probs = []
    
    def __init__(self):
        pass
    
    def fit(self, X_train, y_train):
        
        self.classes = set(y_train)
        for c in self.classes:
            self.dictionary[c] = []
        
        X_train = X_train['metadata']
        for i, word in enumerate(X_train):
            c1 = y_train[i]
            self.dictionary[c1].append(word)
            
    def predict_proba(self, X_test):
        
        self.probs = []
        for i, word in enumerate(X_test['metadata']):
            
            edit_distances = []
            for c1 in self.classes:
                dam = [damerau_levenshtein_distance(word, field) for field in self.dictionary[c1]]
                if len(dam) > 0:
                    # aggiungiamo 1 perchè divideremo dopo
                    edit_distances.append((min(dam)+1))
                else:
                    edit_distances.append(MAX_DIST)
            
            self.probs.append(np.true_divide(1, edit_distances))
            
        return self.probs   
    
    def predict(self, X_test):
        
        if (len(self.probs) == 0):
            self.probs = self.predict_proba(X_test)
        y_pred = []
        for series in self.probs:
            y_pred.append(np.argmax(series))
        
        return y_pred

In [18]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.neighbors import KNeighborsClassifier
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

In [19]:
X = {'metadata': []}
y = []

path = ""

# calcoliamo le features di ogni timeseries

with open(path + 'ThingspeakEU.meta.csv', 'r', encoding='utf-8') as dati:
    for row in dati:
        riga = row.strip().split(',')
        classe = int(riga[8])
        y.append(classe)
        valore = riga[1]
        X['metadata'].append(valore)
        
X = pd.DataFrame(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state = 100)

In [20]:
nlp = NLP_Classifier()

In [21]:
nlp.fit(X_train, y_train)

In [22]:
probs = nlp.predict_proba(X_test)

In [23]:
y_pred = nlp.predict(X_test)

In [25]:
from sklearn.metrics import accuracy_score

accuracy_score(y_pred, y_test)

0.6891679748822606