<a href="https://colab.research.google.com/github/kapoor-a/nlp/blob/main/sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install trax
!pip install nltk

In [None]:
import os 
import shutil
import random as rnd

# import relevant libraries
import trax
import trax.fastmath.numpy as np
from trax import layers as tl
from trax import fastmath

In [None]:
import re
import nltk
nltk.download('twitter_samples')
nltk.download('stopwords')
from nltk.tokenize import TweetTokenizer
from nltk.corpus import stopwords, twitter_samples 
from nltk.stem import PorterStemmer
import string

In [None]:
stopwords_english = stopwords.words('english')
stemmer = PorterStemmer()

In [None]:
def process_tweet(tweet):
    # remove stock market tickers like $GE
    tweet = re.sub(r'\$\w*', '', tweet)
    # remove old style retweet text "RT"
    tweet = re.sub(r'^RT[\s]+', '', tweet)
    # remove hyperlinks
    tweet = re.sub(r'https?:\/\/.*[\r\n]*', '', tweet)
    # remove hashtags
    # only removing the hash # sign from the word
    tweet = re.sub(r'#', '', tweet)
    # tokenize tweets
    tokenizer = TweetTokenizer(preserve_case=False, strip_handles=True, reduce_len=True)
    tweet_tokens = tokenizer.tokenize(tweet)
    tweets_clean = []
    for word in tweet_tokens:
        if (word not in stopwords_english and # remove stopwords
            word not in string.punctuation): # remove punctuation
            #tweets_clean.append(word)
            stem_word = stemmer.stem(word) # stemming word
            tweets_clean.append(stem_word)
    return tweets_clean    

In [None]:
def load_tweets():
    all_positive_tweets = twitter_samples.strings('positive_tweets.json')
    all_negative_tweets = twitter_samples.strings('negative_tweets.json')  
    return all_positive_tweets, all_negative_tweets

In [None]:
def train_val_split():
    all_positive_tweets, all_negative_tweets = load_tweets()

    # View the total number of positive and negative tweets.
    print(f"The number of positive tweets: {len(all_positive_tweets)}")
    print(f"The number of negative tweets: {len(all_negative_tweets)}")
    
    split = int(0.9*len(all_positive_tweets))

    val_pos   = all_positive_tweets[split:] 
    train_pos  = all_positive_tweets[:split]

    val_neg   = all_negative_tweets[split:] 
    train_neg  = all_negative_tweets[:split]
    
    train_x = train_pos + train_neg 

    val_x  = val_pos + val_neg

    # Set the labels for the training set (1 for positive, 0 for negative)
    train_y = np.append(np.ones(len(train_pos)), np.zeros(len(train_neg)))

    # Set the labels for the validation set (1 for positive, 0 for negative)
    val_y  = np.append(np.ones(len(val_pos)), np.zeros(len(val_neg)))


    return train_pos, train_neg, train_x, train_y, val_pos, val_neg, val_x, val_y

In [None]:
def get_vocab(train_x):
    # started with pad, end of line and unk tokens
    vocab = {'__PAD__': 0, '__</e>__': 1, '__UNK__': 2} 

    for tweet in train_x: 
        processed_tweet = process_tweet(tweet)
        for word in processed_tweet:
            if word not in vocab: 
                vocab[word] = len(vocab)
    
    return vocab

In [None]:
def tweet_to_tensor(tweet, vocab_dict, unk_token='__UNK__'):  
    word_l = process_tweet(tweet)
    tensor_l = [] 
    unk_id = vocab_dict[unk_token]
    
    for word in word_l:
        word_id = vocab_dict[word] if word in vocab_dict else unk_id
        tensor_l.append(word_id)
    
    return tensor_l

In [None]:
def data_generator(data_pos, data_neg, batch_size, vocab_dict, loop, shuffle=False):  
    assert batch_size % 2 == 0
    n_to_take = batch_size // 2
    pos_index = 0
    neg_index = 0
    len_data_pos = len(data_pos)
    len_data_neg = len(data_neg)
    pos_index_lines = list(range(len_data_pos))
    neg_index_lines = list(range(len_data_neg))
    if shuffle:
        rnd.shuffle(pos_index_lines)
        rnd.shuffle(neg_index_lines)
    targets = np.array([1]*n_to_take + [0]*n_to_take)
    weights = np.array([1]*batch_size)    
    stop = False
    
    while not stop:  
        batch = []
        for i in range(n_to_take):
            if pos_index >= len_data_pos: 
                if not loop:
                    stop = True;
                    break;
                pos_index = 0
                if shuffle:
                    rnd.shuffle(pos_index_lines)
                    
            tweet = data_pos[pos_index_lines[pos_index]]
            tensor = tweet_to_tensor(tweet, vocab_dict)
            batch.append(tensor)
            pos_index = pos_index + 1

        for i in range(n_to_take):
            if neg_index >= len_data_neg:
                if not loop:
                    stop = True 
                    break 
                neg_index = 0
                if shuffle:
                    rnd.shuffle(neg_index_lines)
            tweet = data_neg[neg_index_lines[neg_index]]
            tensor = tweet_to_tensor(tweet, vocab_dict)
            batch.append(tensor)
            neg_index += 1

        if stop:
            break;

        max_len = max([len(t) for t in batch]) 
        tensor_pad_l = []
        for tensor in batch:
            n_pad = max_len - len(tensor)
            pad_l = [0]*n_pad
            tensor_pad = tensor + pad_l
            tensor_pad_l.append(tensor_pad)

        inputs = np.array(tensor_pad_l)
        yield inputs, targets, weights

In [None]:
train_pos, train_neg, train_x, train_y, val_pos, val_neg, val_x, val_y = train_val_split()
vocab = get_vocab(train_x)

In [None]:
vocab_size = len(vocab)
embedding_size = 256
batch_size = 16

In [None]:
train_stream = data_generator(train_pos, train_neg, batch_size, vocab, loop=True, shuffle=True)
eval_stream = data_generator(val_pos, val_neg, batch_size, vocab, True, True)

In [None]:
next(eval_stream)

In [None]:
def classifier(vocab_size, embedding_dim, output_dim, mode='train'):
    return tl.Serial( 
      tl.Embedding(vocab_size=vocab_size, d_feature=embedding_dim),
      tl.Dense(embedding_dim),
      tl.Mean(axis=1),
      tl.Dense(output_dim),
      tl.LogSoftmax()
    ) 

In [None]:
from trax.supervised import training

def training_loop(model, vocab, train_stream, eval_stream, output_dir="model/"):
    train_task = training.TrainTask(
        labeled_data=train_stream,
        loss_layer=tl.CrossEntropyLoss(),
        optimizer=trax.optimizers.Adam(0.01),
        n_steps_per_checkpoint=10,
    )
    eval_task = training.EvalTask(
        labeled_data=eval_stream,
        metrics=[tl.CrossEntropyLoss(), tl.Accuracy()],
    )

    loop = training.Loop(model, tasks=train_task, eval_tasks=eval_task, output_dir=output_dir)
    return loop

In [None]:
!rm -rf /content/model/
model = classifier(vocab_size, embedding_size, 2)
loop = training_loop(model, vocab, train_stream, eval_stream, output_dir='/content/model/')

In [None]:
loop.run(n_steps = 500)

In [None]:
def predict(sentence):
    inputs = np.array(tweet_to_tensor(sentence, vocab_dict=vocab))
    inputs = inputs[None, :]  
    preds_probs = model(inputs)
    print(preds_probs)
    preds = int(preds_probs[0, 1] > preds_probs[0, 0])
    sentiment = "negative"
    if preds == 1:
        sentiment = 'positive'

    return preds, sentiment

In [None]:
predict("it feels bad when others are sad")

In [None]:
from sklearn.decomposition import PCA #Import PCA from scikit-learn
pca = PCA(n_components=2) #PCA with two dimensions

emb_2dim = pca.fit_transform(model.weights[0])

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

#Selection of negative and positive words
neg_words = ['worst', 'bad', 'hurt', 'sad', 'hate']
pos_words = ['best', 'good', 'nice', 'better', 'love']

#Index of each selected word
neg_n = [vocab[w] for w in neg_words]
pos_n = [vocab[w] for w in pos_words]

plt.figure()

#Scatter plot for negative words
plt.scatter(emb_2dim[neg_n][:,0],emb_2dim[neg_n][:,1], color = 'r')
for i, txt in enumerate(neg_words): 
    plt.annotate(txt, (emb_2dim[neg_n][i,0],emb_2dim[neg_n][i,1]))

#Scatter plot for positive words
plt.scatter(emb_2dim[pos_n][:,0],emb_2dim[pos_n][:,1], color = 'g')
for i, txt in enumerate(pos_words): 
    plt.annotate(txt,(emb_2dim[pos_n][i,0],emb_2dim[pos_n][i,1]))

plt.title('Word embeddings in 2d')

plt.show()