Characteristic words from TT descriptions
===================================

This experiment focuses on the descriptions of the Twitter accounts of Polish Sejm MPs.
We ask ourselves are there any characteristic words used in the descriptions.

This is first toy example, because the number of descriptions is rather small (<400), so the results are statistically insignificant. Also only the biggest parties were used in the experiment (and only for some we obtained any results). But this is good first try for further more challenging tasks.

In the solution I used TF-IDF vectorization of each description, building simple classifier on top of that.
Description were preprocessed so only word-alike tokens were left, and transformed to their canonical forms (geting rid of inflection).

The classifier (even that it is very simple model) achieved good result on the test set (0.73 f1-score with 6 labels to classify). But the task was pretty much simple - in most cases the descriptions contain direct indication of the MPs affiliation - such words were identified as the top features used in classification. Still there are some other words which were selected as importat features... and those are interesting to check.

Data collection: around 2023-01-11


In [None]:
from aipolit.utils.text import read_tsv

In [None]:
# you need to create such file on your own
polit_data = read_tsv('../local_data/politycy-dane.tsv')

In [None]:
# For pretty printing of table data
import pandas as pd
from IPython.display import display, HTML

def show_pretty_table(raw_data, header):
    df = pd.DataFrame(raw_data, columns=header)
    display(HTML(df.to_html()))

In [None]:
# Source for parts of this code: 
# https://datascience.stackexchange.com/questions/103735/methods-for-finding-characteristic-words-for-a-group-of-documents-in-comparison

import numpy as np
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import make_pipeline
from tqdm import tqdm

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_recall_fscore_support

In [None]:
import random

In [None]:
# Used for lemmatization
import spacy
from spacy.lang.pl.examples import sentences

nlp_pl = spacy.load("pl_core_news_sm")

In [None]:
# Remove emojis preprocessing step
# Source: https://stackoverflow.com/questions/33404752/removing-emojis-from-a-string-in-python
import re

EMOJI_STRING = \
u"\U0001F600-\U0001F64F"  # emoticons
u"\U0001F300-\U0001F5FF"  # symbols & pictographs
u"\U0001F680-\U0001F6FF"  # transport & map symbols
u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
u"\U00002500-\U00002BEF"  # chinese char
u"\U00002702-\U000027B0"
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
u"\U0001f926-\U0001f937"
u"\U00010000-\U0010ffff"
u"\u2640-\u2642" 
u"\u2600-\u2B55"
u"\u200d"
u"\u23cf"
u"\u23e9"
u"\u231a"
u"\ufe0f"  # dingbats
u"\u3030"

        
def remove_emojis(data):
    emoj = re.compile("[" + EMOJI_STRING + "]+", re.UNICODE)
    return re.sub(emoj, '', data)

In [None]:
# Remove urls
# Source:
# https://stackoverflow.com/questions/11331982/how-to-remove-any-url-within-a-string-in-python

def remove_urls(text):
    text = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '', text, flags=re.MULTILINE)
    return text

In [None]:
def remove_emails(text):
    text = re.sub(r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+', "", text)
    return text

In [None]:
# Leave only word-alike tokens
# remove emojis
# run lemmatization

def preprocess_sentence(nlp, text):
    text = remove_emojis(text)
    text = remove_urls(text)
    text = remove_emails(text)
    text = re.sub("[^\w]+", " ", text)
    
    print(text)
    text = re.sub(r"^\s+", "", text)
    text = re.sub(r"\s+$", "", text)
    text = re.sub(r"\s+", " ", text)
    
    result = []
    doc = nlp_pl(text)
    for token in doc:
        #print(token.text, token.pos_, token.dep_, token.lemma_)
        result.append(token.lemma_)        

    return " ".join(result)

In [None]:
# Demonstrating how preprocessing works
sample_sentence = "2022 🇵🇱 Minister ds. Unii Europejskiej #babieslivesmatter http://example.com s@a.com @@@"
#sample_sentence = "🇵🇱"
#sample_sentence = "😀"

preprocessed_sentence = preprocess_sentence(nlp_pl, sample_sentence)

print("Input:\n  " + sample_sentence)
print("")
print("After preprocessing:\n  " + preprocessed_sentence)


In [None]:
# Loading sample data

# not enough data for all the parties :(
processed_parties = ['PiS', 'PO', 'Lewica', 'Solidarna Polska', 'PSL', 'Polska2050']

#processed_parties = ['PiS', 'PO']

descriptions_texts = []
descriptions_labels = []

for e in polit_data:
    party = e['party']
    desc = e['description']
    if not desc:
        continue

    if party in processed_parties:
        desc = preprocess_sentence(nlp_pl, desc)
        # We take only descriptons with at least 10 characters
        if len(desc) < 10:
            continue
        descriptions_texts.append(desc)
        descriptions_labels.append(party)  
        
print("Descriptions:", len(descriptions_texts))
print("Labels:", len(descriptions_labels))

In [None]:

from collections import Counter

def show_label_distribution():
    print("Label distribution")

    label_count = Counter()
    for label in descriptions_labels:
        label_count[label] += 1
    
    raw_data = []
    for label, value in label_count.items():
        raw_data.append((label, value))
    show_pretty_table(raw_data, ["Label", "Count"])
    
show_label_distribution()

In [None]:
print("Sample descriptions:")
desc_and_labels = [(d, l) for d, l in zip(descriptions_texts, descriptions_labels)]

def show_desc_sample():
    data = []
    for entry in random.sample(desc_and_labels, k=10):
        desc = entry[0]
        label = entry[1]
        data.append((label, desc))
    show_pretty_table(data, ['label', 'tt description'])  

show_desc_sample()

In [None]:
# Build classifier

tfidf = TfidfVectorizer(
    min_df=0.001, max_df=0.2, max_features=10_000, ngram_range=(1, 3),
    token_pattern=r"(?u)\b\w+\b") # one char tokens are also valid


X_train, X_test, y_train, y_test = train_test_split(
    descriptions_texts,
    descriptions_labels, 
    test_size=0.2, 
    random_state=42)


X_train_tfidf_matrix = tfidf.fit_transform(X_train)

# 2. Train classifier
clf = RandomForestClassifier()
clf.fit(X_train_tfidf_matrix, y_train)


In [None]:
clf_pipeline = make_pipeline(tfidf, clf)

In [None]:
# Sample classifier predictions on TEST set

def show_sample_predictions():
    MAX_SAMPLE_TO_SHOW = 100

    #X_test_tfidf_matrix = tfidf.transform(X_test[0:MAX_SAMPLE_TO_SHOW])
    #y_test_actual = clf.predict(X_test_tfidf_matrix)

    y_test_actual = clf_pipeline.predict(X_test[0:MAX_SAMPLE_TO_SHOW])
    raw_data = []
    for desc, pred_actual, pred_expected in zip(
        X_test[0:MAX_SAMPLE_TO_SHOW],
        y_test_actual,
        y_test[0:MAX_SAMPLE_TO_SHOW],
    ):
        PRED_STATUS = "FAIL"
        if pred_actual == pred_expected:
            PRED_STATUS = "SUCCESS"
        raw_data.append((PRED_STATUS, pred_actual, pred_expected, desc))
    show_pretty_table(raw_data, ['Result', "Actual", "Expected", "TT Description"])
    
        
show_sample_predictions()


In [None]:
# Score of the classifier
from sklearn.metrics import classification_report


def show_score(clf, X_input, y_expected, dataset_name):
    y_actual = clf_pipeline.predict(X_input)
    score = precision_recall_fscore_support(y_expected, y_actual, average='weighted')
    print(f"Score for {dataset_name} is:")
    print(f"  precistion: {score[0]}")
    print(f"  recall: {score[1]}")
    print(f"  f-score: {score[2]}")
    print(f"  Support: {score[3]}")
    print("")
    
    print("Classification report")
    print(classification_report(y_expected, y_actual, target_names=clf.classes_))
    
show_score(clf, X_train, y_train, "TRAIN SET")
show_score(clf, X_test, y_test, "TEST SET")

In [None]:
feature_names = tfidf.get_feature_names_out()
print("Feature count:", len(feature_names))
print("Number of stopwords:", len(tfidf.stop_words_))
#print("Stopwords:", tfidf.stop_words_)
#print("Features:", feature_names)

In [None]:
TOP_N_FEATURES = 50 # Top N features to be considered
MIN_CONFIDENCE = 0.6 # Take only results with high confidence

# 3. Get feature importances
feature_importances = clf.feature_importances_

# 4. Sort and get important features
word_indices = np.argsort(feature_importances)[::-1] # using argsort we get indices of important features

top_words_per_class = defaultdict(list)

for word_idx in word_indices[:TOP_N_FEATURES]:
    word = feature_names[word_idx]
    clf_input = [word]
    #word_class = clf.predict(tfidf.transform(clf_input))[0]
    class_probs = clf_pipeline.predict_proba(clf_input)[0]
    class_idx = np.argmax(class_probs)
    class_prob = class_probs[class_idx]
    if class_prob < MIN_CONFIDENCE:
        continue
    word_class = clf.classes_[class_idx]    
    top_words_per_class[word_class].append(word)

In [None]:
for label, top_words in top_words_per_class.items():
    print(f"Top words characteristic for class: {label}")
    for word in top_words:
        print(f"  {word}")
    print("")

LIME Explanations
===============

Another approach how to identify which words are "characteristic" to given class is to use Explainable AI techniques in order to find which features are the most commonly used for the "correct" decision.

We will use LIME alogrithm in this experiment.

The general idea is as follows:

1. Run explainer on each entry in both test/train set
2. For each explanation process only those which are correct, and have "high" confidence
3. Check top input features (words) contributing to final score (removing those with low weight - another hyperparam)
4. Count each feature as keyword candidate
5. Show most common keywords collected for each label.

This method has its advantage over previous "naive" approach, as it allows to think about keywords "independently" between classes. It may appear that some words (especially for parties with similar idealogy) can be keywords for multiple parties. In that cases they will appear on the different lists.


In [None]:
from lime.lime_text import LimeTextExplainer

In [None]:
explainer = LimeTextExplainer(class_names=clf.classes_)

In [None]:
def explain_single(index):
    print(f"Expected label: {descriptions_labels[index]}")
    exp = explainer.explain_instance(descriptions_texts[index], clf_pipeline.predict_proba, num_features=10, top_labels=2)
    exp.show_in_notebook(text=True)
    
explain_single(41)

In [None]:
index = 41
exp = explainer.explain_instance(descriptions_texts[index], clf_pipeline.predict_proba, num_features=10, top_labels=2)

In [None]:
exp.as_list()

In [None]:
exp_class_to_index = dict()
for i, exp_class in enumerate(exp.class_names):
    exp_class_to_index[exp_class] = i
    
print(exp_class_to_index)

In [None]:
exp_extracted_keywords =defaultdict(Counter)

In [None]:
KEYWORD_THRESHOLD = 0.02
PRED_THRESHOLD = 0.7

def is_pred_confident_enough(exp, y_expected):
    class_idx = exp_class_to_index[y_expected]
    prob = exp.predict_proba
    if prob[class_idx] >= PRED_THRESHOLD:
        return True
    return False

def explain_all(X, y_true):
    for x, y_exp in tqdm(zip(X, y_true)):
        exp = explainer.explain_instance(
            x, 
            clf_pipeline.predict_proba, 
            num_features=10, 
            top_labels=2)
        
        # only take if prediction is correct and higher than thershold
        if not is_pred_confident_enough(exp, y_exp):
            continue
        for keyword, score in exp.as_list(label=exp_class_to_index[y_exp]):
            if score > KEYWORD_THRESHOLD:
                exp_extracted_keywords[y_exp][keyword] += 1        
        
#explain_all(X_test, y_test)
explain_all(descriptions_texts, descriptions_labels)

In [None]:
for label, counter in exp_extracted_keywords.items():
    print(f"Label: {label}")
    for word, freq in counter.most_common(n=10):
        print(f"{word} => {freq}")
    print("")