### Import Libraries

In [1]:
# ! pip install pyspellchecker

In [2]:
import re
import math
from typing import Literal, Any
import warnings

import numpy as np
import pandas as pd
from pandas._typing import ArrayLike
from scipy.sparse import csr_matrix

from spellchecker import SpellChecker

from nltk.corpus import stopwords as nltk_stopwords, words as nltk_words
from nltk.stem import WordNetLemmatizer, PorterStemmer

from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import cosine_similarity

from sklearn.naive_bayes import BernoulliNB, GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier

from gensim.models import Word2Vec

### Download NLTK Ressources

In [3]:
# import nltk
# nltk.download('stopwords')
# nltk.download('words')
# nltk.download('wordnet')

### Load and Clean Dataset

In [4]:
def __tweet__clean(sent: str) -> str:
  sent = re.sub(r'http\S+', '', sent)
  sent = re.sub(r'[^A-Za-z\s]', ' ', sent)
  sent = re.sub(r'^\s+|\s+$', '', sent, flags=re.MULTILINE)
  sent = re.sub(r'\s{2,}', ' ', sent)
  return sent.lower()

def __tweet__preprocess(
    tweets_df: pd.Series, method: Literal['stem', 'lemmatize']|None=None, misspelling=False, stopword=False
    ) -> pd.Series:
  stop_words = set(nltk_stopwords.words('english'))
  lemmatizer = WordNetLemmatizer()
  stemmer    = PorterStemmer()
  speller    = SpellChecker(distance=1)
  speller.word_frequency.load_words(nltk_words.words())
  def __preprocess_helper(tweet: str) -> str:
    # words = word_tokenize(tweet)
    words = tweet.split()
    clean_words: list[str] = []
    for word in words:
      if misspelling:
        correct_word = speller.correction(word)
        word = word if correct_word is None else correct_word
      if stopword and word in stop_words:
        continue
      if method == 'stem':
        word = stemmer.stem(word)
      if method == 'lemmatize':
        word = lemmatizer.lemmatize(word)
      clean_words.append(word)
    return ' '.join(clean_words)
  # begin preprocess
  processed_tweet = tweets_df.apply(__tweet__clean)
  if method:
    processed_tweet = processed_tweet.apply(__preprocess_helper)
  return processed_tweet

def __tweet__vectorizer(
  tweets: pd.Series, *, method: Literal['count', 'tfidf', 'word2vec'], binary=False, ngram: Literal['11', '12', '22']='11'
  )-> np.ndarray:
  ngram_range = {'11': (1, 1), '12': (1, 2), '22': (2, 2)}.get(ngram, (1, 1))
  if method == 'tfidf':
    bow = TfidfVectorizer(ngram_range=ngram_range).fit_transform(tweets)
    return bow # type: ignore
  elif method == 'count':
    bow = CountVectorizer(binary=binary, ngram_range=ngram_range).fit_transform(tweets)
    return bow # type: ignore
  # block for Word2Vec
  bow_w2v: np.ndarray = np.empty((0, 133+7), np.float32)
  sentences           = [ line.split() for line in tweets ]
  w2v                 = Word2Vec(sentences, vector_size=133+7, epochs=13, min_count=1)
  for sent in sentences:
    tweet_vec = np.zeros((1, 133+7))
    for word in sent:
      tweet_vec += w2v.wv[word]
    bow_w2v = np.append(bow_w2v, tweet_vec, axis=0)
  return bow_w2v

def __load_tweets_and_labels_into_dataframe() -> pd.DataFrame:
  # load dataset
  neg, neu, pos = './data/negative.csv', './data/neutral.csv', './data/positive.csv'
  with open(neg, 'r') as neg_f, open(neu, 'r') as neu_f, open(pos, 'r') as pos_f:
    negative, neutral, positive = neg_f.read(), neu_f.read(), pos_f.read()
  # sent tokenizing
  sentence_pattern = r',([A-Z])'
  replacement_pattern = r'\n\1'
  negative = re.sub(sentence_pattern, replacement_pattern, negative).splitlines()
  neutral  = re.sub(sentence_pattern, replacement_pattern, neutral ).splitlines()
  positive = re.sub(sentence_pattern, replacement_pattern, positive).splitlines()
  # create DataFrame for each label
  negative = pd.DataFrame({'tweet': negative, 'label': 0})
  neutral  = pd.DataFrame({'tweet': neutral, 'label': 1})
  positive = pd.DataFrame({'tweet': positive, 'label': 2})
  # concat DataFrame
  tweets_df = pd.concat([negative, neutral, positive])
  # remove empty tweets
  tweets_df = tweets_df[tweets_df['tweet'].str.strip() != '']
  tweets_df = tweets_df.drop_duplicates(subset=['tweet'], ignore_index=True)
  tweets_df = tweets_df.sample(frac=1, ignore_index=True) # here shuffle tweets
  return tweets_df

def tweets_load() -> tuple[ArrayLike, ArrayLike, dict[str, np.ndarray]]:
  """
  Return:
  -------
  tweets, y_labels, bows
  """
  # processing and vectorizing with its params
  processing  = ['just_tokenization', 'stemming', 'lemmatization', 'stemming+misspelling', 'lemmatization+misspelling', 'lemmatization+stopwords']
  processing_params  = [
    {'method': None}, {'method': 'stem'}, {'method': 'lemmatize'}, {'method': 'stem', 'misspelling': True},
    {'method': 'lemmatize', 'misspelling': True}, {'method': 'lemmatize', 'stopword': True}
    ]
  vectorizing = ['binary', 'word_counts', 'tfidf', 'word2vec']
  vectorizing_params = [{'method': 'count', 'binary': True}, {'method': 'count'}, {'method': 'tfidf'}, {'method': 'word2vec'}]
  # other varibles
  bows: dict[str, np.ndarray] = {}
  # load datasets
  tweets_df  = __load_tweets_and_labels_into_dataframe()
  # fit each processing method a vectorizer
  __i, __len = 0, len(processing) * len(vectorizing)
  for proc, proc_params in zip(processing, processing_params):
    proc_tweets = __tweet__preprocess(tweets_df['tweet'], **proc_params)
    for vect, vect_params in zip(vectorizing, vectorizing_params):
      bows[f'{proc:27} and   {vect:13} vectorizing'] = __tweet__vectorizer(proc_tweets, **vect_params)
      __i += 1
      print(f'{__i:2}/{__len}:   {proc:27} and   {vect:13} vectorizing', end='\r')
  # return pure tweets, labels and bows
  tweets, y = tweets_df['tweet'].values, tweets_df['label'].values
  return tweets, y, bows


### Similarity

In [5]:
# def get_top_10_most_similar_tweets(bow: np.ndarray, tweets: pd.DataFrame, tweet_idx: int) -> np.ndarray:
#   bow_sum       = np.sqrt(np.sum(np.square(bow), axis=1))
#   bow_dot_tweet = np.dot(bow, bow[tweet_idx])
#   similarity    = np.divide(bow_dot_tweet, bow_sum * bow_sum[tweet_idx] + 1e-7)
#   top_10_df     = pd.DataFrame({'0': similarity}).sort_values(by='0', ascending=False).drop(index=tweet_idx)[:10]
#   top_10_index  = top_10_df[top_10_df['0'] > 0].index
#   top_10_tweets = tweets['tweet'].loc[top_10_index].values
#   return top_10_tweets # type: ignore


def __top_similar_pairs(bow: np.ndarray) -> dict[tuple[int, int], float]:
  similarity = pd.DataFrame(cosine_similarity(bow))
  similar_pairs: dict[tuple[int, int], float] = {}
  for tweet_idx in similarity.columns.values:
    tweet_similiraty = similarity[tweet_idx].sort_values(ascending=False)
    top_value, top_idx = tweet_similiraty[1], tweet_similiraty.index[1]
    similar_pairs[(tweet_idx, top_idx)] = top_value
  sorted_similar_pairs_by_value = {key: similar_pairs[key] for key in sorted(similar_pairs, key=lambda itm: similar_pairs[itm], reverse=True)}
  return sorted_similar_pairs_by_value

def __print_top_similar_tweets_pair(tweets: ArrayLike, bows: dict[str, np.ndarray]) -> dict[str, list]:
  for bow_name in bows:
    X = bows[bow_name]
    top_similar_pairs = __top_similar_pairs(X)
    print("top similar pairs of  '", bow_name, "':", sep='')
    __i = 1
    for pair in top_similar_pairs:
      first, second = pair
      print(' ' * 15, '-' * 150)
      print(' ' * 5, f'{1.0:3f}: {tweets[first]}')
      print(' ' * 5, f'{top_similar_pairs[pair]:3f}: {tweets[second]}')
      if __i == 10: break
      __i += 1
    print('\n')

### Just Load Dataset

In [6]:
tweets, y, bows = tweets_load()

24/24:   lemmatization+stopwords     and   word2vec      vectorizing

### Global Varibles

In [7]:
train_frac = 0.8

train_size: int = int(y.shape[0] * train_frac)

y_train, y_test = y[: train_size], y[train_size: ]

### Start

In [8]:
__print_top_similar_tweets_pair(tweets, bows)

top similar pairs of  'just_tokenization           and   binary        vectorizing':
                ------------------------------------------------------------------------------------------------------------------------------------------------------
      1.000000: The video will definitely be a 30 minute episode if not longer.
      1.000000: GOD NOT THE KOALAS 
                ------------------------------------------------------------------------------------------------------------------------------------------------------
      1.000000: GOD NOT THE KOALAS 
      0.316228: I'm not lost
                ------------------------------------------------------------------------------------------------------------------------------------------------------
      1.000000: If it's spring, it must be a varsity 
      0.258199: It will be at random on twitch happy
                --------------------------------------------------------------------------------------------------------------

In [9]:
warnings.filterwarnings("ignore", category=Warning)

bow_to_model: dict[str, Any] = {}

__i, __len = 0, len(bows) * 4 # ( * number ) the number changes respectively with how many model you use in models: list
for bow_name in bows:
  X = bows[bow_name]
  if isinstance(X, csr_matrix):
    X = X.toarray()
  X_train, X_test = X[: train_size], X[train_size: ]
  models: list = [
      LogisticRegression(multi_class='multinomial', solver='lbfgs', max_iter=37),
      DecisionTreeClassifier(),
      BernoulliNB(),
      GaussianNB(),
    ]
  for model in models:
    model.fit(X_train, y_train)
    y_predicted = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_predicted)
    model_name = model.__class__.__name__
    bow_to_model[f'{bow_name} using   {model_name:25} model'] = accuracy
    __i += 1
    print(f'{__i:2}/{__len}:   {bow_name} using   {model_name:25} model: {accuracy}', end='\r')


96/96:   lemmatization+stopwords     and   word2vec      vectorizing using   GaussianNB                model: 0.6056338028169014

In [10]:
sorted_bow_to_model__keys = sorted(bow_to_model, key = lambda item: bow_to_model[item], reverse=True)

for key in sorted_bow_to_model__keys:
  value = bow_to_model[key]
  print(f'{key}:   {value}')


lemmatization               and   binary        vectorizing using   LogisticRegression        model:   0.9460093896713615
just_tokenization           and   binary        vectorizing using   LogisticRegression        model:   0.9436619718309859
just_tokenization           and   word_counts   vectorizing using   LogisticRegression        model:   0.9436619718309859
lemmatization+misspelling   and   binary        vectorizing using   LogisticRegression        model:   0.9436619718309859
lemmatization+misspelling   and   word_counts   vectorizing using   LogisticRegression        model:   0.9436619718309859
stemming                    and   binary        vectorizing using   LogisticRegression        model:   0.9413145539906104
lemmatization               and   word_counts   vectorizing using   LogisticRegression        model:   0.9413145539906104
stemming+misspelling        and   word_counts   vectorizing using   LogisticRegression        model:   0.9413145539906104
stemming                