# Functions for text classification

In [1]:
import pandas as pd
import numpy as np
import statsmodels.formula.api as smf
import sys
import scipy.sparse
import sklearn.metrics
import sklearn.naive_bayes
import sklearn.model_selection
#import keras
import nltk
from nltk.collocations import BigramCollocationFinder
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_context('notebook') 
sns.set_style('ticks')
%matplotlib inline
plt.rcParams['figure.figsize'] = (7, 5)

Using TensorFlow backend.


In [2]:
seed = 5
np.random.seed(seed)

## Functions

In [3]:
%run "tqdm.ipynb"

### Tokenize the text

In [5]:
def tok_text(text):
    tok = nltk.word_tokenize(text)
    tok_no_punctuation = [word.lower() for word in tok if word.isalpha()]
    tok_no_stopwords = [word for word in tok_no_punctuation if word not in stopwords]
    tokens = [nltk.stem.porter.PorterStemmer().stem(word) for word in tok_no_stopwords]
    
    return tokens

### Count the total frequency of words

In [5]:
def word_freq(data, column_name):
    dic = nltk.FreqDist()
    for words in data[column_name]:
        for word in words:
                dic[word] += 1
                
    return dic

### Count the relative frequency of words

In [6]:
def word_freq_class(data, n_words, column_name, ref_value, col_feature):
    n_class = len(data[data[column_name]==ref_value])
    fdist = nltk.FreqDist()
    for words in data[data[column_name]==ref_value][col_feature]:
        for word in np.unique(words):
            fdist[word] += 1
            
    common = pd.Series(dict(fdist))/n_class
    common= common.sort_values(ascending=False)
    finalList = common.head(n_words).round(3)
    
    return n_class, finalList

In [None]:
def word_freq_class_total(data, n_words, column_name, ref_value, col_feature):
    fdist = nltk.FreqDist()
    for words in data[data[column_name]==ref_value][col_feature]:
        for word in np.unique(words):
            fdist[word] += 1
    common = pd.Series(dict(fdist))/n_class
    common= common.sort_values(ascending=False)
    finalList = common.head(n_words).round(3)
    
    return n_class, finalList

### Split the dataset in train and test

In [7]:
def split_train_test(data, frac):
    #data[clas]=(data['Class']==clas).astype(int)
    train = data.sample(frac=frac, random_state=5)
    test = data[data.index.isin(train.index)==False].copy()
    train = train.reset_index(drop=True)
    test = test.reset_index(drop=True)
    print('Classification on train dataset: ' , train['Class_n'].value_counts(normalize = True))
    print('Classification on test dataset: ', test['Class_n'].value_counts(normalize = True))
    
    return train, test

### Cross-entropy to rank the features

In [1]:
def training_error(feature, data, y_train, col_feature):
    X_train = design_matrix_one(feature, data[col_feature])
    nbc = sklearn.naive_bayes.BernoulliNB()
    model = nbc.fit(X_train, np.ravel(y_train))
    prob = model.predict_proba(X_train)
    return sklearn.metrics.log_loss(y_train, prob)

### Sparse matrix

In [9]:
# given one feature
def design_matrix_one(feature, series):
    X = series.apply(lambda text_tok: (feature in text_tok))
    X = X.astype(int) 
    return X.values.reshape((-1,1)) # converting to a NumPy matrix, as required

In [10]:
# given a list of features
def design_matrix(features, series):
    X = scipy.sparse.lil_matrix((len(series),len(features)))
    for i in range(len(series)):
        tokens = series.iloc[i]
        for j, feature in enumerate(features):
            if feature in tokens:
                X[i, j]= 1.0
    return X

### Naive Bayes 

* with one feature

In [11]:
def naive_bayes_one(feature, train, test, col_feature):    
    X_train = design_matrix_one(feature, train[col_feature])
    X_test = design_matrix_one(feature, test[col_feature])
    y_train = train.iloc[:,2].values
    y_test = test.iloc[:,2].values

    nbc = sklearn.naive_bayes.BernoulliNB()
    model = nbc.fit(X_train, np.ravel(y_train))
    pred = model.predict(X_test)
    error = 1 - sklearn.metrics.accuracy_score(pred, y_test)
    print('Classification error on the test dataset - one feat.: ', error.round(3))  
    return error

* with **n** features ranked according to cross-entropy

In [12]:
def naive_bayes_n(n, train, test, rank_features, col_feature):
    y_train = train.iloc[:,2].values
    y_test = test.iloc[:,2].values
    
    feat_to_use = rank_features[:n]
    #feat_to_use.remove('preços')
    
    X_train = design_matrix(feat_to_use, train[col_feature])
    X_test = design_matrix(feat_to_use, test[col_feature])

    nbc = sklearn.naive_bayes.BernoulliNB()
    model = nbc.fit(X_train, np.ravel(y_train))
    y_pred = model.predict(X_test)
    error  = 1 - sklearn.metrics.accuracy_score(y_test, y_pred)
    print('Classification error on the test dataset - 10 feat.: ', error.round(3)) 
    return error

* model selection for the number of features

In [13]:
def naive_bayes_selection(train, test, rank_features, col_feature):
    y_train = train.iloc[:,2].values
    y_test = test.iloc[:,2].values
    
    test_errors = []
    cv_errors = []

    n_features = np.arange(0, len(rank_features)+1, 10)
    n_features[0] = 1 # the first model has 1 feature, then 20, 40, etc

    for n in tqdm(n_features):
        X_train = design_matrix(rank_features[:n], train[col_feature])
        X_test = design_matrix(rank_features[:n], test[col_feature])
        nbc = sklearn.naive_bayes.BernoulliNB()
        model = nbc.fit(X_train, np.ravel(y_train))

        scores = sklearn.model_selection.cross_val_score(model, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors.append(1-np.mean(scores))

        y_pred = model.predict(X_test)
        test_errors.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred)) 
    
    return n_features, cv_errors, test_errors

### Plot classification error

In [14]:
def plot_error(n_features, cv_errors, test_errors, classifier):
    fig, ax= plt.subplots(figsize=(7.5,5))
    ax.plot(n_features, test_errors, color='#1F77B4', label='Test error')
    ax.plot(n_features, cv_errors, color='#FF7F0E', label='CV error')
    ax.set_xlabel('Number of features')
    ax.set_ylabel('Misclassification rate')
    ax.set_ylim([0,1])
    plt.title('Misclassification rate for %s' %(classifier))
    plt.legend()
    sns.despine()
    plt.show()

    print('Lowest CV error: K = {}'.format(n_features[np.argmin(cv_errors)])) 
    print('CV error with %s features:' %str(n_features[np.argmin(cv_errors)]), round(min(cv_errors),3))   
    print('Test error in this selected model = {:.3f}'.format(test_errors[np.argmin(cv_errors)])) 


### Model evaluation

In [3]:
def model_evaluation(n, train, test, rank_features, col_feature, cat):
    X_train = design_matrix(rank_features[:n], train[col_feature])
    X_test = design_matrix(rank_features[:n], test[col_feature])
    y_train = train.iloc[:,2].values
    y_test = test.iloc[:,2].values
    
    nbc = sklearn.naive_bayes.BernoulliNB()
    model = nbc.fit(X_train, np.ravel(y_train))
    y_pred = model.predict(X_test)
    prob = model.predict_proba(X_test)
    
    confusion  = sklearn.metrics.confusion_matrix(y_test, y_pred)
    df_cm = pd.DataFrame(confusion, index = [i for i in range(1,cat+1)],
                      columns = [i for i in range(1,cat+1)])
    plt.figure(figsize = (5,3))
    sns.heatmap(df_cm, annot=True, fmt="d");
    plt.title('Confusion matrix for multiclass NB')
    plt.xlabel('Naive Bayes classifier')
    plt.ylabel('Real label')
    
    df_relative = pd.DataFrame(confusion, index = [i for i in range(1,cat+1)],
                  columns = [i for i in range(1,cat+1)])
    for row in range(0,cat):
        agg = sum(df_cm.iloc[row,:])
        for col in range(0,cat):
            df_relative.iloc[row, col] = df_cm.iloc[row, col]/agg  
            
    plt.figure(figsize = (5,3))
    sns.heatmap(df_relative, annot=True);
    plt.title('Confusion matrix for multiclass NB (%)')
    plt.xlabel('Naive Bayes classifier')
    plt.ylabel('Real label')

### All the steps

In [2]:
def naive_bayes_test(data, frac, col_feature, features, cat):
    train, test = split_train_test(data, frac)

    y_train = train.iloc[:,2].values
    losses=[]
    for feature in features.index:
        losses.append(training_error(feature, train, y_train, col_feature))

    ranked = pd.Series(losses, index=features.index)
    ranked = ranked.sort_values()
    rank_features = list(ranked.index) 

    error_one = naive_bayes_one(ranked.index[0], train, test, col_feature)

    error_10 = naive_bayes_n(10, train, test, rank_features, col_feature)

    n_features, cv_errors, test_errors = naive_bayes_selection(train, test, rank_features, col_feature)
    plot_error(n_features, cv_errors, test_errors)
    
    model_evaluation(n_features[np.argmin(cv_errors)], train, test, rank_features, col_feature, cat)
    
    return train, test, ranked

### Naive Bayes Feature selection

In [None]:
def NB_selection(train, test, rank_features, col_feature, col_label, list_min, list_type):
    y_train = train[col_label].values
    y_test = test[col_label].values
    
    test_errors = []
    cv_errors = []

    n_features = np.arange(0, len(rank_features)+1, 10) # the first model has 1 feature, then 10, 20, etc
    n_features[0] = 1 

    train_min_feat = train[list_min+list_type].values
    test_min_feat = test[list_min+list_type].values
    
    for n in tqdm(n_features):
        
        train_word_feat = design_matrix(rank_features[:n], train[col_feature])
        X_train = scipy.sparse.hstack((train_min_feat, train_word_feat))
                
        test_word_feat = design_matrix(rank_features[:n], test[col_feature])
        X_test = hstack((test_min_feat, test_word_feat))
        
        NB = BernoulliNB(alpha = 0.01)
        model = NB.fit(X_train, np.ravel(y_train))

        scores = sklearn.model_selection.cross_val_score(model, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors.append(1-np.mean(scores))

        y_pred = model.predict(X_test)
        test_errors.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred)) 
    
    return n_features, cv_errors, test_errors

### SVM Feature selection

In [1]:
def SVM_selection(train, test, rank_features, col_feature, col_label, list_min, list_type):
    y_train = train[col_label].values
    y_test = test[col_label].values
    
    test_errors = []
    cv_errors = []

    n_features = np.arange(0, len(rank_features)+1, 10) # the first model has 1 feature, then 10, 20, etc
    n_features[0] = 1 

    train_min_feat = train[list_min+list_type].values
    test_min_feat = test[list_min+list_type].values
    
    for n in tqdm(n_features):
        
        train_word_feat = design_matrix(rank_features[:n], train[col_feature])
        X_train = scipy.sparse.hstack((train_min_feat, train_word_feat))
                
        test_word_feat = design_matrix(rank_features[:n], test[col_feature])
        X_test = hstack((test_min_feat, test_word_feat))
        
        svm = SGDClassifier(loss='hinge', penalty='l2', alpha=0.001, max_iter=5, random_state=5)
        model = svm.fit(X_train, np.ravel(y_train))

        scores = sklearn.model_selection.cross_val_score(model, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors.append(1-np.mean(scores))

        y_pred = model.predict(X_test)
        test_errors.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred)) 
    
    return n_features, cv_errors, test_errors

### Logistic Regression Feature Selection

In [2]:
def LR_selection(train, test, rank_features, col_feature, col_label, list_min, list_type):
    y_train = train[col_label].values
    y_test = test[col_label].values
    
    test_errors = []
    cv_errors = []

    n_features = np.arange(0, len(rank_features)+1, 10) # the first model has 1 feature, then 10, 20, etc
    n_features[0] = 1 

    train_min_feat = train[list_min+list_type].values
    test_min_feat = test[list_min+list_type].values
    
    for n in tqdm(n_features):
        
        train_word_feat = design_matrix(rank_features[:n], train[col_feature])
        X_train = scipy.sparse.hstack((train_min_feat, train_word_feat))
                
        test_word_feat = design_matrix(rank_features[:n], test[col_feature])
        X_test = hstack((test_min_feat, test_word_feat))
        
        logistic = LogisticRegression(penalty = 'l2', C=3, class_weight="balanced")
        model = logistic.fit(X_train, np.ravel(y_train))

        scores = sklearn.model_selection.cross_val_score(model, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors.append(1-np.mean(scores))

        y_pred = model.predict(X_test)
        test_errors.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred)) 
    
    return n_features, cv_errors, test_errors

### Plot confusion matrix

In [5]:
def plot_confusion(y_test, y_pred, cat, classifier, palette):

    plt.tight_layout()
    
    
    confusion  = sklearn.metrics.confusion_matrix(y_test, y_pred)
    df_cm = pd.DataFrame(confusion, index = [i for i in range(1,cat+1)],
                      columns = [i for i in range(1,cat+1)])
    plt.figure(figsize = (6,4))
    sns.heatmap(df_cm, annot=True, fmt="d", cmap=palette);
    plt.title('Confusion matrix for %s classifier' %(classifier))
    plt.xlabel('%s classifier' %(classifier))
    plt.ylabel('Real label')
    if cat==10:
        tick_marks = ['FL', 'DR+SF', 'FD', 'IM', 'MM', 'IF', 'OB', 'RH', 'DOC', 'INFO+RE' ]
    else:
        tick_marks = ['other', 'FL+DR+SF']
    plt.xticks(np.arange(cat)+0.5, tick_marks, rotation=90)
    plt.yticks(np.arange(cat)+0.5, tick_marks[::-1], rotation=0)

    
    df_relative = pd.DataFrame(confusion, index = [i for i in range(1,cat+1)],
                  columns = [i for i in range(1,cat+1)])
    for row in range(0,cat):
        agg = sum(df_cm.iloc[row,:])
        for col in range(0,cat):
            df_relative.iloc[row, col] = df_cm.iloc[row, col]/agg  

    plt.figure(figsize = (6,4))
    sns.heatmap(df_relative, annot=True, fmt='.0%', cmap=palette);
    plt.title('Recall confusion matrix for %s' %(classifier))
    plt.xlabel('%s classifier' %(classifier))
    plt.ylabel('Real label')
    if cat==10:
        tick_marks = ['FL', 'DR+SF', 'FD', 'IM', 'MM', 'IF', 'OB', 'RH', 'DOC', 'INFO+RE' ]
    else:
        tick_marks = ['other', 'FL+DR+SF']
    plt.xticks(np.arange(cat)+0.5, tick_marks, rotation=90)
    plt.yticks(np.arange(cat)+0.5, tick_marks[::-1], rotation=0) 
    
    
    df_relative_2 = pd.DataFrame(confusion, index = [i for i in range(1,cat+1)],
                  columns = [i for i in range(1,cat+1)])
    for col in range(0,cat):
        agg = sum(df_cm.iloc[:,col])
        for row in range(0,cat):
            df_relative_2.iloc[row, col] = df_cm.iloc[row, col]/agg  
    
    plt.figure(figsize = (6,4))
    sns.heatmap(df_relative_2, annot=True, fmt='.0%', cmap=palette);
    plt.title('Precision confusion matrix for %s' %(classifier))
    plt.xlabel('%s classifier' %(classifier))
    plt.ylabel('Real label')
    if cat==10:
        tick_marks = ['FL', 'DR+SF', 'FD', 'IM', 'MM', 'IF', 'OB', 'RH', 'DOC', 'INFO+RE' ]
    else:
        tick_marks = ['other', 'FL+DR+SF']
    plt.xticks(np.arange(cat)+0.5, tick_marks, rotation=90)
    plt.yticks(np.arange(cat)+0.5, tick_marks[::-1], rotation=0)

### CV for all agorithms together

In [None]:
def feat_selection_all(train, test, rank_features, col_feature, col_label, list_min, list_type):
    y_train = train[col_label].values
    y_test = test[col_label].values
    
    test_errors_lr = []
    cv_errors_lr = []
    test_errors_svm = []
    cv_errors_svm = []
    test_errors_nb = []
    cv_errors_nb = []

    n_features = np.arange(0, len(rank_features)+1, 10) # the first model has 1 feature, then 10, 20, etc
    n_features[0] = 1 

    train_min_feat = train[list_min+list_type].values
    test_min_feat = test[list_min+list_type].values
    
    for n in tqdm(n_features):
        
        train_word_feat = design_matrix(rank_features[:n], train[col_feature])
        X_train = scipy.sparse.hstack((train_min_feat, train_word_feat))
                
        test_word_feat = design_matrix(rank_features[:n], test[col_feature])
        X_test = hstack((test_min_feat, test_word_feat))
        
        logistic = LogisticRegression(penalty = 'l2', C=3)
        model_lr = logistic.fit(X_train, np.ravel(y_train))

        scores_lr = sklearn.model_selection.cross_val_score(model_lr, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors_lr.append(1-np.mean(scores_lr))

        y_pred_lr = model_lr.predict(X_test)
        test_errors_lr.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred_lr)) 
        
        svm = SGDClassifier(loss='hinge', penalty='l2', alpha=0.001, max_iter=5, random_state=5)
        model_svm = svm.fit(X_train, np.ravel(y_train))

        scores_svm = sklearn.model_selection.cross_val_score(model_svm, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors_svm.append(1-np.mean(scores_svm))

        y_pred_svm = model_svm.predict(X_test)
        test_errors_svm.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred_svm))
        
        NB = BernoulliNB(alpha = 0.01)
        model_nb = NB.fit(X_train, np.ravel(y_train))

        scores_nb = sklearn.model_selection.cross_val_score(model_nb, X_train, y_train, cv=10, scoring = 'accuracy')
        cv_errors_nb.append(1-np.mean(scores_nb))

        y_pred_nb = model_nb.predict(X_test)
        test_errors_nb.append(1 - sklearn.metrics.accuracy_score(y_test, y_pred_nb)) 
    
    return n_features, cv_errors_lr, test_errors_lr, cv_errors_svm, test_errors_svm, cv_errors_nb, test_errors_nb

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class MinistryDummies(BaseEstimator, TransformerMixin):
    """Takes in dataframe, extracts road name column, outputs average word length"""

    def __init__(self):
        pass

    def transform(self, df, y=None):
        """The workhorse of this feature extractor"""
        return pd.get_dummies(df['Orgao_Sup_cod'], drop_first=True)

    def fit(self, df, y=None):
        """Returns `self` unless something different happens in train and test"""
        return self