## Using Traditional Machine Learning Methods For Twitter Sentiment Analysis

Import the required packages / modules.

In [32]:
import nltk
import re
import pickle
nltk.download("stopwords")
nltk.download("twitter_samples")
from nltk.corpus import twitter_samples
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import emoji
from nltk.tokenize.casual import TweetTokenizer

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/peiyuns/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package twitter_samples to
[nltk_data]     /Users/peiyuns/nltk_data...
[nltk_data]   Package twitter_samples is already up-to-date!


For ML

In [33]:
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction import DictVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score

Dataset to use

In [34]:
twitter_samples.fileids()

['negative_tweets.json', 'positive_tweets.json', 'tweets.20150430-223406.json']

In [35]:
# Read positive and negative tweets
positive_tweets = twitter_samples.strings("positive_tweets.json")
negative_tweets = twitter_samples.strings("negative_tweets.json")

In [36]:
stop_words = set(stopwords.words('english'))
vec = DictVectorizer()  # Vectorizer
lemmatizer = WordNetLemmatizer()

In [37]:
def handle_hashtag(hashtag):
    i = 0
    while(hashtag[i] == "#"):
        i += 1
        if i == len(hashtag):
            return -1
    
    # multiple hashtags in the word
    if "#" in hashtag[i:]:
        return -1
    
    return hashtag[i:]

In [38]:
frequent_symbols = [":)", ":(", ":D", ":-)", ":p"]

In [39]:
tokenizer = TweetTokenizer()

In [40]:
def preprocess(tweets):
    new_tweets = []
    for tweet in tweets:
        new_tweet = []
        for word in tokenizer.tokenize(tweet):
            
#             # lowercase
#             word = word.lower()
            
            # regular words
            if not (word in stop_words or re.search("[^a-zA-Z]",word)):  # if not stopwords and non-alphabetic
                new_tweet.append(lemmatizer.lemmatize(word))
            
            # hashtags
            elif word[0] == "#":  
                hashtag = handle_hashtag(word)
                if hashtag != -1:
                    new_tweet.append("#" + lemmatizer.lemmatize(hashtag))
            
            # symbols / emojis
            elif word in frequent_symbols or word in emoji.UNICODE_EMOJI:
                new_tweet.append(word)
                
        new_tweets.append(new_tweet)
    
    # split into training, development, and testing
    training, develop_test = train_test_split(new_tweets, test_size=0.2)
    development, testing = train_test_split(develop_test, test_size=0.5)   
    
    return (training, development, testing)

In [41]:
# Excluding hashtags containing stop words and non-alphabetic chars
(positive_training, positive_develop, positive_test)  = preprocess(positive_tweets)
(negative_training, negative_develop, negative_test)  = preprocess(negative_tweets)

In [42]:
# Get array of dictionary according to the list of tweets 
def getDict(tweets):
    dict_arr = []
    for tweet in tweets:
        tweet_dict = {}
        for word in tweet:
            if word in tweet_dict:
                tweet_dict[word] += 1
            else:
                tweet_dict[word] = 1
        dict_arr.append(tweet_dict)
    return dict_arr

In [43]:
# Training, development, testing sets and targets
X_train = vec.fit_transform(getDict(positive_training + negative_training))
y_train = len(positive_training)*[1] + len(negative_training)*[0]

X_dev = vec.transform(getDict(positive_develop + negative_develop))
y_dev = len(positive_develop)*[1] + len(negative_develop)*[0]

X_test = vec.transform(getDict(positive_test + negative_test))
y_test = len(positive_test)*[1] + len(negative_test)*[0]

In [44]:
# Hyperparameter sets
alphas = [0.001, 0.01, 0.1, 1, 10, 100]
penaltys = ['l1', 'l2']
Cs = [0.001, 0.01, 0.1, 1, 10, 100, 1000]
kernels = ["linear", "poly", "rbf", "sigmoid"]

# Init best hyperparameters
best_bayes_params = {}
best_bayes_score = 0
best_logistic_params = {}
best_logistic_score = 0

print("Naive Bayes hyperparameters:")
# Tune naive bayes hyperparameters
for alpha in alphas:
    bayes = MultinomialNB(alpha = alpha) # Create classifier
    bayes.fit(X_train, y_train) # Train model
    score = bayes.score(X_dev, y_dev) # Calculate score 
    print ("alpha = %7.3f, Score = %.4f" % (alpha, score)) # Print result
    
    # Check if better
    if score > best_bayes_score:
        best_bayes_params = {'alpha':alpha}
        best_bayes_score = score

print("\nLogistic Regression hyperparameters")
# Tune logistic regression hyperparameters
for C in Cs:
    for penalty in penaltys:
        logistic = LogisticRegression(C = C, penalty = penalty) # Create classifier
        logistic.fit(X_train, y_train) # Train model
        score = logistic.score(X_dev, y_dev) # Calculate score  
        print("penalty = %s, C = %8.3f, Score = %.4f" % (penalty, C, score))
    
        # Check if better
        if score > best_logistic_score:
            best_logistic_params = {'C':C, 'penalty':penalty}
            best_logistic_score = score
        
print("\nNaive Bayes: best parameter: %s, score = %f" % (str(best_bayes_params), best_bayes_score))
print("Logistic Regression: best parameter: %s, score = %f" % (str(best_logistic_params), best_logistic_score))

Naive Bayes hyperparameters:
alpha =   0.001, Score = 0.9510
alpha =   0.010, Score = 0.9670
alpha =   0.100, Score = 0.9830
alpha =   1.000, Score = 0.9860
alpha =  10.000, Score = 0.9830
alpha = 100.000, Score = 0.9810

Logistic Regression hyperparameters
penalty = l1, C =    0.001, Score = 0.8600
penalty = l2, C =    0.001, Score = 0.9550
penalty = l1, C =    0.010, Score = 0.9850
penalty = l2, C =    0.010, Score = 0.9560
penalty = l1, C =    0.100, Score = 0.9960
penalty = l2, C =    0.100, Score = 0.9960
penalty = l1, C =    1.000, Score = 0.9960
penalty = l2, C =    1.000, Score = 0.9960
penalty = l1, C =   10.000, Score = 0.9960




penalty = l2, C =   10.000, Score = 0.9960
penalty = l1, C =  100.000, Score = 0.9960
penalty = l2, C =  100.000, Score = 0.9960
penalty = l1, C = 1000.000, Score = 0.9960
penalty = l2, C = 1000.000, Score = 0.9960

Naive Bayes: best parameter: {'alpha': 1}, score = 0.986000
Logistic Regression: best parameter: {'C': 0.1, 'penalty': 'l1'}, score = 0.996000


In [45]:
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score

# Classifiers
bayes_clf = MultinomialNB(alpha = best_bayes_params['alpha'])
logistic_clf = LogisticRegression(C = best_logistic_params['C'], penalty = best_logistic_params['penalty'])

# Train classifiers
bayes_clf.fit(X_train, y_train)
logistic_clf.fit(X_train, y_train)

print ("Naive Bayes: f-score = %.4f, accuracy = %.4f" % (f1_score(bayes_clf.predict(X_test),y_test, average = 'macro'), accuracy_score(bayes_clf.predict(X_test),y_test)))
print ("Logistic Regression: f-score = %.4f, accuracy = %.4f" % (f1_score(logistic_clf.predict(X_test),y_test, average = 'macro'), accuracy_score(logistic_clf.predict(X_test),y_test)))


Naive Bayes: f-score = 0.9870, accuracy = 0.9870
Logistic Regression: f-score = 0.9950, accuracy = 0.9950




In [46]:
# save the classifier and vectorizer
pickle.dump(logistic_clf, open("Logistic_{C0.1}_{l1}.pk1", "wb"))  
pickle.dump(vec, open("DictVectorizer.pk1", "wb"))

In [47]:
# load the classifier and vectorizer
logistic_clf = pickle.load(open("Logistic_{C0.1}_{l1}.pk1", "rb"))  
vec = pickle.load(open("DictVectorizer.pk1", "rb"))

In [48]:
logistic_clf

LogisticRegression(C=0.1, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='warn',
          n_jobs=None, penalty='l1', random_state=None, solver='warn',
          tol=0.0001, verbose=0, warm_start=False)

In [49]:
vec

DictVectorizer(dtype=<class 'numpy.float64'>, separator='=', sort=True,
        sparse=True)