In [1]:
from collections import defaultdict, Counter
from nltk import word_tokenize
from math import log, sqrt

In [2]:
# TF-IDF variants: https://en.wikipedia.org/wiki/Tf–idf
def tf1(frequency, N, maxTf, k): return 1 if frequency else 0 # binary 
def tf2(frequency, N, maxTf, k): return frequency # raw count
def tf3(frequency, N, maxTf, k): return frequency / N # term frequency
def tf4(frequency, N, maxTf, k): return log(1 + frequency) # log normalization
def tf6(frequency, N, maxTf, k): return k+(1-k)*(frequency/maxTf) # double normalization k
def tf5(frequency, N, maxTf, k): return tf6(frequency, maxTf, 0.5) # double normaliztion 0.5

tfSet = [tf1, tf2, tf3, tf4, tf5, tf6]

def idf1(df, N): return 1 # unary
def idf2(df, N): return log(N/df) # inverse document frequency
def idf3(df, N): return log(N/(1+df)) # inverse document frequency smooth
# idf4 = lambda df, N, maxDf: log(maxDf/(1+df)) # inverse document frequency max
def idf5(df, N): return log((N-df)/df) # probabilistic inverse document frequency

idfSet = [idf1, idf2, idf3, None, idf5]

distance = lambda x1, x2: (x1-x2)**2

In [3]:
class kNN_Classifier:
    def __init__(self, tokenizer=None, tf=6, idf=2):
        tokenizer = tokenizer if not tokenizer == None else word_tokenize
        self.params = {"tokenizer":tokenizer, 
                       "tf":tfSet[tf-1], 
                       "idf":idfSet[idf-1]}
        
    def fit(self, X, y):
        """doc string"""
        # check valid dataset
        assert len(X) == len(y)
        
        # define objects
        self.X = X
        self.y = y
        self.classes = list(set(y))
        self.doc2idx = lambda d: X.index(d)
        self.idx2doc = lambda i: X[i]
        
        self.vocabulary = list()
        self._posting = list()
        self._lexicon = defaultdict(lambda :-1)
        self._docInfo = defaultdict(lambda: {"maxTf":0, "vecLen":0.0})
        self._weights = []
        self._wLexicon = defaultdict(lambda:{"wptr":-1, "df":0})

        # TDM
        for doc in X:
            docID = self.doc2idx(doc)
            localPosting = defaultdict(int)
            for token in self.params["tokenizer"](doc):
                if token not in self.vocabulary:
                    self.vocabulary.append(token)
                localPosting[token] += 1
                
            maxTf = max(localPosting.values())
            self._docInfo[docID]["maxTf"] = maxTf
            
            for token, freq in localPosting.items():
                ptr = self._lexicon[token]
                nextPtr = len(self._posting)
                self._posting.append((docID, freq, ptr))
                self._lexicon[token] = nextPtr
                self._wLexicon[token]["df"] += 1
        
        # weight        
        N = len(X)
        for token in self.vocabulary:
            ptr = self._lexicon[token]
            self._wLexicon[token]["wptr"] = len(self._weights)
            while ptr != -1:
                _struct = self._posting[ptr]
                weight = self.params["tf"](_struct[1], N, 
                           self._docInfo[_struct[0]]["maxTf"], 0) * \
                         self.params["idf"](self._wLexicon[token]["df"], N)
                self._weights.append((_struct[0], weight))
                self._docInfo[_struct[0]]["vecLen"] += \
                                            distance(0,weight)**2
                ptr = _struct[-1]
        
        
    def refit(tf, idf):
        """doc string"""
        self.params["tf"] = tfSet[tf-1]
        self.params["idf"] = idfset[idf-1]
        
        # weight        
        N = len(X)
        for token in self.vocabulary:
            ptr = self._lexicon[token]
            self._wLexicon[token]["wptr"] = len(self._weights)
            while ptr != -1:
                _struct = self._posting[ptr]
                weight = self.params["tf"](_struct[1], N, 
                           self._docInfo[_struct[0]]["maxTf"], 0) * \
                         self.params["idf"](df, N)
                self._weights.append((_struct[-1], weight))
                self._docInfo[_struct[-1]]["vecLen"] += \
                                            distance(0,weight)**2
                ptr = _struct[-1]
        
        
    def predict_prob(self, test, *, k=5, method="cosine"):
        """doc string"""
        assert method in ["euclide", "cosine"], "Invalid method"
        
        # test indexing
        qRepr = defaultdict(int)
        qWeight = defaultdict(float)
        qVecLen = 0.0
        for token in self.params["tokenizer"](test):
            if token in self.vocabulary:
                qRepr[token] += 1
        maxQtf = max(qRepr.values())
        
        # test weight
        N = len(self.X)
        for token, freq in qRepr.items():
            qWeight[token] = self.params["tf"](freq, N, maxQtf, 0) *\
                             self.params["idf"](
                                        self._wLexicon[token]["df"], N)
            qVecLen += distance(0, qWeight[token])**2
            
        # find neighbors
        if method == "euclide":
            result = defaultdict(float)
            for token, _wptr_df in self._wLexicon.items():
                wptr = _wptr_df["wptr"]
                df = _wptr_df["df"]
                for _ in range(df):
                    _struct = self._weights[wptr]
                    result[_struct[0]] += distance(_struct[-1],
                                                    qWeight[token])
                    wptr += 1
            result = {self.y[docID]:sqrt(v) for docID, v in result.items()}
            return sorted(result.items(), 
                          key=lambda x:x[1], reverse=True)[:k]
        
        elif method == "cosine":
            result = defaultdict(float)
            for token, _wptr_df in self._wLexicon.items():
                wptr = _wptr_df["wptr"]
                df = _wptr_df["df"]
                for _ in range(df):
                    _struct = self._weights[wptr]
                    result[_struct[0]] += _struct[-1] *\
                                                    qWeight[token]
                    wptr += 1
            result = [
                (self.y[docID], v/(sqrt(qVecLen) * sqrt(self._docInfo[docID]["vecLen"])))
                for docID, v in result.items()
            ]
            return sorted(result, 
                          key=lambda x:x[1], reverse=True)[:k]
    
    
    def predict(self, test, *, k=5, method="cosine"):
        """doc string"""
        assert method in ["euclide", "cosine"], "Invalid method"
        assert isinstance(test, str)
        # predict
        result = self.predict_prob(test, k=k, method=method)    
        k_class_count = Counter([_[0] for _ in result]).most_common()
        k_most_freqeunt = k_class_count[0][0]
        candidates = [_[0] for _ in k_class_count if _[0] == k_most_freqeunt]
        # sorting
        if len(candidates) == 1:
            return candidates[0]
        else:
            for cand, prob in result:
                if cand in candidates:
                    return cand
        
    def predict_many(self, testset, k=5, method="cosine"):
        from collections import Container
        assert issubclass(type(testset), Container)
        
        result = []
        for test in testset:
            result.append(self.predict(test, k=k, method=method))
        return result

---

In [4]:
# test

import os
from sklearn.metrics import classification_report

basedir = "C:/Users/JINHYO/1. My Training/IPA NLP Class/1.수업/NLP_Class/practice/8일차_실습_project/헤드라인/"
Class = ["IT 과학", "경제", "사회", "생활 문화", "정치"]
fileList = [_ for _ in os.listdir(basedir) if 10 < len(_)]
train_X = []
train_y = []
for c in Class:
    for file in [basedir + _ for _ in fileList if _.startswith(c+"-")]:
        train_X.append(open(file, encoding="utf-8").read())
        train_y.append(c)
        
test_X = []
test_y = []
for filename in os.listdir("C:/Users/JINHYO/1. My Training/IPA NLP Class/1.수업/NLP_Class/practice/testData"):
    if not filename.startswith("세계"):
        test_X.append(open("C:/Users/JINHYO/1. My Training/IPA NLP Class/1.수업/NLP_Class/practice/testData/" + filename, encoding="utf-8").read())
        testcls = filename.split("-")[0].replace("&", " ")
        test_y.append(testcls)

In [5]:
knn = kNN_Classifier()

In [6]:
knn.fit(train_X, train_y)

In [7]:
knn.predict_prob(test_X[19])

[('생활 문화', 0.20858041472462496),
 ('경제', 0.163400543332466),
 ('경제', 0.10947826737509088),
 ('생활 문화', 0.09904514223272988),
 ('사회', 0.09809072927475788)]

In [8]:
knn.predict_prob(test_X[19], method="euclide")

[('사회', 4.338281692521411),
 ('경제', 4.327098108728728),
 ('정치', 4.290565724481992),
 ('생활 문화', 4.154136172981419),
 ('IT 과학', 4.114677442426481)]

In [9]:
knn.predict(test_X[19])

'생활 문화'

In [10]:
knn.predict(test_X[19], method="euclide")

'사회'

In [11]:
predicts = knn.predict_many(test_X, k=20)
print(classification_report(test_y, predicts))



              precision    recall  f1-score   support

       IT 과학       1.00      0.10      0.18        10
          경제       0.33      0.30      0.32        10
          사회       0.45      0.50      0.48        10
       생활 문화       1.00      0.80      0.89        10
          정치       0.43      0.90      0.58        10

    accuracy                           0.52        50
   macro avg       0.64      0.52      0.49        50
weighted avg       0.64      0.52      0.49        50



In [12]:
predicts = knn.predict_many(test_X, method="euclide", k=20)
print(classification_report(test_y, predicts))

              precision    recall  f1-score   support

       IT 과학       0.00      0.00      0.00        10
          경제       0.15      0.40      0.22        10
          사회       0.33      0.40      0.36        10
       생활 문화       0.00      0.00      0.00        10
          정치       0.30      0.30      0.30        10

    accuracy                           0.22        50
   macro avg       0.16      0.22      0.18        50
weighted avg       0.16      0.22      0.18        50



  'precision', 'predicted', average, warn_for)
